🪲

Cloud ComposerのDAGからデータをバケットに書き出すには

2023/02/08に公開

https://zenn.dev/jcc/articles/c8482049249914#bigquery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない
↑のような「DAGによって生成された設定ファイルで他のDAGを作成する」ユースケースだと、DAGからデータ(設定ファイル)をバケットに書き出す処理が必要となります。

Apache Airflowの公式イメージ(例えばapache/airflow:2.2.5-python3.8)を利用し、ボリュームのマウントが正しく設定されていれば、データはちゃんと同期されるのでそもそも何の問題もありません。

volumes:
    - ./dags:/opt/airflow/dags
    - ./config:/opt/airflow/config

ただし、Cloud Composerの場合は挙動がやや異なります。

  • マウントされているのはディレクトリgs://{COMPOSER_BUKET_NAME}/dataのみ
    • docker-compose.yml的な書き方で書くと、 /home/airflow/gcs/data:gs://{COMPOSER_BUKET_NAME}/dataになる
    • つまり全DAGが格納されている/home/airflow/gcs/dags/配下に書き出されたデータは同期されず
  • また、/home/airflow/gcs/dataにアクセスできるのはworkerのみ ドキュメント
    • schedulerあるいはweb serverからはアクセスできず

そのため、ローカルのAirflowでDAGが問題なく実行できたのに、Cloud Composerに上げると設定ファイルが存在しないというバグが発生し、下流のDAGに影響が出ます。

調べた解決法を共有します。

  1. 設定ファイルを/home/airflow/gcs/dataに書き出す
  2. GCSHookgs://{COMPOSER_BUKET_NAME}/dataに書き出された設定ファイルをgs://{COMPOSER_BUKET_NAME}/dagsにコピーする

例:

import json
from airflow.providers.google.cloud.hooks.gcs import GCSHook


def write_setting_file():
    gcs_hook = GCSHook()
    path = "/home/airflow/gcs/data/{SETTING_FILE}.yml"
    source_object = "data/{SETTING_FILE}.yml"
    destination_object = "dags/{SETTING_FILE}.yml"

    with open(path, "w") as f:
        f.write(json.dumps('{"item1":"hogehoge", "item2":"bar"}'))

    gcs_hook.copy(
        source_bucket="{COMPOSER_BUKET_NAME}",
        source_object=source_object,
        destination_object=destination_object,
    )

gs://{COMPOSER_BUKET_NAME}/dagsに設定ファイルがコピーされているので、下流のDAGも問題なく生成されます。

AirflowのDAGを開発する際にコストを抑えるため、開発環境ではCloud Composerを使わないのが一般的ですが、Cloud Composerは上記のような固有設定があり、開発と本番環境が一部異なる構成になってしまうため、バグがたまに発生します。
幸い2023年1月にComposerのローカルAirflow環境がリリースされたので、コストを抑えつつ開発と本番環境の構成を一本化することができるようになりました。
https://cloud.google.com/composer/docs/composer-2/run-local-airflow-environments
実際に試してみて今度紹介したいと思います。

Discussion