🎻

Azure Function の永続的オーケストレーションをやってみた

2022/01/18に公開

以前、Azure Function のタイマートリガーについてまとめました。
https://zenn.dev/megane_otoko/articles/069_azure_function

軽い処理であればタイマートリガーで問題ないのですが、重い処理だと前の処理が終わる前に次の処理が始まってしまうことがあります。

これに対応したものが 永続的オーケストレーション です。

"永続的オーケストレーション" は、終了しないオーケストレーター関数です。
1 時間ごとに関数を実行する CRON スケジュールでは、1:00、2:00、3:00 といったタイミングで関数が実行され、重複の問題が発生する可能性があります。
この例では、クリーンアップ所要時間が 30 分の場合は、1:00、2:30、4:00 にスケジュールされるため、重複することはありません。
https://docs.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-eternal-orchestrations

オーケストレーションの詳細については以下の記事をご参照ください。
https://tech-lab.sios.jp/archives/12991

オーケストレーションのコードの仕組みについては以下の記事をご参照ください。
https://qiita.com/KcMichael/items/176a94e4e13ecd33f7fe

今回使ったコードはこちら

なお、下記の環境で動作を確認しています。

Python 3.9.5
Windows 10
VSCode 1.63.2

ローカル実行時は Azurite を使っています、そのあたりや仮想環境等の環境構築については手前味噌ですが以前 Azure Function についてまとめたブログをご参照ください。
https://zenn.dev/megane_otoko/articles/069_azure_function

ファイル構成

|--durable-orchestrator # Durable Functions orchestrator 関数  
|     |--- __init__.py # ループ処理を実装するファイル
|     |--- function.json # 特にいじらない
|
|--durable-activity # Durable Functions activity 関数  
|     |--- __init__.py # 自作関数を入れ込むファイル
|     |--- function.json # 特にいじらない
|
|--durable-client # Durable Functions HTTP starter 関数  
|     |--- __init__.py # 特にいじらない
|     |--- function.json # 特にいじらない
|
|--display_time
|     |--- display_time.py # 自作のライブラリ
|     |--- __init__.py # 特にいじらない
|
|(以降は自動作成されるファイルのため省略)  

(参考) Azure Funtion で作成できる関数

Azure Function 関数のうち、Durable Functions activity、Durable Functions HTTP starter、Durable Functions orchestrator の3つを使用しています。

Azure Blob Storage trigger  
Azure Cosmos DB trigger  
Durable Functions activity      # これを使用
Durable Functions entity  
Durable Functions HTTP starter  # これも使用
Durable Functions orchestrator  # これも使用  
Azure Event Grid trigger  
Azure Event Hub trigger  
HTTP trigger  
Kafka output  
Kafka trigger  
Azure Queue Storage trigger  
RabbitMQ trigger  
Azure Service Bus Queue trigger  
Azure Service Bus Topic trigger  
Timer trigger  

Durable Functions orchestrator 関数

init_.py を以下のように変更します。

next_cleanup で指定している timedelta() でループ処理の間隔を、
deadline で指定している timedelta() でタイムアウトする時間を調整できます。

なお、timedelta() の引数については以下記事を参照ください。
https://note.nkmk.me/python-datetime-timedelta-conversion/

durable-orchestrator/__init__.py
import azure.functions as func
import azure.durable_functions as df
from datetime import datetime, timedelta

def orchestrator_function(context: df.DurableOrchestrationContext):
  
    activity_task = context.call_activity("durable-activity") # Durable Functions activity 関数のフォルダ名と揃えた方がよさそう。  

    # ループ処理
    yield activity_task
    td = timedelta(minutes=1)
    next_cleanup = context.current_utc_datetime + timedelta(minutes=1) # ループ感覚
    yield context.create_timer(next_cleanup)
    context.continue_as_new(None)
    
    # タイムアウト処理
    deadline = context.current_utc_datetime + timedelta(minutes=10) # タイムアウトする時間
    timeout_task = context.create_timer(deadline)
    winner = yield context.task_any([activity_task, timeout_task])
    if winner == activity_task:
        timeout_task.cancel()
        return True
    elif winner == timeout_task:
        return False

main = df.Orchestrator.create(orchestrator_function)

https://docs.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-eternal-orchestrations?tabs=python

Durable Functions activity 関数

init_.py を以下のように変更します。

durable-activity/__init__.py
import logging
import datetime
import time
from display_time import display_time # 追加した自作ライブラリ

def main(name: str) -> str:

    dtime = display_time.display_time_now() # 自作関数
    now_time = dtime.time_now()
    logging.warning(f"Activity {now_time}")
    print("テスト中")
    time.sleep(5)
    return f'{name}!'

https://docs.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-eternal-orchestrations?tabs=python

Durable Functions HTTP starter 関数

init.py を以下のように変更します。

durable-client/__init__.py
import logging

from azure.durable_functions import DurableOrchestrationClient
import azure.functions as func

async def main(req: func.HttpRequest, starter: str, message):

    logging.info(starter)
    client = DurableOrchestrationClient(starter)

    # Orchestratorの開始
    instance_id = await client.start_new('durable-orchestrator') # Durable Functions orchestrator 関数のフォルダ名に合わせる
    response = client.create_check_status_response(req, instance_id)
    message.set(response)

https://qiita.com/KcMichael/items/176a94e4e13ecd33f7fe

ローカル実行するとこのようにログが出力されます。

[2022-01-18T11:09:15.747Z] Executing 'Functions.durable-activity' (Reason='(null)', Id=d808009c-9f05-4e5c-9f49-db8e9f89df1d)[2022-01-18T11:09:15.754Z] Activity 2022-01-18 20:09:15.753335

1分間隔で実行するはずが一度にたくさんログが出てくるのが気がかりですが・・・。

以上になります、最後までお読みいただきありがとうございました。

Discussion