😸

Airflow Deferrable Operators

2021/12/25に公開

クリスマスも年の瀬も近づいていますが、新年来てほしくないですよね?明日も来てほしくないですよね?時の進みをDeferしたいですよね?
Airflowのドキュメント見ていたら、"Defer"という魅力的なキーワードを見つけたので調べてみました。

tl;dr

  • 2.2でDeferrable Operatorというのが出たよ
    • 2.0で登場したSmart Sensorを置き換えるよ
  • Pythonの非同期IOの仕組みを使い、Workerではない場所(Triggerer)で処理を待つよ
  • 使えるOperatorはまだ少ないよ
  • AWS/GCPのmanaged Airflowでは使えないよ。Astrnomerか自前の環境か必要だよ

登場の背景

Deferrable Operatorですが、基本的にはSensorを改善するものです。
(ただし、外部リソースにアクセスするOperator全般に使えるらしい)

そのため、一旦Sensorを簡単にご説明した後、Deferrable Operatorの登場、そしてその仕組みについて説明します。

普通のSensor

なにかが起きるまで待機するOperatorを、AirflowではSensorと呼びます。
「なにか」とは、

などです。

Qiitaの記事以前書きました)

Deferrable Operator

Andrew Godwinさん(※)のAIP-400が提案したのが始まりです。
※ AirflowのSaaS Astrnomerの中の人です

何を気にしてDeferrable Operatorを提案したかというと

It solves the problem where both Sensors and externally-dependent Operators spend most of their time idle, but occupying a worker slot.

要するに、待つためだけにWorker Slot(※※)を専有するの無駄じゃね?という理由らしいです。

※※ Poolparallelism等

同じモチベーションで、Sensorにはrescheduleモード(※※※)、また、Airflow 2.0ではSmart Sensorと言う仕組みも導入されましたが、

  • rescheduleモードは、time-drivenなので低レイテンシーの処理には向かない点
  • Smart Sensorは、Sensor毎に特殊な実装が必要な点と、助長構成できない点

を解消するために、Deferrable Operatorを提案したようです。

※※※一定間隔毎にSchedulerでSensorのTaskを実行しチェックする仕組み

(余談)Smart Sensor

SmartSensorはAirflow2.0で登場した、Deferrable Operatorと同じ問題を解決する仕組みです。
ただし、現在はearly accessで将来的にはなくなる予定です。

Smart sensors, an "early access" feature added in Airflow 2, are now deprecated and will be removed in Airflow 2.4.0. They have been superseded by Deferable Operators, added in Airflow 2.2.0.

Deprecatedになりますが、Deferrable Operatorの仕組みと似ていますので、その存在も意義があったのかもしれません。

smart-sensors.htmlより
(AirflowのSmartSensorドキュメントより)

Deferrable Operatorの仕組み

「Triggerer」プロセスが新しく追加され、Taskから登録されたTriggerの監視・実行を行います。

  • Task/Operatorの処理のなかでdeferメソッドを呼ぶと、処理が一旦中止され、Triggerの情報がDBに登録されます
  • 定期的にTriggererがDBをチェックし、登録されたTriggerをasyncioで実行(イベントループに登録)します
  • Triggerの処理が終了すると、TriggererはTaskInstanceの状態を書き換え、TaskInstanceが再度実行されるようにします


(エラー処理や後処理など一部省略しています)

Task/Operator側の話

  • deferメソッドを呼び出すと、workerからTaskは除外されます
  • Triggeerが発火すると、deferのmethod_nameで指定されたメソッドが呼び出されます
defer(trigger, method_name, kwargs, timeout)
  • trigger: Deferが使うインスタンスです
  • method_nameとkwargs: Deferからの復帰後呼び出されるOperator内のメソッドです
  • timeout: この期間Deferから復帰しなければ、Task Instanceがfailします

ちなみに、method_nameで指定されたメソッドは、eventというキーワード引数を受けつける必要があります。

Triggerの話

BaseTriggerを継承したクラスで、以下のメソッドを実装します。

  • __init__: Operatorから呼び出し
  • run: TriggerEventを呼ぶ(呼ぶタイミングまでawait)、asyncメソッド
  • serialize

(余談)内側の話

Operator使う・作る分には、おそらく意識する必要がないと思いますが、せっかく調べたので供養です。

使ってみる

  • DAGのコードは通常と同じように書けます
  • TriggererのAirflow基盤へのインストールが必要です
  • 12/22現在、GCPのCloud Composer・AWSのMWAAともにDeferrable Operator使えません
    • そもそも2.2未対応
    • Astrnomerは2.2対応しています(が私は未確認です)

今回はAirflowのDocker環境で試してみます。

なお、リンク先の注意書きにも書いていますが、このDocker環境は開発用です

AirflowのDocker Composer環境の準備

インストールガイドに従い、インストールします。

  • それなりにvCPU、メモリが必要です
  • Deferrable Operatorが2.2からの機能なので、新しいバージョンのdocker-composer.yamlが必要です
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.3/docker-compose.yaml'
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker-compose up airflow-init
docker-compose up

localhost:8080/homeにアクセス(ユーザ名・パスワードairflowです)して、Airflow UIを開くことができたら成功です。
ログを眺めると、Triggererがいることもわかります。

(ちなみにTriggererの起動の設定はこちらです)

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

例のDAGを動かす

TimeDeltaSensorAsyncというDefferable Operatorを使うexample_time_delta_sensor_asyncという例のDAGが一緒にインストールされるので、pauseを解除して、手動でTriggerしてみましょう。処理は数秒で終わると思います。

TimeDeltaSensorAsyncのTaskInstanceのログを下に示します。
このDAG・Taskは10秒待つだけの処理ですが、それでも以下のことがわかります:

  • TaskInstanceのステータスがDEFERREDに一度なっている([2021-12-22, 23:16:07 UTC] {taskinstance.py:1342} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20211222T231602, start_date=20211222T231607
  • TaskInstanceがWorkerに再度配置されている([2021-12-22, 23:16:15 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>の部分)
*** Reading local file: /opt/airflow/logs/example_time_delta_sensor_async/wait/2021-12-22T23:16:02.320809+00:00/1.log
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1238} INFO - 
--------------------------------------------------------------------------------
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1240} INFO - 
--------------------------------------------------------------------------------
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1259} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2021-12-22 23:16:02.320809+00:00
[2021-12-22, 23:16:07 UTC] {standard_task_runner.py:52} INFO - Started process 28384 to run task
[2021-12-22, 23:16:07 UTC] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2021-12-22T23:16:02.320809+00:00', '--job-id', '3', '--raw', '--subdir', '/home/***/.local/lib/python3.7/site-packages/***/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/tmp/tmpm0bdr7hd', '--error-file', '/tmp/tmp995i2mo1']
[2021-12-22, 23:16:07 UTC] {standard_task_runner.py:77} INFO - Job 3: Subtask wait
[2021-12-22, 23:16:07 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [running]> on host 58da9bd73377
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_time_delta_sensor_async
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_EXECUTION_DATE=2021-12-22T23:16:02.320809+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-22T23:16:02.320809+00:00
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1342} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20211222T231602, start_date=20211222T231607
[2021-12-22, 23:16:07 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2021-12-22, 23:16:08 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1238} INFO - 
--------------------------------------------------------------------------------
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1240} INFO - 
--------------------------------------------------------------------------------
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1259} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2021-12-22 23:16:02.320809+00:00
[2021-12-22, 23:16:15 UTC] {standard_task_runner.py:52} INFO - Started process 28389 to run task
[2021-12-22, 23:16:15 UTC] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2021-12-22T23:16:02.320809+00:00', '--job-id', '4', '--raw', '--subdir', '/home/***/.local/lib/python3.7/site-packages/***/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/tmp/tmpt4bjoeid', '--error-file', '/tmp/tmpravez__z']
[2021-12-22, 23:16:15 UTC] {standard_task_runner.py:77} INFO - Job 4: Subtask wait
[2021-12-22, 23:16:15 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [running]> on host 58da9bd73377
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_time_delta_sensor_async
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_EXECUTION_DATE=2021-12-22T23:16:02.320809+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-22T23:16:02.320809+00:00
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1277} INFO - Marking task as SUCCESS. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20211222T231602, start_date=20211222T231615, end_date=20211222T231615
[2021-12-22, 23:16:15 UTC] {local_task_job.py:154} INFO - Task exited with return code 0

何ができんの?

最新バージョンでも、使えるDeferrableなOperatorは少ないです。
Databricksが対応したり、そのうち増えると思います。

名前(とコード)見ていただければ、それぞれのOperatorの働きは理解できると思います。

対応するTrigger

Discussion