Microsoft Agent Framework (C#) を見てみよう その6 Executor のステータス管理
シリーズ記事
- 「雑感」とハローワールド
- ざっとリポジトリを見てみる
- ワークフローを見てみよう
- ワークフローの Executor を掘り下げる
- ワークフローで条件分岐とループを扱う
- Executor のステータス管理
- チェックポイントの永続化
- Human in the loop を試してみよう
はじめに
前回は、Microsoft Agent Framework のワークフローで条件分岐やループを扱う方法を見てきました。今回は、Executor のステータス管理について掘り下げてみたいと思います。
今まで、Executor の間でデータをやり取りするときは、単純に入力と出力を繋げる形でワークフローを構築してきました。しかし、その他に IWorkflowContext
を使って状態を管理する方法もあります。
IWorkflowContext
は、ワークフローの実行中に共有されるコンテキスト情報を保持するためのインターフェースです。これを使うことで、Executor 間で状態を共有したり、特定の Executor の実行結果を保存したりすることができます。
状態管理
Executor の途中でワークフロー全体で共有できる状態を保存するには IWorkflowContext
の QueueStateUpdateAsync
メソッドを使います。引数には key
と value
と scope
(オプション) を指定します。
多分、scope
を指定しないとデフォルトのスコープが使われるのですが、恐らく別の Executor からは参照できないのではないかと思います。なので scope
に適当な名前を指定しておくのが良いと思います。保存した状態を読み込むには ReadStateAsync
メソッドを使います。こちらも引数には key
と scope
(オプション) を指定します。
実際に試してみましょう。以下のような 2 つの Executor を定義します。
class Executor1() : Executor<string, string>(nameof(Executor1))
{
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// QueueStateUpdateAsync でステートを保存
await context.QueueStateUpdateAsync("State", "Value from Executor1", "SampleScope");
return message;
}
}
class Executor2() : Executor<string, string>(nameof(Executor2))
{
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// ReadStateAsync でステートを読み込み
var state = await context.ReadStateAsync<string>("State", "SampleScope");
return $"{message}:{state}";
}
}
Executor1
は QueueStateUpdateAsync
を使って "SampleScope" というスコープに "State" というキーで状態を保存します。Executor2
は ReadStateAsync
を使って同じスコープとキーで状態を読み込み、メッセージに付加して返します。これをワークフローに組み込んで実行してみます。
using Microsoft.Agents.AI.Workflows;
var executor1 = new Executor1();
var executor2 = new Executor2();
var workflow = await new WorkflowBuilder(executor1)
.AddEdge(executor1, executor2)
.WithOutputFrom(executor2)
.BuildAsync<string>();
var run = await InProcessExecution.StreamAsync(workflow, "Hello, world!");
await foreach (var evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<string>()}");
}
}
Executor1
で Value from Executor1
という状態が保存され、Executor2
でそれを読み込んで Hello, world!:Value from Executor1
という結果が得られるはずです。
実行すると以下のような出力が得られます。
Executor2: Hello, world!:Value from Executor1
これで、Executor 間で状態を共有する方法が分かりました。IWorkflowContext
を使うことで、より複雑なワークフローを構築する際に役立ちますね。
ワークフロー全体の状態の保存
今までは、ワークフローを実行したら最後まで走りきる形で試してきましたが、Agent Framework ではワークフローの途中で状態を保存して、後でその状態から再開することも可能です。ただ、何処のタイミングでも止められるのではなくスーパーステップと呼ばれる特定のポイントでのみ状態を保存できます。
Agent Framework のワークフローは「スーパーステップ」と呼ばれるものがワークフロー内の 1 つの実行単位となっています。1 つのスーパーステップでは、その時点で実行可能な Executor が全て実行されます。全ての実行可能な Executor が実行されるとスーパーステップが完了します。タイミングがワークフローのチェックポイントとなり、その時点でワークフローの状態がチェックポイントとして保存されます。
すこしチェックポイントの動きを見てみましょう。まずは以下のようにカウントアップをしていく Executor と値を文字列にする Executor を定義します。
// 初期メッセージ: カウントアップの増加量を指定
record SetAmountMessage(int Amount);
// 現在のカウントの値
record CurrentCountMessage(int Value);
// カウントアップを行うExecutor
class CountUpExecutor() : ReflectingExecutor<CountUpExecutor>(nameof(CountUpExecutor)),
IMessageHandler<SetAmountMessage, CurrentCountMessage>,
IMessageHandler<CurrentCountMessage, CurrentCountMessage>
{
private int _amount;
private int _currentCount;
// 初期メッセージを処理し、増加量を設定。戻り値は現在の値を返す。
public ValueTask<CurrentCountMessage> HandleAsync(SetAmountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
_amount = message.Amount;
return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
}
// カウントアップ後のメッセージを処理し、さらにカウントアップ
public ValueTask<CurrentCountMessage> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
_currentCount += _amount;
return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
}
}
// 最終的な出力メッセージを生成するExecutor
class GenerateOutputMessageExecutor() : Executor<CurrentCountMessage, string>(nameof(GenerateOutputMessageExecutor))
{
public override ValueTask<string> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
return ValueTask.FromResult($"The final count is {message.Value}");
}
}
初めて使う機能として ReflectingExecutor<T>
に複数の IMessageHandler<TInput, TOutput>
インターフェースを実装することで、異なる型のメッセージを処理できる Executor を定義しています。CountUpExecutor
は SetAmountMessage
を受け取って増加量を設定し、CurrentCountMessage
を受け取ってカウントアップを行います。GenerateOutputMessageExecutor
は最終的なカウント結果を文字列に変換します。
これらの Executor を使って以下のようなワークフローを組み立てるメソッドを定義します。
// ワークフローを構築するメソッド
async ValueTask<Workflow<SetAmountMessage>> BuildWorkflowAsync()
{
// 各Executorのインスタンスを作成
var countUpExecutor = new CountUpExecutor();
var generateOutputMessageExecutor = new GenerateOutputMessageExecutor();
// ワークフロービルダーを初期化
// 開始地点は CountUpExecutor
return await new WorkflowBuilder(countUpExecutor)
.AddSwitch(countUpExecutor, switchBuilder =>
{
// 10 より小さい場合は countUpExecutor 戻してループ
switchBuilder.AddCase(
(CurrentCountMessage? message) => message is { Value: < 10 },
countUpExecutor);
// 10 以上の場合は generateOutputMessageExecutor に行く
switchBuilder.WithDefault(generateOutputMessageExecutor);
})
// 出力は GenerateOutputMessageExecutor
.WithOutputFrom(generateOutputMessageExecutor)
.BuildAsync<SetAmountMessage>();
}
このワークフローでは、初期値としてカウントアップの増加量を受け取って、それを使って CountUpExecutor
が 10 未満の間は自身にループし、10 以上になったら GenerateOutputMessageExecutor
に進むようになっています。つまり 4
を入力すると CountUpExecutor
が 0, 4, 8, 12
とカウントアップし、最終的に GenerateOutputMessageExecutor
が The final count is 12
というメッセージを生成します。
ちなみに、ここでも初出の機能として AddSwitch
メソッドを使って条件分岐を定義しています。これは AddCase
メソッドで条件と遷移先を指定し、WithDefault
メソッドでデフォルトの遷移先を指定します。今回は 2 つへの分岐ですが AddCase
を複数回呼び出すことで 3 つ以上の分岐も可能です。前回紹介すべきでしたが、純粋に忘れていました。すいません。
ということで実際に動かしてみましょう。チェックポイントの機能を使うには CheckpointManager
を用意して InProcessExecution.StreamAsync
に渡します。以下のコードはワークフローを実行し、スーパーステップ完了時にチェックポイントを保存しながら最終出力を取得する例です。ワークフローの実行が完了した後に最終出力と保存された全てのチェックポイントの情報を表示します。
// ワークフローを構築
var workflow = await BuildWorkflowAsync();
// チェックポイント管理の準備
var checkpointManager = CheckpointManager.Default;
// チェックポイント情報を格納するリスト
List<CheckpointInfo> checkpoints = [];
// ワークフローを実行し、初期値4を渡す
await using Checkpointed<StreamingRun> run = await InProcessExecution.StreamAsync(
workflow,
new SetAmountMessage(4),
checkpointManager);
// ワークフローの最終出力を格納する変数
string? output = null;
// イベントストリームを監視
await foreach (var evt in run.Run.WatchStreamAsync())
{
// スーパーステップ完了時にチェックポイントを保存
if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
{
Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
}
// CountUpExecutor の実行完了時にログ出力
if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
{
Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
}
// 最終出力を取得
if (evt is WorkflowOutputEvent { Data: string } outputEvent)
{
Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.As<string>()}");
output = outputEvent.As<string>();
}
}
// 結果を出力
Console.WriteLine(output);
// 全チェックポイントを表示
foreach (var checkpoint in checkpoints)
{
Console.WriteLine($"Checkpoint at run id {checkpoint.RunId} with checkpoint {checkpoint.CheckpointId}");
}
ワークフローの状態を表す Checkpoint
は SuperStepCompletedEvent
の CompletionInfo
プロパティに含まれています。スーパーステップが完了するたびにこのイベントが発行されるので、これを監視してチェックポイント情報を収集しています。SuperStepCompletedEvent
イベント以外にも上記コードでは ExecutorCompletedEvent
と WorkflowOutputEvent
を監視して、各 Executor の実行結果や最終出力を取得しています。
このコードを実行すると、以下のような出力が得られます。
CountUpExecutor result: CurrentCountMessage { Value = 0 }
Checkpoint was stored: 85975cdb9aa041c6a4d4d63912cce279
CountUpExecutor result: CurrentCountMessage { Value = 4 }
Checkpoint was stored: 813d0077797c46e6a27c8f41d6288dd3
CountUpExecutor result: CurrentCountMessage { Value = 8 }
Checkpoint was stored: 5e5c24d43780401497cf4b86d4c6d90a
CountUpExecutor result: CurrentCountMessage { Value = 12 }
Checkpoint was stored: e4320c537d0f473783c1d494f2f0261b
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: 9c6942e22b744babb07b2f5ac3302f13
The final count is 12
Checkpoint at run id 10c04281b12946e4aacc8caa5fd466d5 with checkpoint 85975cdb9aa041c6a4d4d63912cce279
Checkpoint at run id 10c04281b12946e4aacc8caa5fd466d5 with checkpoint 813d0077797c46e6a27c8f41d6288dd3
Checkpoint at run id 10c04281b12946e4aacc8caa5fd466d5 with checkpoint 5e5c24d43780401497cf4b86d4c6d90a
Checkpoint at run id 10c04281b12946e4aacc8caa5fd466d5 with checkpoint e4320c537d0f473783c1d494f2f0261b
Checkpoint at run id 10c04281b12946e4aacc8caa5fd466d5 with checkpoint 9c6942e22b744babb07b2f5ac3302f13
run id と checkpoint id は実行ごとに変わるので、実際に動かしたときとは異なる値になりますが、The final count is 12
という最終出力が得られ、4 回の CountUpExecutor
の実行のためのスーパーステップと 1 回の GenerateOutputMessageExecutor
完了時にそれぞれチェックポイントが保存されていることが分かります。図で表すと以下のようになります。
Checkpoint
を使ってワークフローを途中から再開することも可能です。例えば、3 番目のチェックポイントから再開したい場合は、以下のようにします。ポイントは InProcessExecution.ResumeStreamAsync
メソッドを使うことです。引数には新しいワークフロー、再開するチェックポイント、チェックポイントマネージャ、そして再開する実行の run id を指定します。このとき指定するチェックポイントマネージャーは、最初の実行時に使ったものと同じインスタンスを使う必要があります。これは、大事なデータはチェックポイントマネージャーが管理しているためです。
Console.WriteLine("--------------------------------------");
var workflow2 = await BuildWorkflowAsync();
await using var run2 = await InProcessExecution.ResumeStreamAsync(
workflow2,
checkpoints[2],
checkpointManager,
checkpoints[2].RunId);
await foreach (var evt in run2.Run.WatchStreamAsync())
{
Console.WriteLine(evt);
}
これを実行すると無限ループになってしまいます。これは、チェックポイントが保存しているのは IWorkflowContext
に保存されたステートとワークフローの進捗なので Executor
のフィールドなどは保存されません。そのため今回の場合は CountUpExecutor
の _currentCount
や _amount
は保存されず、再開時にはどちらも 0
のままになってしまいます。なので、CountUpExecutor
は再開後に CurrentCountMessage
を受け取っても _amount
が 0
のままなのでカウントアップが行われず、ずっと 0
のままループしてしまいます。
この問題を解決するには、チェックポイントの前後で Executor
の状態を IWorkflowContext
に保存し、再開時にそれを読み込んで Executor
のフィールドに反映させる必要があります。Executor
には、そのためのメソッドとして OnCheckpointingAsync
と OnCheckpointRestoredAsync
が用意されています。OnCheckpointingAsync
はチェックポイントが保存される直前に呼び出され、OnCheckpointRestoredAsync
はチェックポイントから復元された直後に呼び出されます。これらをオーバーライドして、チェックポイントの保存と復元の際に必要な状態を IWorkflowContext
に保存・読み込みするように実装します。
この機能を実装した CountUpExecutor
は以下のようになります。
// カウントアップを行うExecutor
class CountUpExecutor() : ReflectingExecutor<CountUpExecutor>(nameof(CountUpExecutor)),
IMessageHandler<SetAmountMessage, CurrentCountMessage>,
IMessageHandler<CurrentCountMessage, CurrentCountMessage>
{
private int _amount;
private int _currentCount;
// 以前と同じなので HandleExecuteAsync メソッドは省略
protected override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 必要なステータスを保存
await context.QueueStateUpdateAsync(nameof(_amount), _amount, cancellationToken);
await context.QueueStateUpdateAsync(nameof(_currentCount), _currentCount, cancellationToken);
}
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 必要なステータスを読み込む
_amount = await context.ReadStateAsync<int>(nameof(_amount));
_currentCount = await context.ReadStateAsync<int>(nameof(_currentCount));
}
}
この状態で実行すると以下のように、途中から再開しても正しく動作することが確認できます。
CountUpExecutor result: CurrentCountMessage { Value = 0 }
Checkpoint was stored: a317212f981d42ebb88f183d8640d7fd
CountUpExecutor result: CurrentCountMessage { Value = 4 }
Checkpoint was stored: 8ecddebaf6f7412b9232597f0c5ef56d
CountUpExecutor result: CurrentCountMessage { Value = 8 }
Checkpoint was stored: 6cd2d9ba401e4824b010c725fb70ce98
CountUpExecutor result: CurrentCountMessage { Value = 12 }
Checkpoint was stored: 56ea76158ae8499fbedf30d7476f292f
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: 1f5b03c1233d40d09d1233fae401af70
The final count is 12
Checkpoint at run id 6a29eb7235d54ce0b1ce4a8eed92a1a7 with checkpoint a317212f981d42ebb88f183d8640d7fd
Checkpoint at run id 6a29eb7235d54ce0b1ce4a8eed92a1a7 with checkpoint 8ecddebaf6f7412b9232597f0c5ef56d
Checkpoint at run id 6a29eb7235d54ce0b1ce4a8eed92a1a7 with checkpoint 6cd2d9ba401e4824b010c725fb70ce98
Checkpoint at run id 6a29eb7235d54ce0b1ce4a8eed92a1a7 with checkpoint 56ea76158ae8499fbedf30d7476f292f
Checkpoint at run id 6a29eb7235d54ce0b1ce4a8eed92a1a7 with checkpoint 1f5b03c1233d40d09d1233fae401af70
--------------------------------------
SuperStepStartedEvent(Step = 0, Data: Microsoft.Agents.AI.Workflows.SuperStepStartInfo = Microsoft.Agents.AI.Workflows.SuperStepStartInfo)
ExecutorInvokedEvent(Executor = CountUpExecutor, Data: Microsoft.Agents.AI.Workflows.PortableValue = Microsoft.Agents.AI.Workflows.PortableValue)
ExecutorCompletedEvent(Executor = CountUpExecutor, Data: CurrentCountMessage = CurrentCountMessage { Value = 12 })
SuperStepCompletedEvent(Step = 0, Data: Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo = Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo)
SuperStepStartedEvent(Step = 1, Data: Microsoft.Agents.AI.Workflows.SuperStepStartInfo = Microsoft.Agents.AI.Workflows.SuperStepStartInfo)
ExecutorInvokedEvent(Executor = GenerateOutputMessageExecutor, Data: CurrentCountMessage = CurrentCountMessage { Value = 12 })
ExecutorCompletedEvent(Executor = GenerateOutputMessageExecutor, Data: System.String = The final count is 12)
WorkflowOutputEvent(Data: System.String = The final count is 12)
SuperStepCompletedEvent(Step = 1, Data: Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo = Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo)
ちゃんと途中から実行されて The final count is 12
という結果が得られています。
この例では新しいワークフローを作成して再開しましたが、同じワークフローを使い回すことも可能です。厳密には同じワークフローを使いまわすのではなく、ワークフローの実行を巻き戻すというイメージに近いです。やり方は Checkpointed<StreamingRun>
(今回の例では run
変数) の RestoreCheckpointAsync
メソッドを使います。引数には再開するチェックポイントを指定します。以下のように書きます。
Console.WriteLine("--------------------------------------");
// 指定したチェックポイントの位置に戻す
await run.RestoreCheckpointAsync(checkpoints[2]);
await foreach (var evt in run.Run.WatchStreamAsync())
{
Console.WriteLine(evt);
}
これで、元々の実行状態に戻すことができました。再開後は、チェックポイントを基にした状態から処理が再開されます。これにより、ワークフローの実行を柔軟に管理することが可能になります。実行結果は先ほどと同じなので省略します。
まとめ
今回は、Microsoft Agent Framework の Executor のステータス管理について掘り下げてみました。IWorkflowContext
を使って Executor 間で状態を共有する方法や、ワークフロー全体の状態をチェックポイントとして保存し、途中から再開する方法を紹介しました。これらの機能を活用することで、一時的にワークフローを止めたり、再開したりすることが自由にできるようになります。
しかし、現状ではインメモリにチェックポイントの状態が保存されるため永続化されません。次回は、チェックポイントの永続化と、余裕があれば Human in the Loop (HITL) の機能について見てみたいと思います。
Discussion