📚
Airflowで遊ぶ:Task作成編
こんにちはkirigayaです
今回はAirflowで遊ぶ:環境構築編の続きです
さっそく色々作って遊んでいきましょう
執事風タスクを実装していきます
処理の流れはこんな感じです
ポイントとしてはbranch_taskで使用しているBranchPythonOperator、
hawaii_taskで使用しているAirflowException、
trigger_vacation_hawaiiで使用しているTriggerDagRunOperatorです。
解説は後ほどします
コード全体
3ファイルともdags/に置きます
withDAG
で定義した部分はUI上で表示され、DAGの中でOperatorを使って関数を実行します。最後の行でタスクの順序を定義します。
- butler.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
from butler_task import greet, next_talk, hawaii, tea_party, end_day
with DAG(
dag_id='one_day',
schedule_interval=None,
start_date=datetime(2025, 2, 1),
catchup=False
) as dag:
greet_task = PythonOperator(
task_id='greet_task',
python_callable=greet
)
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=next_talk
)
hawaii_task = PythonOperator(
task_id='hawaii_task',
python_callable=hawaii
)
tea_party_task = PythonOperator(
task_id='tea_party_task',
python_callable=tea_party
)
end_task = PythonOperator(
task_id='end_task',
python_callable=end_day
)
trigger_vacation_hawaii = TriggerDagRunOperator(
task_id='trigger_vacation_hawaii',
trigger_dag_id='vacation_hawaii'
)
greet_task >> branch_task
branch_task >> [hawaii_task, tea_party_task, end_task]
hawaii_task >> trigger_vacation_hawaii
- butler_task_hawaii.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from butler_task import private_beach
with DAG(
dag_id='vacation_hawaii',
schedule_interval=None,
start_date=datetime(2025, 2, 1),
catchup=False
) as dag:
vacation_hawaii = PythonOperator(
task_id='private_beach_task',
python_callable=private_beach
)
- butler_task.py
import random
import pendulum
from airflow.exceptions import AirflowException
def greet():
# now = pendulum.now("Asia/Tokyo")
now = pendulum.now("Europe/Paris")
if now.hour < 12:
greeting = "おはようございます。ご主人様"
elif now.hour < 18:
greeting = "こんにちは。ご主人様"
else:
greeting = "こんばんは。ご主人様"
print(greeting)
return greeting
def next_talk(**kwargs):
ti = kwargs['ti']
greeting = ti.xcom_pull(task_ids='greet_task')
if greeting == "おはようございます。ご主人様":
return "hawaii_task"
elif greeting == "こんにちは。ご主人様":
return "tea_party_task"
else:
return "end_task"
def hawaii():
print("現在、ハワイへ移動中です。")
airframe_trouble = random.randint(1, 4)
if airframe_trouble > 2:
raise AirflowException("機体トラブルが発生したため引き返します")
else:
print("無事、ハワイに到着しました")
def tea_party():
print("次の予定は〇〇ホテルでのお茶会となっております")
def end_day():
print("本日もご苦労様でした。ゆっくりお休みください")
def private_beach():
weather_chance = random.randint(1, 10)
if weather_chance > 5:
print("晴天でよかったです。プライベートビーチをお楽しみください")
else:
print("雨天のため、室内プールをお楽しみください")
1つ1つのタスク解説
-
greet_task
greet関数を呼び出し現在時刻によって挨拶文を出力
def greet():
now = pendulum.now("Asia/Tokyo")
# now = pendulum.now("Europe/Paris")
if now.hour < 12:
greeting = "おはようございます。ご主人様"
elif now.hour < 18:
greeting = "こんにちは。ご主人様"
else:
greeting = "こんばんは。ご主人様"
print(greeting)
return greeting
-
branch_task
next_talk関数を呼び出しgreet_task
の出力によってBranchPythonOperator
で処理を分岐させる
def next_talk(**kwargs):
ti = kwargs['ti']
greeting = ti.xcom_pull(task_ids='greet_task')
if greeting == "おはようございます。ご主人様":
return "hawaii_task"
elif greeting == "こんにちは。ご主人様":
return "tea_party_task"
else:
return "end_task"
-
tea_party_task
tea_party関数を呼び出しお昼の予定を伝えます
条件はgreet_task
を実行した時刻に依存しています
def tea_party():
print("次の予定は〇〇ホテルでのお茶会となっております")
-
end_task
end_day関数を呼び出し夜の予定を伝えます
条件はgreet_task
を実行した時刻に依存しています
def end_day():
print("本日もご苦労様でした。ゆっくりお休みください")
-
hawaii_task
hawaii関数を呼び出しランダムに数値を決め条件によってハワイへ到着するかが決まります
条件によってraise AirflowException
でタスクの実行を中断します
次のタスクであるtrigger_vacation_hawaii
まで到達することはありません
def hawaii():
print("現在、ハワイへ移動中です...")
airframe_trouble = random.randint(1, 4)
if airframe_trouble > 2:
raise AirflowException("機体トラブルが発生したため引き返します")
else:
print("無事、ハワイに到着しました")
-
trigger_vacation_hawaii
ここでは次のDAGを指定します
指定したDAGは手動でトリガーしなくても実行されます
trigger_vacation_hawaii = TriggerDagRunOperator(
task_id='trigger_vacation_hawaii',
trigger_dag_id='vacation_hawaii'
)
-
private_beach_task
条件によってプライベートビーチで遊べるかどうかを出力します
def private_beach():
weather_chance = random.randint(1, 10)
if weather_chance > 5:
print("晴天でよかったです。プライベートビーチをお楽しみください")
else:
print("雨天のため、室内プールをお楽しみください")
実行してみると...
※vacation_hawaii
DAGは有効にするがトリガーはしない
あら...トラブルでハワイには行けませんでしたwぜひ色々カスタマイズしてみてください!
最後に
たくさん遊んできましたが利用用途としてはクラウドサービスと連携して使っていく場面が多いと思います。今度はクラウドでも遊んでみようかな...
Discussion