🛠️

Microsoft Agent Framework (C#) を見てみよう その10 Durable Functions でワークフロー

に公開

シリーズ記事

はじめに

今回は、これまで色々やってきた Microsoft Agent Framework のワークフローを Durable Functions で動かしてみようと思います。
方針としてはワークフローの実行単位の1つであるスーパーステップ1回進めるという処理をアクティビティ関数で行い、オーケストレーター関数でワークフローの終了までループするという形でやっていこうと思います。

下準備

ということで、まずは Azure Functions のプロジェクトと Durable Task Scheduler のローカルエミュレーターのセットアップを行います。
これは以前書いた記事の「Durable Task Scheduler を .NET Aspire で起動する」に書いてある方法で行います。

簡単に説明すると .NET Aspire の空のアプリを新規作成して、そこに Azure Functions のプロジェクト(関数のテンプレートは Durable Functions Orchestrations を選択します) を追加します。

host.json に以下のような extensions の定義を追加します。

  "extensions": {
    "durableTask": {
      "hubName": "%TASKHUB_NAME%",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  }

さらに Durable Task Scheduler 用の以下のパッケージもインストールします。

  • Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged

最後に AppHost プロジェクトの AppHost.cs を以下のようにして Durable Task Scheduelr のコンテナが起動するようにします。

AppHost.cs
var builder = DistributedApplication.CreateBuilder(args);

var functionapp1 = builder.AddAzureFunctionsProject<Projects.FunctionApp1>("functionapp1");

// この if 文の中が Durable Task Scheduler の設定
if (builder.ExecutionContext.IsRunMode)
{
    var durableTaskScheduler = builder.AddContainer("durable-task-scheduler",
        "mcr.microsoft.com/dts/dts-emulator",
        "latest")
        .WithLifetime(ContainerLifetime.Persistent)
        .WithEndpoint(8080, 8080)
        .WithEndpoint(8082, 8082, scheme: "http", name: "Dashboard");
    var durableTaskSchedulerConnectionString = builder.AddParameter("durable-task-scheduler-connectionstring",
        "Endpoint=http://localhost:8080;Authentication=None");
    functionapp1.WithEnvironment("DURABLE_TASK_SCHEDULER_CONNECTION_STRING", durableTaskSchedulerConnectionString)
        .WithEnvironment("TASKHUB_NAME", "default")
        .WaitFor(durableTaskScheduler);
}

builder.Build().Run();

最後に .NET Aspire で起動したときのポート番号の不一致を解消するために launchSettings.json のプロファイル名を https にします。

launchSettings.json
{
  "profiles": {
    "https": {
      "commandName": "Project",
      "commandLineArgs": "--port 7072",
      "launchBrowser": false
    }
  }
}

後は、リリース時には Durable Task Scheduler を使わないようにしたい場合とかは追加の手順が必要ですが、そこは元記事を参照してください。今回はローカルで動きが見れるだけでいいのでこのままいきます。

ワークフローの作成

では、ワークフローを使えるようにしていきましょう。
Microsoft Agent Framework のワークフローを使うためには以下の NuGet パッケージをインストールします。

  • Microsoft.Agents.AI
  • Microsoft.Agents.AI.Workflows

次にワークフローを定義します。ここでは、これまでも使ってきたカウントアップをするワークフローを使います。まずは Executor を定義します。

Executors.cs
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

namespace FunctionApp1;

// 入力設定メッセージ
public record SetAmountMessage(int Amount);
// 現在のカウント値メッセージ
public record CurrentCountMessage(int Value);

// カウントアップ Executor(内部状態 _amount / _currentCount をチェックポイント永続化)
public class CountUpExecutor() : ReflectingExecutor<CountUpExecutor>(nameof(CountUpExecutor)),
    IMessageHandler<SetAmountMessage, CurrentCountMessage>,
    IMessageHandler<CurrentCountMessage, CurrentCountMessage>
{
    private int _amount;
    private int _currentCount;

    public ValueTask<CurrentCountMessage> HandleAsync(SetAmountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _amount = message.Amount;
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    public ValueTask<CurrentCountMessage> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _currentCount += _amount;
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    protected override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        await context.QueueStateUpdateAsync(nameof(_amount), _amount, cancellationToken);
        await context.QueueStateUpdateAsync(nameof(_currentCount), _currentCount, cancellationToken);
    }

    protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _amount = await context.ReadStateAsync<int>(nameof(_amount));
        _currentCount = await context.ReadStateAsync<int>(nameof(_currentCount));
    }
}

// 終了メッセージ生成 Executor
public class GenerateOutputMessageExecutor() : Executor<CurrentCountMessage, string>(nameof(GenerateOutputMessageExecutor))
{
    public override ValueTask<string> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        ValueTask.FromResult($"The final count is {message.Value}");
}

次に Durable Functions で使うための CheckpointStore を実装します。今回は直近のチェックポイントだけ覚えていればいいのでメモリ上に直近のものだけを保持するシンプルなもので行こうと思います。run id も checkpoint id も固定にしてしまいます。

AzureFunctionsCheckpointStore.cs
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using System.Text.Json;

namespace FunctionApp1;

public class AzureFunctionsCheckpointStore : JsonCheckpointStore
{
    public CheckpointInfo CheckpointInfo { get; } = new(nameof(AzureFunctionsCheckpointStore), nameof(LatestCheckpoint));
    public JsonElement? LatestCheckpoint { get; set; }
    public override ValueTask<CheckpointInfo> CreateCheckpointAsync(string runId, JsonElement value, CheckpointInfo? parent = null)
    {
        if (CheckpointInfo.RunId != runId)
        {
            throw new InvalidOperationException();
        }

        LatestCheckpoint = value;
        return ValueTask.FromResult(CheckpointInfo));
    }

    public override ValueTask<JsonElement> RetrieveCheckpointAsync(string runId, CheckpointInfo key)
    {
        if (CheckpointInfo.RunId != runId)
        {
            throw new InvalidOperationException();
        }

        return ValueTask.FromResult(LatestCheckpoint ?? default);
    }

    public override ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null)
    {
        return ValueTask.FromResult<IEnumerable<CheckpointInfo>>([CheckpointInfo]);
    }
}

続けてワークフローと先ほどの Store を DI コンテナに登録します。

Program.cs
builder.Services.AddTransient<AzureFunctionsCheckpointStore>();
builder.Services.AddTransient(sp =>
{
    var countUp = new CountUpExecutor();
    var generateOutputMessage = new GenerateOutputMessageExecutor();
    return new WorkflowBuilder(countUp)
        .AddSwitch(countUp, switchBuilder =>
        {
            switchBuilder.AddCase((CurrentCountMessage? m) => m?.Value < 10, countUp)
                .WithDefault(generateOutputMessage);
        })
        .WithOutputFrom(generateOutputMessage)
        .Build();
});

次に 1 ターンだけ進めるアクティビティ関数を作成します。

ProgressWorkflowActivity.cs
using Microsoft.Agents.AI.Workflows;
using Microsoft.Azure.Functions.Worker;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace FunctionApp1;

public class ProgressWorkflowActivity(AzureFunctionsCheckpointStore store,
    Workflow workflow)
{
    [Function(nameof(ProgressWorkflowActivity))]
    public async Task<WorkflowStepResult> InvokeAsync([ActivityTrigger] WorkflowStepRequest request)
    {
        // request に応じて最初から開始するか途中から開始するか
        static async Task<Checkpointed<StreamingRun>> createRun(WorkflowStepRequest request, 
            AzureFunctionsCheckpointStore store,
            Workflow workflow)
        {
            store.LatestCheckpoint = request.Store;
            var checkpointManager = CheckpointManager.CreateJson(store);
            return request switch
            {
                (SetAmountMessage message, null) => await InProcessExecution.StreamAsync(
                    workflow, message, checkpointManager, store.CheckpointInfo.RunId),
                (null, JsonElement _) => await InProcessExecution.ResumeStreamAsync(
                    workflow, store.CheckpointInfo, checkpointManager, store.CheckpointInfo.RunId),
                _ => throw new InvalidOperationException()
            };
        }

        store.LatestCheckpoint = request.Store;
        await using var run = await createRun(request, store, workflow);

        // ワークフローを実行
        List<WorkflowOutputEvent> workflowOutputEvents = [];
        await foreach (var evt in run.Run.WatchStreamAsync())
        {
            if (evt is SuperStepCompletedEvent superStepCompletedEvent)
            {
                // 1 スーパーステップで止める
                break;
            }

            if (evt is WorkflowOutputEvent workflowOutputEvent)
            {
                workflowOutputEvents.Add(workflowOutputEvent);
            }

            if (evt is WorkflowErrorEvent workflowErrorEvent)
            {
                throw new InvalidOperationException();
            }
        }

        // 現在のチェックポイントの状態とアウトプットを返す
        return new(store.LatestCheckpoint, workflowOutputEvents.Select(x => x.As<string>() ?? "").ToList());
    }
}

// ワークフローの1ステップを実行するパラメーター
[method: JsonConstructor]
public record WorkflowStepRequest(
    // 初回実行時のみ値が入る
    SetAmountMessage? SetAmountMessage,
    // 2回目以降はここにステートが入る
    JsonElement? Store)
{
    public WorkflowStepRequest(SetAmountMessage setAmountMessage) : this(setAmountMessage, null) { }
    public WorkflowStepRequest(JsonElement store) : this(null, store) { }
}

// ワークフローの1ステップの実行結果
public record WorkflowStepResult(
    // ワークフローのステート
    JsonElement? State,
    // ワークフローの出力
    IReadOnlyCollection<string> WorkflowOutputs);

最後にオーケストレーター関数を作成します。

WorkflowOrchestrator.cs
using Microsoft.Agents.AI.Workflows;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;

namespace FunctionApp1;

public static class Function1
{
    [Function(nameof(Function1))]
    public static async Task<string> RunOrchestrator(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        ILogger logger = context.CreateReplaySafeLogger(nameof(Function1));
        var input = context.GetInput<SetAmountMessage>();
        if (input == null) throw new InvalidOperationException();

        // ワークフローから戻り値が返ってくるまで繰り返す
        var request = new WorkflowStepRequest(input);
        IReadOnlyCollection<string>? workflowOutputs;
        while(true)
        {
            var result = await context.CallActivityAsync<WorkflowStepResult>(
                nameof(ProgressWorkflowActivity),
                request);
            if (result.WorkflowOutputs.Any())
            {
                workflowOutputs = result.WorkflowOutputs;
                break;
            }

            request = new(result.State ?? default);
        }

        // ひとまず最初の出力を結果として返す
        return workflowOutputs.First();
    }

    [Function("Function1_HttpStart")]
    public static async Task<HttpResponseData> HttpStart(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
        [DurableClient] DurableTaskClient client,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger("Function1_HttpStart");

        // ひとまずスターター関数で SetAmountMessage(4) を決め打ちで渡す
        string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
            nameof(Function1),
            new SetAmountMessage(4));

        logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

        // Returns an HTTP 202 response with an instance management payload.
        // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
        return await client.CreateCheckStatusResponseAsync(req, instanceId);
    }
}

これで Durable Functions でワークフローを動かす準備ができました。動かしてみましょう。起動して適当にスターター関数を呼び出すと Durable Task Scheduler のダッシュボードでワークフローの進行状況が見れます。

いい感じですね。amount に 4 を指定しているので最初に SetAmountMessage を処理して、そこからインクリメントが 3 回行われて、最後にメッセージを生成する Executor が実行されてるので合計で 5 回 ProgressWorkflowActivity が呼ばれているのがわかります。 オーケストレーターの入力と出力を見てみても、ちゃんと意図した通りになっています。

最後の ProgressWorkflowActivity の出力が The final count is 12 になっているのも確認できます。

いい感じに 1 ステップごとのワークフローの状態が確認できています。

まとめ

今回は、Microsoft Agent Framework のワークフローを Durable Functions で動かしてみました。
まだワークフローの終了条件などを詰めないと汎用的なワークフローの実行ができるものではありませんが、ワークフローの1ステップをアクティビティ関数で実行するという形で Durable Functions と組み合わせることができました。
そのうち、Durable Functions や Azure AI Foundry などでいい感じに動かせるようになると思うのですが、自分でもとりあえず軽く動かせたので満足です。

早くここら辺こなれてほしいですね。

Microsoft (有志)

Discussion