📚

AirflowのTaskGroupはいくつまでネストできるのか

2024/11/23に公開

概要

AirflowのTaskGroup機能を使ったネタ記事です
何の役にも立ちません

TaskGroupのネスト

あまり使う機会はありませんが、TaskGroupはネストさせることができます
以前書いたTaskGroupを継承するやり方を使う場合、以下のようなコードを書くことでネストさせることができます

from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup


class ParentTaskGroup(TaskGroup):
    def __init__(self, group_id, **kwargs):
        super().__init__(group_id=group_id, **kwargs)
        t1 = EmptyOperator(task_id="task1", task_group=self)
        child = ChildTaskGroup(group_id="child", parent_group=self)
        t2 = EmptyOperator(task_id="task2", task_group=self)
        t1 >> child >> t2
        
class ChildTaskGroup(TaskGroup):
    def __init__(self, group_id, **kwargs):
        super().__init__(group_id=group_id, **kwargs)
        t1 = EmptyOperator(task_id="child_task1", task_group=self)
        t2 = EmptyOperator(task_id="child_task2", task_group=self)
        t1 >> t2

with DAG(
    dag_id="nest_example",
    start_date=datetime(2024, 11, 22),
    schedule="@once",
    catchup=False,
) as dag:
    ParentTaskGroup(group_id="parent")

このコードをAirflowに反映させた結果が以下の画像です
parentというグループの下にchildというグループが表示されており、ネストされていることがわかります

ネストの限界を探す

検証の準備

ネストが可能と聞くとエンジニアとしては「じゃあいくつまでネストできるのか?」というのが気になります
そこで以下のようなコードで検証します

# import文は先ほどと同じなので省略

MAX_DEPTH = 3

class NestTaskGroup(TaskGroup):
    def __init__(self, group_id, depth, **kwargs):
        super().__init__(group_id=group_id, **kwargs)
        if depth == MAX_DEPTH:
            EmptyOperator(task_id="task", task_group=self)
        else:
            NestTaskGroup(group_id=f"g{depth + 1}", depth=depth + 1, parent_group=self)
        
with DAG(
    dag_id="nest_max_test",
    start_date=datetime(2024, 7, 26),
    schedule="@once",
    catchup=False,
) as dag:
    NestTaskGroup(group_id="g1", depth=1)

NestTaskGroupというクラスを作り、コンストラクタの引数としてdepthを受け取るようにします
コンストラクタではdepthを増やしながら再帰的にNestTaskGroupを呼び出し、depthMAX_DEPTHに到達した際は再帰呼び出しを終了する形にしています
このMAX_DEPTHの値を変えながら、いくつまでTaskGroupをネストすることができるのか検証します

検証環境

Pythonのバージョン: 3.10.13
Airflowのバージョン: 2.9.3

検証結果

MAX_DEPTH=64にしたところ、以下のエラーが出てDAGが表示できなくなってしまいました

webserver  | airflow.exceptions.AirflowException: The key has to be less than 250 characters

TaskGroupを使用した場合、各taskのtask_idは{group_id}.{task_id}という形になります
ネストした場合は、ネストした分だけgroup_idが長くなっていきます
今回だとgroup_id=f"g{depth + 1}"としているため、MAX_DEPTH=64の際のtask_idは以下のようになっていました

g1.g2.g3.g4.g5.g6.g7.g8.g9.g10.g11.g12.g13.g14.g15.g16.g17.g18.g19.g20.g21.g22.g23.g24.g25.g26.g27.g28.g29.g30.g31.g32.g33.g34.g35.g36.g37.g38.g39.g40.g41.g42.g43.g44.g45.g46.g47.g48.g49.g50.g51.g52.g53.g54.g55.g56.g57.g58.g59.g60.g61.g62.g63.g64.task

このtask_idがAirflow内部で設定されている上限(250文字)に到達したため、The key has to be less than 250 charactersというエラーが出てしまったようです
普通にAirflowを使っていればtask_idが250文字に到達することはないので、妥当な上限ですね

最後に、エラーが起きないギリギリであるMAX_DEPTH=63の際のDAGの画像を貼っておしまいにしようと思います

Discussion