🕰️

Vertex AI Pipelines で scheduler API がプレビューになりました

2023/06/20に公開

要約

  • Vertex AI Pipelines の scheduler API がプレビューリリースされた。
  • これまで Vertex AI Pipelines の定期実行は、Cloud Scheduler などの他プロダクトを利用する必要があった。
  • scheduler API により、Vertex AI Pipelines の定期実行を Vertex AI だけで可能になった。

1. はじめに

こんにちは、クラウドエース データML ディビジョン所属の田中です。
クラウドエースのITエンジニアリングを担うシステム開発部の中で、特にデータ基盤構築・分析基盤構築からデータ分析までを含む一貫したデータ課題の解決を専門とするのがデータML ディビジョンです。

データML ディビジョンでは活動の一環として、毎週Google Cloud の新規リリースを調査・発表し、データ領域のプロダクトのキャッチアップをしています。その中でも重要と考えるリリースを本ページ含め記事として公開しています。

今回紹介するリリースは、Vertex AI Pipelines の「scheduler API」についてです。
このリリースにより Verte AI Pipelines の実行をスケジュールする機能が追加され、これまで Cloud Scheduler などの他プロダクトを利用する必要があったスケジュール実行が Vertex AI だけで完結できるようになりました。
なお、この機能はプレビュー段階になります。

2. サービスの概要

2.1. Vertex AI Pipelines とは?

Vertex AI Pipelines は、サーバーレス方式で機械学習(ML)ワークフローをオーケストレートするサービスです。
Kubeflow Pipelines SDK または TensorFlow Extended を使用して構築されたパイプラインを実行することができます。

2.2. scheduler API とは?

今回、ご紹介するリリースは2023年5月16日付に発表された Vertex AI Pipelines の scheduler API という追加機能についてです。

該当リリースノート: Vertex AI release notes

scheduler API により、1回または繰り返しの Vertex AI Pipelines の実行をスケジュールすることが可能です。

これまでは、Cloud Scheduler でパイプライン実行をスケジュールするに記載されているように、スケジュール実行のためには、Cloud Scheduler など他のプロダクトを利用する必要がありました。しかし、この scheduler API により、Vertex AI の設定だけで完結できるようになりました。

3. 試してみた

scheduler API を実際に試してみます。

3.1. 事前に必要なもの

  • Google Cloud アカウント
  • Google Cloud プロジェクト
  • gcloud CLI
  • Python

3.2. Cloud Storage バケット準備

パイプラインの出力ディレクトリとして利用する Cloud Storage バケットを作成しておきます。

gsutil mb -l us-central1 gs://<YOUR-BUCKET-NAME>

3.3. パイプライン準備

使用した環境は下記の通りです。

  • python: 3.11.3
  • Kubeflow Pipelines SDK: 2.0.0rc2

Kubeflow Pipelines SDK をインストールします。

pip install kfp==2.0.0rc2

今回は、Cloud Scheduler でパイプライン実行をスケジュールするに記載されているサンプルの hello-world-scheduled-pipeline をパイプラインとして利用します。(Kubeflow Pipelines SDK v2 でビルドするために一部書き換えています。)

hello-world-scheduled-pipeline.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import component

# A simple component that prints and returns a greeting string
@component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(message=greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

プログラムの実行によりパイプラインをビルドし、hello-world-scheduled-pipeline.yaml を作成します。

python hello-world-scheduled-pipeline.py

3.4. パイプライン実行をスケジュール

Vertex AI の[パイプライン]ページから、[実行を作成] ボタンを選択します。

1

[パイプライン実行の作成] が表示されたら、[実行の詳細] を以下のように設定し(これら以外はデフォルトのまま)、[続行] ボタンを選択します。

  • ファイルをアップロード[参照] から作成したhello-world-scheduled-pipeline.yamlを選択)
  • リージョン: us-central1
  • 実行スケジュール: 定期的
  • 開始日時: オン(2023/06/14 12:00)
  • 頻度: * * * * *
  • 頻度のタイムゾーン: 日本標準時(JST)
  • 終了: 指定日時に終了(2023/06/14 12:10)

なお、実行スケジュールの設定は下記2通りで、定期的が今回のリリースにより追加された機能になります。

  • 1回限り: パイプラインを即実行します。こちらは scheduler API がリリースされる前と同様な実行です。
  • 定期的: 開始日時、頻度、終了日時を設定し、パイプラインを定期に実行します。

2

次に、[ランタイムの構成] を以下のように設定し、[送信] ボタンを選択します。

  • Cloud Storage のロケーション
    • 出力ディレクトリ: [参照] から作成しておいた Cloud Storage バケットを選択
  • パイプライン パラメータ
    • greet_name: 適当な文字列を入力

3

スケジュールが作成されると、[スケジュール] ページに表示されていることが確認できます。
4

3.5. パイプライン実行結果

パイプライン実行は、1分ごと、開始日時(2023/06/14 12:00)と終了日時(2023/06/14 12:10)を除いた時刻(12:01 ~ 12:09)に開始されます。(計9回実行)
5

終了日時を過ぎると、スケジュールのステータスは完了に変わります。
6

なお、Vertex AI Pipelines のキャッシュ により、すでに実行完了したパイプラインのステップがある場合、同じパイプラインのステップはスキップされ実行されません。今回の場合も、1回目の実行が完了済みになった時刻(12:04 ~ 12:09)のパイプライン実行はキャッシュによりステップがスキップされ、期間が1秒になっていることが確認できます。

キャッシュの条件に一致してスキップされると新しい出力が得られません。
実行による新しい出力を期待してスケジュール設定する場合は、キャッシュを無効にするのが良いと思います。(キャッシュを無効にする方法は実行キャッシュの構成を参照)

実際にキャッシュを無効にしたパイプラインを実行してみます。使用したパイプラインのコードは次の通りです。

hello-world-scheduled-pipeline.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import component

# A simple component that prints and returns a greeting string
@component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(message=greet_name).set_caching_options(False)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

1回目の実行が完了済みになった時刻(16:03 ~ 16:09)のパイプライン実行は、キャッシュされていないことが確認できます
7

4. まとめ

今回の記事では Vertex AI Pipelines の scheduler API についてご紹介しました。
scheduler API により、Vertex AI Pipelines の定期実行を Vertex AI だけでできるようになりました。
プレビュー段階ではありますが、Vertex AI Pipelines を定期実行する場合に是非ご利用ください。

Discussion