AirflowのTaskGroupはいくつまでネストできるのか
概要
AirflowのTaskGroup機能を使ったネタ記事です
何の役にも立ちません
TaskGroupのネスト
あまり使う機会はありませんが、TaskGroupはネストさせることができます
以前書いたTaskGroupを継承するやり方を使う場合、以下のようなコードを書くことでネストさせることができます
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
class ParentTaskGroup(TaskGroup):
def __init__(self, group_id, **kwargs):
super().__init__(group_id=group_id, **kwargs)
t1 = EmptyOperator(task_id="task1", task_group=self)
child = ChildTaskGroup(group_id="child", parent_group=self)
t2 = EmptyOperator(task_id="task2", task_group=self)
t1 >> child >> t2
class ChildTaskGroup(TaskGroup):
def __init__(self, group_id, **kwargs):
super().__init__(group_id=group_id, **kwargs)
t1 = EmptyOperator(task_id="child_task1", task_group=self)
t2 = EmptyOperator(task_id="child_task2", task_group=self)
t1 >> t2
with DAG(
dag_id="nest_example",
start_date=datetime(2024, 11, 22),
schedule="@once",
catchup=False,
) as dag:
ParentTaskGroup(group_id="parent")
このコードをAirflowに反映させた結果が以下の画像です
parent
というグループの下にchild
というグループが表示されており、ネストされていることがわかります
ネストの限界を探す
検証の準備
ネストが可能と聞くとエンジニアとしては「じゃあいくつまでネストできるのか?」というのが気になります
そこで以下のようなコードで検証します
# import文は先ほどと同じなので省略
MAX_DEPTH = 3
class NestTaskGroup(TaskGroup):
def __init__(self, group_id, depth, **kwargs):
super().__init__(group_id=group_id, **kwargs)
if depth == MAX_DEPTH:
EmptyOperator(task_id="task", task_group=self)
else:
NestTaskGroup(group_id=f"g{depth + 1}", depth=depth + 1, parent_group=self)
with DAG(
dag_id="nest_max_test",
start_date=datetime(2024, 7, 26),
schedule="@once",
catchup=False,
) as dag:
NestTaskGroup(group_id="g1", depth=1)
NestTaskGroup
というクラスを作り、コンストラクタの引数としてdepth
を受け取るようにします
コンストラクタではdepth
を増やしながら再帰的にNestTaskGroup
を呼び出し、depth
がMAX_DEPTH
に到達した際は再帰呼び出しを終了する形にしています
このMAX_DEPTH
の値を変えながら、いくつまでTaskGroupをネストすることができるのか検証します
検証環境
Pythonのバージョン: 3.10.13
Airflowのバージョン: 2.9.3
検証結果
MAX_DEPTH=64
にしたところ、以下のエラーが出てDAGが表示できなくなってしまいました
webserver | airflow.exceptions.AirflowException: The key has to be less than 250 characters
TaskGroupを使用した場合、各taskのtask_idは{group_id}.{task_id}
という形になります
ネストした場合は、ネストした分だけgroup_idが長くなっていきます
今回だとgroup_id=f"g{depth + 1}"
としているため、MAX_DEPTH=64
の際のtask_idは以下のようになっていました
g1.g2.g3.g4.g5.g6.g7.g8.g9.g10.g11.g12.g13.g14.g15.g16.g17.g18.g19.g20.g21.g22.g23.g24.g25.g26.g27.g28.g29.g30.g31.g32.g33.g34.g35.g36.g37.g38.g39.g40.g41.g42.g43.g44.g45.g46.g47.g48.g49.g50.g51.g52.g53.g54.g55.g56.g57.g58.g59.g60.g61.g62.g63.g64.task
このtask_idがAirflow内部で設定されている上限(250文字)に到達したため、The key has to be less than 250 characters
というエラーが出てしまったようです
普通にAirflowを使っていればtask_idが250文字に到達することはないので、妥当な上限ですね
最後に、エラーが起きないギリギリであるMAX_DEPTH=63
の際のDAGの画像を貼っておしまいにしようと思います
Discussion