【Durable Functions】Python で動かしてみて概要を理解する

に公開

はじめに

以前から気になっていた Durable Functions を概要から理解したいです。
C# で開発することが定番だと思いますが、筆者は Python をよく使うので、Python 版で理解していきます。

ターゲット

  • Durable Functions で何ができるかあまりよくわからない方
  • Durable Functions を Python で使ってみたい方

Durable Functions 概要

Azure Functions の拡張機能であり、長時間実行するような、ワークフロー実装に最適な機能です。
https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-overview?tabs=in-process%2Cnodejs-v3%2Cv1-model&pivots=python

  • 実行状態や処理の途中経過をステートフルで管理できる
  • 長時間タスクを実行することに長けている
    • Durable Functions は実行状態を Azure Storage 等に永続化することで、安定して処理を継続できます。
    • 通常、Azure Functions などのサーバレスアーキテクチャはタイムアウトが設けられていることが多く長時間タスクには不向きです。(Azure Functions の HTTP リクエストでは 230秒でタイムアウトする)
    • ユーザーの入力待ちやタイマーによる遅延等も組み合わせることができます。
  • 自動リトライ
    • エラーが発生した場合の再試行を設定することができます。
    • 複雑なコードを書く必要なく、簡単に実装できる
    • 各 Activity (処理単位)ごとに、リトライ設定ができる
    実装イメージ
    orchestrator.py
    import datetime
    import azure.durable_functions as df
    
    def orchestrator(context: df.DurableOrchestrationContext):
        # ① リトライポリシーを定義
        retry_options = df.RetryOptions(
            first_retry_interval=datetime.timedelta(seconds=10),  # 初回リトライ待ち
            max_number_of_attempts=5                              # 最大試行回数
        )
        retry_options.backoff_coefficient = 2.0                      # 10, 20, 40, … の指数バックオフ
        retry_options.max_retry_interval = datetime.timedelta(minutes=5)
    
        # ② with_retry で Activity 呼び出し
        result = yield context.call_activity_with_retry(
            "DoLongTask",
            retry_opts,
            {"task_id": 42} # 適当
        )
    
        return result
    

構成要素

基本的に、Durable Fucntions は、実行状態を制御する Orchestrator 関数と、Orchestrator から呼び出す Activity 関数で構成されます。

  • Trigger 関数
    • ワークフロー実行トリガー(Azure Functions 同様)
  • Client 関数(Starter)
    • Trigger から呼び出されるエントリポイントで、Orchestrator の起動を行う
  • Orchestrator 関数
    • 処理全体を制御する、実行順序や条件分岐を記述
    • 状態を管理し、各 Activity 関数を呼び出す
  • Activity 関数
    • 処理ロジックを担当。
  • Sub-Orchestrator 関数
    • Orchestrator 関数から再帰的に呼び出せる Orchestrator 関数
  • Entity 関数
    • キーごとに個別の状態を管理する

実装パターン

Durable Functions のアプリケーションパターンです。Fan-out/Fan-in をよく耳にします。

1. Chaining

  • 複数の関数を順番に実行するパターン。
  • 関数間でデータを引き継ぎつつ、順番に処理を進める。
  • 例)直列のパイプライン処理(Agent1 → Agent2 → Agent3 ...)

2. Fan-out/Fan-in

  • 多数の関数を並列に実行し、その結果を集約するパターン。
  • 大量のデータ処理やバッチ処理に適している。
  • 例)複数 Agent を並列に呼び出してすべての実行が完了するまで待つ

3. Async HTTP APIs

  • 長時間かかる処理の進捗を非同期に管理できるHTTPエンドポイントを提供するパターン。
  • 呼び出した側は、進捗状況や処理結果をポーリングする形で取得できる。

4. Monitor

  • 状態を監視し、特定の条件が満たされるまで繰り返し実行されるパターン。
  • 状態変化を継続的に監視し、条件に合致した時点で次の処理に移る。
  • 例)Sharepoint のドキュメントに更新があれば5分置きにチャンク化

5. Human interaction

  • 人間による承認や入力を待つための仕組み。
  • 外部イベントを待つ仕組みを用いて、人間の操作が必要な処理を実装できる。
  • 例)ヒトによる承認を待つ

6. Aggregator

  • 複数の独立したサービスや関数の結果を一箇所に集めて統合するパターン。
  • 実行結果を一つの箱に集約できる(Entity にキーと値として保存できる)

コスト

実行時間と実行回数に基づいて課金されます。
Premium プランの場合は、常時インスタンスが起動することになるため費用が発生します(約20,000円ぐらい)
Flex Consumption プランは比較的新しい料金形態ですが、VNET対応/常時起動(コールドスタート回避)/最大1000インスタンスまでスケールできる など、他プランよりも優れている点が多いため、基本的にはこのプラン選択しておけばいいかな、という感じです。無料利用枠もありますし。

また、一般提供(GA)されており、東日本リージョンでも利用可能です。

課金プラン 特徴・用途のポイント 注意点
従量課金プラン (Consumption) - 実行時間(GB秒)と実行回数に基づいて課金
- 利用が少ないと安価(無料枠あり)
- VNET非対応
- コールドスタート発生(初回起動時遅延)
Flex従量課金プラン(Flex Consumption) - 実行回数と実行時間(GB秒)に応じた柔軟な従量課金
- VNET対応でセキュアな環境でも利用可能
- 一般の従量課金プランより若干高め
Premiumプラン - 常時1台以上のインスタンスが起動(即応性が高い)
- コールドスタート無し
- VNET対応で高性能・高可用性
- 最低1台のインスタンスが常時稼働するため高コスト

Orchestrator の挙動について

実行結果をトレースしてみると、Orchestrator に書いたログが何度も出力されてしまい「?」となると思います。
Orchestrator は、Activity の呼び出しや外部イベント/タイマーの待機操作があると、処理を何度も最初から実行(=リプレイ)するという動きをします。

MS Learn から抜粋してますが、以下のイメージです

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    print("Calling F1.")
    yield context.call_activity("F1")
    print("Calling F2.")
    yield context.call_activity("F2")
    print("Calling F3.")
    yield context.call_activity("F3")
    return None

main = df.Orchestrator.create(orchestrator_function)
実行結果
Calling F1.
-- call_activity("F1") を実行したので、最初に戻る --
Calling F1.
Calling F2.
-- call_activity("F2") を実行したので、最初に戻る --
Calling F1.
Calling F2.
Calling F3.
-- call_activity("F3") を実行したので、最初に戻る --
Calling F1.
Calling F2.
Calling F3.

図にするとこんな感じ

call_activity() 実行毎に、最初から処理が流れていることがわかります。
リプレイ時、Activity は繰り返し実行されるわけではない?と思うかもですが、1度 Activity を実行すると、実行結果が Storage に履歴として永続化されているので、リプレイ以降に呼び出された場合は、履歴を復元します。

リプレイ時に、ログ等を繰り返し出力したくない場合は、以下のように書くことで、重複実行を防ぐことができます。

if not context.is_replaying:
    print("Calling F1")

Durable Task Schedular (DTS)

最近登場した、 Durable Functions のフレームワークに最適化されたストレージプロバイダーです。

ストレージプロバイダーは Durable Functions のの実行状態の保存などを行なってくれます
Durable Functions のオーケストレーションやアクティビティの状態を管理するために使用されます。
ダッシュボードで、オーケストレーターやアクティビティの実行状況を監視できる点が便利です。

過去一度だけ Durable Functions を使ったことがありますが、当時 Application Insights で実行ログを監視しており、処理が追いづらく、、

その点、DTS のダッシュボードでは各タスクの処理時間のボトルネックが一目でわかるので、便利です!

現時点では C# での開発のみ利用できます。

Durable Functions は C# を中心に開発されています、理由は、Durable Functions のコアライブラリである Durable Task Framework という .NET/C# ライブラリが土台になっているから、という理解です。

また別の機会で深堀してみます。

  • ストレージプロバイダー一覧(by GPT)
プロバイダー 特徴 ユースケース
Azure Storage 既定バックエンド。追加設定なしで低コスト運用。Azurite でローカル開発可。 小〜中規模ワークフロー、学習・テスト
MSSQL (SQL Server / Azure SQL) 強い整合性とトランザクション保証。オンプレ/VNet でも利用可能。KEDA スケール対応。 金融・基幹系、既存 DB 資産活用
Netherite Event Hubs+Page Blob+FASTER による高スループット/低レイテンシ。2028-03-31 EOSL 予定。 IoT テレメトリ、大量イベント処理
Durable Task Scheduler (Preview) フルマネージドでストレージレス内部実装。Premium / App Service プラン専用、最高スループット。 超大規模ワークフロー、運用負荷最小化

メモ

個人的に押さえておきたい点をメモします。

  • 1Activity = 1関数として扱う
  • Activity → Activity は直接呼ぶことはできない
    • Activity での処理結果は一度 Orchestratorに戻す必要があるから(Orchestratorが実行状態を管理しているから)
  • 実装の流れとして、以下のように作る
    1. Orchestrator にワークフローの流れを書く
    2. 各処理をActivityに書く

動作確認

サンプルコードを動かします。
https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/quickstart-python-vscode?tabs=windows

実行環境

  • Python プログラミング モデル v2
  • Python 3.12
  • Visual Studio Code
  • azurite 3.34.0
  • func 4.0.7317

起動

Durable Functions は Azure Functions の拡張機能なので、Azure Functions のセットアップと手順は変わりません。

詳細は、MS Learn のクイックスタートを確認してください

  1. VSCode 拡張機能のインストール
  2. Ctrl + Shift + P で検索ウィンドウ開く
  3. azure functions: Create New Project > プロジェクトフォルダ選択 > Python
  4. Python3 3.12.3(推奨ver) > http_trigger > http_trigger > ANONYMOUS
  5. 拡張機能 > 「Azure Functions」を検索し、インストール

パッケージができる

$ tree
.
├── function_app.py
├── host.json
├── local.settings.json
└── requirements.txt

1 directory, 4 files
  • .funcignore: ビルド時、functionsの関係ないファイルを除外できる

  • function_app.py: エントリポイント

  • host.json: Functions ホスト全体の動作設定を行う (例: logging, extensions, functionTimeout

  • local.settings.json: ローカル上で開発するうえでの環境変数を管理。.env相当。

  • 依存関係の更新
    pip install します

    requirements.txt
    azure-functions
    azure-functions-durable
    

azurite

Durable Functions はステートフルであり、ワークフローの状態を保持するためのストレージが必要です。
azurite は Azure Storage をローカルでエミュレートできます。
実際に Azure Storageリソースを作成し、そのまま接続することも問題ないですが、ここでは azurite を使用してローカル実行していきます。

  • インストール
    npm install -g azurite
$ npm install -g azurite
npm warn deprecated glob@7.2.3: Glob versions prior to v9 are no longer supported
npm warn deprecated inflight@1.0.6: This module is not supported, and leaks memory. Do not use it. Check out lru-cache if you want a good and tested way to coalesce async requests by a key value, which is much more comprehensive and powerful.
npm warn deprecated rimraf@3.0.2: Rimraf versions prior to v4 are no longer supported
npm warn deprecated uuid@3.4.0: Please upgrade  to version 7 or higher.  Older versions may use Math.random() in certain circumstances, which is known to be problematic.  See https://v8.dev/blog/math-random for details.

added 378 packages in 14s

89 packages are looking for funding
  run `npm fund` for details
  • local.settings.json を更新
    • AzureWebJobsStorage: "UseDevelopmentStorage=true" は、azurite を使用することを指します。実リソースと繋ぐ場合は、接続文字列を指定します。
local.settings.json
{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "node",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true" # 追加
  }
}

  • 起動
    azurite --silent --location ./azurite --debug ./azurite/debug.log
$ azurite --silent --location ./azurite --debug ./azurite/debug.log
Azurite Blob service is starting at http://127.0.0.1:10000
Azurite Blob service is successfully listening at http://127.0.0.1:10000
Azurite Queue service is starting at http://127.0.0.1:10001
Azurite Queue service is successfully listening at http://127.0.0.1:10001
Azurite Table service is starting at http://127.0.0.1:10002
Azurite Table service is successfully listening at http://127.0.0.1:10002

func start

疎通確認します。
生成された function_app.py に以下のサンプルコードを貼り付けます。

function_app.py
import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# An HTTP-triggered function with a Durable Functions client binding
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name)
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    result1 = yield context.call_activity("hello", "Seattle")
    result2 = yield context.call_activity("hello", "Tokyo")
    result3 = yield context.call_activity("hello", "London")

    return [result1, result2, result3]

# Activity
@myApp.activity_trigger(input_name="city")
def hello(city: str):
    return f"Hello {city}"
  • 実行
$ func start
Found Python version 3.12.11 (python3).

Azure Functions Core Tools
Core Tools Version:       4.0.7317 Commit hash: N/A +5ca56d37938824531b691f094d0a77fd6f51af20 (64-bit)
Function Runtime Version: 4.1038.300.25164

[2025-06-11T06:51:49.244Z] Worker process started and initialized.

Functions:

        http_start:  http://localhost:7071/api/orchestrators/{functionName}

        hello: activityTrigger

        hello_orchestrator: orchestrationTrigger

For detailed output, run func with --verbose flag.

http://localhost:7071/api/orchestrators/hello_orchestrator
にアクセスし、下記が返却されたら、起動していることが確認できます。

{
  "id": "8d078fdd2d4241f2b30b285f8a72cb06",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06?taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ==",
  "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ==",
  "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ==",
  "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ==",
  "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06?taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ==",
  "restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06/restart?taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ==",
  "suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06/suspend?reason={text}&taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ==",
  "resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8d078fdd2d4241f2b30b285f8a72cb06/resume?reason={text}&taskHub=TestHubName&connection=Storage&code=7qn2SB-izlKhU0EXy_UzOWEOpQJzs5C6Nr36SslFlEMRAzFuM5NQdQ=="
}

Response

Durable Functions では、各操作用のURIがクライアントに返却されます。

プロパティ HTTP メソッド 説明
id Durable Function インスタンスの一意キー
statusQueryGetUri GET インスタンスの実行ステータス(Running, Completed, Failed)を取得
sendEventPostUri POST {eventName} に対応するカスタムイベントをインスタンスへ送信
terminatePostUri POST インスタンスを強制終了(Terminate)する。reason パラメータで終了理由を指定可能
rewindPostUri POST インスタンスの状態を巻き戻して(Rewind)再実行をトリガーする reason 指定可能
purgeHistoryDeleteUri DELETE インスタンスの履歴(履歴ストレージ内のデータ)を完全に削除
restartPostUri POST 完了済み/失敗済みのインスタンスを再度最初から実行(Restart)する
suspendPostUri POST インスタンスを一時停止(Suspend)する。reason パラメータで停止理由を指定可能
resumePostUri POST 一時停止中のインスタンスを再開(Resume)する。reason パラメータで再開理由を指定可能

たとえば、 statusQueryGetUri にアクセスすると、以下のように、タスクの実行結果が返却されます。結果をポーリングするためのエンドポイントを簡単に作成できます。

{
  "name": "hello_orchestrator",
  "instanceId": "8d078fdd2d4241f2b30b285f8a72cb06",
  "runtimeStatus": "Completed",
  "input": null,
  "customStatus": null,
  "output": [
    "Hello Seattle",
    "Hello Tokyo",
    "Hello London"
  ],
  "createdTime": "2025-06-11T06:52:50Z",
  "lastUpdatedTime": "2025-06-11T06:52:50Z"
}

おわりに

次回は Durable Functions を使って DeepResearch を実装します。

参考

https://blog.cgillum.tech/building-serverless-ai-agents-using-durable-functions-on-azure-e1272882082c
https://learn.microsoft.com/ja-jp/azure/azure-functions/durable/durable-functions-overview?tabs=in-process%2Cnodejs-v3%2Cv1-model&pivots=python

ヘッドウォータース

Discussion