Mercari Dataflow Templateの紹介
テラーノベルで機械学習を担当している川尻です。テラーノベルで定期実行タスクの管理には、Google CloudのマネージドサービスであるCloud Composerを活用しています。以前にもテラーノベルのテックブログで他のサービスと比較して紹介しています。
定期実行タスクの中で典型的な処理の一つとして、BigQueryで処理した結果をGoogle Cloud StorageやCloud Spannerに書き出すというものが挙げられます。そういった処理のとき、Mercari Dataflow Templateが便利なので紹介します。また、最後にComposerから使うときのTipsをまとめました。
Dataflowのつらいところ
Dataflow[1] というのは、Google Cloudのフルマネージドでサーバーレスなデータ処理サービスです。処理のパイプラインは、Apache Beam[2] というオープンソースのSDKを使って作成して、その実行環境の一つとしてDataflowがあります。パイプラインは、Java, Python, Goなどで記述できます。
ComposerからDataflowを呼び出すのは簡単にできます。しかし、自分でDataflowのコードを書くのは結構手間で、Apache Beamの実装・テストやデバッグ方法を覚える以外にも、別のvirtualenvを作ってパッケージ管理、DockerイメージをArtifact Registryに配置するなど管理するものが増えてしまいます。また、Composerと同じプロジェクトで管理しようとすると、ComposerではPythonのバージョンが固定されており、同じPythonを使う場合は古いものに合わせることになってしまいます。[3][4]
Mercari Dataflow Templateによる解決
上記のような問題に対してMercari Dataflow Templateを使うとコードを書くことなく、JSONファイルを作成するだけで非常に多くのユースケースをカバーすることができます。データの入出力先として、Google Cloudの様々なサービスやS3、RDBなどに対応しています。必要な設定を記入してコマンドやコードからFlex Templateを起動するだけで実行できます。定期実行するタスクだけでなく、単発で実行する処理にもとても便利です。
例えば、BigQueryのクエリ結果をSpannerのテーブルに保存するための設定ファイルは、以下のように記述します。
{
"sources": [
{
"name": "bigquery",
"module": "bigquery",
"parameters": {
"query": "SELECT * FROM `myproject.mydataset.mytable`"
}
}
],
"sinks": [
{
"name": "spanner",
"module": "spanner",
"input": "bigquery",
"parameters": {
"projectId": "myproject",
"instanceId": "myinstance",
"databaseId": "mydatabase",
"table": "mytable"
}
}
]
}
コマンドから実行する場合は以下のように、事前にデプロイしておいたDataflow Flex Templateと上記JSONファイルのGCSのパスを指定するだけで可能です。
gcloud dataflow flex-template run bigquery-to-spanner \
--template-file-gcs-location=gs://example/template \
--parameters=config=gs://example/config.json
広いユースケースがカバーできるように、各入出力モジュールに様々な便利なオプションが用意されています。例えばstorage
というモジュールは、Google Cloud StorageやAWS S3へ書き出すことができるもので、dynamicSplitField
というオプションで指定したフィールドの値ごとにファイル名を変えることができます。その他にも、変換処理を追加できたり、実行時に設定ファイルのパラメータを渡せたり、色々な使い方ができるので、詳細は公式ドキュメントを参照ください。
Composerと一緒に使うときのTips
ComposerとMercari Dataflow Templateを一緒に使うときのtipsを紹介します。
設定ファイルの配置
Mercari Dataflow Templateを呼び出すには、JSONの設定ファイルをGCSにおく必要があります。そのためのパスをそれぞれ環境変数で設定し、配置する手順を管理すると手間が増えてしまいます。一方Composerではすでに、タスクを記述するPythonファイルを決められたGCS配置してデプロイしています。そこで、その中に一緒に設定ファイルを置くことにすれば、Composerと同じデプロイ手順にまとめることが可能です。以下のようなコマンドで、ComposerがデプロイしているGCS上のパスを取得することができます。これを環境変数にセットすることで、JSONファイルのパスを指定するのに用いることができます。
COMPOSER_DAG_GCS_PREFIX=$(gcloud --project $PROJECT composer environments describe ${COMPOSER_NAME} --location=asia-northeast1 --format=json | jq -r ".config.dagGcsPrefix")
Flex Templateを起動するOperatorを使う
ComposerからFlex Templateを起動するときには、DataflowStartFlexTemplateOperator を使いましょう。通常のコマンドにはない、Dataflowの実行完了まで待つなどのオプションが用意されています。以下のようなコードでOperatorを作成できます。
from airflow.providers.google.cloud.operators.dataflow import (
DataflowStartFlexTemplateOperator,
)
...
body = {
"launchParameter": {
"jobName": "mercari-dataflow-task"
"containerSpecGcsPath": "gs://example-bucket/template/mercari.json",
"parameters": {
"config": "gs://asia-northeast1-private-com-xxxxx-bucket/dags/xxxx/mercari.json", # 上記のコマンドでプリフィックスを取得しておく
},
"environment": {
"tempLocation": f"gs://example-bucket/dataflow-tmp/",
},
}
}
operator = DataflowStartFlexTemplateOperator(
task_id=task_id,
body=body,
location="asia-northeast1",
wait_until_finished=True,
)
デバッグのためにローカルから実行できるコードも用意しておく
JSONの設定ファイルとはいえ一発でうまく動くことはまずないため、Composerを通さずに直接実行しておくとデバッグがしやすいです。例えばBigQueryのクエリをPythonで動的に生成する場合はPythonのスクリプトで以下のように記述しておくと便利です。
from google.cloud import dataflow
from some_module import generate_query
query = generate_query(...)
request = dataflow.LaunchFlexTemplateRequest(
project_id="your-project-id",
location="asia-northeast1",
launch_parameter=dataflow.LaunchFlexTemplateParameter(
job_name="test-config-file",
container_spec_gcs_path="gs://example-bucket/template/mercari.json",
parameters={
"config": "gs://asia-northeast1-private-com-xxxxx-bucket/dags/xxxx/mercari.json", # 上記のコマンドでプリフィックスを取得しておく
"bigquery.query": query,
},
environment={
"temp_location": "gs://example-bucket/dataflow-tmp/",
},
),
)
client = dataflow.FlexTemplatesServiceClient()
resp = client.launch_flex_template(request=request)
print(resp.job.id) # コンソールからこのJob IDのものを探す
まとめ
Mercari Dataflow Templateの紹介とComposerと使うときのTipsを紹介しました。ほとんどGoogle Cloudを使うときに限られてしまいますが、とても便利なツールです。Google Cloudの新しい機能を取り込んだり、バグ修正されたりメンテナンスもされています。以前に不具合を見つけてイシューを上げたときにも、すぐに対応していただきました。もっと使う人が増えるといいなと思っています。
-
それぞれで違うPythonを使うようにモノレポ構成を組むこともできますが、その場合も更に複雑な管理が必要となってしまいます。また、割と密結合な実装になるため、別々のレポジトリで管理するの大変です。 ↩︎
-
Python3.11にアップデートされる予定でかなり改善されますが、今後は3.11のサポートが切れるまで続くのでまた古くなっていきます。 https://cloud.google.com/composer/docs/release-notes#January_31_2024 ↩︎
Discussion