Open1

Google Cloud Composer (Airflow) の知見をためる場所

kumewatakumewata

タイムアウトエラーでの対処

背景

DAG内のタスクとしてCloud Functionsの関数を同期的に呼び出す処理があった。
関数内で実行する処理はある程度長いため、timeoutを最大の540secに設定していた。
またタスクを実行するPythonOperator側でもexecution_timeoutを540secに指定していた。

# `PythonOperator` を使用してタスクを定義する
task = PythonOperator(
   task_id='my_task',
   python_callable=my_task_function,
   execution_timeout=timedelta(minutes=540),  # 実行タイムアウトを 540 秒に設定
   dag=dag,
)

参考:Airflow DAG の改良による Cloud Composer の最適化

しかしDAGを実行してみるとタイムアウトエラーが起きてしまった。

Failed to execute job 14451 for task invoke_cloud_function_update_table_schema (HTTPSConnectionPool(host='us-central1-google-project-id.cloudfunctions.net', port=443): Read timed out. (read timeout=120); 1243711)

結論

リクエスト実行に利用したライブラリ(google.auth.transport.requests)のメソッドでもtimeout指定する必要があった。
ソースコードを確認したところ、
デフォルト120sec設定になっていて、これはエラーのtimeout表示と整合が取れていた。

def invoke_cloud_function():
    url = f"https://{REGION}-{PROJECT_ID}.cloudfunctions.net/function-name"
    data = {"google_project_id": PROJECT_ID}
    request = google.auth.transport.requests.Request()
    id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(
        url, request=request
    )

    resp = AuthorizedSession(id_token_credentials).post(
        url,
        json=data,
        timeout=CLOUD_FUNCTION_TIMEOUT # タイムアウト指定を追記
    )

    return resp.status_code

参考

AirFlowの処理では今回利用したパラメータ以外にもいくつかタイムアウトがある。
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts

  • timeout
    • センサーオペレータを使う場合にはこのパラメータで最大待ち時間を指定できる(最大3600sec)
  • dagrun_timeout
    • DAG単位でのタイムアウト指定に利用する