Azure Function の永続的オーケストレーションをやってみた
以前、Azure Function のタイマートリガーについてまとめました。
軽い処理であればタイマートリガーで問題ないのですが、重い処理だと前の処理が終わる前に次の処理が始まってしまうことがあります。
これに対応したものが 永続的オーケストレーション です。
"永続的オーケストレーション" は、終了しないオーケストレーター関数です。
1 時間ごとに関数を実行する CRON スケジュールでは、1:00、2:00、3:00 といったタイミングで関数が実行され、重複の問題が発生する可能性があります。
この例では、クリーンアップ所要時間が 30 分の場合は、1:00、2:30、4:00 にスケジュールされるため、重複することはありません。
https://docs.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-eternal-orchestrations
オーケストレーションの詳細については以下の記事をご参照ください。
オーケストレーションのコードの仕組みについては以下の記事をご参照ください。
今回使ったコードはこちら
なお、下記の環境で動作を確認しています。
Python 3.9.5
Windows 10
VSCode 1.63.2
ローカル実行時は Azurite を使っています、そのあたりや仮想環境等の環境構築については手前味噌ですが以前 Azure Function についてまとめたブログをご参照ください。
ファイル構成
|--durable-orchestrator # Durable Functions orchestrator 関数
| |--- __init__.py # ループ処理を実装するファイル
| |--- function.json # 特にいじらない
|
|--durable-activity # Durable Functions activity 関数
| |--- __init__.py # 自作関数を入れ込むファイル
| |--- function.json # 特にいじらない
|
|--durable-client # Durable Functions HTTP starter 関数
| |--- __init__.py # 特にいじらない
| |--- function.json # 特にいじらない
|
|--display_time
| |--- display_time.py # 自作のライブラリ
| |--- __init__.py # 特にいじらない
|
|(以降は自動作成されるファイルのため省略)
(参考) Azure Funtion で作成できる関数
Azure Function 関数のうち、Durable Functions activity、Durable Functions HTTP starter、Durable Functions orchestrator の3つを使用しています。
Azure Blob Storage trigger
Azure Cosmos DB trigger
Durable Functions activity # これを使用
Durable Functions entity
Durable Functions HTTP starter # これも使用
Durable Functions orchestrator # これも使用
Azure Event Grid trigger
Azure Event Hub trigger
HTTP trigger
Kafka output
Kafka trigger
Azure Queue Storage trigger
RabbitMQ trigger
Azure Service Bus Queue trigger
Azure Service Bus Topic trigger
Timer trigger
Durable Functions orchestrator 関数
init_.py を以下のように変更します。
next_cleanup で指定している timedelta() でループ処理の間隔を、
deadline で指定している timedelta() でタイムアウトする時間を調整できます。
なお、timedelta() の引数については以下記事を参照ください。
import azure.functions as func
import azure.durable_functions as df
from datetime import datetime, timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
activity_task = context.call_activity("durable-activity") # Durable Functions activity 関数のフォルダ名と揃えた方がよさそう。
# ループ処理
yield activity_task
td = timedelta(minutes=1)
next_cleanup = context.current_utc_datetime + timedelta(minutes=1) # ループ感覚
yield context.create_timer(next_cleanup)
context.continue_as_new(None)
# タイムアウト処理
deadline = context.current_utc_datetime + timedelta(minutes=10) # タイムアウトする時間
timeout_task = context.create_timer(deadline)
winner = yield context.task_any([activity_task, timeout_task])
if winner == activity_task:
timeout_task.cancel()
return True
elif winner == timeout_task:
return False
main = df.Orchestrator.create(orchestrator_function)
Durable Functions activity 関数
init_.py を以下のように変更します。
import logging
import datetime
import time
from display_time import display_time # 追加した自作ライブラリ
def main(name: str) -> str:
dtime = display_time.display_time_now() # 自作関数
now_time = dtime.time_now()
logging.warning(f"Activity {now_time}")
print("テスト中")
time.sleep(5)
return f'{name}!'
Durable Functions HTTP starter 関数
init.py を以下のように変更します。
import logging
from azure.durable_functions import DurableOrchestrationClient
import azure.functions as func
async def main(req: func.HttpRequest, starter: str, message):
logging.info(starter)
client = DurableOrchestrationClient(starter)
# Orchestratorの開始
instance_id = await client.start_new('durable-orchestrator') # Durable Functions orchestrator 関数のフォルダ名に合わせる
response = client.create_check_status_response(req, instance_id)
message.set(response)
ローカル実行するとこのようにログが出力されます。
[2022-01-18T11:09:15.747Z] Executing 'Functions.durable-activity' (Reason='(null)', Id=d808009c-9f05-4e5c-9f49-db8e9f89df1d)[2022-01-18T11:09:15.754Z] Activity 2022-01-18 20:09:15.753335
1分間隔で実行するはずが一度にたくさんログが出てくるのが気がかりですが・・・。
以上になります、最後までお読みいただきありがとうございました。
Discussion