🪲
Cloud ComposerのDAGからデータをバケットに書き出すには
↑のような「DAGによって生成された設定ファイルで他のDAGを作成する」ユースケースだと、DAGからデータ(設定ファイル)をバケットに書き出す処理が必要となります。
Apache Airflowの公式イメージ(例えばapache/airflow:2.2.5-python3.8
)を利用し、ボリュームのマウントが正しく設定されていれば、データはちゃんと同期されるのでそもそも何の問題もありません。
volumes:
- ./dags:/opt/airflow/dags
- ./config:/opt/airflow/config
ただし、Cloud Composerの場合は挙動がやや異なります。
- マウントされているのはディレクトリ
gs://{COMPOSER_BUKET_NAME}/data
のみ-
docker-compose.yml
的な書き方で書くと、/home/airflow/gcs/data:gs://{COMPOSER_BUKET_NAME}/data
になる - つまり全DAGが格納されている
/home/airflow/gcs/dags/
配下に書き出されたデータは同期されず
-
- また、
/home/airflow/gcs/data
にアクセスできるのはworkerのみ ドキュメント- schedulerあるいはweb serverからはアクセスできず
そのため、ローカルのAirflowでDAGが問題なく実行できたのに、Cloud Composerに上げると設定ファイルが存在しないというバグが発生し、下流のDAGに影響が出ます。
調べた解決法を共有します。
- 設定ファイルを
/home/airflow/gcs/data
に書き出す -
GCSHook
でgs://{COMPOSER_BUKET_NAME}/data
に書き出された設定ファイルをgs://{COMPOSER_BUKET_NAME}/dags
にコピーする
例:
import json
from airflow.providers.google.cloud.hooks.gcs import GCSHook
def write_setting_file():
gcs_hook = GCSHook()
path = "/home/airflow/gcs/data/{SETTING_FILE}.yml"
source_object = "data/{SETTING_FILE}.yml"
destination_object = "dags/{SETTING_FILE}.yml"
with open(path, "w") as f:
f.write(json.dumps('{"item1":"hogehoge", "item2":"bar"}'))
gcs_hook.copy(
source_bucket="{COMPOSER_BUKET_NAME}",
source_object=source_object,
destination_object=destination_object,
)
gs://{COMPOSER_BUKET_NAME}/dags
に設定ファイルがコピーされているので、下流のDAGも問題なく生成されます。
AirflowのDAGを開発する際にコストを抑えるため、開発環境ではCloud Composerを使わないのが一般的ですが、Cloud Composerは上記のような固有設定があり、開発と本番環境が一部異なる構成になってしまうため、バグがたまに発生します。
幸い2023年1月にComposerのローカルAirflow環境がリリースされたので、コストを抑えつつ開発と本番環境の構成を一本化することができるようになりました。
実際に試してみて今度紹介したいと思います。
Discussion