📚
Airflowで遊ぶ:環境構築編
こんにちは!kirigayaです
今回はAirflowで遊んでいきます!
さっそく環境作っていきます
コンテナ環境をダウンロードした方が早いかもしれない...
Airflowとは?
データ処理(コードやコマンド)を自動化、管理できるツールです。
今回は順番にpythonコードを実行していくシステムを作って遊びます!
用語集:
- DAG(Directed Acyclic Graph)
ワークフロー全体を表す単位で、複数のタスク(Task)が有向非巡回グラフとして定義される - Task(タスク)
DAG内で実行される個々の処理単位。データ処理やスクリプト実行などを担当する - Operator(オペレーター)
タスクの実行内容を定義するクラス。例:PythonOperator(Pythonスクリプト実行)、BashOperator(シェルコマンド実行)など - TaskInstance(タスクインスタンス)
特定のDAG実行時における、各タスクの具体的な実行インスタンス。状態(成功、失敗、実行中など)を持つ - Scheduler(スケジューラー)
DAGのスケジュール管理を行い、実行すべきタスクを決定するコンポーネント - Executor(エグゼキューター)
タスクを実行する仕組み。LocalExecutor(ローカル実行)、CeleryExecutor(分散実行)などがある - Trigger Rule(トリガールール)
タスクの実行条件を決定するルール。例:all_success(前のタスクがすべて成功したら実行)など。 - XCom(Cross Communication)
タスク間でデータをやり取りするための仕組み - Connection(接続設定)
データベースやクラウドサービスとの連携設定を保存する機能 - Hook(フック)
APIやデータベースと接続するためのインターフェース。例:PostgresHook(PostgreSQL接続用)など - Sensor(センサー)
特定の条件が満たされるまで待機する特殊なタスク。例:FileSensor(ファイルの存在を確認)など - Airflow Web UI
DAGの実行状況を確認し、手動実行やデバッグを行える管理画面
環境構築はpyenv + poetryでやっていきます
pyenv install 3.11.0
cd airflow_work
pyenv local 3.11.0
poetry env use $(pyenv which python)
poetry install
poetry add apache-airflow
Airflowの初期設定
airflow.cfgを生成しデフォルトのSQLiteデータベースが作成される
export AIRFLOW_HOME=$(pwd)/airflow
poetry run airflow db init
実行すると以下のようにdbの場所が出力されます
DB: sqlite:////Users/test/work/airflow_work/airflow/airflow.db
ログイン用のユーザーを作成
UIでログインする時に使います
poetry run airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
成功すると以下のように表示されます
INFO - Added user admin
User "admin" created with role "Admin"
DAGを作成
mkdir -p airflow/dags
sample_dag.pyを作成
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_hello():
print("こんにちは!, Airflow!")
with DAG(
"sample_dag",
start_date=datetime(2024, 1, 1), # 過去の日付に設定
schedule=None, # スケジュールなし(手動トリガー専用)
catchup=False
) as dag:
task = PythonOperator(
task_id="hello_task",
python_callable=print_hello
)
Web UIを起動して手動トリガーで確認する場合
いきなり実行してしまうとAirflow側が用意したサンプルのdagが生成されてしまうので無効にします
airflow.cfg ファイルを編集し、load_examples を False に変更します
Airflowの設定ファイルairflow.cfg
[core]
load_examples = False
スケジューラーとUIを同時に実行
poetry run airflow scheduler & poetry run airflow webserver --port 8080
本番環境は非推奨だがstandaloneもある...
デフォルトでSQLiteを使用している場合のみ
本番環境でのSQLiteは推奨されていない
poetry run airflow standalone
UIにアクセスしてみる
どうですか?ちゃんとsample_dagがDAGsに表示されていれば成功です!
右側にある▶️ボタンから実際にトリガーすることができます
お疲れ様でした...次の記事で色々なtaskを作成してあそんでいきます
Discussion