Kubeflow Pipelinesの資料整理
Kubeflow Pipelines
SDKのインストール
KFPのPython SDKの各種パッケージ
-
kfp.compiler
: Python DSLで記述されたパイプラインをYAML形式のワークフローにコンパイルするためのパッケージ -
kfp.components
: コンポーネントを扱う(読み込み、書き出しなど)ためのパッケージ -
kfp.dsl
: パイプラインやコンポーネントを定義するためのDSL(domain-specific language) -
kfp.Client
: KFPをPythonから操作するためのクライアントパッケージ - KFP extension modules: 特定のプラットフォーム(各種クラウドやオンプレ環境など)のためのパッケージ
- KFP diagnose_me module: 環境診断のためのパッケージ
コンポーネントとパイプラインの構築
KFPのコンポーネントは以下の仕様を持つ
- インターフェース:入力と出力
- 実装:コンテナイメージと実行コマンド
- メタデータ:名前や説明など
コンポーネントの実装について
- パイプライン構築のチュートリアル
- Pythonベースの軽量コンポーネントを構築することもできる。
コンポーネントのディレクトリ構成
Kubeflow Pipelinesのインストール
下記のインストールオプション(KFP基盤の利用方法)がある
- Kubeflow Pipelines Standalone: KFPに関連する最小限の依存関係を用いた利用
- Kubeflow deploymentの一部としての利用。Kubeflowに関連する全ての依存関係が含まれる
- (Beta)Google Cloud AI Platform Pipelinesからの利用
- AI Platform PipelinesはVertex Pipelinesになった。Vertex Pipelinesの概要
- テストのためのローカル利用
Metadata and Metrics
KFPのコンポーネント間で受け渡す型(?)
MetricsとVisualize Result
ローカルでのKubeflow Pipelinesのデプロイ
クラスタの展開
手元のMacBook Pro(Intel CPU)で検証。ドキュメントではkind, K3s, K3ai [alpha]のどれかを使えると書いてある。kindで試してみる。
- kindのインストール:
brew install kind
- クラスタの作成:
kind create cluster
Kubeflow Pipelinesのデプロイ
# env/platform-agnostic-pns hasn't been publically released, so you will install it from master
export PIPELINE_VERSION=1.6.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
デプロイが完了するまでに数分かかる。デプロイの進行はkubectl -n kubeflow get deployment
から確認できる。完了後、ポート転送することでブラウザからKFPのUIにアクセスできる。
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
停止
export PIPELINE_VERSION=1.6.0
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
クラスタの削除
kind delete cluster
Cloud Workflows
Google Cloud Workflowsはサーバーレスで動作するワークフローサービス。Kubeflow Pipelinesとは直接関係ない。おしゃれなGUIなどはないが、シンプルなワークフローであればKFPよりも少ない学習コストで利用できそう。
Vertex AIのカスタムトレーニングジョブと組み合わせれば、少ない学習コストでそこそこ汎用な構成ができるかもしれない。
- Cloud RunとCloud FunctionsでWorkflowsを使用するチュートリアル
- カスタムトレーニングジョブの作成 | Vertex AI
- pythonクイックスタート | Cloud Functions
- Cloud RunでサーバレスMLOpsを試してみた
- Connectors reference
- APIとリファレンス
他サービスへの認証
ワークフロー内で認証付きリクエストを行うには、呼び出すサービスに応じてOAuth2またはOpenID Connect(OIDC)のどちらかを使う。
-
OAuth2
: Compute Engine APIやFirestore APIなどのすべてのGoogle Cloud APIsとの接続 -
OIDC
: Cloud RunやCloud Functionsとの接続に利用できる
ワークフローからVertex AIのカスタムジョブを実行するには、ワークフロー用にサービスアカウントを用意して、「Vertex AIユーザー」のロールを付与、リクエスト時の認証にtype: OAuth2
を指定すれば良い。
Vertex AI Custom Jobs
カスタムジョブを実行できる。処理内容の実装方法は2通り
- コードのみを実装し、
gcloud beta ai custom-jobs local-run
コマンドを通じてコンテナイメージのビルド、テストを行う。自分でDockerをいじる必要がない。 - Dockerfileまで自分で作成し、コンテナイメージをビルド、テストする。柔軟性が高い。
Container Registryへのイメージのプッシュ
docker tag <local-image> gcr.io/<project id>/<image>:<tag>
docker push gcr.io/<project id>/<image>:<tag>
カスタムジョブのコンフィグ
GPUを利用できるマシンタイプ
AI PlatformとVertex AIの違い
Vertex AIのAPIエンドポイント
Vertex AIのCloud Storageディレクトリ
baseOutputDirectory
フィールドを'gs://<bucket-name>/<optional-prefix>'
指定すると、コードの実行時に以下の環境変数が設定される。
-
AIP_MODEL_DIR
: モデル アーティファクトの保存を目的としたディレクトリのCloud Storage URI。<baseOutputDirtectory>/model/
-
AIP_CHECKPOINT_DIR
: チェックポイントの保存を目的としたディレクトリのCloud Storage URI。<baseOutputDirectory>/checkpoints/
-
AIP_TENSORBOARD_LOG_DIR
: TensorBoard ログの保存を目的としたディレクトリのCloud Storage URI。<baseOutputDirectory>/logs/
CustomJobSpecの指定時に、workerPoolSpecs
などと同様に指定する。
{
"workerPoolSpecs": [
{
object (WorkerPoolSpec)
}
],
"scheduling": {
object (Scheduling)
},
"serviceAccount": string,
"network": string,
"baseOutputDirectory": {
object (GcsDestination)
}
}
ドキュメントによればKerasのモデル書き出しやTensorBoardのログ出力は直接指定できる。一方でXGBoostやScikit-Learnのモデル書き出しなどはコード内でgoogle-cloud-storageライブラリを使用してアップロードする必要がある。
2021/09/19時点でのプレビュー機能として、Cloud Storage FUSEを使うとバケットをインスタンスのファイルシステムにマウントできる。例えばgs://<bucket>/data.csv
のデータをコード上でfile = oepn('/gcs/<bucket>/data.csv', 'r')
として読み込むことができる。上記のbaseOutputDirectory
も/gcs/<bucket>
と指定すればローカル・クラウドの差異を意識せずにファイル読み書きできるかも?(要検証)
DevOps・MLOpsサンプル
Vertex Pipelines
ワークフローの各処理はKubeflow Pipelines SDKによって記述する。
Google Cloudコンポーネント
GCPのサービスとやり取りするための基本的なコンポーネントはパッケージとして提供されている。
pip install -U google-cloud-pipeline-components
Vertex AIのカスタムジョブを投下するコンポーネントは2021年8月現在でexperimental
Vertex Pipelinesの各コンポーネントはVertex AIのジョブとして実行されるので、実行時のスペックとして指定できそう。
train_op = create_component_from_func(
func=...,
output_component_file='...',
packages_to_install=[...]
])
train_task = train_op(...)
worker_pool_specs = [
{
"containerSpec": {
"imageUri":train_task.container.image,
"args": train_task.arguments,
},
"replicaCount": TRAINING_REPLICA_COUNT,
"machineSpec": {
"machineType": TRAINING_MACHINE_TYPE,
"acceleratorType": TRAINING_ACCELERATOR_TYPE,
"acceleratorCount": TRAINING_ACCELERATOR_COUNT,
},
},
]
train_task.custom_job_spec = {
"displayName": train_task.name,
"jobSpec": {
"workerPoolSpecs": worker_pool_specs,
}
}
x_task.custom_job_spec = {...}
で指定する方法はドキュメントにはほぼ記載がない。もっと直感的に指定できるようになると嬉しい。
component_spec
はKubeflow PipelinesのContainerOp
、YAML形式の定義ファイルからkfp.components.load_component_from_file
でロードできる。
サンプルのコンポーネント定義
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Submitting a CustomJob
inputs:
- {name: type, type: String}
- {name: gcp_project, type: String}
- {name: gcp_region, type: String}
- {name: payload, type: String}
outputs:
- {name: gcp_resources, type: String}
implementation:
container:
image: gcr.io/managed-pipeline-test/launcher:v8
command: [python, /launcher.py]
args: [
--type, {inputValue: type},
--gcp_project, {inputValue: gcp_project},
--gcp_region, {inputValue: gcp_region},
--payload, {inputValue: payload},
--gcp_resources, {outputPath: gcp_resources},
]
コンポーネントの入出力で使える型は以下
その他のサンプル
Vertex AI and MLOps
Vertex AIとその他サービスを組み合わせてCI/CD、パイプライン化、モデルモニタリングを行うデモ
Serving prediction via Vertex AI
Vertex AI上での学習したモデルのサービングは以下の3ステップからなる。
- モデルのアップロード(インポート)
- APIエンドポイントの作成
- エンドポイントへのモデルのデプロイ
モデルのアップロードは、パラメータなどのアーティファクトとサービング用のコンテナを指定する。
サービング用コンテナは、学習済みモデルがTensorFlow, scikit-learn、XGBoostの場合ビルド済みコンテナを利用できる。カスタムコンテナを用いる場合は、以下の仕様を満たしていれば任意のコンテナを利用できる。
エンドポイントを作成、アップロード済みモデルのデプロイすることで予測を提供できるようになる。一つのエンドポイントに複数のモデルをデプロイすることも、モデルを複数のエンドポイントにデプロイすることもできる。同一エンドポイント内でのトラフィックの分割や、GPUを利用することもできる。
Vertex AI上の予測サービングにおける最小ノード数は1のよう。GPUが不要でトラフィックがないタイミングでゼロスケールしたい場合にはCloud Runでも十分かもしれない(Cloud Runでもトラフィック分割などは可能)。