Cloud Dataflow を触ってみる

公式の Go言語による Quickstart があるのでこれに従って進める

Compute Engine のデフォルトサービスアカウントにjob実行の権限を付与するために、まずは自分のアカウントに IAM role を付与する
Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

どうやらdataflowでjobを実行するためには次の権限が必要らしい。この二つの権限の違いについては後で調べる
- roles/dataflow.admin
- roles/dataflow.worker
今回のサンプルでは GCS に対するファイルを読み書きが存在するため次の権限も付与
- roles/storage.objectAdmin

go module を作成して、パイプラインの中身を少し編集して再実行する。ローカルで実行すると数十分待っても終了しなかった。
Dataflowで zone を asia-northeast1 で指定して jobを実行すると、最初は次のようなエラーが出た。時間を空けて再実行か、別の zone を指定する必要があるらしい。
Startup of the worker pool in zone asia-northeast1-b failed to bring up any of the desired 1 workers. Please refer to https://cloud.google.com/dataflow/docs/guides/common-errors#worker-pool-failure for help troubleshooting. ZONE_RESOURCE_POOL_EXHAUSTED:
asia-northeast2 を指定して再実行したところ成功した。

Dataflow のモニタリング周りが気になったので取得できるメトリクスを少し調査
Dataflow Job の Fafiled というメトリクスを通して、job が失敗したかどうかを取得できるようだ。この値を用いて実行が失敗したかどうかモニタリングができそう

Dataflow templates という機能があるので、それも使ってみる
ここに従えばサンプルのtemplateを実行することができる
➜ gcloud dataflow jobs run words-count-1 \
--gcs-location gs://dataflow-templates/latest/Word_Count \
--region asia-northeast2 \
--parameters=inputFile=gs://dataflow-samples/shakespeare/kinglear.txt \
--parameters=output=gs://hiramekun-dataflow-sample/results/outputs

service account を別途指定して実行するためには、このドキュメント通りに行えば良いらしい
まずは service account を作成する
➜ gcloud iam service-accounts create dataflow-worker \
--description="worker service account for dataflow job" \
--display-name="dataflow-worker"
Created service account [dataflow-worker].
作成した service account に必要な権限を付与する
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com" \
--role="roles/dataflow.worker"
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com" \
--role="roles/dataflow.admin"
例では Cloud Storage に対する読み書きが必要なので、それも付与
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"
その他に必要な権限も付与していく
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:service-<PROJECT_NUMBER>@dataflow-service-producer-prod.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountTokenCreator"
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:service-<PROJECT_NUMBER>@dataflow-service-producer-prod.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser"
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:service-<PROJECT_NUMBER>@compute-system.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountTokenCreator"
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:service-<PROJECT_NUMBER>@compute-system.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser"
ようやく実行。 --service-accunt-email
でサービスアカウントを指定する。
➜ gcloud dataflow jobs run words-count-1 \
--gcs-location gs://dataflow-templates/latest/Word_Count \
--region asia-northeast2 \
--parameters=inputFile=gs://dataflow-samples/shakespeare/kinglear.txt \
--parameters=output=gs://hiramekun-dataflow-sample/results/outputs \
--service-account-email=dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com

この辺で実行中のjobを確認できる
→ gcloud dataflow jobs list --region asia-northeast2
Dataflow job詳細からserviceAccountEmail
を確認すると、無事に新しく作成した service account で実行されていることがわかる

dataflow 周りの権限を削除してから実行しても実行できるようにみえる。本当に新しいservice account を指定できているのか?
roles/dataflow.worker を削除してから実行すると、きちんとwarningが出てjobの実行ができていないようだった
Permissions verification for controller service account failed. All permissions in IAM role roles/dataflow.worker should be granted to controller service account dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com.
ただ、20分ほど待ってもjobはfailせず、常に実行中のステータスになっていた。タイムアウトなどはあるのだろうか、後で調べたい

あと、dataflowのコンソールから確認するとjobが見えたり見えなかったりする。時間をおいて確認するのがよさそう

他のテンプレートも触ってみたい。公式が提供しているテンプレートは次の通り
メルカリ社が提供しているテンプレートもある

では次に、BigQueryからPubSubへの連携を試したい。
まずはこのQuickStartに従って簡単にCloud PubSubのTopic, Subscription, Publisher, Subscriberあたりを準備する。
ついでに少し手を加えて自分のリポジトリを作っておく

メルカリのDataflow Templateを使用して、BigQueryからPubSub Topicへのデータ送信を試してみる。
まずこのテンプレートを利用するためにはArtifact Registryを用意する必要があるので、こちらのQuick Startに従って簡単にレポジトリを用意する。
続いて、BigQueryのデータを簡単に準備。これに従ってサンプルデータを自分のプロジェクト内に作成する
この状態になれば、事前準備としては大丈夫。Dataflow Template のREADMEに沿ってソースをビルドし、Templateをデプロイする。

Protobuf形式でPubSubにPublishしたい。Dataflow Temlate中の実装では、自動的にBigQueryのスキーマ中に含まれるフィールドと、Protobufのフィールドで同じものを変換するため、全く同じフィールド名である必要がある。

dataflowで用いているサービスアカウントに、BigQueryに対するデータ読み取り権限とPubSub Topicに対するPublish権限を付与する
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com" \
--role="roles/bigquery.dataViewer"
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com" \
--role="roles/pubsub.publisher"
また、これだけだと動かなかったので、追加で readsession に関連する権限を追加する必要があった
➜ gcloud projects add-iam-policy-binding hiramekun-dataflow-sample \
--member="serviceAccount:dataflow-worker@hiramekun-dataflow-sample.iam.gserviceaccount.com" \
--role="roles/bigquery.readSessionUser"

他にも色々やったので、こっちに手順をまとめなおした