Vertex AI Pipelinesを利用してエンドポイントをデプロイした
今回はVertex AI Pipelinesを利用してモデルを開発してエンドポイントにデプロイするためのチュートリアルを実施しました。Google Cloudがオフィシャルで提供しているノートブックがあり、それを利用してデプロイするための方法を解説しようと思います。なお、Pipeline構成コードはサンプルに則って進めますが、インフラ構成についてはTerraformで作成します。
システム構成
今回実装するシステムは以下のようになたています。ざっくり以下のようになります。
- Cloud Storage: パイプラインの結果を保存するためのバケット
- Vertex AI Pipelines: 学習からアップロード、デプロイまでを実装
- IAM: PipelinesがCloud Storageやジョブ実行ができるようにするためのサービスアカウント
ジョブの実装についてはPythonで記述し、インフラの構成はTerraformで行います。
インフラ実装
Cloud StorageとIAMの設定についてTerraformを利用して実装します。ファイル構成は以下になります。
main.tf
variables.tf
modules/
gcs/
main.tf
variables.tf
service-account/
main.tf
variables.tf
GCSモジュール作成
パイプライン結果の保存のためのバケットを実装します。まずは変数の定義をvariables.tf
に実装します。
variable "project_id" {
description = "The Google Cloud project ID"
type = string
}
variable "location" {
description = "Location"
type = string
default = "ASIA-NORTHEAST1"
}
次にmain.tf
を実装します。バケット名はgs://<project id>-pipeline-bucket
として作成します。
resource "google_storage_bucket" "my_bucket" {
name = "${var.project_id}-pipeline-bucket"
location = var.location
force_destroy = true
}
サービスアカウントの作成
Cloud Storageに対する権限およびVertex AI上で動かせるためのサービスアカウントを定義します。まずは変数をvariables.tf
に定義します。ロールについてはroles
のdefault
に定義している3つを付与します。
variable "project_id" {
description = "The Google Cloud project ID"
type = string
}
variable "region" {
description = "The Google Cloud region"
type = string
}
variable "service_account_id" {
description = "Service account's id"
type = string
default = "create-pipeline"
}
variable "roles" {
type = list(string)
default = ["roles/storage.objectCreator", "roles/storage.objectViewer", "roles/aiplatform.user"]
}
次にmain.tf
を実装します。roles
には3つのロールを定義しているので、for_each
を使って3つのロールを付与されるようにリソース定義しています。
resource "google_service_account" "my_sa" {
account_id = var.service_account_id
display_name = "My Service Account"
}
resource "google_project_iam_member" "my_sa_roles" {
for_each = toset(var.roles)
project = var.project_id
role = each.value
member = "serviceAccount:${google_service_account.my_sa.email}"
}
ルートファイルの実装
最後にルートファイルを実装します。まずルートのvariables.tf
を以下のように実装します。
variable "project_id" {
description = "The Google Cloud project ID"
type = string
}
variable "region" {
description = "The Google Cloud region"
type = string
default = "asia-northeast1"
}
次にmain.tf
を実装します。main.tf
ではGCSモジュールとサービスアカウントモジュールを参照します。
provider "google" {
project = var.project_id
region = var.region
}
module "gcs" {
source = "./modules/gcs"
project_id = var.project_id
}
module "service-account" {
source = "./modules/service-account"
project_id = var.project_id
region = var.region
}
インフラの作成
ここまで実装したインフラを適用するためにterraform apply
を実行します。
パイプラインの実装
まずはコード全体像は以下になります。
.env
PROJECT_ID=...
REGION=asia-northeast1
main.py
from typing import Any
from dotenv import load_dotenv
load_dotenv()
import os
import random
import string
import google.cloud.aiplatform as aip
import kfp
from kfp import compiler # noqa: F811
# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
UUID = generate_uuid()
PROJECT_ID = os.environ["PROJECT_ID"]
REGION = os.environ["REGION"]
BUCKET_URI = f"gs://{PROJECT_ID}-pipeline-bucket"
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/bikes_weather"
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)
hp_dict: str = '{"num_hidden_layers": 3, "hidden_size": 32, "learning_rate": 0.01, "epochs": 1, "steps_per_epoch": -1}'
data_dir: str = (
"gs://cloud-samples-data/vertex-ai/pipeline-deployment/datasets/bikes_weather/"
)
TRAINER_ARGS = ["--data-dir", data_dir, "--hptune-dict", hp_dict]
# create working dir to pass to job spec
WORKING_DIR = f"{PIPELINE_ROOT}/{UUID}"
MODEL_DISPLAY_NAME = f"train_deploy{UUID}"
print(TRAINER_ARGS, WORKING_DIR, MODEL_DISPLAY_NAME)
@kfp.dsl.pipeline(name="train-endpoint-deploy" + UUID)
def pipeline(
project: str = PROJECT_ID,
location: str = REGION,
model_display_name: str = MODEL_DISPLAY_NAME,
serving_container_image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
):
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.custom_job import \
CustomTrainingJobOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
ModelDeployOp)
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from kfp.dsl import importer_node
custom_job_task = CustomTrainingJobOp(
project=project,
display_name="model-training",
worker_pool_specs=[
{
"containerSpec": {
"args": TRAINER_ARGS,
"env": [{"name": "AIP_MODEL_DIR", "value": WORKING_DIR}],
"imageUri": "gcr.io/google-samples/bw-cc-train:latest",
},
"replicaCount": "1",
"machineSpec": {
"machineType": "n1-standard-4",
},
}
],
)
import_unmanaged_model_task = importer_node.importer(
artifact_uri=WORKING_DIR,
artifact_class=artifact_types.UnmanagedContainerModel,
metadata={
"containerSpec": {
"imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
},
},
).after(custom_job_task)
model_upload_op = ModelUploadOp(
project=project,
location=location,
display_name=model_display_name,
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
model_upload_op.after(import_unmanaged_model_task)
endpoint_create_op = EndpointCreateOp(
project=project,
location=location,
display_name="pipelines-created-endpoint",
)
ModelDeployOp(
endpoint=endpoint_create_op.outputs["endpoint"],
model=model_upload_op.outputs["model"],
deployed_model_display_name=model_display_name,
dedicated_resources_machine_type="n1-standard-4",
dedicated_resources_min_replica_count=1,
dedicated_resources_max_replica_count=1,
)
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="tabular_regression_pipeline.json",
)
DISPLAY_NAME = "bikes_weather_" + UUID
job = aip.PipelineJob(
display_name=DISPLAY_NAME,
template_path="tabular_regression_pipeline.json",
pipeline_root=PIPELINE_ROOT,
enable_caching=False,
location=REGION,
)
job.run(service_account="create-pipeline@<project-id>.iam.gserviceaccount.com")
環境構築
uv
を利用して環境構築します。
uv init vertexai-pipeline -p 3.12
cd vertexai-pipeline
uv add google-cloud-aiplatform google-cloud-storage kfp==2.7.0 google-cloud-pipeline-components
コード実装
今回の実装の肝となるのはpipeline
関数になります。一つずつ紐解きます。
まずモデルを開発するカスタムトレイニングジョブになります。CustomTrainingJobOp
を利用するとカスタムジョブを定義することができます。TRAINING_ARGS
には学習に利用するデータなどの引数が指定されており、学習用のコンテナも提供済みのものをimageUri
に指定しています。マシンスペックについてはn1-standard-4
を利用するようにしています。また、importer_node
ではGCSに保存されたアーティファクトをパイプラインにインポートするために呼び出されています。after
でカスタムジョブを指定することで、学習が終了したらインポートされるようになっています。
custom_job_task = CustomTrainingJobOp(
project=project,
display_name="model-training",
worker_pool_specs=[
{
"containerSpec": {
"args": TRAINER_ARGS,
"env": [{"name": "AIP_MODEL_DIR", "value": WORKING_DIR}],
"imageUri": "gcr.io/google-samples/bw-cc-train:latest",
},
"replicaCount": "1",
"machineSpec": {
"machineType": "n1-standard-4",
},
}
],
)
import_unmanaged_model_task = importer_node.importer(
artifact_uri=WORKING_DIR,
artifact_class=artifact_types.UnmanagedContainerModel,
metadata={
"containerSpec": {
"imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
},
},
).after(custom_job_task)
その次は学習されたモデルをModelRegistryにアップロードしています。先ほどパイプラインにimporter_node
を経由してアーティファクトを取得しているので、それを参照させるようにModelUploadOp
を呼び出しています。なお、この処理は学習終了後にimporter_node
を実行し、その後に実行させるため、after
にはimporter_node
のタスクを指定しています。
model_upload_op = ModelUploadOp(
project=project,
display_name=model_display_name,
unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
model_upload_op.after(import_unmanaged_model_task)
3爪にVertex AIのエンドポイントを作成します。EndpointCreateOp
を利用することでエンドポイントを作成できます。エンドポイントを作成するだけではまだモデルを読み込んでいないので、この処理は学習ジョブなどと並列して実行でき、after
の指定はしていません。
endpoint_create_op = EndpointCreateOp(
project=project,
display_name="pipelines-created-endpoint",
)
最後に、ModelUploadOp
とEndpointCreateOp
の両方が完了したらModelRegistry上のモデルをエンドポイントにデプロイしてサービングします。ModelDeployOp
を利用すると、指定したエンドポイントとモデルを参照してデプロイすることができます。今回はインスタンス数は常時1になるようにし、n1-standard-4
(学習時と同じ)上でサービングさせます。
ModelDeployOp(
endpoint=endpoint_create_op.outputs["endpoint"],
model=model_upload_op.outputs["model"],
deployed_model_display_name=model_display_name,
dedicated_resources_machine_type="n1-standard-4",
dedicated_resources_min_replica_count=1,
dedicated_resources_max_replica_count=1,
)
ここまでできればあとはこのパイプラインをコンパイルして実際にジョブとして実行させます。ジョブごとに名前が変わるようにUUIDを利用しています。
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="tabular_regression_pipeline.json",
)
DISPLAY_NAME = "bikes_weather_" + UUID
job = aip.PipelineJob(
display_name=DISPLAY_NAME,
template_path="tabular_regression_pipeline.json",
pipeline_root=PIPELINE_ROOT,
enable_caching=False,
location=REGION,
)
job.run(service_account="create-pipeline@<project-id>.iam.gserviceaccount.com")
こちらを実行すると、以下のようなパイプラインが作成されます。終了するまでに結構時間がかかりますが、完了すると以下のように表示されます。
推論の実行
それでは実際にデプロイしたエンドポイントに対して推論を実行してみましょう。今回は簡単のため、学習データのうちの一つのレコードの値を元に推論データを作ります。まずはinput_file.json
という名前で以下のデータを作ります。
{
"instances": [
{
"day_of_week": "2",
"dewp": 45.4,
"end_station_id": "53",
"euclidean": 2221.156331718110,
"loc_cross": "POINT(-0.15 51.49)POINT(-0.14 51.51)",
"max": 67.3,
"min": 52.3,
"prcp": 0.05,
"start_station_id": "259",
"temp": 57.9,
"ts": 1434337980
}
]
}
次に以下の情報を変数として定義してください。なお、この内容はパイプラインのリクエストの例から参照できます。
ENDPOINT_ID="..."
PROJECT_ID="..."
INPUT_DATA_FILE="input_file.json"
これを設定した上で以下を実行すると、推論結果が取得できます。結果をみるとpredictions
に該当する推論結果の数値があり、それ以外の属性ではモデルのIDやモデルのパス、モデルのバージョンが取得されていることが確認できました。このように呼び出すことで、エンドポイント上のモデルを呼び出すことができます。
curl \
-X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://asia-northeast1-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/asia-northeast1/endpoints/${ENDPOINT_ID}:predict" \
-d "@${INPUT_DATA_FILE}"
# 結果
{
"predictions": [
[
1513.93433
]
],
"deployedModelId": "2638636591639166976",
"model": "projects/.../locations/asia-northeast1/models/2130849136583376896",
"modelDisplayName": "train_deployw0oy5xbx",
"modelVersionId": "1"
}
まとめ
今回はサンプルが提供されている内容を元に、Vertex AI Pipelinesをデプロイし推論するところまで試してみました。今回利用したのはカスタムトレーニングジョブで実際に利用することも結構あると思うので、Vertex AI上でMLOpsを検討されている方はぜひ一度試してみてください。
Discussion