🛳️

WorkflowsとVertex AIカスタムジョブによるお手軽MLワークフロー

2021/08/19に公開

はじめに

開発プロセスにおける定型的な作業を自動化することは、DevOpsを実現するための一つの要素と言えます。機械学習においても同様に、前処理や学習などといった一連の処理工程をワークフローとして自動化することは、MLOpsの実現する上で重要です。

本記事では実際にMLワークフローを構築しますが、特に以下の項目を意識します。

  • サーバーレス
  • 汎用なサービスを組み合わせたシンプルな構成

サーバーレスとは、インフラの管理を意識することなくサービスを利用できるという性質のことです[1]。必要なタイミングだけ計算リソースを利用することで、コストを抑えることができます。また今回利用するVertex AIのカスタムジョブでは、メモリやGPUなどを細かく設定できるため、実施する処理に応じて柔軟に計算リソースを利用することができます。

汎用なサービスを組み合わせて用いることは、技術導入のハードルを下げることに貢献できます。今回のデモは根本的にはコンテナ実行基盤とワークフローサービスから構成しますが、個人的にこれらの組み合わせだけでも多くのユースケースに対応できると感じています。また用いる技術要素を少なくすることで、新しい開発メンバーのキャッチアップも容易になると考えられます。

利用する技術要素

  • Python:機械学習の処理(データ前処理、モデル学習)の実装
  • Docker:各処理のコンテナアプリケーション化
  • Google Cloud Vertex AI(カスタムトレーニング):サーバーレスのコンテナ実行基盤
  • Google Cloud Workflows:サーバーレスのワークフローツール

実装内容は以下の記事をベースとしました。今回はテーブルデータの前処理とモデルの学習を行います。

https://qiita.com/kazunori279/items/b0a4d427b95ee785366d#cloud-workflowsでサーバレス-オーケストレーション

コードは以下に配置しました。実際に試してみる際には、gcloudコマンドなどのセットアップは公式ドキュメントを参照してください。GCPの利用は有料ですが、私の場合は数十円もかかりませんでした。

https://github.com/daigo0927/blog/tree/master/ml-workflow

Vertex AIのカスタムジョブ

Vertex AIはGCPにおけるMLサービス群の統合プラットフォームです。AWSであればSageMakerに相当すると言えるでしょう。

Vertex AIが提供するサービスの一つであるカスタムトレーニングを用いると、ユーザーが用意した様々な処理をDockerアプリケーションとしてクラウド上で実行できます。必要に応じてメモリやGPUの有無などを柔軟に設定できるほか、処理が終わったら自動的にマシンを停止してくれるため、余計な費用がかからずに済む。またカスタム”トレーニング”という名前ですが、コンテナの実行基盤として様々な用途に利用できます。

準備

コンテナイメージはGoogle Container Registryに格納します。ワークフローの各処理結果を格納するために、GCS(Google Cloud Storage)に3つのバケットを作成します[2]

  • gs://workflow-example-dataset: 生データの格納用
  • gs://workflow-example-preprocess: 前処理済みデータの格納用
  • gs://workflow-example-train: 学習結果のモデル格納用

今回はサンプルデータとしてペンギンのデータセットを利用しました。以下からCSVをダウンロードして生データ格納用のバケットに配置します。

https://github.com/mwaskom/seaborn-data/blob/master/penguins.csv

検証としてローカルからGCSにアクセスするには、サービスアカウントを通じて認証を構成する必要があります。手順は以下を参照してください。

https://cloud.google.com/docs/authentication/production?hl=ja#manually

ML処理のDockerize

今回は簡単な前処理と学習を、2つのDockerアプリケーションとして実装します。

  • 前処理:カラムを整理。データをtrain,valに分割して前処理バケットに格納
  • 学習:前処理済みのtrain,valデータをLightGBMで学習。モデルを学習結果バケットに格納
前処理スクリプト
ml-workflow/preprocess/main.py
import argparse
import pandas as pd
from sklearn.model_selection import StratifiedKFold


SEED = 42


def run(csv_path: str, output_dir: str, n_splits: int) -> None:
    df = pd.read_csv(csv_path)
    print(f"Load CSV from: {csv_path}")

    df['target'] = df['species'].map({'Adelie': 0, 'Chinstrap': 1, 'Gentoo': 2})
    df = df.drop(['species', 'island', 'sex'], axis=1)
    
    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=SEED)
    idx_train, idx_val = next(cv.split(df, df['target']))
    df_train = df.iloc[idx_train]
    df_val = df.iloc[idx_val]

    df_train.to_csv(f'{output_dir}/train.csv', index=False)
    df_val.to_csv(f'{output_dir}/val.csv', index=False)
    print(f'Save train/val data to: {output_dir}')

    
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Preprocess')
    parser.add_argument('--csv-path', type=str, required=True,
                        help='Source CSV file location')
    parser.add_argument('--output-dir', type=str, required=True,
                        help='GCS location for writing outputs')    
    parser.add_argument('--n-splits', type=int, default=3,
                        help='Number of train/val splits. [3] as default')
    args = parser.parse_args()

    run(**vars(args))
前処理のDockerfile
ml-workflow/preprocess/Dockerfile
FROM python:3.8-slim
WORKDIR /root

RUN pip install poetry
COPY poetry.lock pyproject.toml .
RUN poetry config virtualenvs.create false \
    && poetry install --no-interaction --no-ansi

COPY . .

ENTRYPOINT ["python", "main.py"]
学習スクリプト
ml-workflow/train/main.py
import argparse
import pandas as pd
import lightgbm as lgb
from google.cloud import storage
from sklearn.metrics import accuracy_score


SEED = 42


def run(datadir: str,
        out_bucket: str,
        learning_rate: float,
        max_depth: int,
        bagging_fraction: float,
        feature_fraction: float,
        lambda_l1: float,
        lambda_l2: float,
        min_data_in_leaf: int,
        num_leaves: int) -> None:
    df_train = pd.read_csv(f'{datadir}/train.csv')
    df_val = pd.read_csv(f'{datadir}/val.csv')
    print(f'Data size: train: {df_train.shape}, val: {df_val.shape}')

    x_train, y_train = df_train.drop(['target'], axis=1), df_train['target']
    x_val, y_val = df_val.drop(['target'], axis=1), df_val['target']

    ds_train = lgb.Dataset(x_train, label=y_train)
    ds_val = lgb.Dataset(x_val, label=y_val)

    params = {
        'objective': 'multiclass',
        'num_class': 3,
        'learning_rate': learning_rate,
        'max_depth': max_depth,
        'bagging_fraction': bagging_fraction,
        'feature_fraction': feature_fraction,
        'lambda_l1': lambda_l1,
        'lambda_l2': lambda_l2,
        'min_data_in_leaf': min_data_in_leaf,
        'num_leaves': num_leaves,
        'random_state': SEED,
        'verbose': -1
    }

    model = lgb.train(params,
                      ds_train,
                      num_boost_round=1000,
                      early_stopping_rounds=10,
                      valid_sets=[ds_train, ds_val],
                      verbose_eval=50)

    y_pred = model.predict(x_val, num_iteration=model.best_iteration)
    y_pred = y_pred.argmax(axis=-1)
    acc_val = accuracy_score(y_val, y_pred)
    print(f'Validation accuracy: {acc_val}')

    savefile = 'model.lgb'
    model.save_model(savefile)

    bucket = storage.Client().bucket(out_bucket)
    blob = bucket.blob(savefile)
    blob.upload_from_filename(savefile)
    print(f'Upload model to: gs://{out_bucket}/{savefile}')


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Train')
    parser.add_argument('--datadir', type=str, required=True,
                        help='GCS location for train/val data')
    parser.add_argument('--out-bucket', type=str, required=True,
                        help='GCS location for outputs')
    parser.add_argument('--learning-rate', type=float, default=0.1)
    parser.add_argument('--max-depth', type=int, default=10)
    parser.add_argument('--bagging-fraction', type=float, default=0.7)
    parser.add_argument('--feature-fraction', type=float, default=0.7)
    parser.add_argument('--lambda_l1', type=float, default=1.0)
    parser.add_argument('--lambda_l2', type=float, default=1.0)
    parser.add_argument('--min-data-in-leaf', type=int, default=10)
    parser.add_argument('--num-leaves', type=int, default=40)

    args = parser.parse_args()
    run(**vars(args))
学習のDockerfile
ml-workflow/train/Dockerfile
FROM python:3.8-slim
WORKDIR /root

RUN apt-get update -y \
    && apt-get install -y build-essential libgomp1 \
    && apt-get -y clean all

RUN pip install poetry
COPY poetry.lock pyproject.toml .
RUN poetry config virtualenvs.create false \
    && poetry install --no-interaction --no-ansi

COPY . .

ENTRYPOINT ["python", "main.py"]

ENTRYPOINT ["python", "main.py"]によってPythonスクリプトを起動することで、コンテナ実行時に引数を渡すことができる。例えば前処理コンテナを以下のように実行すると、--csv-path, --output-dirで指定した値をmain.pyに渡すことができる。

docker container run <コンテナ起動時引数> <前処理のDockerイメージ> --csv-path gs://<source-bucket>/penguins.csv --output-dir gs://<preprocess-bucket>/<directory>

今回の処理内容は単純なため、わざわざ前処理と学習を分けて実装する必要性は薄いと言えます。一方で処理内容が高度になってくると、前処理はハイメモリマシン、学習はGPU付きマシンなどと処理ごとに適切に計算リソースを割り当てることで、コストを抑えることができます。

カスタムジョブの実行

カスタムジョブはgcloudコマンドや各種クライアントなど、いろいろな方法で実行できます。ローカルから単発のジョブを実行するなら、gcloudコマンドとコンフィグファイルによって実行するのがおすすめです。

gcloud ai custom-jobs create --region <ジョブを実行するリージョン> --display-name <ジョブの名前> --config config.yaml

例えば前処理実行用のコンフィグは以下のようになります(実際にはプロジェクトIDなどを記入し、YAMLファイルとして利用します)。対象のコンテナイメージや実行時のマシンタイプ、スクリプトに渡す引数argsなどを指定できます。

ml-workflow/preprocess/config.yaml.template
workerPoolSpecs:
  machineSpec:
    machineType: e2-standard-4
    # acceleratorType: ACCELERATOR_TYPE
    # acceleratorCount: ACCELERATOR_COUNT
  replicaCount: 1
  containerSpec:
    imageUri: gcr.io/<project-id>/<image>:<tag>
    args:
      - '--csv-path=gs://<dataset-bucket>/penguins.csv'
      - '--output-dir=gs://<preprocess-bucket>/test'

全てのパラメータは以下のドキュメントに記載されています。

https://cloud.google.com/sdk/gcloud/reference/ai/custom-jobs/create?hl=ja

実行状況はCloud Consoleから確認できます。各ジョブをクリックすることで、実行時の引数やログを確認することもできます。

コマンドラインからジョブの状況を確認することもできます。ジョブIDは各ジョブに付与される数字です。条件によってフィルタリングすることも可能です。

# 特定のジョブの実行結果を確認
gcloud ai custom-jobs describe <job-id> --region=<region>

# 成功したジョブ一覧を表示
gcloud ai custom-jobs list --region=<region> --filter state=JOB_STATE_SUCCEEDED
特定ジョブの実行結果
createTime: '2021-08-18T14:25:03.570172Z'
displayName: workflow-preprocess
endTime: '2021-08-18T14:31:20Z'
jobSpec:
  workerPoolSpecs:
  - containerSpec:
      args:
      - --csv-path=gs://workflow-example-dataset/penguins.csv
      - --output-dir=gs://workflow-example-preprocess/test
      imageUri: gcr.io/<project-id>/workflow-example-preprocess:latest
    diskSpec:
      bootDiskSizeGb: 100
      bootDiskType: pd-ssd
    machineSpec:
      machineType: e2-standard-4
    replicaCount: '1'
name: projects/<project-id>/locations/us-east1/customJobs/6858102623183044608
startTime: '2021-08-18T14:31:20Z'
state: JOB_STATE_SUCCEEDED
updateTime: '2021-08-18T14:31:21.405263Z'

後述のワークフロー作成時には、このジョブ実行状況を確認することで処理の進行を管理します。

Workflowsによるパイプライン構築

Google Cloud Workflowsは、GCPの各種サービスやHTTPベースのAPIをサーバーレスで実行できるワークフローサービスです。今回はVertex AIのカスタムジョブを実行しますが、機械学習以外にも様々なユースケースに対応できます。ワークフローはYAMLファイルで定義するため、学習コストが低いと感じます。

以降このサービスをWorkflows、作成・実行する工程の実体をワークフローとして表記します。

準備

Workflowsからカスタムジョブを実行するために、ジョブ実行権限を持つサービスアカウントを作成します。Cloud Consoleの「IAMと管理>サービスアカウント」からサービスアカウントを作成し、Vertex AIユーザーのロールを付与することで準備完了です。もちろんgcloudコマンドを通じて作成することも可能です。


Vertex AIユーザーロールを付与したサービスアカウント

ワークフローの作成

Cloud Consoleからワークフローを作成します。この時に適切な権限を持つサービスアカウントを付与することができます。

処理の中身はYAML形式で記述します。今回のワークフローの大まかな流れは以下のようになります。

ml-workflow/workflows/workflow.yaml
main:
  params: [args]  # ワークフロー実行時の引数
  steps:
    - assign_vars: ...  # 利用する変数を定義
    - preprocess: ...  # 前処理ジョブを投下
    - sleep_preprocess: ...  # 一定時間待機
    - get_preprocess_status: ...  # 前処理ジョブの進行状況を確認
    - wait_preprocess: ...  # 前処理ジョブが完了→学習へ、進行中→再度待機
    - train: ...  # 学習ジョブを投下
    - sleep_train: ...  # 一定時間待機
    - get_train_status: ...  # 学習ジョブの進行状況を確認
    - wait_train: ...  # 学習ジョブが完了→終了へ、進行中→再度待機
    - finish: ...  # 学習ジョブのステータスを返して終了

paramsはワークフロー実行時の引数です。今回は実行時のロケーション(リージョン)とプロジェクトIDを設定しました。

preprocess, trainはそれぞれPOSTリクエストを通じて前処理と学習のジョブを実行しています。マシンスペックの設定やスクリプトの引数はリクエストボディに記載します。auth: type: OAuth2はVertex AIのジョブ実行における認証です。権限を持つサービスアカウントを紐つけることで、OAuth 2.0による認証が可能になります。

ml-workflow/workflows/workflow.yaml
main:
  steps:
    ...
    - preprocess:
        call: http.post
        args:
          url: ${"https://"+location+"-aiplatform.googleapis.com/v1/projects/"+project_id+"/locations/"+location+"/customJobs"}
          body:
            displayName: workflow-preprocess
            jobSpec:
              workerPoolSpecs:
                machineSpec:
                  machineType: e2-standard-4
                replicaCount: 1
                containerSpec:
                  imageUri: ${"gcr.io/"+project_id+"/workflow-example-preprocess:latest"}
                  args:
                    - '--csv-path=gs://workflow-example-dataset/penguins.csv'
                    - '--output-dir=gs://workflow-example-preprocess/test'
          auth:
            type: OAuth2
        result: preprocess_resp    
    - sleep_preprocess: ...

ジョブが完了するまでは後続の処理を待機する必要があります。Workflowsでは現状この待機処理をワンステップで実行する方法はないため、一定時間ごとにジョブの状態を確認、完了していたら次の処理に進む、という形で実現します。

ml-workflow/workflows/workflow.yaml
main:
  params: [args]
  steps:
    ...
    - preprocess: ...
    - sleep_preprocess: # 60秒待機
        call: sys.sleep
        args:
          seconds: 60
    - get_preprocess_status: # ジョブの状態確認
        call: http.get
        args:
          url: ${"https://"+location+"-aiplatform.googleapis.com/v1/"+preprocess_resp.body.name}
          auth:
            type: OAuth2
        result: preprocess_status
    - wait_preprocess: # ジョブが成功:次ステップ、失敗:終了、進行中:再び待機
        switch:
          - condition: ${preprocess_status.body.state == "JOB_STATE_SUCCEEDED"}
            next: train
          - condition: ${preprocess_status.body.state == "JOB_STATE_FAILED"}
            return: ${preprocess_status.body}
        next: sleep_preprocess
    - train: ...

コンソール上でこれらを記入すると、処理の流れを可視化できます。最後にデプロイすることでワークフローの作成が完了します。

補足として、Workflowsでは今回の待機→ジョブの状態確認のような共通処理をサブワークフローとして切り出すことができます。サブワークフローを用いたワークフロー定義はリポジトリのml-workflow/workflows/workflow-slim.yamlを参照してください。

https://github.com/daigo0927/blog/tree/feature/ml-workflow/ml-workflow/workflows

https://cloud.google.com/workflows/docs/apis?hl=ja

ワークフローの実行

作成したワークフローを選択、実行することでワークフローを実行できます。ワークフローの実行時引数はこのタイミングで指定することができます。

実行結果もコンソールから確認できます。各実行IDをクリックすることで、実行時に渡した引数やワークフローからの返り値、処理が失敗した場合にはエラーなどが表示されます。

いくつかの実行で失敗していますが、今回の検証ではワークフローの定義ミスが主な原因でした。ワークフロー内では各処理間で変数を渡すことができ、${var}のようにして値を評価できるのですが、${}を忘れて変数が文字列として扱われてしまうケースなどがありました。実装の際には注意しましょう。

まとめ

WorkflowsとVertex AIのカスタムジョブを用いて、簡単なMLワークフローを構築することができました。これらはサーバーレスのサービスなので、インフラリソースは必要な分だけ確保され、コストを抑えることができます。要素技術としてもPythonとDockerがメインなので、経験の浅いMLエンジニアやデータサイエンティストでも十分理解できるレベルだと思います。

今回はシンプルさを優先してWorkflowsを用いましたが、Vertex AIでもMLワークフローをオーケストレートするための、Vertex Pipelinesというサービスが提供されています。これはKubeflow Pipelines(またはTensorFlow Extended)で定義されたパイプラインの実行基盤であり、Workflowsと同様にサーバーレス方式の実行環境となります。

Vertex Pipelinesを利用すれば、カスタムジョブを含めてVertex AIエコシステムの恩恵を最大限に受けられます。一方で簡単なワークフローであれば、今回のようにシンプルなワークフローツールとカスタムジョブの連携だけでも対応できます。MLシステムの成熟度や開発メンバーのスキルセットに応じて、適した構成を選択し、育てていくことが重要だと言えます。

参考リンク集

https://zenn.dev/dhirooka/scraps/fdcfa1f4b40de2

脚注
  1. サーバーレスコンピューティング | Google Cloud ↩︎

  2. バケット名はグローバルで一意なので、試す場合には他の名前で置き換えてください ↩︎

Discussion