💠

Airflowで複数Taskを1つにまとめて管理する(TaskGroup)

2024/07/28に公開

概要

Airflowを使っていると、いくつかのTaskを1つのまとまりとして管理し、再利用したい場面が出てきます
例えば、Sensorで入力元のテーブルにデータが存在することを確認してから、Operatorで処理を開始するパターンなどです
この場合、TaskGroup機能を使うとコードがきれいになり、Web UIでの見た目も分かりやすくなります
ここでは、TaskGroupを使ったDAGの書き方について紹介します

TaskGroupの使い方

TaskGroupはドキュメントにあるように、関数の前に@task_groupデコレータを付け、関数の中にTaskを書くと実現できます

以下の例では、task_group_example関数に@task_groupを付け、その中にBashOperatorを2つ書いています

from datetime import datetime

from airflow.decorators import task_group
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

@task_group
def task_group_example(parameter):
    first_task = BashOperator(
        task_id="first_task",
        bash_command=f"echo 'first task with {parameter}'"
    )
    second_task = BashOperator(
        task_id="second_task",
        bash_command=f"echo 'second task with {parameter}'"
    )
    first_task >> second_task

with DAG(
    dag_id="task_group_example",
    start_date=datetime(2024, 7, 26),
    schedule="@once",
    catchup=False,
) as dag:
    start_task = EmptyOperator(task_id="start")
    group1 = task_group_example("parameter_A")
    group2 = task_group_example("parameter_B")
    end_task = EmptyOperator(task_id="end")

    start_task >> [group1, group2] >> end_task

DAGの定義の部分を見ると、task_group_exampleをパラメーターを変えて2回呼んでいます
このようにすることで、同じTaskGroupをパラメーターを変えて再利用することが実現できています

このコードをAirflowに反映させると、Web UI上では以下のように表示されます

first_tasksecond_taskが1つのまとまりであることが分かりやすくなっています

Web UI上のgroup名を分かりやすくする

先ほどの例で複数のTaskを1つにまとめることができましたが、UI上で1つ気になる点があります
UI上で表示されるgroup名を確認すると、片方はtask_group_exampleなのに対し、もう片方はtask_group_example__1となっています

これは、両方とも同じgroup名にしてしまうと名前が衝突してしまうため、衝突を回避するためにAirflow側で自動的に名前が変更されています

このままでも問題はないのですが、UIだけを見たときにどちらのgroupにどのパラメーターが使われているのか分かりにくいです
そこで、UI上で表示されるgroup名を分かりやすい名前に変更します

group名を変更するためには、@task_groupgroup_idという引数を渡します
このgroup_idに渡した値が、UI上にそのままgroup名として表示されます
先ほどのコードを以下のように書き換えます

# import文は同じなので省略

def create_task_group_func(group_id, parameter):
    @task_group(group_id=group_id)
    def task_group_example():
        first_task = BashOperator(
            task_id="first_task",
            bash_command=f"echo 'first task with {parameter}'"
        )
        second_task = BashOperator(
            task_id="second_task",
            bash_command=f"echo 'second task with {parameter}'"
        )
        first_task >> second_task
    
    return task_group_example

with DAG(
    dag_id="task_group_example",
    start_date=datetime(2024, 7, 26),
    schedule="@once",
    catchup=False,
) as dag:
    start_task = EmptyOperator(task_id="start")
    group1 = create_task_group_func(group_id="task_group_with_parameter_A", parameter="parameter_A")()
    group2 = create_task_group_func(group_id="task_group_with_parameter_B", parameter="parameter_B")()
    end_task = EmptyOperator(task_id="end")

    start_task >> [group1, group2] >> end_task

先ほどとの主な差分は、以下の通りです

  • @task_groupgroup_idを渡すようにした
  • group_idを外から渡せるようにするために、task_group_example関数をcreate_task_group_funcでラップした

このコードをAirflowに反映させると、group名がtask_group_with_parameter_Atask_group_with_parameter_Bに変わり、それぞれのgroupの判別がつきやすくなりました

TaskGroupを定義する別のやり方(TaskGroupクラスの継承)

先ほどは、関数に@task_groupデコレータを付与することでTaskGroupを実現しました
一方、デコレータではなくクラスでTaskGroupを実現する方法も存在します
クラスで実現する場合は、TaskGroupというクラスがairflowに存在するので、これを継承したクラスを作成します
コードの例を以下に示します

from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup


class TaskGroupExample(TaskGroup):
    def __init__(self, group_id, parameter, **kwargs):
        super().__init__(group_id=group_id, **kwargs)

        first_task = BashOperator(
            task_id="first_task",
            task_group=self,
            bash_command=f"echo 'first task with {parameter}'"
        )
        second_task = BashOperator(
            task_id="second_task",
            task_group=self,
            bash_command=f"echo 'second task with {parameter}'"
        )
        first_task >> second_task      

with DAG(
    dag_id="task_group_example",
    start_date=datetime(2024, 7, 26),
    schedule="@once",
    catchup=False,
) as dag:
    start_task = EmptyOperator(task_id="start")
    group1 = TaskGroupExample(group_id="task_group_with_parameter_A", parameter="paramter_A")
    group2 = TaskGroupExample(group_id="task_group_with_parameter_B", parameter="paramter_B")
    end_task = EmptyOperator(task_id="end")

    start_task >> [group1, group2] >> end_task

TaskGroupを継承したクラスを作成し、コンストラクタに普段と同じようにTaskを書くことでTaskGroupをクラスで実現できます
注意点としては、この書き方をする際はOperatorの引数にtask_group=selfを追加してください

このコードをAirflowに反映させると、先ほどと全く同じ結果が得られます

どちらの書き方が良いのか

TaskGroupを定義する際にデコレータを使う方法と、クラスを使う方法の2種類を紹介しました
どちらを使うのが良いのでしょうか?

@task_groupのコードを見てみると、内部ではTaskGroupクラスを使用しているという旨のコメントが見つかります
https://github.com/apache/airflow/blob/2.9.3/airflow/decorators/task_group.py#L200-L202
そのため、この2つのやり方で実現できることや性能に差はありません
強いて言うなら、TaskGroupクラスはAirflow公式ドキュメントには載っていない方法なので、今後仕様変更がある可能性があります
とはいえ、AirflowのマネージドサービスであるAstronomerのドキュメントには載っているので、大幅な変更が起きる可能性は低いと考えられます

なので、どちらを使うかは使う人の好み次第です
個人的には、Operatorがクラスなので、TaskGroupもクラスで定義しておくと、OperatorとTaskGroupが似た使い方で使用できるので好きです

Discussion