🐈

Microsoft Agent Framework (C#) を見てみよう その7 チェックポイントの永続化

に公開

シリーズ記事

はじめに

前回は、Microsoft Agent Framework の Executor のステータス管理について掘り下げてきました。その中でワークフローのチェックポイントについても触れましたが、前回の段階ではチェックポイントのデータの管理を行うための CheckpointManager として CheckpointManager.Default を使用していました。CheckpointManager.Default はインメモリでデータを管理しているためプロセスの再起動などでデータが飛んでしまいます。
本格的にワークフローを実行する場合には、チェックポイントのデータを永続化しておく必要があります。今回は、チェックポイントの永続化について掘り下げてみます。

チェックポイントの永続化

Microsoft Agent Framework では、チェックポイントの永続化のために ICheckpointStore<TStoreObject> というインターフェースが用意されています。このインターフェースを実装することで、チェックポイントのデータを任意のストレージに保存することができます。現時点では、ICheckpointStore<TStoreObject> を実装したクラスは以下の 3 つが提供されています。

  • JsonCheckpointStore: TStoreObjectJsonElement を指定した抽象クラス。提供する機能は KeyTypeInfo という CheckpointInfo のための JsonTypeInfo だけ。
  • InMemoryCheckpointStore: デフォルトで使用されるストア。JsonCheckpointStore を継承しており、インメモリでデータを管理する。
  • FileSystemJsonCheckpointStore: JsonCheckpointStore を継承しており、ファイルシステムに JSON ファイルとしてデータを保存する。コンストラクタで同期メソッドを使ってファイルを読んでいたりするのでデスクトップアプリとかバッチ処理とかで使う感じかな?Web アプリで使うには心もとない。

ICheckpointStore<TStoreObject> はジェネリックインターフェースなので、TStoreObject に任意の型を指定して実装することができます。ですが、基本的には JsonCheckpointStore を継承して JSON 形式でシリアライズ/デシリアライズするのが簡単で良いと思います。

FileSystemJsonCheckpointStore の利用

では、FileSystemJsonCheckpointStore を使ってチェックポイントの永続化を試してみましょう。前回の記事のコードの CheckpointManager のインスタンスを作成している部分を以下のように変更します。

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Reflection;

// ワークフローを構築
var workflow = await BuildWorkflowAsync();

// ファイルシステムに永続化するチェックポイントマネージャーを作成
using var fs = new FileSystemJsonCheckpointStore(new(Directory.GetCurrentDirectory()));
var checkpointManager = CheckpointManager.CreateJson(fs);
// チェックポイント情報を格納するリスト
List<CheckpointInfo> checkpoints = [];

// ワークフローを実行し、初期値4を渡す
await using Checkpointed<StreamingRun> run = await InProcessExecution.StreamAsync(
    workflow,
    new SetAmountMessage(4),
    checkpointManager);

// ワークフローの最終出力を格納する変数
string? output = null;
// イベントストリームを監視
await foreach (var evt in run.Run.WatchStreamAsync())
{
    // スーパーステップ完了時にチェックポイントを保存
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    // CountUpExecutor の実行完了時にログ出力
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    // 最終出力を取得
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.As<string>()}");
        output = outputEvent.As<string>();
    }
}

// 結果を出力
Console.WriteLine(output);

// 全チェックポイントを表示
foreach (var checkpoint in checkpoints)
{
    Console.WriteLine($"Checkpoint at run id {checkpoint.RunId} with checkpoint {checkpoint.CheckpointId}");
}

Console.WriteLine("--------------------------------------");
// 指定したチェックポイントの位置に戻す
await run.RestoreCheckpointAsync(checkpoints[2]);
await foreach (var evt in run.Run.WatchStreamAsync())
{
    Console.WriteLine(evt);
}

// ワークフローを構築するメソッド
async ValueTask<Workflow<SetAmountMessage>> BuildWorkflowAsync()
{
    // 各Executorのインスタンスを作成
    var countUpExecutor = new CountUpExecutor();
    var generateOutputMessageExecutor = new GenerateOutputMessageExecutor();

    // ワークフロービルダーを初期化
    // 開始地点は CounterExecutor
    return await new WorkflowBuilder(countUpExecutor)
        .AddSwitch(countUpExecutor, switchBuilder =>
        {
            // 10 より小さい場合は countupExecutor 戻してループ
            switchBuilder.AddCase(
                (CurrentCountMessage? message) => message is { Value: < 10 },
                countUpExecutor);
            // 10 以上の場合は generateOutputMessageExecutor に行く
            switchBuilder.WithDefault(generateOutputMessageExecutor);
        })
        // 出力は GenerateOutputMessageExecutor
        .WithOutputFrom(generateOutputMessageExecutor)
        .BuildAsync<SetAmountMessage>();
}

// 初期メッセージ: カウントアップの増加量を指定
record SetAmountMessage(int Amount);
// 現在のカウントの値
record CurrentCountMessage(int Value);

// カウントアップを行うExecutor
class CountUpExecutor() : ReflectingExecutor<CountUpExecutor>(nameof(CountUpExecutor)),
    IMessageHandler<SetAmountMessage, CurrentCountMessage>,
    IMessageHandler<CurrentCountMessage, CurrentCountMessage>
{
    private int _amount;
    private int _currentCount;

    // 初期メッセージを処理し、増加量を設定。戻り値は現在の値を返す。
    public ValueTask<CurrentCountMessage> HandleAsync(SetAmountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _amount = message.Amount;
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    // カウントアップ後のメッセージを処理し、さらにカウントアップ
    public ValueTask<CurrentCountMessage> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _currentCount += _amount;
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    protected override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // 必要なステータスを保存
        await context.QueueStateUpdateAsync(nameof(_amount), _amount, cancellationToken);
        await context.QueueStateUpdateAsync(nameof(_currentCount), _currentCount, cancellationToken);
    }

    protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // 必要なステータスを読み込む
        _amount = await context.ReadStateAsync<int>(nameof(_amount));
        _currentCount = await context.ReadStateAsync<int>(nameof(_currentCount));
    }
}

// 最終的な出力メッセージを生成するExecutor
class GenerateOutputMessageExecutor() : Executor<CurrentCountMessage, string>(nameof(GenerateOutputMessageExecutor))
{
    public override ValueTask<string> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        return ValueTask.FromResult($"The final count is {message.Value}");
    }
}

:::

実行すると前回の記事の実行結果と同じように以下の出力が得られます。

CountUpExecutor result: CurrentCountMessage { Value = 0 }
Checkpoint was stored: ed3cd8ee19904a9587dae9e58eadb30f
CountUpExecutor result: CurrentCountMessage { Value = 4 }
Checkpoint was stored: 693bf3b144cc475abec57ea6d7cffcb4
CountUpExecutor result: CurrentCountMessage { Value = 8 }
Checkpoint was stored: 84e744f245074312aa64060350327014
CountUpExecutor result: CurrentCountMessage { Value = 12 }
Checkpoint was stored: af83e8591429451d9c0046b79d00b6b3
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: ff5ff9b3e2e544d9ac42d96cf667e659
The final count is 12
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ed3cd8ee19904a9587dae9e58eadb30f
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 693bf3b144cc475abec57ea6d7cffcb4
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 84e744f245074312aa64060350327014
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint af83e8591429451d9c0046b79d00b6b3
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ff5ff9b3e2e544d9ac42d96cf667e659
--------------------------------------
SuperStepStartedEvent(Step = 5, Data: Microsoft.Agents.AI.Workflows.SuperStepStartInfo = Microsoft.Agents.AI.Workflows.SuperStepStartInfo)
ExecutorInvokedEvent(Executor = CountUpExecutor, Data: Microsoft.Agents.AI.Workflows.PortableValue = Microsoft.Agents.AI.Workflows.PortableValue)
ExecutorCompletedEvent(Executor = CountUpExecutor, Data: CurrentCountMessage = CurrentCountMessage { Value = 12 })
SuperStepCompletedEvent(Step = 5, Data: Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo = Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo)
SuperStepStartedEvent(Step = 6, Data: Microsoft.Agents.AI.Workflows.SuperStepStartInfo = Microsoft.Agents.AI.Workflows.SuperStepStartInfo)
ExecutorInvokedEvent(Executor = GenerateOutputMessageExecutor, Data: CurrentCountMessage = CurrentCountMessage { Value = 12 })
ExecutorCompletedEvent(Executor = GenerateOutputMessageExecutor, Data: System.String = The final count is 12)
WorkflowOutputEvent(Data: System.String = The final count is 12)
SuperStepCompletedEvent(Step = 6, Data: Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo = Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo)

---- で区切られているところから下がチェックポイントを復元した後の出力です。ちゃんと復元できていることがわかります。

FileSystemJsonCheckpointStore を使うと、チェックポイントのデータが実行ディレクトリに index.jsonl というファイル名で保存されます。中身は JSON Lines 形式で、1 行が 1 つのチェックポイントのデータになっています。例えば、上記の実行結果の場合、以下のような内容になります。

{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"ed3cd8ee19904a9587dae9e58eadb30f"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"693bf3b144cc475abec57ea6d7cffcb4"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"84e744f245074312aa64060350327014"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"af83e8591429451d9c0046b79d00b6b3"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"ff5ff9b3e2e544d9ac42d96cf667e659"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"a40f736db5344545bdaefcccb482bc11"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"9e391f45551b48368953391ac0828eb7"}

全部で 7 行ありますが、これは 1 回目の実行で 5 回のスーパーステップが実行によるものです。実際に以下の実行結果にある run id と checkpoint id が対応しています。

Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ed3cd8ee19904a9587dae9e58eadb30f
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 693bf3b144cc475abec57ea6d7cffcb4
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 84e744f245074312aa64060350327014
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint af83e8591429451d9c0046b79d00b6b3
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ff5ff9b3e2e544d9ac42d96cf667e659

2 回目の実行は 3 回目のチェックポイントの後から実行しているので 2 行追加されて合計 7 行になっています。でも、このファイルだけだとワークフローの状態が含まれていません。実際の各チェックポイントの情報は run id_checkpoint id.json というファイル名で保存されます。例えば、3 回目のチェックポイントの場合、d032fbdaecf74d80a1ca5c4a7ae5a6f9_84e744f245074312aa64060350327014.json というファイル名になります。このファイルの中身は以下のようになっています。

{
  "stepNumber": 2,
  "workflow": {
    "executors": {
      "CountUpExecutor": {
        "executorType": {
          "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
          "typeName": "CountUpExecutor"
        },
        "executorId": "CountUpExecutor"
      },
      "GenerateOutputMessageExecutor": {
        "executorType": {
          "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
          "typeName": "GenerateOutputMessageExecutor"
        },
        "executorId": "GenerateOutputMessageExecutor"
      }
    },
    "edges": {
      "CountUpExecutor": [
        {
          "$type": 1,
          "hasAssigner": true,
          "kind": 1,
          "connection": {
            "sourceIds": [
              "CountUpExecutor"
            ],
            "sinkIds": [
              "CountUpExecutor",
              "GenerateOutputMessageExecutor"
            ]
          }
        }
      ]
    },
    "requestPorts": [
    ],
    "startExecutorId": "CountUpExecutor",
    "outputExecutorIds": [
      "GenerateOutputMessageExecutor"
    ]
  },
  "runnerData": {
    "instantiatedExecutors": [
      "CountUpExecutor"
    ],
    "queuedMessages": {
      "CountUpExecutor": [
        {
          "messageType": {
            "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
            "typeName": "CurrentCountMessage"
          },
          "message": {
            "typeId": {
              "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
              "typeName": "CurrentCountMessage"
            },
            "value": {
              "value": 8
            }
          },
          "source": {
          }
        }
      ]
    },
    "outstandingRequests": [
    ]
  },
  "stateData": {
    "CountUpExecutor||_amount": {
      "typeId": {
        "assemblyName": "System.Private.CoreLib, Version=9.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e",
        "typeName": "System.Int32"
      },
      "value": 4
    },
    "CountUpExecutor||_currentCount": {
      "typeId": {
        "assemblyName": "System.Private.CoreLib, Version=9.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e",
        "typeName": "System.Int32"
      },
      "value": 8
    }
  },
  "edgeStateData": {
  }
}

JSON にはワークフローの Executor の情報や Edge の情報、実行中のメッセージキューの情報、Executor の状態データなどが含まれています。これらの情報を使ってワークフローの状態を復元しています。

コードを以下のように書き換えて保存されたファイルを読み込むようにしてみましょう。以下のコードでハードコードしている run id は実行するたびに変わるので、実際にはファイルを列挙して最新のものを取得するなどの処理が必要です。

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Reflection;

// ワークフローを構築
var workflow = await BuildWorkflowAsync();

// ファイルシステムに永続化するチェックポイントマネージャーを作成
using var fs = new FileSystemJsonCheckpointStore(new(Directory.GetCurrentDirectory()));
var checkpointManager = CheckpointManager.CreateJson(fs);
// ファイルシステムに保存されたデータから指定した run id のチェックポイントを読み込む
List<CheckpointInfo> checkpoints = [.. await fs.RetrieveIndexAsync("d032fbdaecf74d80a1ca5c4a7ae5a6f9")];

// 読み込んだチェックポイントの 3 番目の状態を復元して実行
await using Checkpointed<StreamingRun> run = await InProcessExecution.ResumeStreamAsync(
    workflow,
    checkpoints[2],
    checkpointManager,
    checkpoints[2].RunId);

// ワークフローの最終出力を格納する変数
string? output = null;
// イベントストリームを監視
await foreach (var evt in run.Run.WatchStreamAsync())
{
    // スーパーステップ完了時にチェックポイントを保存
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    // CountUpExecutor の実行完了時にログ出力
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    // 最終出力を取得
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.As<string>()}");
        output = outputEvent.As<string>();
    }
}

// 結果を出力
Console.WriteLine(output);

Executor の定義と BuildWorkflowAsync メソッドは前述のコードと同じです。
ファイルから読み込んだチェックポイントの 3 番目を指定して InProcessExecution.ResumeStreamAsync メソッドでワークフローを復元しています。実行結果は以下のようになります。

CountUpExecutor result: CurrentCountMessage { Value = 12 }
Checkpoint was stored: 81a9770cd56e4ce69b147d5002373c2f
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: c8f25adaf74c46678a3a8d5c330509d0
The final count is 12

ちゃんと 3 回目のチェックポイントの状態から復元されていることがわかります。

永続化を自作してみよう

実際のアプリでは、ファイルシステムではなくデータベースやクラウドストレージに保存したいことが多いと思います。そういう場合には JsonCheckpointStore を継承して特定のストレージに保存するクラスを実装すれば良いです。オーバーライドして実装するメソッドは以下の 3 つです。

  • CreateCheckpointAsync: 新しいチェックポイントを作成して保存する。ここで checkpoint ID を生成して CheckpointInfo を返す
  • RetrieveCheckpointAsync: 指定された run id とチェックポイント ID からチェックポイントデータを取得する
  • RetrieveIndexAsync: 指定された run id のすべてのチェックポイント情報のリストを返す。

やることさえわかれば、そんなに難しくないです。試しにインメモリで保存するだけの簡単な実装を作ってみましょう。

MyInMemoryJsonCheckpointStore.cs
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using System.Text.Json;

namespace WorkflowHelloWorldApp;

// インメモリでチェックポイントを JSON 形式で保存するストア実装
public class MyInMemoryJsonCheckpointStore : JsonCheckpointStore
{
    // run id をキーとして、チェックポイントのリストを保持する辞書
    private readonly Dictionary<string, List<ItemRecord>> _store = new();
    
    // 新しいチェックポイントを作成してストアに保存する
    public override ValueTask<CheckpointInfo> CreateCheckpointAsync(string runId, JsonElement value, CheckpointInfo? parent = null)
    {
        Console.WriteLine($"## CreateCheckpointAsync({runId}, {value.ToString()[0..30]})");
        
        // run id に対応するチェックポイントリストを取得、なければ新規作成
        if (!_store.TryGetValue(runId, out var checkpoints))
        {
            checkpoints = _store[runId] = [];
        }

        // チェックポイント ID はリストのインデックスを使用
        var item = new ItemRecord(checkpoints.Count, value);
        checkpoints.Add(item);
        
        // CheckpointInfo を返す(runId とチェックポイント ID を含む)
        return ValueTask.FromResult(new CheckpointInfo(runId, item.CheckpointId.ToString()));
    }

    // 指定された run id とチェックポイント ID からチェックポイントデータを取得する
    public override ValueTask<JsonElement> RetrieveCheckpointAsync(string runId, CheckpointInfo key)
    {
        Console.WriteLine($"## RetrieveCheckpointAsync({runId}, {key})");
        
        // run id に対応するチェックポイントリストを取得
        if (!_store.TryGetValue(runId, out var checkpoints))
        {
            throw new InvalidOperationException($"Not found: {runId}");
        }

        // チェックポイント ID をインデックスとして使用してデータを返す
        return ValueTask.FromResult(checkpoints[int.Parse(key.CheckpointId)].Value);
    }

    // 指定された run id のすべてのチェックポイント情報のリストを返す
    public override ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null)
    {
        Console.WriteLine($"## RetrieveIndexAsync({runId})");
        
        // run id に対応するチェックポイントがあればそのリストを、なければ空のリストを返す
        return ValueTask.FromResult(
            _store.GetValueOrDefault(runId)
                ?.Select(x => x.CheckpointId)
                .Select(checkpointId => new CheckpointInfo(runId, checkpointId.ToString())) ?? []);
    }

    // デバッグ用:すべてのチェックポイントの内容を出力する
    public void Dump()
    {
        Console.WriteLine("Dump all checkpoints");
        foreach (var checkpoints in _store.Values)
        {
            foreach (var item in checkpoints)
            {
                Console.WriteLine(item.ToString()[0..60]);
            }
        }
    }
}

// チェックポイントのレコード(ID と JSON データを保持)
record ItemRecord(int CheckpointId, JsonElement Value);

次に、上記の MyInMemoryJsonCheckpointStore を使うようにコードを書き換えます。カウントアップと結果の出力を生成するワークフローを完走させたあとに 3 番目のチェックポイントに戻して再実行するようにしてみます。

Program.cs
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
using WorkflowHelloWorldApp;

// ワークフローを構築
var workflow = await BuildWorkflowAsync();

// ファイルシステムに永続化するチェックポイントマネージャーを作成
var myStore = new MyInMemoryJsonCheckpointStore();
var checkpointManager = CheckpointManager.CreateJson(myStore);
// ファイルシステムに保存されたデータから指定した run id のチェックポイントを読み込む

// 読み込んだチェックポイントの 3 番目の状態を復元して実行
await using Checkpointed<StreamingRun> run = await InProcessExecution.StreamAsync(
    workflow,
    new SetAmountMessage(4),
    checkpointManager);

List<CheckpointInfo> checkpoints = [];
// ワークフローの最終出力を格納する変数
string? output = null;
// イベントストリームを監視
await foreach (var evt in run.Run.WatchStreamAsync())
{
    // スーパーステップ完了時にチェックポイントを保存
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    // CountUpExecutor の実行完了時にログ出力
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    // 最終出力を取得
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.As<string>()}");
        output = outputEvent.As<string>();
    }
}

// 結果を出力
Console.WriteLine(output);

// ストアの内容をダンプ
myStore.Dump();

// チェックポイントの 3 番目の状態を復元して再実行
Console.WriteLine("----------------------------------");
await run.RestoreCheckpointAsync(checkpoints[2]);
await foreach (var evt in run.Run.WatchStreamAsync())
{
    // スーパーステップ完了時にチェックポイントを保存
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    // CountUpExecutor の実行完了時にログ出力
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    // 最終出力を取得
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.As<string>()}");
        output = outputEvent.As<string>();
    }
}

// 結果を出力
Console.WriteLine(output);

// ストアの内容をダンプ
myStore.Dump();

実行すると以下のような結果になります。

CountUpExecutor result: CurrentCountMessage { Value = 0 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":0,"workflow":{"e)
Checkpoint was stored: 0
CountUpExecutor result: CurrentCountMessage { Value = 4 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":1,"workflow":{"e)
Checkpoint was stored: 1
CountUpExecutor result: CurrentCountMessage { Value = 8 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":2,"workflow":{"e)
Checkpoint was stored: 2
CountUpExecutor result: CurrentCountMessage { Value = 12 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":3,"workflow":{"e)
Checkpoint was stored: 3
WorkflowOutputEvent was raised: The final count is 12
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":4,"workflow":{"e)
Checkpoint was stored: 4
The final count is 12
Dump all checkpoints
ItemRecord { CheckpointId = 0, Value = {"stepNumber":0,"work
ItemRecord { CheckpointId = 1, Value = {"stepNumber":1,"work
ItemRecord { CheckpointId = 2, Value = {"stepNumber":2,"work
ItemRecord { CheckpointId = 3, Value = {"stepNumber":3,"work
ItemRecord { CheckpointId = 4, Value = {"stepNumber":4,"work
----------------------------------
## RetrieveCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, CheckpointInfo(RunId: d704fbd5b1be4c058a17cf101d02ee8f, CheckpointId: 2))
CountUpExecutor result: CurrentCountMessage { Value = 12 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":5,"workflow":{"e)
Checkpoint was stored: 5
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":6,"workflow":{"e)
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: 6
The final count is 12
Dump all checkpoints
ItemRecord { CheckpointId = 0, Value = {"stepNumber":0,"work
ItemRecord { CheckpointId = 1, Value = {"stepNumber":1,"work
ItemRecord { CheckpointId = 2, Value = {"stepNumber":2,"work
ItemRecord { CheckpointId = 3, Value = {"stepNumber":3,"work
ItemRecord { CheckpointId = 4, Value = {"stepNumber":4,"work
ItemRecord { CheckpointId = 5, Value = {"stepNumber":5,"work
ItemRecord { CheckpointId = 6, Value = {"stepNumber":6,"work

## CreateCheckpointAsync(...)## RetrieveCheckpointAsync(...) といったログが追加されているのがわかります。チェックポイントの作成と取得がちゃんと呼ばれていることがわかります。また、Dump all checkpoints の出力で、チェックポイントの内容がすべて保存されていることも確認できます。最初の実行で 5 つ、2 回目の実行で 2 つ、合計 7 つのチェックポイントが保存されています。

まとめ

今回は、Microsoft Agent Framework のチェックポイントの永続化について掘り下げてみました。
ICheckpointStore<TStoreObject> インターフェースを実装することで、チェックポイントのデータを任意のストレージに保存することができます。JsonCheckpointStore を継承して JSON 形式でシリアライズ/デシリアライズするのが簡単で便利です。FileSystemJsonCheckpointStore を使うとファイルシステムに保存できますが、実際のアプリではデータベースやクラウドストレージに保存したいことが多いと思います。その場合には JsonCheckpointStore を継承して特定のストレージに保存するクラスを実装すれば良いです1 つのスーパーステップ実装のたびにちゃんと履歴を保存していれば途中で失敗していたり、ワークフローを一時的に停止しなければいけないようなシナリオにも柔軟に対応可能です。なかなかいい感じですね。

今回の記事でワークフローを途中で止めて、途中から再開する方法がわかったので次は Human in the loop の機能を試してみようと思います。

Microsoft (有志)

Discussion