Dynamic Task mappingで動的な Airflow DAGを作る
Airflow2.3で「Dynamic Task Mapping」なる面白い機能が追加されたので、その紹介です。
tl;dr
- Task数が実行時に決まる、動的なDAGが欲しい時もあるよ
- Airflow2.3でDynamic Task Mappingという機能がリリースされたよ。動的に変わるDAGを定義できるよ
- expand・XComArgの組み合わせで簡単に実装できるよ
背景
DAGを動的に定義したい
Task数が実行時に決まる、同じような処理(Task)を複数行いたい場合があります。例えば、以下のようなケースです
- マルチテナントの基盤で、各クライアント事に処理を行いたい
- 大まかな流れが同じデータの連携が複数種類ある
- 広告のAPIからクリック・コンバージョンの情報をそれぞれ取りたいとか
- 違うハイパーパラメータ・モデルで機械学習の学習・評価したい
(このような、ある瞬間と別の時間で構成するTaskの数が違うDAGを、以下「動的なDAG」と呼びます)
このような場合の需要はあるようで、インターネット上にも「Airflowでどうすんの」質問・記事が複数あります。
- Airflow公式
- Astronomer
- Stackoverflow 1, 2
DAGの動的な定義はこれまでも可能
Airflowはタスク・タスクの依存関係(DAG)の両方をPythonのコードとして記述します。
この特徴を使うと、これまで(2.3以前)でも動的なDAGを定義することが技術的には可能です。例えば、以下のようなコードです。
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash import BashOperator
with DAG(dag_id="conventional_dynamic_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
# Variableが変わると、DAGの構造も変わる
for i in range(int(Variable.get('conventional_dynamic_mapping'))):
consumer = BashOperator(
task_id=f"consumer_{i}",
do_xcom_push=False,
bash_command=f"echo {i}"
)
Graph Viewで見てもTask Instanceが複数作成されています。
Airflow Variables(conventional_dynamic_mapping)を変えると、DAGに含まれるTaskも変わります(2->3に変更)。
DAGの動的な定義は取り扱い注意
先ほどは「技術的には可能」とニュアンスを置きました。それは以下のような問題があり、取り扱いに注意が必要なためです。
- DAGのTaskでない部分は、Taskの実行場所(worker)とは別の場所(scheduler,webserver)で、DAGの評価時(not 実行時)に評価される可能性があります。そのため、以下のような点に気を付ける必要があります
- 外部サービスにアクセスする場合、DAGの評価の頻度でアクセスしても大丈夫か
- 重たい処理を行っていないか(workerと違ってschedulerは一つ)
- 動的な部分で外部リソースを使う時はworker以外からもアクセスできるか(IPアドレス許可や環境変数)
- DAGが変化してTaskが削除した時の挙動に、特別な注意が必要です
- 依存関係(dependency_on_pastなど)
- Airflow UIから見えなくなる
- 下図は、conventional_dynamic_mappingの繰り返しを一つにした場合のGraph Viewです。consumer_2が消えています(メタデータデータベースには残ってます)。
Dynamic Task Mapping
言うなれば「可能だが自己責任で」感じだった動的なDAGを、Airflowでサポートしようとする試みが、2022年4月にAirflow 2.3で追加されたDynamic Task Mapping(AIP-42)です。
概要
上述な動的なDAGの問題は、Taskでない部分で処理を行っていることが原因です。(Taskでない部分で動くから頻繁に動く、Task管理の仕組みに乗りにくい)。
そこで、Dynamic Task Mappingでは、
- 上流(upstream)のTaskで、動的なデータ(複数)を作成
- 下流(downstream)のTaskで、作成されたデータをループ
することで、Taskでない部分の処理に依存せず、DAG Run毎にTaskの数を変化させます。
DAGの実装としては、以下の二つの設定を行います。
- 上流のTaskで繰り返したい内容(リスト等)を返す(XComに入れる)
- 下流のTaskで繰り返す要素(リストの中身)を受け取るパラメータをexpandで指定
なお、expandは静的な値を受け取ることもできます
例
(Airflow 2.3.3rc3で試しました)
シンプルな例
上流(make_list)で後続のTaskが処理するリストを設定し、下流(consumer)でそのリストの要素をそれぞれ受け取り処理します。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
@task
def make_list():
# 下流のためにIterable(list)を返すTask
# このTaskの結果はXCom経由に保存されます
return [1, 2, 3]
@task
def consumer(elm):
# 上流のTaskの結果リストの要素がそれぞれ受け取るTask
print(elm)
@task
def reducer(lst):
# 上流のTaskの結果リストの要素をまとめて受け取るTask
print(sum(lst))
with DAG(dag_id="taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
# 1. make_list Taskを実行
# 2. make_listの結果をXComに保存
# 3. make_listの結果で下流のTaskを実行
# 3.1 要素それぞれで、consumerを実行
# 3.2 要素(リスト全体)でreducerを実行
lst = make_list()
consumer.expand(elm=lst)
reducer(lst)
Graph Viewで見るとconsumer Taskの表示に、繰り返し回数([3])が付与されてます。
動的に変化するTask(consumer)をクリックすると、「Map Index」が表示され、情報を表示する実行を選択することができます。
(Task全体でclear等することも可能)
consumerの1を選び、ログを見ると、上流(make_list)で設定されたリスト([1,2,3])の要素が表示されています。
[2022-07-17, 08:41:42 UTC] {logging_mixin.py:115} INFO - 1
consumerの2は、また別の要素が表示されています(3も同様)。
[2022-07-17, 08:41:42 UTC] {logging_mixin.py:115} INFO - 2
make_listが返す値は単なるXComの値ですので、通常のTaskflow APIと同様に、後続のTaskで処理することもできます。上記のコード例では、reducerは(consumerとは異なり)make_listの値全体を受け取りその総和を計算します。
[2022-07-17, 08:41:42 UTC] {logging_mixin.py:115} INFO - 6
繰り返し件数を変えるとどうなる
Dynamic Task Mapping以前の実装で問題だった、Taskの数が減った時の挙動を試してみます。
make_listが返すリストの内容を変えます。
@task
def make_list():
# もともと[1,2,3]
return [1]
新しく実行したDAG Runでは、consumerの表記が1に変わっています(元は3)。
変更前のDAG RunのGraph Viewは3のままです。期待通りの挙動ですね。
繰り返しTaskがカスケードする場合
下図のような、Dynamic Task Mappingで変化させるTaskをカスケードさせたDAGも可能です。
コード的には、
- expandしたTask(consumer)を、別のTask(consumer2)のexpandで指定
- expandしたTask(consumer)を、別のTask(reducer2)で(expandで指定せずに)受け取る
だけです。前者は上流のTaskが返した値をそれぞれ、後者は上流のTaskが返した価をまとめて受け取ります。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
@task
def make_list():
# 下流のためにIterable(list)を返すTask
# このTaskの結果はXCom経由に保存され、下流でループに使われる
return [1, 2, 3]
@task
def consumer(elm):
# 上流のTaskの結果リストの要素がそれぞれ受け取るTask
print(elm)
return elm
@task
def consumer2(elm):
# 上流のTaskの結果リストの要素がそれぞれ受け取るTask
print(f'Hey {elm}')
return elm
@task
def reducer2(lst):
# 上流のTaskの結果リストの要素をまとめて受け取るTask
print(sum(lst))
@task
def reducer(lst):
# 上流のTaskの結果リストの要素をまとめて受け取るTask
print(sum(lst))
with DAG(dag_id="taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
lst = make_list()
consumer = consumer.expand(elm=lst)
consumed = consumer2.expand(elm=consumer)
reducer2(consumed)
reducer(lst)
余談ですが、
- xcom_pullにmap_indexesという引数が追加され、どの繰り返しのTaskからXComを取るかを区別できるようになっています
- reduce2のTaskが受け取る値はlistではなく、「_LazyXComAccess」という別の型です
(TaskFlowでない)他のOperatorの例
先ほどの例では、Airflow2系で導入されたTaskflow APIです。従来の記法(Operatorの指定)でも、
- 上流のデータで変化させないパラメータをpartialで指定
- 上流のデータで変化させるパラメータをexpandとXComArgで指定
することで、Dynamic Task Mappingを使うことが可能です。
ただし、PythonOperatorでop_argsを指定する場合は注意が必要です(後述)・
from datetime import datetime
from airflow import DAG
from airflow import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
with DAG(dag_id="non_taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
lst = PythonOperator(
task_id="make_list",
python_callable=lambda: ['echo 1', 'echo 2']
)
consumer = BashOperator. \
partial(task_id="consumer", do_xcom_push=False). \
expand(bash_command=XComArg(lst))
reducer = PythonOperator(
task_id="reducer",
python_callable=lambda ti: print("commands=" + str(ti.xcom_pull(task_ids='make_list'))),
)
lst >> reducer
TaskFlow API同様にGraph Viewに件数が表示されます。
consumer(1)のログを見ても、上流(make_list)の結果の一つが渡されていることがわかります。
[2022-07-17, 22:29:18 UTC] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'echo 1']
[2022-07-17, 22:29:18 UTC] {subprocess.py:85} INFO - Output:
[2022-07-17, 22:29:18 UTC] {subprocess.py:92} INFO - 1
注意点
従来の動的なDAGに比べて、Airflowの仕組みに載せることが出来るDynamic Task Mappingですが、完璧な上位互換ではなく、いくつか気にした方が良い点もあります。
従来の動的なDAGでの実装が必要な場合もあるかもしれません。
XCom
XComでやり取りするという仕組みになっている関係で、XComの制限に影響されます。例えば、
- サイズ制限
- Serializableなデータか否か
などに注意です。
前述のmake_listを少し変え、大きなリストを保存してみます。
# (他の部分は省略)
@task
def make_list():
# 下流のためにIterable(list)を返すTask
# このTaskの結果はXCom経由に保存され、下流でループに使われる
return [i for i in range(1000000000)]
DAG Runは失敗し、ログにはSIGKILLが表示されます。
[2022-07-18, 03:39:09 UTC] {local_task_job.py:156} INFO - Task exited with return code Negsignal.SIGKILL
単純なループ以外の動的なDAG
「Mapping」という名前の通り、単純なループ的(Map-Reduce的)な処理のみ実装できます。
従来の動的なDAGでは、
- ループ以外の変化(本番だけあるTaskに依存させるとか)
- 複雑なループ(多重ループ等)
も可能ですが、Dynamic Task Mappingでは(現状)対応できません。
expandで指定するパラメータ
Dynamic Task Mappingではexpandで変化を受け取るパラメータを指定しますが、expandで指定できないパラメータもあります。(「For example, task_id, pool, and many BaseOperator arguments are not mappable.」)
例えば、paramsパラメータは指定できません(「schedulerでも使っている部分があるため」とのこと)。
iterableを受け取るパラメータをexpandで指定
expand対象のパラメータがiterable(listとか)の場合も注意が必要です。
例えば、(Taskflow APIではない)PythonOperatorを使いたい時、直感系には下のようにすると思います。
from datetime import datetime
from airflow import DAG
from airflow import XComArg
from airflow.operators.python_operator import PythonOperator
with DAG(dag_id="non_taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
lst = PythonOperator(
task_id="make_list",
python_callable=lambda: [10, 20],
)
consumer_prepare_cmd = PythonOperator.partial(
task_id="do_something",
python_callable=lambda x: print(x),
).expand(
op_args=XComArg(lst)
)
これは、
- op_argsは引数のiterableを受け取る
- XComArgはmake_listの結果をそれぞれ[10, 20]渡す
- 10や20はiterableでない
ため、エラーになります。
TypeError: object of type 'int' has no len()
PythonOperatorに限れば下のような回避策があると思いますが、ちょっとしっくりしていないので(特にPythonOperator以外だとどうするのか)、何か情報ご存じの方はコメントいただけると嬉しいです。
- TaskFlow APIを使う
- op_kwargsを使う(下の例)
-
[10, 20]
ではなく['10', '20']
で渡し(op_args=['1', '0']になります)、python_callable側で数値にする
from datetime import datetime
from airflow import DAG
from airflow import XComArg
from airflow.operators.python_operator import PythonOperator
with DAG(dag_id="non_taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
lst = PythonOperator(
task_id="make_list",
python_callable=lambda: [{'x': 10}, {'x': 20}],
)
consumer_prepare_cmd = PythonOperator.partial(
task_id="do_something",
python_callable=lambda x: print(x),
).expand(
op_kwargs=XComArg(lst)
)
(余談)他のワークフローエンジン
ちなみに、Prefectというワークフローエンジンには似た機能があります。
from prefect import Flow, task
@task
def add_ten(x):
return x + 10
with Flow('iterated map') as flow:
mapped_result = add_ten.map([1, 2, 3])
mapped_result_2 = add_ten.map(mapped_result)
Discussion