🐉

Mercari Dataflow Templateの紹介

テラーノベルで機械学習を担当している川尻です。テラーノベルで定期実行タスクの管理には、Google CloudのマネージドサービスであるCloud Composerを活用しています。以前にもテラーノベルのテックブログで他のサービスと比較して紹介しています。
https://zenn.dev/tellernovel_inc/articles/11c211b5a34fb1

定期実行タスクの中で典型的な処理の一つとして、BigQueryで処理した結果をGoogle Cloud StorageやCloud Spannerに書き出すというものが挙げられます。そういった処理のとき、Mercari Dataflow Templateが便利なので紹介します。また、最後にComposerから使うときのTipsをまとめました。
https://github.com/mercari/DataflowTemplate

Dataflowのつらいところ

Dataflow[1] というのは、Google Cloudのフルマネージドでサーバーレスなデータ処理サービスです。処理のパイプラインは、Apache Beam[2] というオープンソースのSDKを使って作成して、その実行環境の一つとしてDataflowがあります。パイプラインは、Java, Python, Goなどで記述できます。

ComposerからDataflowを呼び出すのは簡単にできます。しかし、自分でDataflowのコードを書くのは結構手間で、Apache Beamの実装・テストやデバッグ方法を覚える以外にも、別のvirtualenvを作ってパッケージ管理、DockerイメージをArtifact Registryに配置するなど管理するものが増えてしまいます。また、Composerと同じプロジェクトで管理しようとすると、ComposerではPythonのバージョンが固定されており、同じPythonを使う場合は古いものに合わせることになってしまいます。[3][4]

Mercari Dataflow Templateによる解決

上記のような問題に対してMercari Dataflow Templateを使うとコードを書くことなく、JSONファイルを作成するだけで非常に多くのユースケースをカバーすることができます。データの入出力先として、Google Cloudの様々なサービスやS3、RDBなどに対応しています。必要な設定を記入してコマンドやコードからFlex Templateを起動するだけで実行できます。定期実行するタスクだけでなく、単発で実行する処理にもとても便利です。

例えば、BigQueryのクエリ結果をSpannerのテーブルに保存するための設定ファイルは、以下のように記述します。

{
  "sources": [
    {
      "name": "bigquery",
      "module": "bigquery",
      "parameters": {
        "query": "SELECT * FROM `myproject.mydataset.mytable`"
      }
    }
  ],
  "sinks": [
    {
      "name": "spanner",
      "module": "spanner",
      "input": "bigquery",
      "parameters": {
        "projectId": "myproject",
        "instanceId": "myinstance",
        "databaseId": "mydatabase",
        "table": "mytable"
      }
    }
  ]
}

コマンドから実行する場合は以下のように、事前にデプロイしておいたDataflow Flex Templateと上記JSONファイルのGCSのパスを指定するだけで可能です。

gcloud dataflow flex-template run bigquery-to-spanner \
  --template-file-gcs-location=gs://example/template \
  --parameters=config=gs://example/config.json

広いユースケースがカバーできるように、各入出力モジュールに様々な便利なオプションが用意されています。例えばstorageというモジュールは、Google Cloud StorageやAWS S3へ書き出すことができるもので、dynamicSplitFieldというオプションで指定したフィールドの値ごとにファイル名を変えることができます。その他にも、変換処理を追加できたり、実行時に設定ファイルのパラメータを渡せたり、色々な使い方ができるので、詳細は公式ドキュメントを参照ください。

Composerと一緒に使うときのTips

ComposerとMercari Dataflow Templateを一緒に使うときのtipsを紹介します。

設定ファイルの配置

Mercari Dataflow Templateを呼び出すには、JSONの設定ファイルをGCSにおく必要があります。そのためのパスをそれぞれ環境変数で設定し、配置する手順を管理すると手間が増えてしまいます。一方Composerではすでに、タスクを記述するPythonファイルを決められたGCS配置してデプロイしています。そこで、その中に一緒に設定ファイルを置くことにすれば、Composerと同じデプロイ手順にまとめることが可能です。以下のようなコマンドで、ComposerがデプロイしているGCS上のパスを取得することができます。これを環境変数にセットすることで、JSONファイルのパスを指定するのに用いることができます。

COMPOSER_DAG_GCS_PREFIX=$(gcloud --project $PROJECT composer environments describe ${COMPOSER_NAME} --location=asia-northeast1 --format=json | jq -r ".config.dagGcsPrefix")

Flex Templateを起動するOperatorを使う

ComposerからFlex Templateを起動するときには、DataflowStartFlexTemplateOperator を使いましょう。通常のコマンドにはない、Dataflowの実行完了まで待つなどのオプションが用意されています。以下のようなコードでOperatorを作成できます。

from airflow.providers.google.cloud.operators.dataflow import (
    DataflowStartFlexTemplateOperator,
)
...
body = {
    "launchParameter": {
        "jobName": "mercari-dataflow-task"
        "containerSpecGcsPath": "gs://example-bucket/template/mercari.json",
        "parameters": {
            "config": "gs://asia-northeast1-private-com-xxxxx-bucket/dags/xxxx/mercari.json",  # 上記のコマンドでプリフィックスを取得しておく
        },
        "environment": {
            "tempLocation": f"gs://example-bucket/dataflow-tmp/",
        },
    }
}
operator = DataflowStartFlexTemplateOperator(
    task_id=task_id,
    body=body,
    location="asia-northeast1",
    wait_until_finished=True,
)

デバッグのためにローカルから実行できるコードも用意しておく

JSONの設定ファイルとはいえ一発でうまく動くことはまずないため、Composerを通さずに直接実行しておくとデバッグがしやすいです。例えばBigQueryのクエリをPythonで動的に生成する場合はPythonのスクリプトで以下のように記述しておくと便利です。

dev.py
from google.cloud import dataflow
from some_module import generate_query

query = generate_query(...)
request = dataflow.LaunchFlexTemplateRequest(
    project_id="your-project-id",
    location="asia-northeast1",
    launch_parameter=dataflow.LaunchFlexTemplateParameter(
        job_name="test-config-file",
        container_spec_gcs_path="gs://example-bucket/template/mercari.json",
        parameters={
            "config": "gs://asia-northeast1-private-com-xxxxx-bucket/dags/xxxx/mercari.json",  # 上記のコマンドでプリフィックスを取得しておく
            "bigquery.query": query,
        },
        environment={
            "temp_location": "gs://example-bucket/dataflow-tmp/",
        },
    ),
)
client = dataflow.FlexTemplatesServiceClient()
resp = client.launch_flex_template(request=request)
print(resp.job.id)  # コンソールからこのJob IDのものを探す

まとめ

Mercari Dataflow Templateの紹介とComposerと使うときのTipsを紹介しました。ほとんどGoogle Cloudを使うときに限られてしまいますが、とても便利なツールです。Google Cloudの新しい機能を取り込んだり、バグ修正されたりメンテナンスもされています。以前に不具合を見つけてイシューを上げたときにも、すぐに対応していただきました。もっと使う人が増えるといいなと思っています。

脚注
  1. https://cloud.google.com/dataflow?hl=ja ↩︎

  2. https://beam.apache.org/ ↩︎

  3. それぞれで違うPythonを使うようにモノレポ構成を組むこともできますが、その場合も更に複雑な管理が必要となってしまいます。また、割と密結合な実装になるため、別々のレポジトリで管理するの大変です。 ↩︎

  4. Python3.11にアップデートされる予定でかなり改善されますが、今後は3.11のサポートが切れるまで続くのでまた古くなっていきます。 https://cloud.google.com/composer/docs/release-notes#January_31_2024 ↩︎

テラーノベル テックブログ

Discussion