Airflowで複数Taskを1つにまとめて管理する(TaskGroup)
概要
Airflowを使っていると、いくつかのTaskを1つのまとまりとして管理し、再利用したい場面が出てきます
例えば、Sensorで入力元のテーブルにデータが存在することを確認してから、Operatorで処理を開始するパターンなどです
この場合、TaskGroup機能を使うとコードがきれいになり、Web UIでの見た目も分かりやすくなります
ここでは、TaskGroupを使ったDAGの書き方について紹介します
TaskGroupの使い方
TaskGroupはドキュメントにあるように、関数の前に@task_groupデコレータを付け、関数の中にTaskを書くと実現できます
以下の例では、task_group_example関数に@task_groupを付け、その中にBashOperatorを2つ書いています
from datetime import datetime
from airflow.decorators import task_group
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
@task_group
def task_group_example(parameter):
first_task = BashOperator(
task_id="first_task",
bash_command=f"echo 'first task with {parameter}'"
)
second_task = BashOperator(
task_id="second_task",
bash_command=f"echo 'second task with {parameter}'"
)
first_task >> second_task
with DAG(
dag_id="task_group_example",
start_date=datetime(2024, 7, 26),
schedule="@once",
catchup=False,
) as dag:
start_task = EmptyOperator(task_id="start")
group1 = task_group_example("parameter_A")
group2 = task_group_example("parameter_B")
end_task = EmptyOperator(task_id="end")
start_task >> [group1, group2] >> end_task
DAGの定義の部分を見ると、task_group_exampleをパラメーターを変えて2回呼んでいます
このようにすることで、同じTaskGroupをパラメーターを変えて再利用することが実現できています
このコードをAirflowに反映させると、Web UI上では以下のように表示されます

first_taskとsecond_taskが1つのまとまりであることが分かりやすくなっています
Web UI上のgroup名を分かりやすくする
先ほどの例で複数のTaskを1つにまとめることができましたが、UI上で1つ気になる点があります
UI上で表示されるgroup名を確認すると、片方はtask_group_exampleなのに対し、もう片方はtask_group_example__1となっています

これは、両方とも同じgroup名にしてしまうと名前が衝突してしまうため、衝突を回避するためにAirflow側で自動的に名前が変更されています
このままでも問題はないのですが、UIだけを見たときにどちらのgroupにどのパラメーターが使われているのか分かりにくいです
そこで、UI上で表示されるgroup名を分かりやすい名前に変更します
group名を変更するためには、@task_groupにgroup_idという引数を渡します
このgroup_idに渡した値が、UI上にそのままgroup名として表示されます
先ほどのコードを以下のように書き換えます
# import文は同じなので省略
def create_task_group_func(group_id, parameter):
@task_group(group_id=group_id)
def task_group_example():
first_task = BashOperator(
task_id="first_task",
bash_command=f"echo 'first task with {parameter}'"
)
second_task = BashOperator(
task_id="second_task",
bash_command=f"echo 'second task with {parameter}'"
)
first_task >> second_task
return task_group_example
with DAG(
dag_id="task_group_example",
start_date=datetime(2024, 7, 26),
schedule="@once",
catchup=False,
) as dag:
start_task = EmptyOperator(task_id="start")
group1 = create_task_group_func(group_id="task_group_with_parameter_A", parameter="parameter_A")()
group2 = create_task_group_func(group_id="task_group_with_parameter_B", parameter="parameter_B")()
end_task = EmptyOperator(task_id="end")
start_task >> [group1, group2] >> end_task
先ほどとの主な差分は、以下の通りです
-
@task_groupにgroup_idを渡すようにした -
group_idを外から渡せるようにするために、task_group_example関数をcreate_task_group_funcでラップした
このコードをAirflowに反映させると、group名がtask_group_with_parameter_Aとtask_group_with_parameter_Bに変わり、それぞれのgroupの判別がつきやすくなりました

TaskGroupを定義する別のやり方(TaskGroupクラスの継承)
先ほどは、関数に@task_groupデコレータを付与することでTaskGroupを実現しました
一方、デコレータではなくクラスでTaskGroupを実現する方法も存在します
クラスで実現する場合は、TaskGroupというクラスがairflowに存在するので、これを継承したクラスを作成します
コードの例を以下に示します
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
class TaskGroupExample(TaskGroup):
def __init__(self, group_id, parameter, **kwargs):
super().__init__(group_id=group_id, **kwargs)
first_task = BashOperator(
task_id="first_task",
task_group=self,
bash_command=f"echo 'first task with {parameter}'"
)
second_task = BashOperator(
task_id="second_task",
task_group=self,
bash_command=f"echo 'second task with {parameter}'"
)
first_task >> second_task
with DAG(
dag_id="task_group_example",
start_date=datetime(2024, 7, 26),
schedule="@once",
catchup=False,
) as dag:
start_task = EmptyOperator(task_id="start")
group1 = TaskGroupExample(group_id="task_group_with_parameter_A", parameter="paramter_A")
group2 = TaskGroupExample(group_id="task_group_with_parameter_B", parameter="paramter_B")
end_task = EmptyOperator(task_id="end")
start_task >> [group1, group2] >> end_task
TaskGroupを継承したクラスを作成し、コンストラクタに普段と同じようにTaskを書くことでTaskGroupをクラスで実現できます
注意点としては、この書き方をする際はOperatorの引数にtask_group=selfを追加してください
このコードをAirflowに反映させると、先ほどと全く同じ結果が得られます

どちらの書き方が良いのか
TaskGroupを定義する際にデコレータを使う方法と、クラスを使う方法の2種類を紹介しました
どちらを使うのが良いのでしょうか?
@task_groupのコードを見てみると、内部ではTaskGroupクラスを使用しているという旨のコメントが見つかります
そのため、この2つのやり方で実現できることや性能に差はありません
強いて言うなら、TaskGroupクラスはAirflow公式ドキュメントには載っていない方法なので、今後仕様変更がある可能性があります
とはいえ、AirflowのマネージドサービスであるAstronomerのドキュメントには載っているので、大幅な変更が起きる可能性は低いと考えられます
なので、どちらを使うかは使う人の好み次第です
個人的には、Operatorがクラスなので、TaskGroupもクラスで定義しておくと、OperatorとTaskGroupが似た使い方で使用できるので好きです
Discussion