🦔

Airflowでのジョブ作成方法の紹介

2021/09/13に公開

はじめに

環境

  • WSL2: Ver20
  • Helm: v3.6.1
  • Airflow: ver2

前回の記事

前回の記事では、Node.js以外でのジョブ実行の方法を紹介しました。今回はその続きで、一般的なジョブの作成方法の紹介をしようと思います。

記事の内容

Airflowでは、ジョブをpythonで記述でき、依存関係をDAGとしてかけます。今回は以下の二種類の紹介をしようと思います。

  • Taskflow APIを用いたDAGの書き方
  • 動的にDAGを作成する方法

今回の記事で紹介したコードは以下のレポジトリにあります。
https://github.com/shohta-tera/workflows

TL;DR

Taskflow APIを用いたジョブの書き方は、従来のAirflowでのDAGより記述は楽になった。

Taskflow APIについて

Taskflow APIとはAirflow 2.0から導入された新しいDAGの記述方法です。 @dag@taskを用いてDAGやTaskを記述できます。ただ、この記述方式はPythonOperatorを使用する場合のみで、ほかのBashOperatorなどでは記載方法は従来のままです。

DAGの書き方

DAGの初期化

import json

import boto3
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = {"owner": "admin"}


# dag configuration
@dag(
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(1),
    tags=["sample-dag"],
)
# DAG name
def sample_job():

まず必要モジュールがあれば、インストールとインポートしておきます。
前回までの記事のようにKubernetesのHelmでAirflowをインストールした場合は、Helm Chartのところで、requirements.txtを更新して、再デプロイする必要があります。
DAGの設定項目は@dagで記述します。以下の項目はよく使用するものです。

  • default_args: リトライ回数などを設定できます。
  • schedule_interval: DAGの実行間隔です。Noneなら手動実行で、datetime.timedelta等で定義できます。
  • start_date: catchupと組み合わせて、前日から今までの分のタスク実行などできます。
  • tags: UIに表示して、選択できるようになるTagです。
  • catchup: Trueにすると、start_dateから今までのDAGをschedule_intervalにしたがって実行します。

関数名をDAGの名前としてAirflowは認識します。

Taskの定義

@task(
    executor_config={
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="airflow-worker",
                                resources=k8s.V1ResourceRequirements(
                                    limits={"cpu": "1000m", "memory": "1Gi"},
                                    requests={"cpu": "250m", "memory": "256Mi"},
                                ),
                            )
                        ]
                    )
                )
            }
)
def create_large_data():
    data = {}
    data_string = "{'1001': 301, '10027': 201, '1003': 502.22}"
    for i in range(1000000):
        data[i] = json.loads(data_string)

    return data

TaskのDecorator内で、KubernetesのRequest, Limitの値をそれぞれ設定します。AirflowのConfigurationファイル内では、上記で指定しなかった場合のデフォルト値を設定できますが、ここではDAG内のTaskごとに設定できます。
Taskで返り値として値を返すことができ、後続のTaskで使用できます。

DAGの依存関係に記載

data = create_large_data()
save_to_storage(data)

Task間の依存性の設定はここで行います。また、個別のタスクはKubernetesのPodが起動され別個の環境で実行されます。

DAGを動的に生成するDAG

REST APIの取得結果からテンプレートを使ってDAGを自動で生成したいなどに使えます。こうすることで、DAGを手動で作成することなくREST APIのレスポンスの変化に追従してDAGを作成することができます。

動的にDAGを生成するDAG

create_dag.py
from generating_dag_dynamically.template_job import templated

response = get_all_response(headers, 0)
"""
response = [
    {
        "id": uuid,
        "name": "sample_name",
        "frequency": 600
    }
]
"""
for record in response:
    res_id = record["id"]
    schedule = timedelta(record["frequency"])
    name = record["name"]
    dag_id = f"Sample_dag_{id}_{name}"
    globals()[dag_id] = templated.sample_job(
        dag_id, schedule, default_args, res_id, name
    )

必要な箇所だけ抜き取って来ました。ここで必要となるのが、まずテンプレートとなるDAGをImportしています。このテンプレートでは、返り値としてdagオブジェクトを返すものになっています。
get_all_response部分で、動的に生成するためのレスポンスを取得しています。ここでは、REST APIのResponseとしていますが、基本的にはGraphQLのResponseやSQL, NoSQLからのデータ取得などなんでも大丈夫です。
for文以下で、各レコードごとの処理をしています。
globals()[dag_id]部分で実際のDAG作成処理をしています。さらにここで、レコードの内容で分岐して別のテンプレートを用いてDAGを生成するってことも可能です。

テンプレートとなるDAG

テンプレートとなるDAGは基本的には、上記で紹介した部分で可能です。
ただ、筆者の環境では、DAGの定義部分だけ、TaskflowAPIでの記載がうまく行かなかったので、従来の記述方法で行っています。

def sample_job(dag_id, schedule, default_args, res_id, name):
    with DAG(
        dag_id=dag_id,
        default_args={"owner": "admin"},
        schedule_interval=schedule,
        start_date=days_ago(1),
        tags=[name, "sample", "tempalted"],
        catchup=True,
    ) as dag:

書き方は基本的には変わらないです。最終的にこの関数の最後にreturn dagでDAGを返してください。

注意点など

あとはAirflowのSchedulerが定期的(デフォルト:30秒)ごとに上記のDAGを実行し、動的にDAGを生成します。
注意点としては、以下の点があげられます。

  • DAGの生成の処理時間がしきい値を超えると、Errorとなる。
    • 環境変数AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVALをデフォルトの5分より長くすることで対処可能。ただし、新しいDAGファイルの検出のリアルタイム性が低下する。
  • 例のデータ取得の部分でサーバーの不具合などで正常なレスポンスが得られず、DAGの生成がうまく行かないと生成されたDAGすべてが動作しなくなる。
    • データ取得の部分が再度成功するとDAGも実行可能になる。

おわりに

以上でTaskflow APIと動的にDAGを生成するDAGの紹介でした。
ローカルで用意する環境だけでなく、AWSやGCPなどで展開されているManagedのAirflowに関してもVer2になっており、Taskflow APIはDAGの記述方法が以前と比べて格段に簡単になったので、今後学習する際には、Taskflow API前提で進めるのがいいと思います。
DAGを動的に生成するDAGについては、なにかしらのレスポンス、レコードを元にDAGを動的に生成できるので、非常に便利で同じテンプレートのジョブを使用するけど、ジョブの実行間隔がレコードごとに異なる場合、あまり選択肢はなく手動で作成するとかになってくるので、こうした手法は有用だと思います。
現状のプロジェクトで動的にDAGを生成するDAGを使ったりしているのですが、前述の注意する点に対しての、いい対処法があまり見つかっていないので、どなたか知見のある方がいたらコメント等で指摘してほしいです。

GitHubで編集を提案

Discussion