💻

Apache Airflow で DAG ファイルを登録する

2021/01/06に公開

この記事について

この記事は、以下の前回記事の続きになります。

今回は、Apache Airflow で DAG ファイル (Python スクリプト) を認識させて、実行させるまでを行います。

DAG とは

DAG とは、Direct acyclic graph の略で、日本語にすると、有向無閉路グラフとなります。
DAG は有向グラフなので、一方向のみ、前方向に情報が流れることができ、無閉路グラフなので、開始ノードに戻る逆の道は存在しません。

DAG には、次のような利点があります。

  • 動的フレームワーク: コードとして構成できる
  • 拡張性: さまざまな種類のタスク実行 (Python/Bash/SQL/Docker/AWS/Azure/GCP etc.) をサポート
  • スケーラビリティ: 無限の数のタスク (ワーカーノード) を実行可能

Airflow と DAG

Airflow のジョブの全タスクは、DAG で定義する必要があります。つまり、処理の実行の順序を DAG 形式で定義しなければならないということです。
DAG に関連するすべての構成は、Python 拡張機能である DAG の定義ファイルで定義します。
DAG の定義ファイルには、障害発生時のメール送信や、ジョブの開始・終了時刻・リトライ回数など、すべての依存関係や設定パラメータが含まれます。また、タスクの依存関係やシーケンスなど、すべてのタスクも定義する必要があります。

演算子

DAG の定義ファイルには複数のタスクを含むことができますが、各タスクは異なる性質を持たせることも可能です。例えば、1 つのタスクは Python スクリプト、もう一つはシェルスクリプトや SQL、Spark ジョブなどを含めることができます。
これらのタスクは、operator (演算子) を使用して、DAG 定義ファイルないで定義することになり、Airflow ではさまざまな種類のタスクにさまざまな演算子を提供しています。
Airflow はあらゆる種類のタスクを実行できる柔軟性を持っているため、他のスケジューラと比較した際、この面で強力な優位性があるということになります。

スケジューラーの起動

タスクを実行するには、Airflow のスケジューラーを起動する必要があります。
前回記事で環境を構築した際は、スケジューラーを実行させるまでは行っていないので、今回ここでスケジューラーを起動します。

docker ps でコンテナー ID を確認し、Airflow のコンテナーの Bash セッションを開始します。
Bash セッションを開始したら、CLI を使用してスケジューラーを起動します。
次のコマンドを実行して、バックグラウンドでスケジューラーを起動させます。

airflow scheduler -D

DAG ファイルの作成

それでは、DAG ファイルを作成していきます。DAG ファイルは Python 拡張機能というとおり、Python スクリプトで作成します。
DAG には、タスクに関連するすべての詳細を含む定義を記述する必要があり、依存関係も定義します。一般的に、DAG を実行するためにスクリプト内で行う必要がある手順は以下の通りです。

  1. 必要なライブラリのインポート
  2. デフォルト引数の定義
  3. DAG の作成
  4. タスクの宣言
  5. 依存関係の記載

まず、任意の場所に DAG ファイルを作成します。今回は、sample_dag.py という Python スクリプトのファイルを作成します。
このファイルに、上記の流れでコードを記述していきます。

必要なライブラリのインポート

Airflow を実行するために必要なすべてのライブラリをインポートします。
一般的には、datetime演算子 (Python/Bash など)、Airflow 自体などです。

sample_dag.py
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

デフォルト引数の定義

Airflow でタスクを実行するために必要となる、DAG のデフォルト引数を定義します。

sample_dag.py(続き)
args = {
    'owner': 'Pramod', # DAG の所有者  
    'start_date': airflow.utils.dates.days_ago(3), # タスクの開始日時
    'depends_on_past': False,
    'email': ['airflow@example.com'], # 障害発生時などにメール送信を行う宛先
    'email_on_failure': False, # タスク失敗時にメールを送信するか否か
    'email_on_retry': False, # タスクのリトライが発生した際にメールを送信するか否か
    'retries': 1, # タスク失敗時のリトライ回数
    'retry_delay': timedelta(minutes=5), # タスクが失敗してからリトライが行われるまでの待ち時間
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2021, 12, 31), # タスクの終了日時
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

DAG の作成

デフォルト引数を定義したら、DAG 自体を作成します。これには、DAG 本体の名前や説明、どの間隔でタスクを実行するかを定義することになります。

sample_dag.py(続き)
dag = DAG(
    'sample_airflow_dag', # DAG の名前
    default_args=args, # DAG のデフォルト引数
    description='簡単な DAG テスト', # DAG の説明
    schedule_interval=timedelta(days=1), # タスクの実行間隔
    # start_date=airflow.utils.dates.days_ago(3), # ここでも指定可能
    tags=['example']
)

タスクの宣言

DAG 自体を作成したら、次に、実際に実行するタスク (ジョブ) を宣言します。
実行するすべてのタスクを宣言して、前の手順で作成した DAG の一部とします。

sample_dag.py(続き)
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

依存関係の記載

最後に、タスクの実行順序 (依存関係) を定義します。タスクは、並行タスク・順次タスクのいずれかで定義することになります。
タスクの定義方法はいくつか存在しますが、ここでは簡単に、t1 タスク完了後に t2 および t3 タスクを実行することにします。

sample_dag.py(続き)
t1 >> [t2, t3]

DAG ファイルの配置

DAG ファイルを作成したので、Airflow 環境に配置します。
Airflow では、$AIRFLOW_HOME/airflow.cfg ファイルにて、DAG のファイル配置先のディレクトリを設定しています。

airflow.cfg
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /opt/airflow/dags

デフォルト値では、$AIRFLOW_HOME/dags が設定されているはずです。このパスは任意で変更可能です。

dags_folder で指定されているディレクトリに、先ほど作成した DAG ファイルである sample_dag.py ファイルを配置します。

ファイルを配置後、次回の Airflow の SchedulerJob が完了すると、Airflow の画面から DAG 情報を確認できるようになります。
大体ですが、デフォルト設定のままなら、ファイルを配置してから長くても 5 分ぐらいすれば、画面に表示されるようになると思います。

これで、指定した時刻、間隔で DAG のジョブが実行されるようになります。

タスクの手動実行

実際にタスクが正常に終了するかどうか、その場で DAG のタスクを手動実行することも可能です。
DAGs 画面に表示されている DAG の右側にある Action より、Trigger DAG ボタンを選択します。

DAG トリガーの実行に伴う Configuration JSON の画面が表示されます。
今回の sample_dag.py の場合は、特に入力は必要ないので、そのまま Trigger を選択します。
これで、DAG が手動で実行できます。

DAGs の画面に戻った際、Runs 欄の表示が runnning に変わっていることを確認してください。
実際に DAG のジョブが実行されていることが確認できます。

まとめ

今回は、Apache Airflow で DAG ジョブを実行するのに必要となるDAG ファイルの作成DAG ファイルの配置スケジューラーの起動について解説しました。
Apache Airflow では、さまざまなタスクを柔軟に実行することが可能となっているので、是非皆さんも試してみてください。

参考情報

参考書籍

Discussion