🕌

[連載 #1] Vertex AI Pipelinesの使い方(Kubeflow Pipelines v2)

2024/09/28に公開

この記事で学ぶこと

  • サンプルパイプラインを構築して実行する
  • 基本的なコンポーネントとパイプラインの書き方を学ぶ

なんの紹介をするのか:VertexAI Pipelines

KubeFlowはコンテナオーケストレーションシステムであるKubernetes上で稼働し、MLOps Toolkitを提供するOSSプラットフォームで、Kubeflow Pipelinesはその中のMLパイプライン機能を提供しています。

今回紹介するVertexAI Pipelinesは概ねVertex AI上で提供されるKubeflow Pipelines(kfp)のマネージドサービスと認識して良いでしょう。(TFXも使用できますが本記事ではkfpを使用します。)

Kubernetesの管理に工数を割くことなく、Kubeflow Pipelinesの機能を使用できるため、データサイエンティストやMLエンジニアのみでMLパイプラインを構築することができます。

なぜこの記事を作るか

私は2024の初頭にVertex AI Pipelinesを使用し始めたのですが、2023/06にkubeflow pipelines v2がリリースされ、Vertex AIではkfp v1のサポートが2024/12に終了するという状況であったため、v2から入門しました。

入門する中で感じたのは、やはりリリース直後であったためv2の情報が少ないということです。そこで、ある程度入門が終わった今、Vertex AI Pipelinesとkfp v2の使い方を記録していこうと思いました。

逐次、記事を作成していくので参考になれば幸いです。

参考になった資料

私が入門するにあたって参考になった資料をまず紹介しておきます。
いずれの筆者様にも感謝です。

事前準備

GoogleCloudプロジェクトを持っていることを前提にします。
(今回はVertexAI Pipelinesに必要なIAM permissionに関してスキップしてます。後日余裕があればまとめます)

開発環境はPython3.10を想定し下記のライブラリをインストールしてください。
バージョンは私が試した時のものです

google-cloud-aiplatform==1.51.0
kfp==2.7.0

私の開発環境はGoogleCloudのCloud Workstationsを使用しています。

gcloud auth login
gcloud auth application-default login

をターミナルで済ませておきましょう。

サンプルパイプライン

公式リポジトリのpipelines_intro_kfp.ipynbをベースに簡単なパイプラインからkfp v2の基本を学びましょう。

notebookから必要な部分だけ抽出して解説していきます。

Library Import

import json
from typing import NamedTuple
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import component

認証系情報

PROJECT_ID = "[project name]"  
LOCATION = "[location]" 
BUCKET_URI = "[gs://storage_name]" 
SERVICE_ACCOUNT = "[service account for using VertexAI Pipelines]" 
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/intro"

aiplatform.init(
    project=PROJECT_ID, 
    staging_bucket=BUCKET_URI,
    location=LOCATION
)

aiplatformの初期化に、使用しているPROJECT_ID,LOCATIONを指定しましょう。
LOCATIONは特にこだわりがなければ"us-central1"で良いでしょう。

VertexAI Pipelinesは実行毎の中間生成物をGCSに保存する形で保管・管理するためGCS bucketが必要です。そういった背景からBUCKET_URIが必要です。
BUCKET_URIに指定するバケットはあらかじめ作成しておきましょう。
PIPELINE_ROOTは後ほど使用します。

SERVICE_ACCOUNTはPipeline jobの実行に使用するSAの指定に用います。

Componentの作成

本記事#1ではLightweight Python Componentsという形式のコンポーネントを使用します。

Lightweight Python ComponentsはPython関数の形式で定義するコンポーネントです。実例を見てみましょう。

@component()
def hello_world(text: str) -> str:
    print(text)
    return text

@componentデコレータを使用し、関数を定義することでコンポーネントを定義することができます。
@componentの主要な引数で

  • base_image:コンポーネントで使用するコンテナイメージを指定
  • packages_to_install:インストールするパッケージを指定、requirements.txtに近い

を行うことができます。

また、Lightweight Python Componentsではコンポーネントの制約としてスタンドアローンである必要があります。
この制約から、コンポーネントが使用するパッケージについては、import文をコンポーネントの関数内に入れる必要があります。

@component(base_image="python:3.9",packages_to_install=['numpy==1.21.6'])
def sin(val: float = 3.14) -> float:
    import numpy as np
    return np.sin(val).item()

こんな感じですね。
Lightweight Python Componentsは簡単な処理についてはラクにかけて便利です。

しかし、それなりに込み入った処理を作る場合には単一の関数内で処理を書き切る必要があるためコンポーネントの記述が膨張し、複雑度が増してしまうという課題もあります。

そうした場合には、

  • base_imageを作り込んで利用する
  • Container Componentsを使用する

といった方法がとられます。Container Componentsの説明は今後の記事に回します。

Pipelineの構築

pipelines_intro_kfp.ipynbに従い、コンポーネントを作成し、Pipelineを定義してみましょう。

まずコンポーネントです。

@component()
def hello_world(text: str) -> str:
    print(text)
    return text

@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage  # noqa: F401

    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

@component
def consumer(text1: str, text2: str, text3: str) -> str:
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")
    return f"text1: {text1}; text2: {text2}; text3: {text3}"

簡単にコンポーネントの処理をみておきましょう。

hello_worldンポーネントはstrを受け取りそのまま返します。
two_outputsコンポーネントはstrを受け取り、2つのstrのNamedTupleを返します。
consumerコンポーネントは3つのstrを受け取り、1つのstrに結合して返します。

ここまでのコンポーネントを見て気づいた方もいるかもしれませんが、kfpではコンポーネントのin-outの型付けをしっかりと記載しておく必要があります。
この型付けには今後、kfp特有の型(dsl.Input,dsl.Datasetなど)でのデータ受け渡しなども入ってきます。

ではパイプラインの定義をしましょう

@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hi there"):
    hw_task = hello_world(text=text)
    two_outputs_task = two_outputs(text=text)
    consumer_task = consumer( 
        text1=hw_task.output,
        text2=two_outputs_task.outputs["output_one"],
        text3=two_outputs_task.outputs["output_two"],
    )

パイプラインはpipelineデコレータをつけて関数の形で定義します。
この時引数でname,descriptionに加えて、pipeline_rootも引数としてとっており、ここで認証系情報で用意しておいたPIPELINE_ROOTを指定しています。

パイプラインの中身を見てみると

hw_task = hello_world(text=text)
...
consumer_task = consumer( 
    text1=hw_task.output,
    ...
)

このようにhello_worldコンポーネントの戻り値をhw_taskで受け取り
consumerコンポーネントにhw_task.outputで値を渡しています。

kfpではhw_taskのようなコンポーネントの戻り値はPipelineTaskオブジェクトとなっています。
PipelineTaskはoutputまたはoutputsという属性でコンポーネントの戻り値(returnの値)を取得することができます。

まずは、この方法を使うことでパイプライン内のコンポーネントの依存関係を記述することができます。

Pipelineの実行

それではパイプラインをどのように実行するかを見ていきましょう。

まず最初に行うのはパイプラインのコンパイルです。

compiler.Compiler().compile(
    pipeline_func=pipeline, 
    package_path="intro_pipeline.yaml"
)

定義したパイプラインを渡して、YAMLファイルにコンパイルします。JSONにもコンパイルできるようです。

そしてこのコンパイルしたファイルを指定してaiplatformで実行します

DISPLAY_NAME = "intro_pipeline_job_unique"
# Job定義
job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="[path to file]/intro_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
)
# 実行
job.run(service_account=SERVICE_ACCOUNT)

aiplatform.PipelineJobでjobを定義し、job.runで実行します。

実行すると、標準出力にパイプラインのリンクが表示されるのでそこからパイプラインを見てみましょう。


実行グラフ

実行グラフが確認できました。

ここで少し補足

  • パイプライン実行時にPIPELINE_STATE_PENDINGとなり実行が一向に進まないときは認証系に誤りがある可能性があります。PROJECT_ID,LOCATION,SERVICE_ACCOUNTなどに誤字がないか・GCSバケットは作成してあるか・SAのpermissionは足りているかなどを確認してください。
  • コンポーネントを実行するインスタンスタイプは指定がない場合はe2-standard-4で実行されます。
  • 実行後に指定したGCSバケットを見てみましょう。コンポーネントのoutputがjsonで出力されています。

終わり

今回はここまでです。

振り返りますと

  • Lightweight Python Componentsを学び
  • Pipelineの定義方法を学び
  • Pipelineを実行した

となります。

次回はContainer Componentsの使い方を紹介しようと思います。

Discussion