📚

Airflowで遊ぶ:Task作成編

2025/02/02に公開

こんにちはkirigayaです
今回はAirflowで遊ぶ:環境構築編の続きです
さっそく色々作って遊んでいきましょう
執事風タスクを実装していきます

処理の流れはこんな感じです


ポイントとしてはbranch_taskで使用しているBranchPythonOperator
hawaii_taskで使用しているAirflowException
trigger_vacation_hawaiiで使用しているTriggerDagRunOperatorです。
解説は後ほどします

コード全体

3ファイルともdags/に置きます
withDAGで定義した部分はUI上で表示され、DAGの中でOperatorを使って関数を実行します。最後の行でタスクの順序を定義します。

  • butler.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime

from butler_task import greet, next_talk, hawaii, tea_party, end_day

with DAG(
    dag_id='one_day',
    schedule_interval=None,
    start_date=datetime(2025, 2, 1),
    catchup=False
) as dag:
    
    greet_task = PythonOperator(
        task_id='greet_task',
        python_callable=greet
    )
    
    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=next_talk
    )
    
    hawaii_task = PythonOperator(
        task_id='hawaii_task',
        python_callable=hawaii
    )
    
    tea_party_task = PythonOperator(
        task_id='tea_party_task',
        python_callable=tea_party
    )
    
    end_task = PythonOperator(
        task_id='end_task',
        python_callable=end_day
    )
    
    trigger_vacation_hawaii = TriggerDagRunOperator(
        task_id='trigger_vacation_hawaii',
        trigger_dag_id='vacation_hawaii'
    )
    
    greet_task >> branch_task
    branch_task >> [hawaii_task, tea_party_task, end_task]
    hawaii_task >> trigger_vacation_hawaii
  • butler_task_hawaii.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

from butler_task import private_beach

with DAG(
    dag_id='vacation_hawaii',
    schedule_interval=None,
    start_date=datetime(2025, 2, 1),
    catchup=False
) as dag:
    vacation_hawaii = PythonOperator(
        task_id='private_beach_task',
        python_callable=private_beach
    )
  • butler_task.py
import random
import pendulum
from airflow.exceptions import AirflowException

def greet():
    # now = pendulum.now("Asia/Tokyo")
    now = pendulum.now("Europe/Paris")
    if now.hour < 12:
        greeting = "おはようございます。ご主人様"
    elif now.hour < 18:
        greeting = "こんにちは。ご主人様"
    else:
        greeting = "こんばんは。ご主人様"
    print(greeting)
    return greeting

def next_talk(**kwargs):
    ti = kwargs['ti']
    greeting = ti.xcom_pull(task_ids='greet_task')
    if greeting == "おはようございます。ご主人様":
        return "hawaii_task"
    elif greeting == "こんにちは。ご主人様":
        return "tea_party_task"
    else:
        return "end_task"

def hawaii():
    print("現在、ハワイへ移動中です。")
    airframe_trouble = random.randint(1, 4)
    if airframe_trouble > 2:
        raise AirflowException("機体トラブルが発生したため引き返します")
    else:
        print("無事、ハワイに到着しました")


def tea_party():
    print("次の予定は〇〇ホテルでのお茶会となっております")

def end_day():
    print("本日もご苦労様でした。ゆっくりお休みください")

def private_beach():
    weather_chance = random.randint(1, 10)
    if weather_chance > 5:
        print("晴天でよかったです。プライベートビーチをお楽しみください")
    else:
        print("雨天のため、室内プールをお楽しみください")

1つ1つのタスク解説

  • greet_task
    greet関数を呼び出し現在時刻によって挨拶文を出力
def greet():
    now = pendulum.now("Asia/Tokyo")
    # now = pendulum.now("Europe/Paris")
    if now.hour < 12:
        greeting = "おはようございます。ご主人様"
    elif now.hour < 18:
        greeting = "こんにちは。ご主人様"
    else:
        greeting = "こんばんは。ご主人様"
    print(greeting)
    return greeting
  • branch_task
    next_talk関数を呼び出しgreet_taskの出力によってBranchPythonOperatorで処理を分岐させる
def next_talk(**kwargs):
    ti = kwargs['ti']
    greeting = ti.xcom_pull(task_ids='greet_task')
    if greeting == "おはようございます。ご主人様":
        return "hawaii_task"
    elif greeting == "こんにちは。ご主人様":
        return "tea_party_task"
    else:
        return "end_task"
  • tea_party_task
    tea_party関数を呼び出しお昼の予定を伝えます
    条件はgreet_taskを実行した時刻に依存しています
def tea_party():
    print("次の予定は〇〇ホテルでのお茶会となっております")
  • end_task
    end_day関数を呼び出し夜の予定を伝えます
    条件はgreet_taskを実行した時刻に依存しています
def end_day():
    print("本日もご苦労様でした。ゆっくりお休みください")
  • hawaii_task
    hawaii関数を呼び出しランダムに数値を決め条件によってハワイへ到着するかが決まります
    条件によってraise AirflowExceptionでタスクの実行を中断します
    次のタスクであるtrigger_vacation_hawaiiまで到達することはありません
def hawaii():
    print("現在、ハワイへ移動中です...")
    airframe_trouble = random.randint(1, 4)
    if airframe_trouble > 2:
        raise AirflowException("機体トラブルが発生したため引き返します")
    else:
        print("無事、ハワイに到着しました")
  • trigger_vacation_hawaii
    ここでは次のDAGを指定します
    指定したDAGは手動でトリガーしなくても実行されます
trigger_vacation_hawaii = TriggerDagRunOperator(
    task_id='trigger_vacation_hawaii',
    trigger_dag_id='vacation_hawaii'
)
  • private_beach_task
    条件によってプライベートビーチで遊べるかどうかを出力します
def private_beach():
    weather_chance = random.randint(1, 10)
    if weather_chance > 5:
        print("晴天でよかったです。プライベートビーチをお楽しみください")
    else:
        print("雨天のため、室内プールをお楽しみください")

実行してみると...

vacation_hawaiiDAGは有効にするがトリガーはしない
あら...トラブルでハワイには行けませんでしたwぜひ色々カスタマイズしてみてください!

最後に

たくさん遊んできましたが利用用途としてはクラウドサービスと連携して使っていく場面が多いと思います。今度はクラウドでも遊んでみようかな...

岩田組

Discussion