🥑

Dynamic Task mappingで動的な Airflow DAGを作る

2022/07/18に公開

Airflow2.3で「Dynamic Task Mapping」なる面白い機能が追加されたので、その紹介です。

tl;dr

  • Task数が実行時に決まる、動的なDAGが欲しい時もあるよ
  • Airflow2.3でDynamic Task Mappingという機能がリリースされたよ。動的に変わるDAGを定義できるよ
  • expand・XComArgの組み合わせで簡単に実装できるよ

背景

DAGを動的に定義したい

Task数が実行時に決まる、同じような処理(Task)を複数行いたい場合があります。例えば、以下のようなケースです

  • マルチテナントの基盤で、各クライアント事に処理を行いたい
  • 大まかな流れが同じデータの連携が複数種類ある
    • 広告のAPIからクリック・コンバージョンの情報をそれぞれ取りたいとか
  • 違うハイパーパラメータ・モデルで機械学習の学習・評価したい

(このような、ある瞬間と別の時間で構成するTaskの数が違うDAGを、以下「動的なDAG」と呼びます)

このような場合の需要はあるようで、インターネット上にも「Airflowでどうすんの」質問・記事が複数あります。

DAGの動的な定義はこれまでも可能

Airflowはタスク・タスクの依存関係(DAG)の両方をPythonのコードとして記述します。

この特徴を使うと、これまで(2.3以前)でも動的なDAGを定義することが技術的には可能です。例えば、以下のようなコードです。

from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash import BashOperator


with DAG(dag_id="conventional_dynamic_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
    # Variableが変わると、DAGの構造も変わる
    for i in range(int(Variable.get('conventional_dynamic_mapping'))):
        consumer = BashOperator(
            task_id=f"consumer_{i}",
            do_xcom_push=False,
            bash_command=f"echo {i}"
        )

Graph Viewで見てもTask Instanceが複数作成されています。

Airflow Variables(conventional_dynamic_mapping)を変えると、DAGに含まれるTaskも変わります(2->3に変更)。

DAGの動的な定義は取り扱い注意

先ほどは「技術的には可能」とニュアンスを置きました。それは以下のような問題があり、取り扱いに注意が必要なためです。

  • DAGのTaskでない部分は、Taskの実行場所(worker)とは別の場所(scheduler,webserver)で、DAGの評価時(not 実行時)に評価される可能性があります。そのため、以下のような点に気を付ける必要があります
    • 外部サービスにアクセスする場合、DAGの評価の頻度でアクセスしても大丈夫か
    • 重たい処理を行っていないか(workerと違ってschedulerは一つ)
    • 動的な部分で外部リソースを使う時はworker以外からもアクセスできるか(IPアドレス許可や環境変数)
  • DAGが変化してTaskが削除した時の挙動に、特別な注意が必要です
    • 依存関係(dependency_on_pastなど)
    • Airflow UIから見えなくなる
      • 下図は、conventional_dynamic_mappingの繰り返しを一つにした場合のGraph Viewです。consumer_2が消えています(メタデータデータベースには残ってます)。

Dynamic Task Mapping

言うなれば「可能だが自己責任で」感じだった動的なDAGを、Airflowでサポートしようとする試みが、2022年4月にAirflow 2.3で追加されたDynamic Task Mapping(AIP-42)です。

概要

上述な動的なDAGの問題は、Taskでない部分で処理を行っていることが原因です。(Taskでない部分で動くから頻繁に動く、Task管理の仕組みに乗りにくい)。

そこで、Dynamic Task Mappingでは、

  • 上流(upstream)のTaskで、動的なデータ(複数)を作成
  • 下流(downstream)のTaskで、作成されたデータをループ

することで、Taskでない部分の処理に依存せず、DAG Run毎にTaskの数を変化させます。

DAGの実装としては、以下の二つの設定を行います。

  • 上流のTaskで繰り返したい内容(リスト等)を返す(XComに入れる)
  • 下流のTaskで繰り返す要素(リストの中身)を受け取るパラメータをexpandで指定

なお、expandは静的な値を受け取ることもできます

(Airflow 2.3.3rc3で試しました)

シンプルな例

上流(make_list)で後続のTaskが処理するリストを設定し、下流(consumer)でそのリストの要素をそれぞれ受け取り処理します。

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

@task
def make_list():
    # 下流のためにIterable(list)を返すTask
    # このTaskの結果はXCom経由に保存されます
    return [1, 2, 3]

@task
def consumer(elm):
    # 上流のTaskの結果リストの要素がそれぞれ受け取るTask
    print(elm)

@task
def reducer(lst):
    # 上流のTaskの結果リストの要素をまとめて受け取るTask
    print(sum(lst))


with DAG(dag_id="taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
    # 1. make_list Taskを実行
    # 2. make_listの結果をXComに保存
    # 3. make_listの結果で下流のTaskを実行
    # 3.1 要素それぞれで、consumerを実行
    # 3.2 要素(リスト全体)でreducerを実行
    lst = make_list()
    consumer.expand(elm=lst)
    reducer(lst)

Graph Viewで見るとconsumer Taskの表示に、繰り返し回数([3])が付与されてます。

動的に変化するTask(consumer)をクリックすると、「Map Index」が表示され、情報を表示する実行を選択することができます。
(Task全体でclear等することも可能)

consumerの1を選び、ログを見ると、上流(make_list)で設定されたリスト([1,2,3])の要素が表示されています。

[2022-07-17, 08:41:42 UTC] {logging_mixin.py:115} INFO - 1

consumerの2は、また別の要素が表示されています(3も同様)。

[2022-07-17, 08:41:42 UTC] {logging_mixin.py:115} INFO - 2

make_listが返す値は単なるXComの値ですので、通常のTaskflow APIと同様に、後続のTaskで処理することもできます。上記のコード例では、reducerは(consumerとは異なり)make_listの値全体を受け取りその総和を計算します。

[2022-07-17, 08:41:42 UTC] {logging_mixin.py:115} INFO - 6

繰り返し件数を変えるとどうなる

Dynamic Task Mapping以前の実装で問題だった、Taskの数が減った時の挙動を試してみます。

make_listが返すリストの内容を変えます。

@task
def make_list():
    # もともと[1,2,3]
    return [1]

新しく実行したDAG Runでは、consumerの表記が1に変わっています(元は3)。

変更前のDAG RunのGraph Viewは3のままです。期待通りの挙動ですね。

繰り返しTaskがカスケードする場合

下図のような、Dynamic Task Mappingで変化させるTaskをカスケードさせたDAGも可能です。

コード的には、

  • expandしたTask(consumer)を、別のTask(consumer2)のexpandで指定
  • expandしたTask(consumer)を、別のTask(reducer2)で(expandで指定せずに)受け取る

だけです。前者は上流のTaskが返した値をそれぞれ、後者は上流のTaskが返した価をまとめて受け取ります。

from datetime import datetime
from airflow import DAG
from airflow.decorators import task

@task
def make_list():
    # 下流のためにIterable(list)を返すTask
    # このTaskの結果はXCom経由に保存され、下流でループに使われる
    return [1, 2, 3]

@task
def consumer(elm):
    # 上流のTaskの結果リストの要素がそれぞれ受け取るTask
    print(elm)
    return elm

@task
def consumer2(elm):
    # 上流のTaskの結果リストの要素がそれぞれ受け取るTask
    print(f'Hey {elm}')
    return elm

@task
def reducer2(lst):
    # 上流のTaskの結果リストの要素をまとめて受け取るTask
    print(sum(lst))

@task
def reducer(lst):
    # 上流のTaskの結果リストの要素をまとめて受け取るTask
    print(sum(lst))

with DAG(dag_id="taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
    lst = make_list()
    consumer = consumer.expand(elm=lst)
    consumed = consumer2.expand(elm=consumer)
    reducer2(consumed)
    reducer(lst)

余談ですが、

  • xcom_pullにmap_indexesという引数が追加され、どの繰り返しのTaskからXComを取るかを区別できるようになっています
  • reduce2のTaskが受け取る値はlistではなく、「_LazyXComAccess」という別の型です

(TaskFlowでない)他のOperatorの例

先ほどの例では、Airflow2系で導入されたTaskflow APIです。従来の記法(Operatorの指定)でも、

  • 上流のデータで変化させないパラメータをpartialで指定
  • 上流のデータで変化させるパラメータをexpandとXComArgで指定

することで、Dynamic Task Mappingを使うことが可能です。

ただし、PythonOperatorでop_argsを指定する場合は注意が必要です(後述)・

from datetime import datetime
from airflow import DAG
from airflow import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

with DAG(dag_id="non_taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
    lst = PythonOperator(
        task_id="make_list",
        python_callable=lambda: ['echo 1', 'echo 2']
    )
    consumer = BashOperator. \
        partial(task_id="consumer", do_xcom_push=False). \
        expand(bash_command=XComArg(lst))

    reducer = PythonOperator(
        task_id="reducer",
        python_callable=lambda ti: print("commands=" + str(ti.xcom_pull(task_ids='make_list'))),
    )

    lst >> reducer

TaskFlow API同様にGraph Viewに件数が表示されます。

consumer(1)のログを見ても、上流(make_list)の結果の一つが渡されていることがわかります。

[2022-07-17, 22:29:18 UTC] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'echo 1']
[2022-07-17, 22:29:18 UTC] {subprocess.py:85} INFO - Output:
[2022-07-17, 22:29:18 UTC] {subprocess.py:92} INFO - 1

注意点

従来の動的なDAGに比べて、Airflowの仕組みに載せることが出来るDynamic Task Mappingですが、完璧な上位互換ではなく、いくつか気にした方が良い点もあります。
従来の動的なDAGでの実装が必要な場合もあるかもしれません。

XCom

XComでやり取りするという仕組みになっている関係で、XComの制限に影響されます。例えば、

  • サイズ制限
  • Serializableなデータか否か

などに注意です。

前述のmake_listを少し変え、大きなリストを保存してみます。

# (他の部分は省略)
@task
def make_list():
    # 下流のためにIterable(list)を返すTask
    # このTaskの結果はXCom経由に保存され、下流でループに使われる
    return [i for i in range(1000000000)]

DAG Runは失敗し、ログにはSIGKILLが表示されます。

[2022-07-18, 03:39:09 UTC] {local_task_job.py:156} INFO - Task exited with return code Negsignal.SIGKILL

単純なループ以外の動的なDAG

「Mapping」という名前の通り、単純なループ的(Map-Reduce的)な処理のみ実装できます。

従来の動的なDAGでは、

  • ループ以外の変化(本番だけあるTaskに依存させるとか)
  • 複雑なループ(多重ループ等)

も可能ですが、Dynamic Task Mappingでは(現状)対応できません。

expandで指定するパラメータ

Dynamic Task Mappingではexpandで変化を受け取るパラメータを指定しますが、expandで指定できないパラメータもあります。(「For example, task_id, pool, and many BaseOperator arguments are not mappable.」)

例えば、paramsパラメータは指定できません(「schedulerでも使っている部分があるため」とのこと)。

iterableを受け取るパラメータをexpandで指定

expand対象のパラメータがiterable(listとか)の場合も注意が必要です。
例えば、(Taskflow APIではない)PythonOperatorを使いたい時、直感系には下のようにすると思います。

from datetime import datetime

from airflow import DAG
from airflow import XComArg
from airflow.operators.python_operator import PythonOperator

with DAG(dag_id="non_taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
    lst = PythonOperator(
        task_id="make_list",
        python_callable=lambda: [10, 20],
    )

    consumer_prepare_cmd = PythonOperator.partial(
        task_id="do_something",
        python_callable=lambda x: print(x),
    ).expand(
       op_args=XComArg(lst)
    )

これは、

  • op_argsは引数のiterableを受け取る
  • XComArgはmake_listの結果をそれぞれ[10, 20]渡す
  • 10や20はiterableでない

ため、エラーになります。

TypeError: object of type 'int' has no len()

PythonOperatorに限れば下のような回避策があると思いますが、ちょっとしっくりしていないので(特にPythonOperator以外だとどうするのか)、何か情報ご存じの方はコメントいただけると嬉しいです。

  • TaskFlow APIを使う
  • op_kwargsを使う(下の例)
  • [10, 20]ではなく['10', '20']で渡し(op_args=['1', '0']になります)、python_callable側で数値にする
from datetime import datetime

from airflow import DAG
from airflow import XComArg
from airflow.operators.python_operator import PythonOperator

with DAG(dag_id="non_taskflow_mapping", start_date=datetime(2022, 7, 2), catchup=False) as dag:
    lst = PythonOperator(
        task_id="make_list",
        python_callable=lambda: [{'x': 10}, {'x': 20}],
    )

    consumer_prepare_cmd = PythonOperator.partial(
        task_id="do_something",
        python_callable=lambda x: print(x),
    ).expand(
       op_kwargs=XComArg(lst)
    )

(余談)他のワークフローエンジン

ちなみに、Prefectというワークフローエンジンには似た機能があります

from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

with Flow('iterated map') as flow:
    mapped_result = add_ten.map([1, 2, 3])
    mapped_result_2 = add_ten.map(mapped_result)

Discussion