Airflowで異なるDAGとの間に依存関係を設定する
はじめに
環境
- WSL2: Ubuntu 20
- Helm: v3.6.1
- Airflow: ver2
前回の記事
前回の記事では、Airflowでのジョブの作成方法の紹介を行いました。今回は、DAG間での依存関係の設定方法について紹介していきます。
記事の内容
前回の紹介の記事では、ある処理が成功したのを確認した後に、次の処理を実施するようなユースケースにおいて依存関係を設定することでタスクの同期処理を定義できました。(例: DBからデータを取得して、csv化してAWS S3に保存するなど。)
APIからデータを取ってくるジョブを定義したいときなどに、DAGを細かく保ちたいので、データソースごとにDAGを複数作成するケースが出てきます。このようにDAGが分割された場合では、別のDAGで定義したタスクが成功したのを確認してから、DAGを実行したいケースには上記手法は適応できません。そこで、今回紹介するのはExternalTaskSensor
を用いて、DAGの依存関係を別DAGにまで広げる手法を紹介します。
今回の記事で紹介したコードは以下のレポジトリにあります。
TL;DR
DAG間でのタスクの依存関係を設定するには、ExternalTaskSensor
を使用する。
DAGの設定
今回設定するDAGとして、DAG_Aのタスク(alpha)が成功するのを確認してからDAG_Bのタスク(beta)を実行する。とします。
DAG_A
実行するタスクとしては非常にシンプルで、ただprintしているだけです。
from logging import INFO, basicConfig, getLogger
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from kubernetes.client import models as k8s
basicConfig(level=INFO)
logger = getLogger(__name__)
default_args = {"owner": "admin", "retries": 1}
# dag configuration
@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=["sample-dag"],
)
# DAG name
def DAG_A():
@task(
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="airflow-worker",
resources=k8s.V1ResourceRequirements(
limits={"cpu": "1000m", "memory": "1Gi"},
requests={"cpu": "250m", "memory": "256Mi"},
),
)
]
)
)
}
)
def alpha():
print("test")
alpha()
dag = DAG_A()
DAG_B
TaskFlowSensorの設定
@provide_session
def _get_execution_date_of_task_a(exec_date, session=None, **kwargs):
dag_last_run = get_last_dagrun("DAG_A", session)
return dag_last_run.execution_date
task_a_sensor = ExternalTaskSensor(
task_id = "alpha_sensor",
external_dag_id="DAG_A",
external_task_id="alpha",
allowed_states=["success"],
execution_date_fn=_get_execution_date_of_task_a
)
ExternalTaskSensorの設定する項目について、説明していきます。
- task_id: ExternalTaskSensorの実際のタスク名
- external_dag_id: 依存関係を見るDAG名
- external_task_id: 上記のDAGの中で状態を監視したいタスクを指定する
- allowed_states: 指定したステータスのときのみ後続のタスクを実行する
- execution_date_fn: 今回は最新に実行されたDAGの状態を監視するための関数を指定しました。独自に設定できます。関数を作成せず、指定しない場合は、
execution_delta
で、datetime.timedelta
で指定した差分の時間に実行されたDAGを見ます。
TaskFlowAPIにおける依存関係の設定方法
beta_task = beta()
task_a_sensor >> beta_task
TaskFlowAPIを使用すれば、簡単にタスク定義を作成することはできますが、今回のように、ExternalTaskSensorを組み合わせる際には、定義の仕方に注意が必要です。
一旦タスクを変数として持って、従来のAirflowであったような>>
を用いて依存関係を設定します。
DAGの全体像
上記のものを組み合わせて、DAG_AのTaskalpha
がsuccess
の場合のみタスクを実行するDAGは以下のとおりです。
from logging import INFO, basicConfig, getLogger
from airflow.decorators import dag, task
from airflow.models.dag import get_last_dagrun
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago
from airflow.utils.session import provide_session
from kubernetes.client import models as k8s
basicConfig(level=INFO)
logger = getLogger(__name__)
default_args = {"owner": "admin", "retries": 1}
# dag configuration
@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=["sample-dag"],
)
# DAG name
def DAG_B():
@provide_session
def _get_execution_date_of_task_a(exec_date, session=None, **kwargs):
dag_last_run = get_last_dagrun(
"DAG_A", session, include_externally_triggered=True
)
return dag_last_run.execution_date
task_a_sensor = ExternalTaskSensor(
task_id="alpha_sensor",
external_dag_id="DAG_A",
external_task_id="alpha",
allowed_states=["success"],
execution_date_fn=_get_execution_date_of_task_a,
)
@task(
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="airflow-worker",
resources=k8s.V1ResourceRequirements(
limits={"cpu": "1000m", "memory": "1Gi"},
requests={"cpu": "250m", "memory": "256Mi"},
),
)
]
)
)
}
)
def beta():
print("test")
beta_task = beta()
task_a_sensor >> beta_task
dag = DAG_B()
DAG_Bの詳細
上記図は、DAG_Bの結果となっており、alpha_sensor
という名前のタスクでDAG_Aのalpha
のタスクの結果を見ています。
alpha_sensorのログ
[2022-01-30 14:30:44,994] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG_B.alpha_sensor 2022-01-30T14:30:21.760391+00:00 [queued]>
[2022-01-30 14:30:45,014] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG_B.alpha_sensor 2022-01-30T14:30:21.760391+00:00 [queued]>
[2022-01-30 14:30:45,015] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2022-01-30 14:30:45,015] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2022-01-30 14:30:45,015] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2022-01-30 14:30:45,024] {taskinstance.py:1089} INFO - Executing <Task(ExternalTaskSensor): alpha_sensor> on 2022-01-30T14:30:21.760391+00:00
[2022-01-30 14:30:45,029] {standard_task_runner.py:52} INFO - Started process 32 to run task
[2022-01-30 14:30:45,035] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'DAG_B', 'alpha_sensor', '2022-01-30T14:30:21.760391+00:00', '--job-id', '13', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/git_workflow/task_dependency/dag_b.py', '--cfg-path', '/tmp/tmpe_gfo6vn', '--error-file', '/tmp/tmpk3_bt8g7']
[2022-01-30 14:30:45,036] {standard_task_runner.py:77} INFO - Job 13: Subtask alpha_sensor
[2022-01-30 14:30:45,111] {logging_mixin.py:104} INFO - Running <TaskInstance: DAG_B.alpha_sensor 2022-01-30T14:30:21.760391+00:00 [running]> on host dagbalphasensor.97320d86172b4f978611e80f5565750c
[2022-01-30 14:30:45,191] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=admin
AIRFLOW_CTX_DAG_ID=DAG_B
AIRFLOW_CTX_TASK_ID=alpha_sensor
AIRFLOW_CTX_EXECUTION_DATE=2022-01-30T14:30:21.760391+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-01-30T14:30:21.760391+00:00
[2022-01-30 14:30:45,205] {external_task.py:153} INFO - Poking for DAG_A.alpha on 2022-01-30T14:21:50.631744+00:00 ...
[2022-01-30 14:30:45,216] {base.py:245} INFO - Success criteria met. Exiting.
[2022-01-30 14:30:45,234] {taskinstance.py:1185} INFO - Marking task as SUCCESS. dag_id=DAG_B, task_id=alpha_sensor, execution_date=20220130T143021, start_date=20220130T143044, end_date=20220130T143045
[2022-01-30 14:30:45,287] {taskinstance.py:1246} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2022-01-30 14:30:45,328] {local_task_job.py:146} INFO - Task exited with return code 0
ログからわかるように、DAG_Aのタスクの状態を見て、後続のbeta
のタスクを実行しています。
おわりに
今回の記事で紹介する方法を使用すれば、DAG内でのタスク間の依存関係だけでなく、DAG間でのタスクの依存関係を構築することができました。これらを駆使することでAirflowにおいて、自由に依存関係を作成でき、DAG自体を小さくシンプルに保つこともできます。
また、Airflowの提供するSensorには、複数種類があり、Cloud Pub/Subを待つPubSubPullSensor
だったり、S3, GCSなどのオブジェクトストレージの変更を待つS3KeySensor
やCoogleCloudStorageObjectSensor
などがあり、ETLジョブで想定する様々なユースケースには対応できそうです。
Discussion