🌊

Airflowの新機能Airflow Datasetsによる新しいデータリネージ

2023/12/13に公開

この記事は、Money Forward Engineering 2 Advent Calendar 2023 13日目の投稿です。

12日目は acha さんで PG BATTLE 2023に参加しました でした。

本日は私が「AirflowのDatasets機能」について書いていきたいと思います。

はじめに

初めまして、CTO室分析基盤部のnakamoriです。

私は22新卒でこの分析基盤部に配属されてから1半年、データ分析基盤の開発と運用を行っています。本部署のETL基盤にもAirflow(Cloud Composer)を使っており、使っているバージョンのEOLに怯えながら日々を過ごしています。

今回は、Airflow2.4.0で新しく登場したDatasets機能によるData-aware schedulingについて紹介していきます。

想定する読者

この記事が想定する読者は以下の通りです。

  • Airflowで多くのテーブルを処理しているが、依存関係が複雑になってしまっている方
  • マートテーブルをAirflowで作成しており、DAGをシンプルにしたい方

Datasets機能の基本

Datasetとは何か?

Datasets機能を通じて、Airflowはデータのロジカルなグルーピングを「Dataset」という変数で管理できるようになりました。この変更により、データの依存関係をより明確にし、スケジューリングを効率的に行うことが可能です。

公式ドキュメントを載せておきます。詳しくはそちらをご覧ください。
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html

使い方

Dataset機能を使用するには最初にDatasetを定義してください。

example_dataset = Dataset("s3://dataset/example.csv")

Datasetの中には任意の文字列を入力でき、これが識別子となります。一部入力できない文字があるので詳しくは公式のぺージを閲覧してください。

定義できたらETLタスクにDatasetを仕込んでいきます。

上流の方(producer)は、タスクにoutletsを定義します。

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_dataset], ...)

そうすることで、このタスクで指定されたDatasetが更新されたことをairflowに通知することができます。

下流の方(consumer)では、DAG自体にscheduleを定義します。これによってどのDatasetが更新された時にDAGをtriggerするか指定することができます。

with DAG(dag_id="consumer", schedule=[example_dataset], ...):
    ...

outletsとscheduleはそれぞれ配列型になっており、複数指定することができます。複数指定した場合は、outletsは一度に複数の通知を配信し、scheduleは全てのDatasetを監視します。

Datasets機能のポイント

ポイント1: 任意の文字列が使用可能

Datasetは内部でのみ使用される単なるフラグであり、外部のデータソースに直接作用するわけではありません。例えば、先ほどの例でDataset("s3://dataset-bucket/example.csv")と指定しても、これはS3のファイルに何かをしているわけではないのです。

with DAG(...):
    MyOperator(
        # this task updates example.csv
        outlets=[Dataset("s3://dataset-bucket/example.csv")],
        ...,
    )

このコードは、単にexample.csvファイルが更新されたときに関連するタスクがトリガーされることを意味しています。つまり、Datasetはデータソースの種類に関わらずどんなことでも記述することができます。
Bigqueryだとこんな感じです。

with DAG(...):
    MyOperator(
        # this task updates a table in bigquery
        outlets=[Dataset("BigQuery:project.dataset.sample_table")],
        ...,
    )

ポイント2: 全てのDatasetが更新されるまでトリガーされない

scheduleに複数のDatasetが登録されている場合、全てが更新されるまでDAGの更新は行われません。これにより、上流のエラーが全ての下流DAGの停止を引き起こし、上流の修正が下流にも伝播するため、データの鮮度が明確になります。

cf. multiple-datasets

従来の依存関係設定との比較

「従来の">>"やTaskFlowを使った依存関係の設定で事足りる」と考える方もいるでしょう。しかし、依存関係が張り巡らされるようになってくるとコードが煩雑になりやすいということは一つの問題でした。Datasets機能を活用すると一つのファイルにまとめる必要がなくなり、このような複雑な依存関係も容易に管理できるようになります。

従来の形式


テーブルが増えていくごとにコードがどんどん肥大していく

Dataset機能による新しい形式


依存関係を気にせずファイルをどんどん増やすことができる

レイクとマートの依存関係の簡素化

dbtの登場により、マートテーブル間のリネージは容易になりましたが、レイクとマートの間の依存関係は依然として複雑でした。Datasets機能を使用することで、レイクとマートの間のリネージを簡単に作成できるようになります。これにより、データエンジニアとアナリティクスエンジニアの間の分業がより効果的に行えるようになります。

最後に

いかがでしたでしょうか?新しいDatasets機能により、AirflowでのETLプロセスがより効率的かつ簡単になりました。ノーコードツールの台頭にもかかわらず、Airflowは依然として重要な役割を果たしています。これからもAirflowの発展に期待しております。

--------

今日は親友の誕生日なのでここで祝わせてください。
いつもありがとう。エンジニアとしてお互い頑張ろうぜ。

Discussion