Open8

Kubeflow Pipelinesの資料整理

Kubeflow Pipelines

SDKのインストール

https://www.kubeflow.org/docs/components/pipelines/sdk/install-sdk/

KFPのPython SDKの各種パッケージ

https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/#sdk-packages
  • kfp.compiler: Python DSLで記述されたパイプラインをYAML形式のワークフローにコンパイルするためのパッケージ
  • kfp.components: コンポーネントを扱う(読み込み、書き出しなど)ためのパッケージ
  • kfp.dsl: パイプラインやコンポーネントを定義するためのDSL(domain-specific language)
  • kfp.Client: KFPをPythonから操作するためのクライアントパッケージ
  • KFP extension modules: 特定のプラットフォーム(各種クラウドやオンプレ環境など)のためのパッケージ
  • KFP diagnose_me module: 環境診断のためのパッケージ

コンポーネントとパイプラインの構築

https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/#building-pipelines-and-components

KFPのコンポーネントは以下の仕様を持つ

  • インターフェース:入力と出力
  • 実装:コンテナイメージと実行コマンド
  • メタデータ:名前や説明など

コンポーネントの実装について

コンポーネントのディレクトリ構成

https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#organizing-the-component-files

Kubeflow Pipelinesのインストール

https://www.kubeflow.org/docs/components/pipelines/installation/overview/

下記のインストールオプション(KFP基盤の利用方法)がある

Metadata and Metrics

https://v0-5.kubeflow.org/docs/pipelines/metrics/

KFPのコンポーネント間で受け渡す型(?)

https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/v2/dsl

MetricsとVisualize Result

https://www.kubeflow.org/docs/components/pipelines/sdk/pipelines-metrics/

https://www.kubeflow.org/docs/components/pipelines/sdk/output-viewer/

ローカルでのKubeflow Pipelinesのデプロイ

クラスタの展開

https://www.kubeflow.org/docs/components/pipelines/installation/localcluster-deployment/

手元のMacBook Pro(Intel CPU)で検証。ドキュメントではkind, K3s, K3ai [alpha]のどれかを使えると書いてある。kindで試してみる。

  • kindのインストール:brew install kind
  • クラスタの作成:kind create cluster

Kubeflow Pipelinesのデプロイ

# env/platform-agnostic-pns hasn't been publically released, so you will install it from master
export PIPELINE_VERSION=1.6.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

デプロイが完了するまでに数分かかる。デプロイの進行はkubectl -n kubeflow get deploymentから確認できる。完了後、ポート転送することでブラウザからKFPのUIにアクセスできる。

kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

停止

export PIPELINE_VERSION=1.6.0
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"

クラスタの削除

kind delete cluster

Cloud Workflows

Google Cloud Workflowsはサーバーレスで動作するワークフローサービス。Kubeflow Pipelinesとは直接関係ない。おしゃれなGUIなどはないが、シンプルなワークフローであればKFPよりも少ない学習コストで利用できそう。

https://medium.com/google-cloud-jp/advent2020jp-workflows-a4c56595d977

https://cloud.google.com/workflows/docs/executing-workflow?hl=ja#gcloud

Vertex AIのカスタムトレーニングジョブと組み合わせれば、少ない学習コストでそこそこ汎用な構成ができるかもしれない。

他サービスへの認証

https://cloud.google.com/workflows/docs/authentication?hl=ja#making_authenticated_requests_to_apis

ワークフロー内で認証付きリクエストを行うには、呼び出すサービスに応じてOAuth2またはOpenID Connect(OIDC)のどちらかを使う。

  • OAuth2: Compute Engine APIやFirestore APIなどのすべてのGoogle Cloud APIsとの接続
  • OIDC: Cloud RunやCloud Functionsとの接続に利用できる

ワークフローからVertex AIのカスタムジョブを実行するには、ワークフロー用にサービスアカウントを用意して、「Vertex AIユーザー」のロールを付与、リクエスト時の認証にtype: OAuth2を指定すれば良い。

Vertex AI Custom Jobs

カスタムジョブを実行できる。処理内容の実装方法は2通り

  • コードのみを実装し、gcloud beta ai custom-jobs local-runコマンドを通じてコンテナイメージのビルド、テストを行う。自分でDockerをいじる必要がない。
  • Dockerfileまで自分で作成し、コンテナイメージをビルド、テストする。柔軟性が高い。

https://cloud.google.com/vertex-ai/docs/training/create-custom-container?hl=ja

Container Registryへのイメージのプッシュ

docker tag <local-image> gcr.io/<project id>/<image>:<tag>
docker push gcr.io/<project id>/<image>:<tag>

https://cloud.google.com/container-registry/docs/pushing-and-pulling?hl=ja

カスタムジョブのコンフィグ

https://cloud.google.com/vertex-ai/docs/training/create-custom-job?hl=ja#create

https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec?hl=ja

GPUを利用できるマシンタイプ

https://cloud.google.com/vertex-ai/docs/training/configure-compute?hl=ja#specifying_gpus

AI PlatformとVertex AIの違い

https://cloud.google.com/vertex-ai/docs/start/migrating-to-vertex-ai?_ga=2.236980995.-2082941595.1597042029&_gac=1.57359064.1628498353.CjwKCAjwpMOIBhBAEiwAy5M6YLbmvlZ_sQ--Y33B05VVfq50qWBZCD98WOdyaS-v9gNjubsuMKDvsBoCNuUQAvD_BwE#updating-scripts

Vertex AIのAPIエンドポイント

Vertex AIのCloud Storageディレクトリ

baseOutputDirectoryフィールドを'gs://<bucket-name>/<optional-prefix>'指定すると、コードの実行時に以下の環境変数が設定される。

  • AIP_MODEL_DIR: モデル アーティファクトの保存を目的としたディレクトリのCloud Storage URI。<baseOutputDirtectory>/model/
  • AIP_CHECKPOINT_DIR: チェックポイントの保存を目的としたディレクトリのCloud Storage URI。<baseOutputDirectory>/checkpoints/
  • AIP_TENSORBOARD_LOG_DIR: TensorBoard ログの保存を目的としたディレクトリのCloud Storage URI。<baseOutputDirectory>/logs/

CustomJobSpecの指定時に、workerPoolSpecsなどと同様に指定する。

{
  "workerPoolSpecs": [
    {
      object (WorkerPoolSpec)
    }
  ],
  "scheduling": {
    object (Scheduling)
  },
  "serviceAccount": string,
  "network": string,
  "baseOutputDirectory": {
    object (GcsDestination)
  }
}

ドキュメントによればKerasのモデル書き出しやTensorBoardのログ出力は直接指定できる。一方でXGBoostやScikit-Learnのモデル書き出しなどはコード内でgoogle-cloud-storageライブラリを使用してアップロードする必要がある。

https://cloud.google.com/vertex-ai/docs/training/exporting-model-artifacts

https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training

2021/09/19時点でのプレビュー機能として、Cloud Storage FUSEを使うとバケットをインスタンスのファイルシステムにマウントできる。例えばgs://<bucket>/data.csvのデータをコード上でfile = oepn('/gcs/<bucket>/data.csv', 'r')として読み込むことができる。上記のbaseOutputDirectory/gcs/<bucket>と指定すればローカル・クラウドの差異を意識せずにファイル読み書きできるかも?(要検証)

https://cloud.google.com/vertex-ai/docs/training/code-requirements#fuse

Vertex Pipelines

https://youtu.be/1ykDWsnL2LE

ワークフローの各処理はKubeflow Pipelines SDKによって記述する。

Google Cloudコンポーネント

GCPのサービスとやり取りするための基本的なコンポーネントはパッケージとして提供されている。

pip install -U google-cloud-pipeline-components

https://github.com/kubeflow/pipelines/tree/master/components/google-cloud

Vertex AIのカスタムジョブを投下するコンポーネントは2021年8月現在でexperimental
Vertex Pipelinesの各コンポーネントはVertex AIのジョブとして実行されるので、実行時のスペックとして指定できそう。

train_op = create_component_from_func(
      func=...,
      output_component_file='...',
      packages_to_install=[...]
])
train_task = train_op(...)

worker_pool_specs = [
    {
        "containerSpec": {
            "imageUri":train_task.container.image,
            "args": train_task.arguments,
        },
        "replicaCount": TRAINING_REPLICA_COUNT,
        "machineSpec": {
            "machineType": TRAINING_MACHINE_TYPE,
            "acceleratorType": TRAINING_ACCELERATOR_TYPE,
            "acceleratorCount": TRAINING_ACCELERATOR_COUNT,
        },
    },
]
train_task.custom_job_spec = {
    "displayName": train_task.name,
    "jobSpec": {
        "workerPoolSpecs": worker_pool_specs,
    }
}

x_task.custom_job_spec = {...}で指定する方法はドキュメントにはほぼ記載がない。もっと直感的に指定できるようになると嬉しい。

component_specはKubeflow PipelinesのContainerOp、YAML形式の定義ファイルからkfp.components.load_component_from_fileでロードできる。

サンプルのコンポーネント定義
google_cloud_pipeline_components/experimental/custom_job/component.yaml
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Submitting a CustomJob
inputs:
- {name: type, type: String}
- {name: gcp_project, type: String}
- {name: gcp_region, type: String}
- {name: payload, type: String}
outputs:
- {name: gcp_resources, type: String}
implementation:
  container:
    image: gcr.io/managed-pipeline-test/launcher:v8
    command: [python, /launcher.py]
    args: [
      --type, {inputValue: type},
      --gcp_project, {inputValue: gcp_project},
      --gcp_region, {inputValue: gcp_region},
      --payload, {inputValue: payload},
      --gcp_resources, {outputPath: gcp_resources},
    ]

https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#using-your-component-in-a-pipeline

https://github.com/kubeflow/pipelines/tree/master/components/google-cloud/google_cloud_pipeline_components/aiplatform

コンポーネントの入出力で使える型は以下

https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/types.py

その他のサンプル

https://tech.repro.io/entry/2021/06/22/125113

https://gist.github.com/AseiSugiyama/d189a43f656a3313837e820bc54f873b

https://towardsdatascience.com/serverless-machine-learning-pipelines-with-vertex-ai-an-introduction-30af8b53188e

Serving prediction via Vertex AI

Vertex AI上での学習したモデルのサービングは以下の3ステップからなる。

  1. モデルのアップロード(インポート)
  2. APIエンドポイントの作成
  3. エンドポイントへのモデルのデプロイ

モデルのアップロードは、パラメータなどのアーティファクトとサービング用のコンテナを指定する。

https://cloud.google.com/vertex-ai/docs/general/import-model#aiplatform_upload_model_sample-gcloud

サービング用コンテナは、学習済みモデルがTensorFlow, scikit-learn、XGBoostの場合ビルド済みコンテナを利用できる。カスタムコンテナを用いる場合は、以下の仕様を満たしていれば任意のコンテナを利用できる。

https://cloud.google.com/vertex-ai/docs/predictions/custom-container-requirements

エンドポイントを作成、アップロード済みモデルのデプロイすることで予測を提供できるようになる。一つのエンドポイントに複数のモデルをデプロイすることも、モデルを複数のエンドポイントにデプロイすることもできる。同一エンドポイント内でのトラフィックの分割や、GPUを利用することもできる。

https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api

Vertex AI上の予測サービングにおける最小ノード数は1のよう。GPUが不要でトラフィックがないタイミングでゼロスケールしたい場合にはCloud Runでも十分かもしれない(Cloud Runでもトラフィック分割などは可能)。

https://cloud.google.com/blog/ja/products/serverless/cloud-run-now-supports-gradual-rollouts-and-rollbacks
作成者以外のコメントは許可されていません