Airflowでのジョブ作成方法の紹介
はじめに
環境
- WSL2: Ver20
- Helm: v3.6.1
- Airflow: ver2
前回の記事
前回の記事では、Node.js以外でのジョブ実行の方法を紹介しました。今回はその続きで、一般的なジョブの作成方法の紹介をしようと思います。
記事の内容
Airflowでは、ジョブをpythonで記述でき、依存関係をDAGとしてかけます。今回は以下の二種類の紹介をしようと思います。
- Taskflow APIを用いたDAGの書き方
- 動的にDAGを作成する方法
今回の記事で紹介したコードは以下のレポジトリにあります。
TL;DR
Taskflow APIを用いたジョブの書き方は、従来のAirflowでのDAGより記述は楽になった。
Taskflow APIについて
Taskflow APIとはAirflow 2.0から導入された新しいDAGの記述方法です。 @dag
や@task
を用いてDAGやTaskを記述できます。ただ、この記述方式はPythonOperator
を使用する場合のみで、ほかのBashOperator
などでは記載方法は従来のままです。
DAGの書き方
DAGの初期化
import json
import boto3
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {"owner": "admin"}
# dag configuration
@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=["sample-dag"],
)
# DAG name
def sample_job():
まず必要モジュールがあれば、インストールとインポートしておきます。
前回までの記事のようにKubernetesのHelmでAirflowをインストールした場合は、Helm Chartのところで、requirements.txtを更新して、再デプロイする必要があります。
DAGの設定項目は@dag
で記述します。以下の項目はよく使用するものです。
- default_args: リトライ回数などを設定できます。
- schedule_interval: DAGの実行間隔です。
None
なら手動実行で、datetime.timedelta等で定義できます。 - start_date: catchupと組み合わせて、前日から今までの分のタスク実行などできます。
- tags: UIに表示して、選択できるようになるTagです。
- catchup: Trueにすると、start_dateから今までのDAGを
schedule_interval
にしたがって実行します。
関数名をDAGの名前としてAirflowは認識します。
Taskの定義
@task(
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="airflow-worker",
resources=k8s.V1ResourceRequirements(
limits={"cpu": "1000m", "memory": "1Gi"},
requests={"cpu": "250m", "memory": "256Mi"},
),
)
]
)
)
}
)
def create_large_data():
data = {}
data_string = "{'1001': 301, '10027': 201, '1003': 502.22}"
for i in range(1000000):
data[i] = json.loads(data_string)
return data
TaskのDecorator内で、KubernetesのRequest, Limitの値をそれぞれ設定します。AirflowのConfigurationファイル内では、上記で指定しなかった場合のデフォルト値を設定できますが、ここではDAG内のTaskごとに設定できます。
Taskで返り値として値を返すことができ、後続のTaskで使用できます。
DAGの依存関係に記載
data = create_large_data()
save_to_storage(data)
Task間の依存性の設定はここで行います。また、個別のタスクはKubernetesのPodが起動され別個の環境で実行されます。
DAGを動的に生成するDAG
REST APIの取得結果からテンプレートを使ってDAGを自動で生成したいなどに使えます。こうすることで、DAGを手動で作成することなくREST APIのレスポンスの変化に追従してDAGを作成することができます。
動的にDAGを生成するDAG
from generating_dag_dynamically.template_job import templated
response = get_all_response(headers, 0)
"""
response = [
{
"id": uuid,
"name": "sample_name",
"frequency": 600
}
]
"""
for record in response:
res_id = record["id"]
schedule = timedelta(record["frequency"])
name = record["name"]
dag_id = f"Sample_dag_{id}_{name}"
globals()[dag_id] = templated.sample_job(
dag_id, schedule, default_args, res_id, name
)
必要な箇所だけ抜き取って来ました。ここで必要となるのが、まずテンプレートとなるDAGをImportしています。このテンプレートでは、返り値としてdag
オブジェクトを返すものになっています。
get_all_response
部分で、動的に生成するためのレスポンスを取得しています。ここでは、REST APIのResponseとしていますが、基本的にはGraphQLのResponseやSQL, NoSQLからのデータ取得などなんでも大丈夫です。
for
文以下で、各レコードごとの処理をしています。
globals()[dag_id]
部分で実際のDAG作成処理をしています。さらにここで、レコードの内容で分岐して別のテンプレートを用いてDAGを生成するってことも可能です。
テンプレートとなるDAG
テンプレートとなるDAGは基本的には、上記で紹介した部分で可能です。
ただ、筆者の環境では、DAGの定義部分だけ、TaskflowAPIでの記載がうまく行かなかったので、従来の記述方法で行っています。
def sample_job(dag_id, schedule, default_args, res_id, name):
with DAG(
dag_id=dag_id,
default_args={"owner": "admin"},
schedule_interval=schedule,
start_date=days_ago(1),
tags=[name, "sample", "tempalted"],
catchup=True,
) as dag:
書き方は基本的には変わらないです。最終的にこの関数の最後にreturn dag
でDAGを返してください。
注意点など
あとはAirflowのSchedulerが定期的(デフォルト:30秒)ごとに上記のDAGを実行し、動的にDAGを生成します。
注意点としては、以下の点があげられます。
- DAGの生成の処理時間がしきい値を超えると、Errorとなる。
- 環境変数
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
をデフォルトの5分より長くすることで対処可能。ただし、新しいDAGファイルの検出のリアルタイム性が低下する。
- 環境変数
- 例のデータ取得の部分でサーバーの不具合などで正常なレスポンスが得られず、DAGの生成がうまく行かないと生成されたDAGすべてが動作しなくなる。
- データ取得の部分が再度成功するとDAGも実行可能になる。
おわりに
以上でTaskflow APIと動的にDAGを生成するDAGの紹介でした。
ローカルで用意する環境だけでなく、AWSやGCPなどで展開されているManagedのAirflowに関してもVer2になっており、Taskflow APIはDAGの記述方法が以前と比べて格段に簡単になったので、今後学習する際には、Taskflow API前提で進めるのがいいと思います。
DAGを動的に生成するDAGについては、なにかしらのレスポンス、レコードを元にDAGを動的に生成できるので、非常に便利で同じテンプレートのジョブを使用するけど、ジョブの実行間隔がレコードごとに異なる場合、あまり選択肢はなく手動で作成するとかになってくるので、こうした手法は有用だと思います。
現状のプロジェクトで動的にDAGを生成するDAGを使ったりしているのですが、前述の注意する点に対しての、いい対処法があまり見つかっていないので、どなたか知見のある方がいたらコメント等で指摘してほしいです。
Discussion