Airflow Deferrable Operators
クリスマスも年の瀬も近づいていますが、新年来てほしくないですよね?明日も来てほしくないですよね?時の進みをDeferしたいですよね?
Airflowのドキュメント見ていたら、"Defer"という魅力的なキーワードを見つけたので調べてみました。
tl;dr
- 2.2でDeferrable Operatorというのが出たよ
- 2.0で登場したSmart Sensorを置き換えるよ
- Pythonの非同期IOの仕組みを使い、Workerではない場所(Triggerer)で処理を待つよ
- 使えるOperatorはまだ少ないよ
- AWS/GCPのmanaged Airflowでは使えないよ。Astrnomerか自前の環境か必要だよ
登場の背景
Deferrable Operatorですが、基本的にはSensorを改善するものです。
(ただし、外部リソースにアクセスするOperator全般に使えるらしい)
そのため、一旦Sensorを簡単にご説明した後、Deferrable Operatorの登場、そしてその仕組みについて説明します。
普通のSensor
なにかが起きるまで待機するOperatorを、AirflowではSensorと呼びます。
「なにか」とは、
- SQLが条件を満たすためだったり
- 時間だったり
- PythonのcallableがTrueを返すまで
などです。
(Qiitaの記事以前書きました)
Deferrable Operator
Andrew Godwinさん(※)のAIP-400が提案したのが始まりです。
※ AirflowのSaaS Astrnomerの中の人です
何を気にしてDeferrable Operatorを提案したかというと
It solves the problem where both Sensors and externally-dependent Operators spend most of their time idle, but occupying a worker slot.
要するに、待つためだけにWorker Slot(※※)を専有するの無駄じゃね?という理由らしいです。
※※ Poolやparallelism等
同じモチベーションで、Sensorにはrescheduleモード(※※※)、また、Airflow 2.0ではSmart Sensorと言う仕組みも導入されましたが、
- rescheduleモードは、time-drivenなので低レイテンシーの処理には向かない点
- Smart Sensorは、Sensor毎に特殊な実装が必要な点と、助長構成できない点
を解消するために、Deferrable Operatorを提案したようです。
※※※一定間隔毎にSchedulerでSensorのTaskを実行しチェックする仕組み
(余談)Smart Sensor
SmartSensorはAirflow2.0で登場した、Deferrable Operatorと同じ問題を解決する仕組みです。
ただし、現在はearly accessで将来的にはなくなる予定です。
Smart sensors, an "early access" feature added in Airflow 2, are now deprecated and will be removed in Airflow 2.4.0. They have been superseded by Deferable Operators, added in Airflow 2.2.0.
Deprecatedになりますが、Deferrable Operatorの仕組みと似ていますので、その存在も意義があったのかもしれません。
Deferrable Operatorの仕組み
「Triggerer」プロセスが新しく追加され、Taskから登録されたTriggerの監視・実行を行います。
- Task/Operatorの処理のなかで
defer
メソッドを呼ぶと、処理が一旦中止され、Triggerの情報がDBに登録されます - 定期的にTriggererがDBをチェックし、登録されたTriggerをasyncioで実行(イベントループに登録)します
- Triggerの処理が終了すると、TriggererはTaskInstanceの状態を書き換え、TaskInstanceが再度実行されるようにします
(エラー処理や後処理など一部省略しています)
Task/Operator側の話
-
defer
メソッドを呼び出すと、workerからTaskは除外されます - Triggeerが発火すると、
defer
のmethod_nameで指定されたメソッドが呼び出されます
defer(trigger, method_name, kwargs, timeout)
- trigger: Deferが使うインスタンスです
- method_nameとkwargs: Deferからの復帰後呼び出されるOperator内のメソッドです
- timeout: この期間Deferから復帰しなければ、Task Instanceがfailします
ちなみに、method_name
で指定されたメソッドは、event
というキーワード引数を受けつける必要があります。
Triggerの話
BaseTriggerを継承したクラスで、以下のメソッドを実装します。
-
__init__
: Operatorから呼び出し -
run
: TriggerEventを呼ぶ(呼ぶタイミングまでawait)、asyncメソッド serialize
(余談)内側の話
Operator使う・作る分には、おそらく意識する必要がないと思いますが、せっかく調べたので供養です。
- BaseOperatorのdeferを呼び出すと、例外がキャッチされ、TaskInstanceのステータスを変更・TriggerのエントリをDBに登録されます
- 登録されたTriggerの方はTriggererで処理されます
- TriggererJobとTriggerRunnerの二つのスレッドがあります
-
TriggererJobはループの中で、以下の処理を行います
- DBに登録されたTriggerの確認
- 登録されたTriggerをTriggerRunnerに渡す
- TriggerRunnerのeventキューをチェックし、発火したTriggerに対応するTaskInstanceのステータスをscheduledに戻し、deferで登録したメソッドを呼び出す準備]
- 実際にTaskInstanceの配置をするのはScheduler・Executorの仕事
-
TriggerRunnerはループの中で、以下の処理を行います
- 各Triggerのrunメソッドを呼び発火した情報をキューに入れる
- 中止されたTriggerの対応
-
書き方の悪いtriggerの有無のチェック
- (どのtriggerが悪いかまではチェックしてくれません)
使ってみる
- DAGのコードは通常と同じように書けます
- TriggererのAirflow基盤へのインストールが必要です
-
12/22現在、GCPのCloud Composer・AWSのMWAAともにDeferrable Operator使えません
- そもそも2.2未対応
- Astrnomerは2.2対応しています(が私は未確認です)
今回はAirflowのDocker環境で試してみます。
なお、リンク先の注意書きにも書いていますが、このDocker環境は開発用です。
AirflowのDocker Composer環境の準備
インストールガイドに従い、インストールします。
- それなりにvCPU、メモリが必要です
- Deferrable Operatorが2.2からの機能なので、新しいバージョンのdocker-composer.yamlが必要です
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.3/docker-compose.yaml'
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker-compose up airflow-init
docker-compose up
localhost:8080/homeにアクセス(ユーザ名・パスワードairflowです)して、Airflow UIを開くことができたら成功です。
ログを眺めると、Triggererがいることもわかります。
(ちなみにTriggererの起動の設定はこちらです)
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
例のDAGを動かす
TimeDeltaSensorAsyncというDefferable Operatorを使うexample_time_delta_sensor_asyncという例のDAGが一緒にインストールされるので、pauseを解除して、手動でTriggerしてみましょう。処理は数秒で終わると思います。
TimeDeltaSensorAsyncのTaskInstanceのログを下に示します。
このDAG・Taskは10秒待つだけの処理ですが、それでも以下のことがわかります:
- TaskInstanceのステータスがDEFERREDに一度なっている(
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1342} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20211222T231602, start_date=20211222T231607
) - TaskInstanceがWorkerに再度配置されている(
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
の部分)
*** Reading local file: /opt/airflow/logs/example_time_delta_sensor_async/wait/2021-12-22T23:16:02.320809+00:00/1.log
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1259} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2021-12-22 23:16:02.320809+00:00
[2021-12-22, 23:16:07 UTC] {standard_task_runner.py:52} INFO - Started process 28384 to run task
[2021-12-22, 23:16:07 UTC] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2021-12-22T23:16:02.320809+00:00', '--job-id', '3', '--raw', '--subdir', '/home/***/.local/lib/python3.7/site-packages/***/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/tmp/tmpm0bdr7hd', '--error-file', '/tmp/tmp995i2mo1']
[2021-12-22, 23:16:07 UTC] {standard_task_runner.py:77} INFO - Job 3: Subtask wait
[2021-12-22, 23:16:07 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [running]> on host 58da9bd73377
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_time_delta_sensor_async
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_EXECUTION_DATE=2021-12-22T23:16:02.320809+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-22T23:16:02.320809+00:00
[2021-12-22, 23:16:07 UTC] {taskinstance.py:1342} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20211222T231602, start_date=20211222T231607
[2021-12-22, 23:16:07 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2021-12-22, 23:16:08 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [queued]>
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 1
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1259} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2021-12-22 23:16:02.320809+00:00
[2021-12-22, 23:16:15 UTC] {standard_task_runner.py:52} INFO - Started process 28389 to run task
[2021-12-22, 23:16:15 UTC] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2021-12-22T23:16:02.320809+00:00', '--job-id', '4', '--raw', '--subdir', '/home/***/.local/lib/python3.7/site-packages/***/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/tmp/tmpt4bjoeid', '--error-file', '/tmp/tmpravez__z']
[2021-12-22, 23:16:15 UTC] {standard_task_runner.py:77} INFO - Job 4: Subtask wait
[2021-12-22, 23:16:15 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2021-12-22T23:16:02.320809+00:00 [running]> on host 58da9bd73377
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_time_delta_sensor_async
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_EXECUTION_DATE=2021-12-22T23:16:02.320809+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-22T23:16:02.320809+00:00
[2021-12-22, 23:16:15 UTC] {taskinstance.py:1277} INFO - Marking task as SUCCESS. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20211222T231602, start_date=20211222T231615, end_date=20211222T231615
[2021-12-22, 23:16:15 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
何ができんの?
最新バージョンでも、使えるDeferrableなOperatorは少ないです。
Databricksが対応したり、そのうち増えると思います。
名前(とコード)見ていただければ、それぞれのOperatorの働きは理解できると思います。
対応するTrigger
Discussion