🐈

Airflowで異なるDAGとの間に依存関係を設定する

2022/01/30に公開

はじめに

環境

  • WSL2: Ubuntu 20
  • Helm: v3.6.1
  • Airflow: ver2

前回の記事

前回の記事では、Airflowでのジョブの作成方法の紹介を行いました。今回は、DAG間での依存関係の設定方法について紹介していきます。

記事の内容

前回の紹介の記事では、ある処理が成功したのを確認した後に、次の処理を実施するようなユースケースにおいて依存関係を設定することでタスクの同期処理を定義できました。(例: DBからデータを取得して、csv化してAWS S3に保存するなど。)
APIからデータを取ってくるジョブを定義したいときなどに、DAGを細かく保ちたいので、データソースごとにDAGを複数作成するケースが出てきます。このようにDAGが分割された場合では、別のDAGで定義したタスクが成功したのを確認してから、DAGを実行したいケースには上記手法は適応できません。そこで、今回紹介するのはExternalTaskSensorを用いて、DAGの依存関係を別DAGにまで広げる手法を紹介します。
今回の記事で紹介したコードは以下のレポジトリにあります。
https://github.com/shohta-tera/workflows/tree/main/src/dags/task_dependency

TL;DR

DAG間でのタスクの依存関係を設定するには、ExternalTaskSensorを使用する。

DAGの設定

今回設定するDAGとして、DAG_Aのタスク(alpha)が成功するのを確認してからDAG_Bのタスク(beta)を実行する。とします。

DAG_A

実行するタスクとしては非常にシンプルで、ただprintしているだけです。

from logging import INFO, basicConfig, getLogger

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from kubernetes.client import models as k8s

basicConfig(level=INFO)
logger = getLogger(__name__)
default_args = {"owner": "admin", "retries": 1}


# dag configuration
@dag(
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(1),
    tags=["sample-dag"],
)
# DAG name
def DAG_A():
    @task(
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="airflow-worker",
                            resources=k8s.V1ResourceRequirements(
                                limits={"cpu": "1000m", "memory": "1Gi"},
                                requests={"cpu": "250m", "memory": "256Mi"},
                            ),
                        )
                    ]
                )
            )
        }
    )
    def alpha():
        print("test")

    alpha()


dag = DAG_A()

DAG_B

TaskFlowSensorの設定

    @provide_session
    def _get_execution_date_of_task_a(exec_date, session=None, **kwargs):
        dag_last_run = get_last_dagrun("DAG_A", session)
        return dag_last_run.execution_date
    
    task_a_sensor = ExternalTaskSensor(
        task_id = "alpha_sensor",
        external_dag_id="DAG_A",
        external_task_id="alpha",
        allowed_states=["success"],
        execution_date_fn=_get_execution_date_of_task_a
    )

ExternalTaskSensorの設定する項目について、説明していきます。

  • task_id: ExternalTaskSensorの実際のタスク名
  • external_dag_id: 依存関係を見るDAG名
  • external_task_id: 上記のDAGの中で状態を監視したいタスクを指定する
  • allowed_states: 指定したステータスのときのみ後続のタスクを実行する
  • execution_date_fn: 今回は最新に実行されたDAGの状態を監視するための関数を指定しました。独自に設定できます。関数を作成せず、指定しない場合は、execution_deltaで、datetime.timedeltaで指定した差分の時間に実行されたDAGを見ます。

TaskFlowAPIにおける依存関係の設定方法

    beta_task = beta()
    task_a_sensor >> beta_task

TaskFlowAPIを使用すれば、簡単にタスク定義を作成することはできますが、今回のように、ExternalTaskSensorを組み合わせる際には、定義の仕方に注意が必要です。
一旦タスクを変数として持って、従来のAirflowであったような>>を用いて依存関係を設定します。

DAGの全体像

上記のものを組み合わせて、DAG_AのTaskalphasuccessの場合のみタスクを実行するDAGは以下のとおりです。

from logging import INFO, basicConfig, getLogger

from airflow.decorators import dag, task
from airflow.models.dag import get_last_dagrun
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago
from airflow.utils.session import provide_session
from kubernetes.client import models as k8s

basicConfig(level=INFO)
logger = getLogger(__name__)
default_args = {"owner": "admin", "retries": 1}


# dag configuration
@dag(
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(1),
    tags=["sample-dag"],
)
# DAG name
def DAG_B():
    @provide_session
    def _get_execution_date_of_task_a(exec_date, session=None, **kwargs):
        dag_last_run = get_last_dagrun(
            "DAG_A", session, include_externally_triggered=True
        )
        return dag_last_run.execution_date

    task_a_sensor = ExternalTaskSensor(
        task_id="alpha_sensor",
        external_dag_id="DAG_A",
        external_task_id="alpha",
        allowed_states=["success"],
        execution_date_fn=_get_execution_date_of_task_a,
    )

    @task(
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="airflow-worker",
                            resources=k8s.V1ResourceRequirements(
                                limits={"cpu": "1000m", "memory": "1Gi"},
                                requests={"cpu": "250m", "memory": "256Mi"},
                            ),
                        )
                    ]
                )
            )
        }
    )
    def beta():
        print("test")

    beta_task = beta()
    task_a_sensor >> beta_task


dag = DAG_B()

DAG_Bの詳細


上記図は、DAG_Bの結果となっており、alpha_sensorという名前のタスクでDAG_Aのalphaのタスクの結果を見ています。

alpha_sensorのログ

[2022-01-30 14:30:44,994] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG_B.alpha_sensor 2022-01-30T14:30:21.760391+00:00 [queued]>
[2022-01-30 14:30:45,014] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG_B.alpha_sensor 2022-01-30T14:30:21.760391+00:00 [queued]>
[2022-01-30 14:30:45,015] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2022-01-30 14:30:45,015] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2022-01-30 14:30:45,015] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2022-01-30 14:30:45,024] {taskinstance.py:1089} INFO - Executing <Task(ExternalTaskSensor): alpha_sensor> on 2022-01-30T14:30:21.760391+00:00
[2022-01-30 14:30:45,029] {standard_task_runner.py:52} INFO - Started process 32 to run task
[2022-01-30 14:30:45,035] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'DAG_B', 'alpha_sensor', '2022-01-30T14:30:21.760391+00:00', '--job-id', '13', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/git_workflow/task_dependency/dag_b.py', '--cfg-path', '/tmp/tmpe_gfo6vn', '--error-file', '/tmp/tmpk3_bt8g7']
[2022-01-30 14:30:45,036] {standard_task_runner.py:77} INFO - Job 13: Subtask alpha_sensor
[2022-01-30 14:30:45,111] {logging_mixin.py:104} INFO - Running <TaskInstance: DAG_B.alpha_sensor 2022-01-30T14:30:21.760391+00:00 [running]> on host dagbalphasensor.97320d86172b4f978611e80f5565750c
[2022-01-30 14:30:45,191] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=admin
AIRFLOW_CTX_DAG_ID=DAG_B
AIRFLOW_CTX_TASK_ID=alpha_sensor
AIRFLOW_CTX_EXECUTION_DATE=2022-01-30T14:30:21.760391+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-01-30T14:30:21.760391+00:00
[2022-01-30 14:30:45,205] {external_task.py:153} INFO - Poking for DAG_A.alpha on 2022-01-30T14:21:50.631744+00:00 ... 
[2022-01-30 14:30:45,216] {base.py:245} INFO - Success criteria met. Exiting.
[2022-01-30 14:30:45,234] {taskinstance.py:1185} INFO - Marking task as SUCCESS. dag_id=DAG_B, task_id=alpha_sensor, execution_date=20220130T143021, start_date=20220130T143044, end_date=20220130T143045
[2022-01-30 14:30:45,287] {taskinstance.py:1246} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2022-01-30 14:30:45,328] {local_task_job.py:146} INFO - Task exited with return code 0

ログからわかるように、DAG_Aのタスクの状態を見て、後続のbetaのタスクを実行しています。

おわりに

今回の記事で紹介する方法を使用すれば、DAG内でのタスク間の依存関係だけでなく、DAG間でのタスクの依存関係を構築することができました。これらを駆使することでAirflowにおいて、自由に依存関係を作成でき、DAG自体を小さくシンプルに保つこともできます。
また、Airflowの提供するSensorには、複数種類があり、Cloud Pub/Subを待つPubSubPullSensorだったり、S3, GCSなどのオブジェクトストレージの変更を待つS3KeySensorCoogleCloudStorageObjectSensorなどがあり、ETLジョブで想定する様々なユースケースには対応できそうです。

GitHubで編集を提案

Discussion