[MWAA]airflow.exceptions.AirflowTaskTimeoutエラー
概要
Amazon Managed Workflows for Apache Airflow(以下、MWAA)サービスにて、環境構築時に以下エラーが発生しました。
当該エラーを解消することができたので、その方法をまとめます。
Broken DAG: [/usr/local/airflow/dags/XXXX] Traceback (most recent call last):
File "usr/local/airflow/.local/lib/python3.10/site-packages/airflow/models/dags.py", line 692, in __hash__
val = tuple(self.task_dict.keys())
File "usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /usr/local/airflow/dags/XXXX after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.5.1/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.5.1/best-practices.html#reducing-dag-complexity, PID: 13262
前提
当該エラーが発生したMWAA環境は、Airflowバージョンが v2.5.1
の環境でした。
対応1
エラー内容で調べたところ、類似エラーの事象に遭遇した方の記事を発見しました!
こちらの記事を参考に、Airflow 設定オプションに以下設定を追加しました。
設定オプション | カスタム値 |
---|---|
core.dagbag_import_timeout |
60 |
結果1
残念ながら、 対応1 を実施して環境を再起動しましたが、エラー事象は解消しませんでした。
DAG Processingのログ(Airflow DAG 処理ログ)を確認したところ、未だにタイムアウトエラーが発生していることを確認しました。
対応2
以下のAWSのユーザーガイドを改めて確認しました。
そこで、 core.dagbag_import_timeout
に関する以下の記載を見つけます。(一部抜粋)
このオプションはスケジューラーの「ループ」の一部として処理され、
core.dag_file_processor_timeout
で指定されている値よりも小さい値が含まれている必要があります。
core.dag_file_processor_timeout
の値は変更していなかったため、デフォルトの 50 となっており、上記記載に反する設定状態となっていたことが分かりました。
そのため、Airflow 設定オプションに以下設定も追加しました。
設定オプション | カスタム値 |
---|---|
core.dag_file_processor_timeout |
300 |
結果2
エラーが無事に解消され、環境を構築することができました!!
まとめ
私の場合は、Airflow 設定オプションに対して、以下の2つの設定を追加(デフォルト値から変更)することで、当該エラーは解消することができました。
設定オプション | カスタム値 |
---|---|
core.dag_file_processor_timeout |
300 |
core.dagbag_import_timeout |
60 |
また、今回のDAGのトラブルに関連して、GoogleCloudの分かりやすいドキュメントもあったため、貼り付けておきます。
Discussion