🔎

Apache AirflowのExponential Backoffを読み解く

2023/09/10に公開

要約

  • Apache AirflowのDAGには標準でExponential Backoff And Jitterが用意されている
  • 2.5.3以前と2.6.0以降でretry間隔の最大値に変更があった模様

前提

Airflowのversionは?

2.5.3。
一部、2.6.0の話も出てくる。
執筆時の最新バージョンは2.7.1。

Exponential Backoffとは?

しっかりと理解したい人向け: AWSのブログがオススメ

https://aws.amazon.com/jp/blogs/architecture/exponential-backoff-and-jitter/

ざっくり理解したい人向け

  • 指数関数的にretry間隔を伸ばしていくことで、効率的なRetryを目指すアルゴリズム
  • 例(基準となるRetry間隔が10sの場合)
    • retry回数: 1, retry間隔: 10s
    • retry回数: 2, retry間隔: 20s
    • retry回数: 3, retry間隔: 40s
    • retry回数: 4, retry間隔: 80s
    • ...
    • retry回数: n, retry間隔: 10 * 2^(n-1)s
  • これだけだと同時に大量のretryが走った場合、retryタイミングが被ってしまい、システムへの負荷が大きくなってしまう
  • それを避けるため、retry間隔をバラつかせるExponential Backoff And Jitterがある
  • 例(基準となるretry間隔が10sの場合)
    • retry回数: 1, retry間隔: 11s
    • retry回数: 2, retry間隔: 25s
    • retry回数: 3, retry間隔: 43s
    • retry回数: 4, retry間隔: 90s
    • ...
    • retry回数: n, retry間隔: (10 * 2^(n-1) + (乱数))s

本編

DAGでのexponential backoffを試してみる

retry_exponential_backoff (bool) -- allow progressively longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)

https://airflow.apache.org/docs/apache-airflow/2.5.3/_api/airflow/models/baseoperator/index.html

Docsによると、BaseOperatorの retry_exponential_backoff パラメータを True にしてあげることでexponential backoffが有効になるとのこと。
その際、基準値は retry_delay で指定した値になる模様。

サンプルコード

下記のDAGで実際にexponential backoffが実現されていることを確認できる。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta


default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 3,
    'retry_delay': timedelta(seconds=30),
    'retry_exponential_backoff': True,
}

def my_python_function():
    raise Exception("Task failed intentionally.")

with DAG(
    dag_id='sample-exponential-backoff',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
) as dag:
    python_task = PythonOperator(
        task_id='python_task',
        python_callable=my_python_function,
        dag=dag,
    )

    python_task

実際に確認してみた結果

検証1回目 検証2回目 検証3回目 (参考)jitterを考慮せず単純計算した場合のretry間隔
1回目retry間隔(seconds) 40 54 35 30
2回目retry間隔(seconds) 113 118 94 60
3回目retry間隔(seconds) 214 205 132 120

retry回数を重ねるごとに間隔が伸びている。
秒数にもしっかりとバラツキがある。
良さそう。

jitter考慮しない場合のretry間隔よりもかなり秒数の多い結果も含まれているが、 jitter考慮しない間隔"jitter考慮しない間隔" の2倍 の範囲にretry間隔は収まる仕様のため、OK。
この詳細は、後述の実装の中身除くパートにて。

実装を除いてみる

関数の概要

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1117-L1154

next_retry_datetime がtaskのretry処理を担っている。

Get datetime of the next retry if the task instance fails.
For exponential backoff, retry_delay is used as base and will be converted to seconds.

taskが失敗した際、次に再実行する時間(再実行taskの実行時間)を取得する関数。
exponential backoffの場合、 retry_delay で指定した値をもとに取得する。

exponential backoffではない場合

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1122-L1123

L1123により、 retry_exponential_backoffFalse の場合はL1154まで飛ぶので、単純に retry_delay パラメータで指定した値の間隔でretryが走る。
例えば、 'retry_delay': timedelta(seconds=30) ならば、1回目のretryでも2回目以降のretryでも、毎回retry間隔は30s。

exponential backoff部分

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1124-L1127

L1127でjitterなしのretry間隔を算出している。
(0除算を防ぐために処理順を気をつけている旨もコメントに記載されていて丁寧!)

(jitterなしのretry間隔) = (retry_delayで指定した間隔) * 2^(現在の実行回数 - 2)

例えば、1回目のretryであれば、 現在の実行回数 = 2 なので、サンプルコードに当てはめると、

(jitterなしの1回目retryの間隔) =  30s * 2^(2 - 2) = 30s

上記をretry間隔の最小値 min_backoff としている。

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1129-L1134

ちなみに、 retry_delay0 に指定していると必ず min_backoff = 0 となってしまうが、それを回避するためL1133にて 1 以上の値になるようにしている。
retry_delay のデフォルト値は timedelta(seconds=300) のため、あまりこのケースには遭遇しなそう)

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1136-L1144

L1137からL1144で、retry間隔が min_backoff2 * min_backoff - 1 の間でランダムになるよう処理している。
こちらが、実際に確認してみた結果 にて、実際のretry間隔がjitterなしのretry間隔から大きく離れていた理由。
(ちなみに、ハッシュ値まわりの処理が追いきれなかったが、乱数生成のために利用している模様)

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1145-L1151

L1150で、retry間隔が datetime.timedelta の最大値を超えないように制御している。

Airflow2.6.0以降の変更点

https://github.com/apache/airflow/blob/ab54c63940a99646df974d4bcf2e37415e277e69/airflow/models/taskinstance.py#L1187

ちなみに、 timedelta.max999999999 days, 23:59:59.999999 なのでさすがに大きすぎると考えたのか、v2.6.0以降では MAX_RETRY_DELAY を最大値としている。

https://github.com/apache/airflow/blob/ab54c63940a99646df974d4bcf2e37415e277e69/airflow/models/abstractoperator.py#L63

MAX_RETRY_DELAY は1日。

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1152-L1153

L1152で、もしDAGに max_retry_delay パラメータが設定されていた場合、その最大値を超えないよう制御している。

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/models/taskinstance.py#L1154

最後にL1154で、実行完了時間に最終的なretry間隔を加算し、次に実行される再実行taskの実行時間を返している。

以上。

参考リンク一覧

https://airflow.apache.org/docs/apache-airflow/2.5.3/index.html

https://github.com/apache/airflow/tree/2.5.3

https://aws.amazon.com/jp/blogs/architecture/exponential-backoff-and-jitter/

Discussion