Cloud ComposerのDAGを最適化するTips

2025/03/08に公開

Cloud Composer 最適化ガイド(公式) を一読したんで、下記に整理します。

はじめに(基礎知識)

AirflowやComposer、DAGについて知らない人向けに、簡単に説明しておきます。

  • Apache Airflowとは?

    • ワークフロー(複数のタスクの流れ)をプログラムで管理・自動化するOSS。
  • Cloud Composerとは?

    • Google Cloud上でApache Airflowを簡単に使えるように提供されているフルマネージドサービス。インフラ管理に煩わされず、ワークフローの作成や実行に集中できます。
  • DAGとは?

    • Directed Acyclic Graphの略で、Airflowで処理を実行する際のタスクの流れを定義したもの。

Tips20選

1. わかりやすいファイル名にする

  • DAGの目的、頻度、担当チームなどがわかるようにする。
daily_sales_report_marketing.py

2. 冪等性(再実行可能性)を意識する

  • 何回実行しても同じ結果が得られるようタスクをつくる。
MERGE INTO table USING source ON table.id = source.id
WHEN MATCHED THEN UPDATE SET...
WHEN NOT MATCHED THEN INSERT...

3. 複雑な処理はDAGの外で定義する

  • DAG定義内に複雑な処理を書かない。

4. Jinjaテンプレート利用する

  • 日付や変数を動的に設定可能
query = "SELECT * FROM table WHERE date = '{{ ds }}'"

5. カスタムOperatorを活用する

  • よく使う処理を再利用可能にする。
class CustomOperator(BaseOperator):
    def execute(self, context):
        # カスタム処理
        pass

6. Pythonパッケージを使って整理する

  • 再利用可能なコードは別パッケージ化する。
dags/
├── common_package/  # 再利用可能な処理を入れるディレクトリ
│   ├── __init__.py
│   ├── data_utils.py
│   └── data_queries.py
└── daily_sales_report.py # common_packageの処理を呼び出す

7. Airflow Variablesで環境変数を管理する

from airflow.models import Variable
db_url = Variable.get("DATABASE_URL")

8. Sensorの間隔調整する

  • ポーリング頻度を調整して無駄な負荷を減らす。
sensor = ExternalTaskSensor(
    poke_interval=300
)

9. DAGのタスクを並列実行可能にする

  • DAGの依存関係を整理して並列化。
task_1 >> [task_2, task_3] >> task_4

10. DAGのキャッチアップを無効化

  • 過去のタスクの自動再実行を防ぐ。
dag = DAG(catchup=False)

11. スケジュールを適切に設定する

  • 必要最低限の頻度にする。
  • https://crontab.guru/ を利用して、人でもわかるようコメントに頻度を明記しておく
schedule_interval="0 8 * * *" # At 08:00

12. 再試行(リトライ)の設定しておく

  • 失敗した時の動きを明確にしておく。
retries=3,
retry_delay=timedelta(minutes=5)

13. DAGを一時停止状態で作成

dag = DAG(is_paused_upon_creation=True)

14. 明確なドキュメントを追加

  • DAGの目的や処理内容をdocstringで記述する。
サンプルDAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

"""
DAG名: daily_sales_report
目的: 毎日の売上レポートを作成し、Google Driveにアップロードする。
処理内容:
  1. BigQueryから昨日の売上データを抽出
  2. 売上データをCSVに変換
  3. CSVファイルをGoogle Driveへアップロード
"""

default_args = {
    'owner': 'sales_analytics_team',
    'start_date': datetime(2024, 1, 1)
}

dag = DAG(
    dag_id='daily_sales_report',
    schedule_interval='0 8 * * *',  # 毎日午前8時
    default_args=default_args,
    catchup=False,
)

def extract_sales_data(**kwargs):
    """BigQueryから売上データを抽出する処理"""
    pass

def upload_report_to_drive(**kwargs):
    """生成したレポートをGoogle Driveにアップロードする処理"""
    pass

extract_task = PythonOperator(
    task_id='extract_sales_data',
    python_callable=extract_sales_data,
    dag=dag,
)

upload_task = PythonOperator(
    task_id='upload_report',
    python_callable=upload_report_to_drive
)

extract_sales_data >> upload_report_to_drive

15. タスクのオーナーを明記する

default_args = {"owner": "data_team"}

16. トリガールールを活用する

trigger_rule='all_done'

17. 静的な開始日付を設定

  • datetime.now()など動的日付を避けることで予期しない実行を防ぐ。

18. タイムアウトを設定

  • DAGが長時間動き続けるのを防ぐ。
dagrun_timeout=timedelta(hours=2)

19. ログの標準化

  • トラブルシューティングを楽にするため、ログフォーマット統一する。
import logging
logging.info("Start processing: %s", execution_date)

20. DAGのタグ付け

  • UI上でフィルターや分類を容易にする。
dag = DAG(tags=['sales', 'daily'])

おわりに

Cloud Composerを活用することで、データ処理の自動化を実現できます。
しかし、適切な設定を行わないと、パフォーマンス低下や運用負担の増加につながります。
本記事で紹介したTipsを取り入れることで、パフォーマンス向上や運用負担の軽減していきましょう。

GitHubで編集を提案

Discussion