🐕

Apache Airflow使ってみた

に公開

今回はApache Airflow(以下、Airflow)について調べてみました。調べようと思った理由としては

  • 以前から名前は知っていたが調べる機会がなかった
  • Google Cloudの勉強をしている中でCloud ComposerがAirflowを使っているということでどんなものか気になっていた

からです。今回は本格的な調査というよりは、まずはどんなものかについて調べてみようと思います。

Apache Airflowとは?

一言で言ってしまえばワークフロー管理ツールということみたいです。Pythonを使ってワークフローを構成し、スケジューリングを行ったり動作のモニタリングができるツールとなります。

https://airflow.apache.org/

AirflowはWorkflows as codeとして開発されており、この思想はいくつかの利点があるということです(詳細はこちら)。

  • Dynamic:パイプラインはコードとして実装され、動的にDAGを生成しパラメータ化することができます
  • Extensible:Airflowフレームワークは多数のビルトインオペレータを内包しているため、さまざまなユーザのニーズに拡張することができます
  • Flexible:Jinjaテンプレートエンジンを利用しており、豊富なカスタマイズをすることができます

ここで出てきたDAGという用語は重要な概念になります。DAGとはDirected Acyclic Graphの頭文字をとったものであり、簡単に言えばプロセスには全て順序があり、開始と終了が異なっているため巡回することのないグラフのことです。DAGにはいくつかの属性がありそれらは以下のようです。

  • Schedule:ワークフローがいつ実行されるか
  • Tasks:ワーカーにて実行される個別の動作単位
  • Task Dependencies:どのタスクが実行されるかに関する順序と条件
  • Callbacks:ワークフロー全体が完了した後に取られるアクション
  • Additional Parameters:そのほかのオペレーションに関する詳細

DAGを簡単に構築できるようにし、かつ事前に用意されたいくつものオペレーションを利用することで、Airflowを使えばワークフローを簡単に構成できるということです。

実際に使ってみました

Apache Airflowのインストール

今回はPyPiを使う方法を利用します。具体的な内容は公式ドキュメントを参照してください。

https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html

Airflowをインストールするために、こちらのコマンドを実行します。

pip install "apache-airflow[celery]==3.0.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.9.txt"

こちらの方法でインストールは成功しますが、公式ドキュメントではAirflowのインストールについて細かな説明がされているので、インストール時は確認することをお勧めします。

チュートリアルの実施

今回はこちらに用意されているAirflow 101: Building Your First Workflowを試してみようと思います。

まずはコードの詳細は置いておくとして、チュートリアルの内容を動かしてみようと思います。コードはドキュメントにもある通り以下になります。


import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

それぞれの部分について何をしているか確認してみましょう。

  • 必要なモジュールのインポート
    • 今回のチュートリアルでは時刻を扱うものがあるため、datetimeを利用しています
    • BashOperatorやDAGはAirflowでワークフローを作成するために利用されます。
import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
  • DAGの作成
    • DAGの第一引数にてこのワークフローの名前を定義しています
    • default_argsではワークフローに関するデフォルトの設定を指定します。今回指定しているパラメータの設定内容は以下になります。
      • depends_on_past:Trueに設定すると、直前のタスクが成功しない場合にそのタスクは実行されません
      • retries:何回リトライするか
      • retry_delay:リトライするまでの間隔
    • schedulestart_dateを指定することにより、実行スケジュールを制御することができます。
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
  • タスクの定義
    • 今回はBashコマンドを実行するようなタスクを利用するためBashOperatorを使っています
    • t1タスクは実行時点の日時を表示、t2タスクは実行時に5秒間のsleepを実行します。
t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)
  • Jinjaテンプレートを利用したタスクの実装
    • Jinjaテンプレートを利用することでタスクを定義することができます
    • 今回の例では、ds変数に格納された日付とその7日後の日付を表示します。なお、ds変数はAIrflowで現在の日時を保持するものであり、macros.ds_add関数も組み込みユーティリティです
templated_command = textwrap.dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id="templated",
    depends_on_past=False,
    bash_command=templated_command,
)
  • タスクの実行順序を定義
    • このように書くことで、t1が実行された後にt2とt3がそれぞれ実行されるワークフローが完成します。
t1 >> [t2, t3]

ワークフローを実行してみる

まずは先ほど作ったワークフローをairflowの管理対象に追加しましょう。以下のコマンドを実行することで登録できます。ここでは、ソースコードが~/airflow/dags/tutorial.pyとして保存されている前提となっています。

python ~/airflow/dags/tutorial.py

次に実際にワークフローが登録されたか確認してみましょう。試しに以下のコマンドを実行してみてください。

airflow tasks list tutorial

tutorialという名前でDAGを生成しており、うまく登録できていれば以下のように3つのタスクの名前が表示されるはずです。

print_date
sleep
templated

また、以下のコマンドを実行するとDAGがgraphvizを利用して表示されます。

airflow dags show tutorial

結果は私の環境では以下のようになりました。

[2025-04-26T18:59:25.956+0900] {providers_manager.py:946} INFO - The hook_class 'airflow.providers.standard.hooks.filesystem.FSHook' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work
[2025-04-26T18:59:25.957+0900] {providers_manager.py:946} INFO - The hook_class 'airflow.providers.standard.hooks.package_index.PackageIndexHook' is not fully initialized (UI widgets will be missing), because the 'flask_appbuilder' package is not installed, however it is not required for Airflow components to work
[2025-04-26T18:59:26.079+0900] {workday.py:41} WARNING - Could not import pandas. Holidays will not be considered.
digraph tutorial {
	graph [label=tutorial labelloc=t rankdir=LR]
	print_date [color="#000000" fillcolor="#f0ede4" label=print_date shape=rectangle style="filled,rounded"]
	sleep [color="#000000" fillcolor="#f0ede4" label=sleep shape=rectangle style="filled,rounded"]
	templated [color="#000000" fillcolor="#f0ede4" label=templated shape=rectangle style="filled,rounded"]
	print_date -> sleep
	print_date -> templated
}

この結果を見ると、print_date(t1)の実行の後にsleep(t2)とtemplated(t3)がそれぞれ実行されることがわかります。

それでは最後にタスクを個別に呼び出してみましょう。まずは以下のコマンドを実行してprint_dateを呼び出してみます。

airflow tasks test tutorial print_date 2015-06-01

その結果、私の環境では以下のようになりました。

[2025-04-26T19:01:30.788+0900] {taskinstance.py:1312} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.print_date __airflow_temporary_run_2025-04-26T10:01:30.769487+00:00__ [None]>
[2025-04-26T19:01:30.790+0900] {taskinstance.py:1312} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance:torial.print_date __airflow_temporary_run_2025-04-26T10:01:30.769487+00:00__ [None]>
[2025-04-26T19:01:30.790+0900] {taskinstance.py:1549} INFO - Starting attempt 0 of 2
[2025-04-26T19:01:30.790+0900] {taskinstance.py:1628} WARNING - cannot record queued_duration for task print_date because previous state change time has not been saved
[2025-04-26T19:01:30.791+0900] {taskinstance.py:1572} INFO - Executing <Task(BashOperator): print_date> on 2015-06-01 00:00:00+00:00
[2025-04-26T19:01:30.818+0900] {taskinstance.py:2045} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='print_date' AIRFLOW_CTX_LOGICAL_DATE='2015-06-01T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-04-26T10:01:30.769487+00:00__'
Task instance is in running state
 Previous state of the Task instance: queued
Current task name:print_date
Dag name:tutorial
[2025-04-26T19:01:30.821+0900] {taskinstance.py:2159} INFO - ::endgroup::
[2025-04-26T19:01:30.821+0900] {subprocess.py:78} INFO - Tmp dir root location: /var/folders/np/flfw4vdj7x32nw9dw1_2t6x40000gn/T
[2025-04-26T19:01:30.821+0900] {subprocess.py:88} INFO - Running command: ['/bin/bash', '-c', 'date']
[2025-04-26T19:01:30.826+0900] {subprocess.py:99} INFO - Output:
[2025-04-26T19:01:30.829+0900] {subprocess.py:106} INFO - 20254月26日 土曜日 19時01分30秒 JST
[2025-04-26T19:01:30.829+0900] {subprocess.py:110} INFO - Command exited with return code 0
[2025-04-26T19:01:30.835+0900] {taskinstance.py:1796} INFO - ::group::Post task execution logs
[2025-04-26T19:01:30.835+0900] {taskinstance.py:1808} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=print_date, run_id=__airflow_temporary_run_2025-04-26T10:01:30.769487+00:00__, logical_date=20150601T000000, start_date=, end_date=20250426T100130

[2025-04-26T19:01:30.829+0900] {subprocess.py:106} INFO - 2025年 4月26日 土曜日 19時01分30秒 JST

のように日時が表示されることがわかりました。

次は以下のコマンドでsleepさせてみましょう

airflow tasks test tutorial sleep 2015-06-01

実行結果としてはログが表示された後、5秒間sleepが実行されることが確認できると思います。

最後にtemplatedを実行してみましょう。

airflow tasks test tutorial templated 2015-06-01

私の環境では以下のような結果となりました。

[2025-04-26T19:02:59.851+0900] {taskinstance.py:1312} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.templated __airflow_temporary_run_2025-04-26T10:02:59.831192+00:00__ [None]>
[2025-04-26T19:02:59.853+0900] {taskinstance.py:1312} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: tutorial.templated __airflow_temporary_run_2025-04-26T10:02:59.831192+00:00__ [None]>
[2025-04-26T19:02:59.853+0900] {taskinstance.py:1549} INFO - Starting attempt 0 of 2
[2025-04-26T19:02:59.853+0900] {taskinstance.py:1628} WARNING - cannot record queued_duration for task templated because previous state change time has not been saved
[2025-04-26T19:02:59.854+0900] {taskinstance.py:1572} INFO - Executing <Task(BashOperator): templated> on 2015-06-01 00:00:00+00:00
[2025-04-26T19:02:59.881+0900] {taskinstance.py:2045} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='templated' AIRFLOW_CTX_LOGICAL_DATE='2015-06-01T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-04-26T10:02:59.831192+00:00__'
Task instance is in running state
 Previous state of the Task instance: queued
Current task name:templated
Dag name:tutorial
[2025-04-26T19:02:59.883+0900] {taskinstance.py:2159} INFO - ::endgroup::
[2025-04-26T19:02:59.884+0900] {subprocess.py:78} INFO - Tmp dir root location: /var/folders/np/flfw4vdj7x32nw9dw1_2t6x40000gn/T
[2025-04-26T19:02:59.884+0900] {subprocess.py:88} INFO - Running command: ['/bin/bash', '-c', '\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n']
[2025-04-26T19:02:59.889+0900] {subprocess.py:99} INFO - Output:
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:110} INFO - Command exited with return code 0
[2025-04-26T19:02:59.898+0900] {taskinstance.py:1796} INFO - ::group::Post task execution logs
[2025-04-26T19:02:59.898+0900] {taskinstance.py:1808} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=templated, run_id=__airflow_temporary_run_2025-04-26T10:02:59.831192+00:00__, logical_date=20150601T000000, start_date=, end_date=20250426T100259

[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-01
[2025-04-26T19:02:59.891+0900] {subprocess.py:106} INFO - 2015-06-08

のように指定日時の後にその7日後の日付が表示されることがわかりました。

まとめ

今回はApache Airflowについて簡単なまとめと、チュートリアルを実施してみました。まだまだ使い方がわからないことだらけですが、今後もっと調べて詳細な使い方や実際に使うにあたり便利な方法などあればシェアしていこうとおもいます。リクエストなどあればコメントお願いします。

Discussion