😶🌫️
AirflowでDAGに渡すパラメータにバリデーションを適用する方法
概要
- AirflowではDAGのトリガー時にパラメータを指定することができ、渡されたパラメータはDAG内で参照できます
- DAG宣言時の引数としてparamsを指定すると、パラメータの仕様を定義できます
- これによりトリガー時にJSON Schemaのバリデーションを適用できます
- 詳細は公式ドキュメントを参照してください:https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html#
Paramsで指定できる項目
Paramsで指定可能な代表的な項目は以下の通りです。
- default
- 初期値
- DAGがスケジュール実行されたときやパラメータを指定せずに手動実行されたときはこの初期値が使用される
- title
- Airflow UI上で表示されるタイトルラベル
- description
- Airflow UI上で表示される説明文
- type
- 型
-
string
,number
,integer
,boolean
,array
,object
,null
の7種類 - パラメータを任意項目とする場合は
["null", "string"]
のように記述する
使用例
例としてDAGを実行する時に以下のようなパラメータを指定できるとします。
パラメータ | 説明 |
---|---|
targetProductIds | バッチジョブで処理対象となるプロダクトIDのリスト |
startAt | 処理対象期間の開始日時 |
endAt | 処理対象期間の終了日時 |
dryRun | データの書き込みなどを行わず結果を出力するにとどめるフラグ |
retryCount | 処理に失敗したときの最大リトライ回数 |
これらのパラメータは例えば以下のように定義できます。
with DAG(
dag_id="the_dag",
params={
"targetProductIds": Param(
default=None,
type=["null", "array"],
items={"type": "string", "format": "uuid"}
),
"startAt": Param(
default=None,
type=["null", "string"],
format="date-time",
title="startAt",
),
"endAt": Param(
default=None,
type=["null", "string"],
format="date-time",
title="endAt",
),
"dryRun": Param(
default=False,
type="boolean",
),
"retryCount": Param(
default=5,
type="integer",
minimum=0,
maximum=10,
),
}
):
Airflow UI上で見ると以下のように表示されます。
これにより、DAGをトリガーするときに不正な値を入力できなくなります。
DAG内でパラメータを参照するには以下のようにcontextを使用します。
from airflow.operators.python import get_current_context
context = get_current_context()
trigger_parameters = context["params"]
print(trigger_parameters["targetProductIds"])
# ["ad0973d4-f611-4f26-8d32-0265ccd13c9c"]
ハマったところ
nullもしくはEnum値を指定可能なパラメータを定義したい場合、以下のように定義できます。
Params(
default=None,
type=["null", "string"],
enum=[None, "A", "B", "C"]
)
ただしこれだと手動トリガー時にNoneを選択してもなぜか文字列の"None"と認識され、nullを渡すことができません。
正しい定義方法をご存知の方は教えて下さい。
Discussion