📚

Airflowで遊ぶ:環境構築編

2025/02/02に公開

こんにちは!kirigayaです
今回はAirflowで遊んでいきます!
さっそく環境作っていきます
コンテナ環境をダウンロードした方が早いかもしれない...

Airflowとは?

データ処理(コードやコマンド)を自動化、管理できるツールです。
今回は順番にpythonコードを実行していくシステムを作って遊びます!

用語集:

  • DAG(Directed Acyclic Graph)
    ワークフロー全体を表す単位で、複数のタスク(Task)が有向非巡回グラフとして定義される
  • Task(タスク)
    DAG内で実行される個々の処理単位。データ処理やスクリプト実行などを担当する
  • Operator(オペレーター)
    タスクの実行内容を定義するクラス。例:PythonOperator(Pythonスクリプト実行)、BashOperator(シェルコマンド実行)など
  • TaskInstance(タスクインスタンス)
    特定のDAG実行時における、各タスクの具体的な実行インスタンス。状態(成功、失敗、実行中など)を持つ
  • Scheduler(スケジューラー)
    DAGのスケジュール管理を行い、実行すべきタスクを決定するコンポーネント
  • Executor(エグゼキューター)
    タスクを実行する仕組み。LocalExecutor(ローカル実行)、CeleryExecutor(分散実行)などがある
  • Trigger Rule(トリガールール)
    タスクの実行条件を決定するルール。例:all_success(前のタスクがすべて成功したら実行)など。
  • XCom(Cross Communication)
    タスク間でデータをやり取りするための仕組み
  • Connection(接続設定)
    データベースやクラウドサービスとの連携設定を保存する機能
  • Hook(フック)
    APIやデータベースと接続するためのインターフェース。例:PostgresHook(PostgreSQL接続用)など
  • Sensor(センサー)
    特定の条件が満たされるまで待機する特殊なタスク。例:FileSensor(ファイルの存在を確認)など
  • Airflow Web UI
    DAGの実行状況を確認し、手動実行やデバッグを行える管理画面

環境構築はpyenv + poetryでやっていきます

pyenv install 3.11.0
cd airflow_work
pyenv local 3.11.0
poetry env use $(pyenv which python)
poetry install

poetry add apache-airflow

Airflowの初期設定
airflow.cfgを生成しデフォルトのSQLiteデータベースが作成される

export AIRFLOW_HOME=$(pwd)/airflow
poetry run airflow db init

実行すると以下のようにdbの場所が出力されます

DB: sqlite:////Users/test/work/airflow_work/airflow/airflow.db

ログイン用のユーザーを作成
UIでログインする時に使います

poetry run airflow users create \
    --username admin \
    --password admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

成功すると以下のように表示されます

INFO - Added user admin
User "admin" created with role "Admin"

DAGを作成

mkdir -p airflow/dags

sample_dag.pyを作成

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_hello():
    print("こんにちは!, Airflow!")

with DAG(
    "sample_dag",
    start_date=datetime(2024, 1, 1),  # 過去の日付に設定
    schedule=None,  # スケジュールなし(手動トリガー専用)
    catchup=False
) as dag:
    task = PythonOperator(
        task_id="hello_task",
        python_callable=print_hello
    )

Web UIを起動して手動トリガーで確認する場合
いきなり実行してしまうとAirflow側が用意したサンプルのdagが生成されてしまうので無効にします
airflow.cfg ファイルを編集し、load_examples を False に変更します
Airflowの設定ファイルairflow.cfg

[core]
load_examples = False

スケジューラーとUIを同時に実行

poetry run airflow scheduler & poetry run airflow webserver --port 8080

本番環境は非推奨だがstandaloneもある...
デフォルトでSQLiteを使用している場合のみ
本番環境でのSQLiteは推奨されていない

poetry run airflow standalone

UIにアクセスしてみる

どうですか?ちゃんとsample_dagがDAGsに表示されていれば成功です!
右側にある▶️ボタンから実際にトリガーすることができます
お疲れ様でした...次の記事で色々なtaskを作成してあそんでいきます

岩田組

Discussion