💬

Vertex AI Pipelinesを利用してエンドポイントをデプロイした

に公開

今回はVertex AI Pipelinesを利用してモデルを開発してエンドポイントにデプロイするためのチュートリアルを実施しました。Google Cloudがオフィシャルで提供しているノートブックがあり、それを利用してデプロイするための方法を解説しようと思います。なお、Pipeline構成コードはサンプルに則って進めますが、インフラ構成についてはTerraformで作成します。

https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_model_train_upload_deploy.ipynb

システム構成

今回実装するシステムは以下のようになたています。ざっくり以下のようになります。

  • Cloud Storage: パイプラインの結果を保存するためのバケット
  • Vertex AI Pipelines: 学習からアップロード、デプロイまでを実装
  • IAM: PipelinesがCloud Storageやジョブ実行ができるようにするためのサービスアカウント

ジョブの実装についてはPythonで記述し、インフラの構成はTerraformで行います。

インフラ実装

Cloud StorageとIAMの設定についてTerraformを利用して実装します。ファイル構成は以下になります。

main.tf
variables.tf
modules/
  gcs/
     main.tf
     variables.tf
  service-account/
     main.tf 
     variables.tf

GCSモジュール作成

パイプライン結果の保存のためのバケットを実装します。まずは変数の定義をvariables.tfに実装します。

modules/gcs/variables.tf
variable "project_id" {
  description = "The Google Cloud project ID"
  type        = string
}

variable "location" {
  description = "Location"
  type        = string
  default     = "ASIA-NORTHEAST1"
}

次にmain.tfを実装します。バケット名はgs://<project id>-pipeline-bucketとして作成します。

modules/gcs/main.tf
resource "google_storage_bucket" "my_bucket" {
  name          = "${var.project_id}-pipeline-bucket"
  location      = var.location
  force_destroy = true
}

サービスアカウントの作成

Cloud Storageに対する権限およびVertex AI上で動かせるためのサービスアカウントを定義します。まずは変数をvariables.tfに定義します。ロールについてはrolesdefaultに定義している3つを付与します。

modules/service-account/variables.tf
variable "project_id" {
  description = "The Google Cloud project ID"
  type        = string
}

variable "region" {
  description = "The Google Cloud region"
  type        = string
}

variable "service_account_id" {
  description = "Service account's id"
  type        = string
  default     = "create-pipeline"
}

variable "roles" {
  type    = list(string)
  default = ["roles/storage.objectCreator", "roles/storage.objectViewer", "roles/aiplatform.user"]
}

次にmain.tfを実装します。rolesには3つのロールを定義しているので、for_eachを使って3つのロールを付与されるようにリソース定義しています。

modules/service-account/main.tf
resource "google_service_account" "my_sa" {
  account_id   = var.service_account_id
  display_name = "My Service Account"
}

resource "google_project_iam_member" "my_sa_roles" {
  for_each = toset(var.roles)
  project  = var.project_id
  role     = each.value
  member   = "serviceAccount:${google_service_account.my_sa.email}"
}

ルートファイルの実装

最後にルートファイルを実装します。まずルートのvariables.tfを以下のように実装します。

variables.tf
variable "project_id" {
  description = "The Google Cloud project ID"
  type        = string
}

variable "region" {
  description = "The Google Cloud region"
  type        = string
  default     = "asia-northeast1"
}

次にmain.tfを実装します。main.tfではGCSモジュールとサービスアカウントモジュールを参照します。

main.tf
provider "google" {
  project = var.project_id
  region  = var.region
}

module "gcs" {
  source = "./modules/gcs"

  project_id = var.project_id
}

module "service-account" {
  source = "./modules/service-account"

  project_id = var.project_id
  region     = var.region
}

インフラの作成

ここまで実装したインフラを適用するためにterraform applyを実行します。

パイプラインの実装

まずはコード全体像は以下になります。

.env
PROJECT_ID=...
REGION=asia-northeast1
main.py
from typing import Any
from dotenv import load_dotenv
load_dotenv()
import os
import random
import string
import google.cloud.aiplatform as aip
import kfp
from kfp import compiler  # noqa: F811


# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


UUID = generate_uuid()
PROJECT_ID = os.environ["PROJECT_ID"]
REGION = os.environ["REGION"]
BUCKET_URI = f"gs://{PROJECT_ID}-pipeline-bucket"
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/bikes_weather"

aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

hp_dict: str = '{"num_hidden_layers": 3, "hidden_size": 32, "learning_rate": 0.01, "epochs": 1, "steps_per_epoch": -1}'
data_dir: str = (
    "gs://cloud-samples-data/vertex-ai/pipeline-deployment/datasets/bikes_weather/"
)
TRAINER_ARGS = ["--data-dir", data_dir, "--hptune-dict", hp_dict]

# create working dir to pass to job spec
WORKING_DIR = f"{PIPELINE_ROOT}/{UUID}"

MODEL_DISPLAY_NAME = f"train_deploy{UUID}"
print(TRAINER_ARGS, WORKING_DIR, MODEL_DISPLAY_NAME)


@kfp.dsl.pipeline(name="train-endpoint-deploy" + UUID)
def pipeline(
    project: str = PROJECT_ID,
    location: str = REGION,
    model_display_name: str = MODEL_DISPLAY_NAME,
    serving_container_image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
):
    from google_cloud_pipeline_components.types import artifact_types
    from google_cloud_pipeline_components.v1.custom_job import \
        CustomTrainingJobOp
    from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                              ModelDeployOp)
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from kfp.dsl import importer_node

    custom_job_task = CustomTrainingJobOp(
        project=project,
        display_name="model-training",
        worker_pool_specs=[
            {
                "containerSpec": {
                    "args": TRAINER_ARGS,
                    "env": [{"name": "AIP_MODEL_DIR", "value": WORKING_DIR}],
                    "imageUri": "gcr.io/google-samples/bw-cc-train:latest",
                },
                "replicaCount": "1",
                "machineSpec": {
                    "machineType": "n1-standard-4",
                },
            }
        ],
    )

    import_unmanaged_model_task = importer_node.importer(
        artifact_uri=WORKING_DIR,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
            },
        },
    ).after(custom_job_task)

    model_upload_op = ModelUploadOp(
        project=project,
        location=location,
        display_name=model_display_name,
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
    )
    model_upload_op.after(import_unmanaged_model_task)

    endpoint_create_op = EndpointCreateOp(
        project=project,
        location=location,
        display_name="pipelines-created-endpoint",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=model_display_name,
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )


compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tabular_regression_pipeline.json",
)


DISPLAY_NAME = "bikes_weather_" + UUID

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="tabular_regression_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
    location=REGION,
)

job.run(service_account="create-pipeline@<project-id>.iam.gserviceaccount.com")

環境構築

uvを利用して環境構築します。

uv init vertexai-pipeline -p 3.12 
cd vertexai-pipeline
uv add google-cloud-aiplatform google-cloud-storage kfp==2.7.0 google-cloud-pipeline-components

コード実装

今回の実装の肝となるのはpipeline関数になります。一つずつ紐解きます。

まずモデルを開発するカスタムトレイニングジョブになります。CustomTrainingJobOpを利用するとカスタムジョブを定義することができます。TRAINING_ARGSには学習に利用するデータなどの引数が指定されており、学習用のコンテナも提供済みのものをimageUriに指定しています。マシンスペックについてはn1-standard-4を利用するようにしています。また、importer_nodeではGCSに保存されたアーティファクトをパイプラインにインポートするために呼び出されています。afterでカスタムジョブを指定することで、学習が終了したらインポートされるようになっています。

custom_job_task = CustomTrainingJobOp(
    project=project,
    display_name="model-training",
    worker_pool_specs=[
        {
            "containerSpec": {
                "args": TRAINER_ARGS,
                "env": [{"name": "AIP_MODEL_DIR", "value": WORKING_DIR}],
                "imageUri": "gcr.io/google-samples/bw-cc-train:latest",
            },
            "replicaCount": "1",
            "machineSpec": {
                "machineType": "n1-standard-4",
            },
        }
    ],
)

import_unmanaged_model_task = importer_node.importer(
    artifact_uri=WORKING_DIR,
    artifact_class=artifact_types.UnmanagedContainerModel,
    metadata={
        "containerSpec": {
            "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
        },
    },
).after(custom_job_task)

その次は学習されたモデルをModelRegistryにアップロードしています。先ほどパイプラインにimporter_nodeを経由してアーティファクトを取得しているので、それを参照させるようにModelUploadOpを呼び出しています。なお、この処理は学習終了後にimporter_nodeを実行し、その後に実行させるため、afterにはimporter_nodeのタスクを指定しています。

model_upload_op = ModelUploadOp(
    project=project,
    display_name=model_display_name,
    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
)
model_upload_op.after(import_unmanaged_model_task)

3爪にVertex AIのエンドポイントを作成します。EndpointCreateOpを利用することでエンドポイントを作成できます。エンドポイントを作成するだけではまだモデルを読み込んでいないので、この処理は学習ジョブなどと並列して実行でき、afterの指定はしていません。

endpoint_create_op = EndpointCreateOp(
    project=project,
    display_name="pipelines-created-endpoint",
)

最後に、ModelUploadOpEndpointCreateOpの両方が完了したらModelRegistry上のモデルをエンドポイントにデプロイしてサービングします。ModelDeployOpを利用すると、指定したエンドポイントとモデルを参照してデプロイすることができます。今回はインスタンス数は常時1になるようにし、n1-standard-4(学習時と同じ)上でサービングさせます。

ModelDeployOp(
    endpoint=endpoint_create_op.outputs["endpoint"],
    model=model_upload_op.outputs["model"],
    deployed_model_display_name=model_display_name,
    dedicated_resources_machine_type="n1-standard-4",
    dedicated_resources_min_replica_count=1,
    dedicated_resources_max_replica_count=1,
)

ここまでできればあとはこのパイプラインをコンパイルして実際にジョブとして実行させます。ジョブごとに名前が変わるようにUUIDを利用しています。

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tabular_regression_pipeline.json",
)


DISPLAY_NAME = "bikes_weather_" + UUID

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="tabular_regression_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
    location=REGION,
)

job.run(service_account="create-pipeline@<project-id>.iam.gserviceaccount.com")

こちらを実行すると、以下のようなパイプラインが作成されます。終了するまでに結構時間がかかりますが、完了すると以下のように表示されます。

推論の実行

それでは実際にデプロイしたエンドポイントに対して推論を実行してみましょう。今回は簡単のため、学習データのうちの一つのレコードの値を元に推論データを作ります。まずはinput_file.jsonという名前で以下のデータを作ります。

input_file.json
{
  "instances": [
     {
        "day_of_week": "2",
        "dewp": 45.4,
        "end_station_id": "53",
        "euclidean": 2221.156331718110,
        "loc_cross": "POINT(-0.15 51.49)POINT(-0.14 51.51)",
        "max": 67.3,
        "min": 52.3,
        "prcp": 0.05,
        "start_station_id": "259",
        "temp": 57.9,
        "ts": 1434337980
     }
  ]
}

次に以下の情報を変数として定義してください。なお、この内容はパイプラインのリクエストの例から参照できます。

ENDPOINT_ID="..."
PROJECT_ID="..."
INPUT_DATA_FILE="input_file.json"

これを設定した上で以下を実行すると、推論結果が取得できます。結果をみるとpredictionsに該当する推論結果の数値があり、それ以外の属性ではモデルのIDやモデルのパス、モデルのバージョンが取得されていることが確認できました。このように呼び出すことで、エンドポイント上のモデルを呼び出すことができます。

curl \
-X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://asia-northeast1-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/asia-northeast1/endpoints/${ENDPOINT_ID}:predict" \
-d "@${INPUT_DATA_FILE}"

# 結果
{
  "predictions": [
    [
      1513.93433
    ]
  ],
  "deployedModelId": "2638636591639166976",
  "model": "projects/.../locations/asia-northeast1/models/2130849136583376896",
  "modelDisplayName": "train_deployw0oy5xbx",
  "modelVersionId": "1"
}

まとめ

今回はサンプルが提供されている内容を元に、Vertex AI Pipelinesをデプロイし推論するところまで試してみました。今回利用したのはカスタムトレーニングジョブで実際に利用することも結構あると思うので、Vertex AI上でMLOpsを検討されている方はぜひ一度試してみてください。

Discussion