🚄

Dataflow による機械学習モデルの並列分散推論とその開発フロー

2024/12/21に公開

この記事は MLOps Advent Calendar 2024 21日目への投稿記事です。

GCP の Dataflow を利用して機械学習モデルによる推論を並列分散で行う方法と、その開発フローの一例を紹介します。

Dataflow はバッチでの並列分散処理や、データのストリーミング処理を行うことができるサービスです。公式サイトではストリーミング処理について強調されていますが、実際に利用してみてバッチでの並列分散処理でも有用に感じました。

この記事のソースコードは以下で公開しています。
https://github.com/st81/dataflow-example-ml-inference

執筆目的

以下のことがすぐに分かるようにすることが目的です。これらのことは、Dataflow の公式ドキュメントや実装サンプルを読みながら手を動かせば分かるのですが、それまでに時間がかかったためすぐに思い出せるように執筆しました。

  • Dataflow での機械学習モデルの並列分散推論の開発フローの一例が分かる
    • 公式ドキュメントや実装サンプルは Dataflow ジョブを実行する例であり、プロダクション向け開発で、開発のイテレーションをどう回すかは分からなかったため
  • PyTorch で Dataflow で並列分散推論するときの方法が分かる
    • 公式ドキュメントや実装サンプルでは TensorFlow での推論例しかないため
  • 自身で用意したデータセットやモデルを利用した Dataflow での推論方法が分かる
    • 公式ドキュメントや実装サンプルは、予め用意されたデータセットやモデルを扱っており、自身で用意したそれらを利用するにはもうひと手間必要であったため

後述するような課題を抱えていて、その選択肢の1つとして Dataflow の活用を考えられている方々にこの記事が参考になれば幸いです。

なお、上記が目的であるため、この記事では Dataflow や Apache Beam などに関する詳細な説明は行いません。別途公式のドキュメントなどをご参照ください。

この記事では以下のサイトを参考にしています。

想定課題

この記事での想定課題は、大規模データに対して機械学習モデルの推論を行う必要があるが、直列で処理すると要件とする処理時間を満たせないというものです。例えば、数百万件の画像データを月次で定期的にバッチで推論する必要があるが、直列で処理した場合要件とする時間以内(例:1ヶ月以内)に完了しないなどです。

Dataflow を採用するメリット

私見ですが採用するメリットは、既に社内などでストリーミング処理で利用実績があり有識者が近くにいたりナレッジが溜まっていたりすることと思いました。上述したようにストリーミング処理も可能であることから、GCP をデータや機械学習の基盤として活用されている場合は、既に Dataflow を社内などで利用されている可能性が高いと思います。

また、他のサービスと比較したわけではありませんが、実際に私が使ってみて感じたメリットとしては、

  • 1度開発すれば並列分散処理するワーカーを自由に増やせるため、データ量が増えても柔軟に対応できる
    • 例えば、100ワーカーで並列分散処理させることも簡単にできます
  • 機械学習モデルの推論以外のプロセスでも大量データを扱う場合に大幅な高速化が期待できる
    • 実際に業務データを扱う場合は、推論前後にデータのダウンロードやアップロードなどが必要になることがあり、例えば GCS とデータをやり取りする場合などはそこでかなりの処理時間を要しますが、並列分散処理させることで大幅な高速化ができます
  • Apache Beam の構文に慣れてさえしまえば、簡単に並列分散処理が書ける
    • Apache Beam の構文も個人的な感覚としてそれほど学習に時間がかかるものではなかったです

開発・推論フロー概要

開発・推論フローは以下図の通りです。

開発・推論フロー

開発フローでは、作成した Apach Beam パイプラインを Compute Engine インスタンス上で実行し動作確認を行います。推論フローでは、Dataflow 上で推論処理を行います。こうした理由は、 Dataflow 上でパイプラインを実行すると開始までにそれなりに時間がかかり(数分~十数分程度)、コード変更>動作確認のサイクルを素早く回せないためです。

また、ここでは開発フローの区切りが良いタイミングで推論フローを実行、正常に動作したら再びソースコード変更としています。理由は、区切りが良いタイミング(新しいコマンドライン引数追加、ライブラリ追加など)で定期的に推論フローを実行し、 Dataflow 上でも動作確認すると開発効率が良かったためです。具体的には、Compute Engine インスタンスではエラーが発生しないが Dataflow 上では発生するということがそれなりに多くあり、かつそのデバッグがやや難しく、変更をたくさん行ってから Dataflow 上で動作確認すると、どの変更によってエラーが発生するようになったのか特定に時間がかかったためです。

ちなみに、 Compute Engine インスタンス上で開発をしているのは、GPU を利用した開発時にローカルマシンに GPU がない場合を考慮してのことです(現在 GPU を利用する部分は TODO でありまだ未完です)。

ここからは、実際に上記フローを回すための事前準備から開発・推論フローで利用したコマンド群を記載します。

事前準備

GCP 環境変数設定

export PROJECT="<fill-this>"
export LOCATION="<fill-this>"

export MODELS_BUCKET="dataflow-example-ml-inference-models"
export IMAGES_BUCKET="dataflow-example-ml-inference-images"

export SERVICE_ACCOUNT_NAME="dataflow-example-ml-inference"

export BQ_DATASET="dataflow_example_ml_inference"
export BQ_TABLE="predictions"

export REPOSITORY="<fill-this>"

export DATAFLOW_JOB_NAME="example-ml-inference"
export TAG="0.0.1"
export SDK_CONTAINER_IMAGE="$LOCATION-docker.pkg.dev/$PROJECT/$REPOSITORY/$DATAFLOW_JOB_NAME:$TAG"

export BUCKET="$PROJECT-dataflow-example"
export TEMPLATE_FILE="gs://$BUCKET/$DATAFLOW_JOB_NAME-$TAG.json"

# 開発時に利用する Compute Engine インスタンス周りの変数
export INSTANCE_NAME="instance-`date +%Y%m%d-%H%M%S`"
export ZONE="<fill-this>"
export INSTANCE_SERVICE_ACCOUNT="<fill-this>"

サービスアカウント作成・ロール付与

Dataflow で利用します。

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \
    --display-name $SERVICE_ACCOUNT_NAME
gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:"$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com" \
    --role="roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:"$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"
gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:"$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com" \
    --role="roles/bigquery.user"
gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:"$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com" \
    --role="roles/dataflow.worker"
gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:"$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com" \
    --role="roles/storage.objectUser"

Artifact Regirstry リポジトリ作成

Dataflow ジョブ実行に利用する Docker Image の格納先として利用します。

gcloud artifacts repositories create $REPOSITORY \
    --repository-format=docker \
    --location=$LOCATION \
    --project $PROJECT

BigQuery データセット・テーブル作成

推論結果を書き込むために利用します。

bq mk \
    --dataset \
    $PROJECT:$BQ_DATASET
bq mk \
    --table \
    $PROJECT:$BQ_DATASET.$BQ_TABLE \
    img_filename:STRING,target:INTEGER,prediction:INTEGER,created_at:TIMESTAMP

GCS Bucket 作成

Dataflow の Flex Template や staging, temp ディレクトリ作成先として利用します。

gsutil mb \
    -p $PROJECT \
    -l $LOCATION \
    gs://$BUCKET

pyproject.toml 作成

依存関係は Poetry で管理します。

docker run \
    --rm \
    -it \
    -v $(pwd):/work \
    -w /work \
    --entrypoint /bin/bash \
    pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime
# この Docker Image は Dataflow で利用するベース Docker Image と同一としています。
# 以下は Docker Container 内で実行
pip install poetry==1.8.5
poetry init --name dataflow-example-ml-inference --python ">=3.9,<3.12.0" -n
poetry add apache-beam[gcp]==2.54.0 pandas==2.1.3 scikit-learn==1.2.2

推論時に利用するアーティファクトを GCS へアップロード

この記事で利用する推論用の画像・学習済みモデル重みは以下の私個人の GitHub プロジェクトで作成したものです。

https://github.com/st81/pytorch-lightning-tutorial-mnist

画像ファイルの準備方法やモデルの学習方法は同プロジェクト内の README を参照ください。

ここでは、画像データセットとして MNIST、モデルとして ResNet を使用しています。

推論用の画像ファイルを GCS へアップロード

gsutil mb \
    -p $PROJECT \
    -l $LOCATION \
    gs://$IMAGES_BUCKET
gsutil -m cp </path/to/img/dir/*> gs://$IMAGES_BUCKET
gsutil -m cp </path/to/test.csv> gs://$IMAGES_BUCKET

推論用の学習済みモデル重みを GCS へアップロード

gsutil mb \
    -p $PROJECT \
    -l $LOCATION \
    gs://$MODELS_BUCKET
gsutil cp </path/to/model/checkpoint> gs://$MODELS_BUCKET

開発フローで利用するコマンド

Compute Engine インスタンス準備

  • インスタンス 作成
gcloud compute instances create $INSTANCE_NAME \
    --project=$PROJECT \
    --zone=$ZONE \
    --machine-type=n1-standard-1 \
    --network-interface=network-tier=PREMIUM,stack-type=IPV4_ONLY,subnet=default \
    --no-restart-on-failure \
    --maintenance-policy=TERMINATE \
    --provisioning-model=SPOT \
    --instance-termination-action=STOP \
    --service-account=$INSTANCE_SERVICE_ACCOUNT \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --create-disk=auto-delete=yes,boot=yes,device-name=$INSTANCE_NAME,image=projects/cos-cloud/global/images/cos-105-17412-495-69,mode=rw,size=100,type=pd-balanced \
    --no-shielded-secure-boot \
    --shielded-vtpm \
    --shielded-integrity-monitoring \
    --labels=goog-ec-src=vm_add-gcloud \
    --reservation-affinity=any
TODO: GPU の場合のコマンドフラグ追加
--accelerator=count=1,type=nvidia-tesla-t4 \
  • 開発に必要なファイル群を インスタンス に送信
gcloud compute scp --zone $ZONE --project $PROJECT --recurse ./* $INSTANCE_NAME:~
gcloud compute scp --zone $ZONE --project $PROJECT $HOME/.config/gcloud/application_default_credentials.json $INSTANCE_NAME:~
  • ssh 接続
gcloud compute ssh --zone $ZONE $INSTANCE_NAME --project $PROJECT
TODO: GPU の場合のコマンド追加
sudo cos-extensions install gpu
sudo mount --bind /var/lib/nvidia /var/lib/nvidia
sudo mount -o remount,exec /var/lib/nvidia
/var/lib/nvidia/bin/nvidia-smi
sudo docker run \
    --rm \
    -it \
    --volume /var/lib/nvidia/lib64:/usr/local/nvidia/lib64 \
    --volume /var/lib/nvidia/bin:/usr/local/nvidia/bin \
    --device /dev/nvidia0:/dev/nvidia0 \
    --device /dev/nvidia-uvm:/dev/nvidia-uvm \
    --device /dev/nvidiactl:/dev/nvidiactl \
    --entrypoint /bin/bash \
    pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime

/usr/local/nvidia/bin/nvidia-smi
python -c "import torch; print(torch.cuda.is_available());"

ソースコード変更

私の場合は、エディタとして VSCode を利用し、 VSCode 上でソースコード変更後、インスタンス上で vim エディタで変更を反映していました。この部分は2重での変更になっており、より良い方法を模索したい点です。

Docker Image を build

  • 「GCP 環境変数設定」に記載のコマンドを実行しインスタンス上でも環境変数設定

  • Docker Image をビルド

docker build -t $SDK_CONTAINER_IMAGE -f Dockerfile .

Apache Beam パイプライン動作確認

  • 以下コマンドを実行
docker run \
    --rm \
    --volume $HOME/application_default_credentials.json:/root/.config/application_default_credentials.json \
    --entrypoint python \
    $SDK_CONTAINER_IMAGE \
    main.py \
    --dataset_csv_path "gs://$IMAGES_BUCKET/test.csv" \
    --dataset_img_root "gs://$IMAGES_BUCKET" \
    --model_checkpoint_path "gs://$MODELS_BUCKET/<fill-this>" \
    --device="cpu" \
    --output_table_id $PROJECT:$BQ_DATASET.$BQ_TABLE \
    --project $PROJECT \
    --temp_location "gs://$BUCKET/temp"
    # temp_location は BigQuery へのデータ書き込み時に必要なため指定
TODO: GPU ありの場合のコマンド追記
docker run \
    --rm \
    -it \
    --volume /var/lib/nvidia/lib64:/usr/local/nvidia/lib64 \
    --volume /var/lib/nvidia/bin:/usr/local/nvidia/bin \
    --volume $HOME/application_default_credentials.json:/root/.config/application_default_credentials.json \
    --device /dev/nvidia0:/dev/nvidia0 \
    --device /dev/nvidia-uvm:/dev/nvidia-uvm \
    --device /dev/nvidiactl:/dev/nvidiactl \
    --entrypoint /bin/bash \
    $SDK_CONTAINER_IMAGE

Artifact Registry へ push

  • Artifact Registry 認証に必要なパスワード取得(ローカルで実行)
gcloud auth print-access-token
  • 上記パスワードをコピー(ローカルで実行)

  • Artifact Registry 認証(インスタンス上で実行)

docker login -u oauth2accesstoken $LOCATION-docker.pkg.dev
  • パスワードを求められるため、コピーしたパスワードをペーストし Enter 押下(インスタンス上で実行)

  • 以下コマンドを実行(インスタンス上で実行)

docker push $SDK_CONTAINER_IMAGE

このセクションに記載の手順はやや煩雑かつセキュリティ的にあまり良くないように感じるため、より良い方法を模索したい点です。

Flex Template を build

gcloud dataflow flex-template build $TEMPLATE_FILE  \
    --image $SDK_CONTAINER_IMAGE \
    --sdk-language "PYTHON" \
    --metadata-file=metadata.json \
    --project $PROJECT

推論フローで利用するコマンド

Flex Template をもとに Dataflow Job 発行

gcloud dataflow flex-template run "$DATAFLOW_JOB_NAME-`date +%Y%m%d-%H%M%S`" \
    --template-file-gcs-location $TEMPLATE_FILE \
    --project $PROJECT \
    --region $LOCATION \
    --max-workers 7 \
    --num-workers 7 \
    --worker-machine-type n1-standard-1 \
    --staging-location "gs://$BUCKET/staging" \
    --temp-location "gs://$BUCKET/temp" \
    --service-account-email "$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com" \
    --parameters sdk_container_image=$SDK_CONTAINER_IMAGE \
    --parameters dataset_csv_path="gs://$IMAGES_BUCKET/test.csv" \
    --parameters dataset_img_root="gs://$IMAGES_BUCKET" \
    --parameters model_checkpoint_path="gs://$MODELS_BUCKET/<fill-this>" \
    --parameters device="cpu" \
    --parameters output_table_id="$PROJECT:$BQ_DATASET.$BQ_TABLE" \
    --parameters autoscaling_algorithm="NONE" \
    --parameters disk_size_gb="50"
TODO: GPU 利用の場合のコマンドフラグ追記
    --parameters dataflow_service_option="worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver" \

推論実行例

実行例を以下に示します。

7ワーカーでの実行例

少し分かりづらいかもですが、画像の F45 というステージが GCS から画像ダウンロード>推論>BigQuery への書き込み を行う部分であり、ワーカー数7の場合3分26秒かかっています。

ワーカー数1として同じ処理を実行すると以下の結果となり、17分6秒かかっていることから 約6倍高速化出来ていることが分かります。

1ワーカーでの実行例

BigQuery に書き込まれた推論結果の例は以下であり、

select
  img_filename,
  target,
  prediction,
from
  `<project_id>.dataflow_example_ml_inference.predictions`
order by img_filename
limit 10

推論結果例

Accuracy を計算してみると以下の通り 0.9152 と高い値であり学習済みモデルで推論されたことも確認できました。

select
  count(case when target = prediction then 1 end) / count(*) as accuracy
from
  `<project_id>.dataflow_example_ml_inference.predictions`

Accuracy結果

まとめ

Dataflow を利用して機械学習モデルの推論を並列分散に行う方法と、その開発フローの一例を紹介しました。ワーカー数を変更することにより高速化が実現できることや、開発サイクルとしてどんなものが考えられるかを示しました。

ここではかなり小さなサイズの画像と機械学習モデル、1万件という少量のデータしか扱っていないですが、より大きなサイズの大量のデータを大きめのモデルで推論する場合は、かなりの高速化インパクトがあります。

この記事で述べたような課題をお持ちの方々のお役に立てれば幸いです。

Discussion