😶‍🌫️

AirflowでDAGに渡すパラメータにバリデーションを適用する方法

2024/09/30に公開

概要

  • AirflowではDAGのトリガー時にパラメータを指定することができ、渡されたパラメータはDAG内で参照できます
  • DAG宣言時の引数としてparamsを指定すると、パラメータの仕様を定義できます

Paramsで指定できる項目

Paramsで指定可能な代表的な項目は以下の通りです。

  • default
    • 初期値
    • DAGがスケジュール実行されたときやパラメータを指定せずに手動実行されたときはこの初期値が使用される
  • title
    • Airflow UI上で表示されるタイトルラベル
  • description
    • Airflow UI上で表示される説明文
  • type
    • string, number, integer, boolean, array, object, nullの7種類
    • パラメータを任意項目とする場合は["null", "string"]のように記述する

使用例

例としてDAGを実行する時に以下のようなパラメータを指定できるとします。

パラメータ 説明
targetProductIds バッチジョブで処理対象となるプロダクトIDのリスト
startAt 処理対象期間の開始日時
endAt 処理対象期間の終了日時
dryRun データの書き込みなどを行わず結果を出力するにとどめるフラグ
retryCount 処理に失敗したときの最大リトライ回数

これらのパラメータは例えば以下のように定義できます。

with DAG(
    dag_id="the_dag",
    params={
        "targetProductIds": Param(
            default=None,
            type=["null", "array"],
            items={"type": "string", "format": "uuid"}
        ),
        "startAt": Param(
            default=None,
            type=["null", "string"],
            format="date-time",
            title="startAt",
        ),
        "endAt": Param(
            default=None,
            type=["null", "string"],
            format="date-time",
            title="endAt",
        ),
        "dryRun": Param(
            default=False,
            type="boolean",
        ),
        "retryCount": Param(
            default=5,
            type="integer",
            minimum=0,
            maximum=10,
        ),
    }
):

Airflow UI上で見ると以下のように表示されます。

これにより、DAGをトリガーするときに不正な値を入力できなくなります。

DAG内でパラメータを参照するには以下のようにcontextを使用します。

from airflow.operators.python import get_current_context

context = get_current_context()
trigger_parameters = context["params"]

print(trigger_parameters["targetProductIds"])
# ["ad0973d4-f611-4f26-8d32-0265ccd13c9c"]

ハマったところ

nullもしくはEnum値を指定可能なパラメータを定義したい場合、以下のように定義できます。

Params(
    default=None,
    type=["null", "string"],
    enum=[None, "A", "B", "C"]
)

ただしこれだと手動トリガー時にNoneを選択してもなぜか文字列の"None"と認識され、nullを渡すことができません。
正しい定義方法をご存知の方は教えて下さい。

Discussion