🙌

Vertex AI Pipelinesに入門してみよう

に公開

今回はVertex AI Pipelinesに入門するための最もシンプルなところを紹介しようと思います。

Vertex AI Pipelinesとは?

Vertex AI Pipelinesは、MLのパイプラインを記述することができるサービスになります。MLワークフローをおーけストレートすることにより、データ準備からモデルの学習、モニタリングまで全て自動で行うことができるようになります。MLOpsを実現するためにはMLのライフサイクルを可能な限り自動化するための取り組みが必要であり、そのための手段としてとても有益なツールとなります。

パイプラインを記述するためにはKubeflow PipelinesまたはTensorflow Extended(TFX)を利用する必要があり、今回はKubeflow Pipelinesを使ってみます。

https://cloud.google.com/vertex-ai/docs/pipelines/introduction?hl=ja

実際に使ってみよう

それでは実際にKubeflow Pipelinesを使ってみましょう。今回は名前を指定すると、その名前を含めた挨拶のコメントおよび実行時のタイムスタンプを返してくれるパイプラインを構築します。実際に利用する場合はMLのライフサイクルに沿ったパイプラインを実装しますが、今回は簡単のため、Kubeflow Pipelinesドキュメントで提供されているGetting Startedをベースに進めます。MLに沿ったパイプラインは次回以降で取り扱います。

https://www.kubeflow.org/docs/components/pipelines/getting-started/

パイプラインの実装

環境構築

まずは環境構築をしましょう。Kubeflow Pipleinesをuvで利用できる環境にしていきます。

uv init pipeline_kfp_quickstart -p 3.12
cd pipeline_kfp_quickstart
uv add kfp

コード全体像

次にコードの実装をしてみます。なお、Kubeflow Pipelinesが提供しているコードを少しいじっています。

create_pipeline.py
from typing import NamedTuple
from kfp import dsl, compiler

@dsl.component(packages_to_install=["datetime"])
def say_hello(name: str, greeting: dsl.OutputPath(str), execution_time: dsl.OutputPath(str)):
   with open(greeting, "w") as f:
       f.write(f"Hello, {name}! How are you?")

   from datetime import datetime
   now = datetime.now()
   with open(execution_time, "w") as f:
       f.write(now.strftime("%Y/%m/%d %H:%M:%S"))

Outputs = NamedTuple("Outputs", [("greeting", str), ("execution_time", str)])

@dsl.pipeline
def hello_pipeline(recipient: str) -> Outputs:
   t = say_hello(name=recipient)
   return Outputs(t.outputs["greeting"], t.outputs["execution_time"])

compiler.Compiler().compile(hello_pipeline, "pipeline.yaml")

それでは一つずつみていきましょう。

kfpについて

まず、Kubeflow Pipelinesを使うにはkfpをインポートする必要があり、今回はパイプラインのコンポーネントを定義するのとコンパイルするために以下の二つをインポートします。

from kfp import dsl, compiler

コンポーネント定義

次にパイプラインの実装を進めます。最初に定義するのはパイプラインの個々の要素であるコンポーネントになります。今回はsay_helloと言う関数名でパイプラインを定義しています。コンポーネントを定義するには@dsl.componentデコレータを関数に付与します。デコレータにも引数を色々設定できますが、今回はdatetimeライブラリが足りなかったのでそれを追加でインストールさせるためにpackages_to_install=["datetime"]のみ追加しています。

今回実装している中で重要な要素としてはdsl.OutputPath(str)があります。これはコンポーネントが出力として扱うための変数であり、コンポーネントは出力を書き込むことで呼び出し側のパイプライン本体に情報を伝えることができます。今回の例で言うとgreetingに挨拶を、execution_timeに実行タイムスタンプを保存してパイプラインに伝えています。

@dsl.component(packages_to_install=["datetime"])
def say_hello(name: str, greeting: dsl.OutputPath(str), execution_time: dsl.OutputPath(str)):
    with open(greeting, "w") as f:
        f.write(f"Hello, {name}! How are you?")

    from datetime import datetime
    now = datetime.now()
    with open(execution_time, "w") as f:
        f.write(now.strftime("%Y/%m/%d %H:%M:%S"))

コンポーネントが実装できれば、次はそれを呼び出すコンポーネントを定義します。まず、このパイプラインの結果としてさきほどコンポーネントで作成したgreetingexecution_timeを扱いたいと思います。パイプラインで複数の戻り値を記録したい場合は、一つの方法としてtyping.NamedTupleを用いる方法があります。今回の例では二つの出力を持てるように設定し、パイプライン関数の戻り値の型として設定しています。

パイプライン定義

パイプラインの定義は@dsl.pipelineデコレータを利用し、関数の引数がパイプライン実行時に指定されるパラメータになります。今回はrecipientを引数に受け取るようになっています。

from typing import NamedTuple


Outputs = NamedTuple("Outputs", [("greeting", str), ("execution_time", str)])

@dsl.pipeline
def hello_pipeline(recipient: str) -> Outputs:
    t = say_hello(name=recipient)
    return Outputs(t.outputs["greeting"], t.outputs["execution_time"])

パイプラインのコンパイル

最後に、パイプラインが実装できたらあとはそのパイプラインをコンパイルして、Vertex AI Pipelinesに与えるための方式に変換します。コンパイルするとpipeline.yamlと言うファイルでパイプラインがコンパイルされます。

compiler.Compiler().compile(hello_pipeline, "pipeline.yaml")

パイプラインファイルの生成

それではこのコードを実行してみましょう。実行するとpipeline.yamlが生成されます。

uv run create_pipeline.py
pipeline.yaml
# PIPELINE DEFINITION
# Name: hello-pipeline
# Inputs:
#    recipient: str
# Outputs:
#    execution_time: str
#    greeting: str
components:
  comp-say-hello:
    executorLabel: exec-say-hello
    inputDefinitions:
      parameters:
        name:
          parameterType: STRING
    outputDefinitions:
      parameters:
        execution_time:
          parameterType: STRING
        greeting:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-say-hello:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - say_hello
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'datetime'  &&\
          \  python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef say_hello(name: str, greeting: dsl.OutputPath(str), execution_time:\
          \ dsl.OutputPath(str)):\n    from datetime import datetime\n    text = f\"\
          Hello, {name}!\"\n    with open(greeting, \"w\") as f:\n        f.write(text)\
          \  # \u3053\u308C\u304C\u300C\u30B3\u30F3\u30DD\u30FC\u30CD\u30F3\u30C8\u51FA\
          \u529B\uFF08\u30D1\u30E9\u30E1\u30FC\u30BF\uFF09\u300D\u306B\u306A\u308B\
          \n\n    now = datetime.now()\n    with open(execution_time, \"w\") as f:\n\
          \        f.write(now.strftime(\"%Y/%m/%d %H:%M:%S\"))\n\n"
        image: python:3.9
pipelineInfo:
  name: hello-pipeline
root:
  dag:
    outputs:
      parameters:
        execution_time:
          valueFromParameter:
            outputParameterKey: execution_time
            producerSubtask: say-hello
        greeting:
          valueFromParameter:
            outputParameterKey: greeting
            producerSubtask: say-hello
    tasks:
      say-hello:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-say-hello
        inputs:
          parameters:
            name:
              componentInputParameter: recipient
        taskInfo:
          name: say-hello
  inputDefinitions:
    parameters:
      recipient:
        parameterType: STRING
  outputDefinitions:
    parameters:
      execution_time:
        parameterType: STRING
      greeting:
        parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.14.3

Vertex AI Pipelinesでパイプラインを実行する

それでは先ほど作ったパイプラインをVertex Ai Pipelines上で実行してみましょう。

パイプラインの設定

Vertex AI Pipelinesに移動すると以下のよう画面が表示されます。

パイプラインを作成するために「実行を作成」を選択しましょう。すると以下のような「実行の詳細」画面が表示されます。今回は先ほど生成されたpipeline.yamlをローカルPCからアップロードして読み込ませます。すると自動的に情報が設定されます。

次の画面に映ると「ランタイムの構成」画面が表示されます。この画面では出力結果を保存するためのCloud Storageバケットと入力となる名前を指定することになります。今回は名前はHogeとしています。

設定が完了すれば送信をします。

パイプラインの実行

先ほどの設定が完了すると自動的にパイプラインが実行されます。実行中は以下のような画面が表示されます。サイドバーでは実行に関する情報が表示されており、例えば実行パラメータをみると指定した名前がHogeであることがわかります。

少し待っていると実行が完了して、左側のグラフのステータスが完了になります。ノードの詳細をみると、出力としてgreetingexecution_timeが生成されており、両方ともNamedTupleで指定したstring型になっていることが確認できました。

まとめ

今回はKubeflow Pipelinesを用いてVertex AI Pipelines上にとてもシンプルかつMLとは関係ないですがパイプラインを構築してみました。実際に利用するときはデータ準備からモデル学習、デプロイまで幅広く対応でき、専用のオペレータを用いて構築していくことになります。次回以降の記事ではそちらを紹介していこうと思います。

Discussion