Airflowで複数Taskを1つにまとめて管理する(TaskGroup)
概要
Airflowを使っていると、いくつかのTaskを1つのまとまりとして管理し、再利用したい場面が出てきます
例えば、Sensorで入力元のテーブルにデータが存在することを確認してから、Operatorで処理を開始するパターンなどです
この場合、TaskGroup機能を使うとコードがきれいになり、Web UIでの見た目も分かりやすくなります
ここでは、TaskGroupを使ったDAGの書き方について紹介します
TaskGroupの使い方
TaskGroupはドキュメントにあるように、関数の前に@task_group
デコレータを付け、関数の中にTaskを書くと実現できます
以下の例では、task_group_example
関数に@task_group
を付け、その中にBashOperator
を2つ書いています
from datetime import datetime
from airflow.decorators import task_group
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
@task_group
def task_group_example(parameter):
first_task = BashOperator(
task_id="first_task",
bash_command=f"echo 'first task with {parameter}'"
)
second_task = BashOperator(
task_id="second_task",
bash_command=f"echo 'second task with {parameter}'"
)
first_task >> second_task
with DAG(
dag_id="task_group_example",
start_date=datetime(2024, 7, 26),
schedule="@once",
catchup=False,
) as dag:
start_task = EmptyOperator(task_id="start")
group1 = task_group_example("parameter_A")
group2 = task_group_example("parameter_B")
end_task = EmptyOperator(task_id="end")
start_task >> [group1, group2] >> end_task
DAGの定義の部分を見ると、task_group_example
をパラメーターを変えて2回呼んでいます
このようにすることで、同じTaskGroupをパラメーターを変えて再利用することが実現できています
このコードをAirflowに反映させると、Web UI上では以下のように表示されます
first_task
とsecond_task
が1つのまとまりであることが分かりやすくなっています
Web UI上のgroup名を分かりやすくする
先ほどの例で複数のTaskを1つにまとめることができましたが、UI上で1つ気になる点があります
UI上で表示されるgroup名を確認すると、片方はtask_group_example
なのに対し、もう片方はtask_group_example__1
となっています
これは、両方とも同じgroup名にしてしまうと名前が衝突してしまうため、衝突を回避するためにAirflow側で自動的に名前が変更されています
このままでも問題はないのですが、UIだけを見たときにどちらのgroupにどのパラメーターが使われているのか分かりにくいです
そこで、UI上で表示されるgroup名を分かりやすい名前に変更します
group名を変更するためには、@task_group
にgroup_id
という引数を渡します
このgroup_id
に渡した値が、UI上にそのままgroup名として表示されます
先ほどのコードを以下のように書き換えます
# import文は同じなので省略
def create_task_group_func(group_id, parameter):
@task_group(group_id=group_id)
def task_group_example():
first_task = BashOperator(
task_id="first_task",
bash_command=f"echo 'first task with {parameter}'"
)
second_task = BashOperator(
task_id="second_task",
bash_command=f"echo 'second task with {parameter}'"
)
first_task >> second_task
return task_group_example
with DAG(
dag_id="task_group_example",
start_date=datetime(2024, 7, 26),
schedule="@once",
catchup=False,
) as dag:
start_task = EmptyOperator(task_id="start")
group1 = create_task_group_func(group_id="task_group_with_parameter_A", parameter="parameter_A")()
group2 = create_task_group_func(group_id="task_group_with_parameter_B", parameter="parameter_B")()
end_task = EmptyOperator(task_id="end")
start_task >> [group1, group2] >> end_task
先ほどとの主な差分は、以下の通りです
-
@task_group
にgroup_id
を渡すようにした -
group_id
を外から渡せるようにするために、task_group_example
関数をcreate_task_group_func
でラップした
このコードをAirflowに反映させると、group名がtask_group_with_parameter_A
とtask_group_with_parameter_B
に変わり、それぞれのgroupの判別がつきやすくなりました
TaskGroup
クラスの継承)
TaskGroupを定義する別のやり方(先ほどは、関数に@task_group
デコレータを付与することでTaskGroupを実現しました
一方、デコレータではなくクラスでTaskGroupを実現する方法も存在します
クラスで実現する場合は、TaskGroup
というクラスがairflowに存在するので、これを継承したクラスを作成します
コードの例を以下に示します
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
class TaskGroupExample(TaskGroup):
def __init__(self, group_id, parameter, **kwargs):
super().__init__(group_id=group_id, **kwargs)
first_task = BashOperator(
task_id="first_task",
task_group=self,
bash_command=f"echo 'first task with {parameter}'"
)
second_task = BashOperator(
task_id="second_task",
task_group=self,
bash_command=f"echo 'second task with {parameter}'"
)
first_task >> second_task
with DAG(
dag_id="task_group_example",
start_date=datetime(2024, 7, 26),
schedule="@once",
catchup=False,
) as dag:
start_task = EmptyOperator(task_id="start")
group1 = TaskGroupExample(group_id="task_group_with_parameter_A", parameter="paramter_A")
group2 = TaskGroupExample(group_id="task_group_with_parameter_B", parameter="paramter_B")
end_task = EmptyOperator(task_id="end")
start_task >> [group1, group2] >> end_task
TaskGroup
を継承したクラスを作成し、コンストラクタに普段と同じようにTaskを書くことでTaskGroupをクラスで実現できます
注意点としては、この書き方をする際はOperatorの引数にtask_group=self
を追加してください
このコードをAirflowに反映させると、先ほどと全く同じ結果が得られます
どちらの書き方が良いのか
TaskGroupを定義する際にデコレータを使う方法と、クラスを使う方法の2種類を紹介しました
どちらを使うのが良いのでしょうか?
@task_group
のコードを見てみると、内部ではTaskGroup
クラスを使用しているという旨のコメントが見つかります
そのため、この2つのやり方で実現できることや性能に差はありません
強いて言うなら、TaskGroup
クラスはAirflow公式ドキュメントには載っていない方法なので、今後仕様変更がある可能性があります
とはいえ、AirflowのマネージドサービスであるAstronomerのドキュメントには載っているので、大幅な変更が起きる可能性は低いと考えられます
なので、どちらを使うかは使う人の好み次第です
個人的には、Operatorがクラスなので、TaskGroupもクラスで定義しておくと、OperatorとTaskGroupが似た使い方で使用できるので好きです
Discussion