Open1
Google Cloud Composer (Airflow) の知見をためる場所
タイムアウトエラーでの対処
背景
DAG内のタスクとしてCloud Functionsの関数を同期的に呼び出す処理があった。
関数内で実行する処理はある程度長いため、timeout
を最大の540secに設定していた。
またタスクを実行するPythonOperator側でもexecution_timeout
を540secに指定していた。
# `PythonOperator` を使用してタスクを定義する
task = PythonOperator(
task_id='my_task',
python_callable=my_task_function,
execution_timeout=timedelta(minutes=540), # 実行タイムアウトを 540 秒に設定
dag=dag,
)
参考:Airflow DAG の改良による Cloud Composer の最適化
しかしDAGを実行してみるとタイムアウトエラーが起きてしまった。
Failed to execute job 14451 for task invoke_cloud_function_update_table_schema (HTTPSConnectionPool(host='us-central1-google-project-id.cloudfunctions.net', port=443): Read timed out. (read timeout=120); 1243711)
結論
リクエスト実行に利用したライブラリ(google.auth.transport.requests
)のメソッドでもtimeout
指定する必要があった。
ソースコードを確認したところ、
デフォルト120sec設定になっていて、これはエラーのtimeout表示と整合が取れていた。
def invoke_cloud_function():
url = f"https://{REGION}-{PROJECT_ID}.cloudfunctions.net/function-name"
data = {"google_project_id": PROJECT_ID}
request = google.auth.transport.requests.Request()
id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(
url, request=request
)
resp = AuthorizedSession(id_token_credentials).post(
url,
json=data,
timeout=CLOUD_FUNCTION_TIMEOUT # タイムアウト指定を追記
)
return resp.status_code
参考
AirFlowの処理では今回利用したパラメータ以外にもいくつかタイムアウトがある。
-
timeout
- センサーオペレータを使う場合にはこのパラメータで最大待ち時間を指定できる(最大3600sec)
-
dagrun_timeout
- DAG単位でのタイムアウト指定に利用する