🦁

Apache Airflowとは?徹底解説

2025/03/11に公開

1. はじめに

Apache Airflowは、ワークフローの自動化・スケジューリング・監視を行うためのオープンソースのプラットフォームです。データパイプラインの構築に広く利用され、企業のデータエンジニアリングやETL(Extract, Transform, Load)プロセスにおいて不可欠なツールとなっています。

本記事では、Airflowの基本概念、アーキテクチャ、使い方、活用例まで詳しく解説します。


2. Apache Airflowの概要

(1) Airflowとは?

Apache Airflowは、ワークフローの定義とスケジューリングをPythonコードで記述できるツールです。これにより、柔軟なパイプライン設計が可能になります。

(2) Airflowの主な特徴

Pythonベース: ワークフローをPythonスクリプトとして記述できる。
スケジューリング機能: タスクの順序や依存関係を定義し、自動実行可能。
監視・管理: Web UIを通じて、実行状況の可視化やエラーの確認が可能。
モジュール化: 各タスクを分割して、再利用しやすく設計できる。
スケーラビリティ: CeleryExecutorやKubernetesExecutorを利用して、分散処理が可能。


3. Apache Airflowのアーキテクチャ

Airflowはマイクロサービスアーキテクチャを採用しており、以下の主要コンポーネントで構成されます。

(1) DAG(Directed Acyclic Graph)

DAGは、タスクの依存関係を定義するワークフローの設計図です。

例: データの取得 → 変換 → 保存 の処理を順番に実行するワークフロー

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    return 'Hello Airflow'

def print_world():
    return 'World!'

with DAG('simple_dag', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
    task1 = PythonOperator(task_id='hello_task', python_callable=print_hello)
    task2 = PythonOperator(task_id='world_task', python_callable=print_world)
    task1 >> task2  # タスクの順序定義

(2) Web Server(Web UI)

  • DAGの実行状況やタスクの状態を可視化できる管理画面。
  • DAGのオン/オフ切り替え、手動実行、ログ確認などが可能。

(3) Scheduler(スケジューラ)

  • DAGのスケジュールを管理し、適切なタイミングでタスクを実行する。

(4) Executor(実行エンジン)

  • LocalExecutor: 単一のマシンでタスクを実行(小規模向け)。
  • CeleryExecutor: 分散処理を実現し、スケーラブルな実行環境を構築(中〜大規模向け)。
  • KubernetesExecutor: Kubernetes上でタスクをコンテナ化し、スケールアップ(クラウド向け)。

(5) Metadata Database(メタデータDB)

  • DAGやタスクの実行履歴、設定情報を保存するデータベース(通常はPostgreSQLやMySQL)。

4. Apache Airflowの基本的な使い方

(1) インストール

pip install apache-airflow

または、特定のバージョンを指定してインストール:

pip install apache-airflow==2.5.0

(2) 初期設定

Airflowを初期化し、データベースを作成。

airflow db init

(3) Web UIの起動

airflow webserver --port 8080

ブラウザで http://localhost:8080 にアクセスすると、管理画面が開きます。

(4) Schedulerの起動

airflow scheduler

これにより、スケジュールに従ってDAGが自動実行されます。

(5) DAGの登録と実行

  • dags/ ディレクトリにPythonファイルを配置すると、自動的にAirflowに認識される。
  • Web UIまたはCLIで手動実行可能。
airflow dags trigger simple_dag

5. Apache Airflowの活用例

(1) ETL処理の自動化

  • データを定期的に取得し、変換、データウェアハウスに格納。
  • 例: SQLクエリの実行、データクリーニング、S3/GCSへの保存。

(2) 機械学習ワークフローの管理

  • モデルの学習・評価・デプロイを自動化。
  • 例: 学習データの前処理 → モデル学習 → モデルの検証 → デプロイ。

(3) DevOpsタスクの自動化

  • 定期的なジョブ実行(バックアップ、ログ解析)。

6. Apache Airflowの注意点とベストプラクティス

(1) DAGの適切な設計

  • タスクを適切なサイズに分割し、長時間の処理を避ける。
  • 例: 1つのタスクで複雑なSQLクエリを実行するのではなく、複数のステップに分ける。

(2) エラーハンドリングの実装

  • on_failure_callback を設定し、エラー発生時の通知を行う。
  • retry オプションを活用し、一時的なエラー時に再試行可能にする。

(3) ロギングと監視

  • ログを保存し、異常時にすぐ対応できるようにする。
  • Prometheus + Grafana でメトリクスを監視。

7. まとめ

Apache Airflowは、データ処理やワークフローの自動化に最適なツールです。

Pythonベースで柔軟にDAGを設計
スケジューリング・監視・エラーハンドリングが可能
ETL・機械学習・DevOpsタスクの自動化に活用

適切な設計とベストプラクティスを取り入れ、Airflowを最大限活用しましょう!

Discussion