Microsoft Agent Framework (C#) を見てみよう その8 Human in the loop を試してみよう
シリーズ記事
- 「雑感」とハローワールド
- ざっとリポジトリを見てみる
- ワークフローを見てみよう
- ワークフローの Executor を掘り下げる
- ワークフローで条件分岐とループを扱う
- Executor のステータス管理
- チェックポイントの永続化
- Human in the loop を試してみよう
はじめに
前回は、Microsoft Agent Framework のワークフローの永続化の仕組みについて見てきました。ワークフローを任意の場所で止めて、途中から再開することができるようになりました。ワークフローを途中で止めるということは、例えば人間の判断を挟みたい場合などに有効です。そのため今回は、Human in the loop の仕組みを試してみようと思います。
Microsoft Agent Framework の Human in the loop
Human in the loop は、ワークフローの中で人間の判断を挟みたい場合に利用します。例えば、AI が生成した文章を人間が確認して修正したり、AI が提案した選択肢から人間が選んだりする場合などです。 Microsoft Agent Framework ではワークフローに RequestPort
を組み込むことで Human in the loop を実現できます。
RequestPort
はワークフローから外部に対して追加情報を要求するためのポートです。RequestPort
をワークフローに組み込むと、ワークフローの実行が一時停止し、外部からの応答を待つ状態になります。外部から応答が送られると、ワークフローの実行が再開されます。RequestPort
にワークフローの実行順が来ると RequestInfoEvent
というイベントがワークフローから発行されます。このイベントの Request
プロパティの DataAs<T>
メソッドでワークフローから渡されたデータが取得できます。DataAs<T>
メソッドで取得できたデータをもとにユーザーや外部システムなどから応答を受け取り Run
の SendMessageAsync
メソッドで応答をワークフローに送ります。SendMessageAsync
メソッドの引数には RequestInfoEvent
の Request.CreateResponse(ワークフローに返す値)
メソッドで生成したレスポンスを渡します。
実際に Human in the loop を試してみましょう。
今までもずっと使ってきたカウントアップをするワークフローに RequestPort
を組み込んで、カウントアップの途中で人間の判断を挟むようにしてみます。具体的には、現在の値を表示してユーザーに、まだカウントアップを続けるかどうかを尋ねるようにします。カウントアップを続ける場合は、カウントアップを行いユーザーに確認を再度求めます。カウントアップを続けない場合は、最終的なメッセージを作成してワークフローを終了します。
処理の流れを図にすると以下のようになります。
ではコードで書いていきましょう。まずは、Executor
を定義します。といっても今までとほとんど変わりはありません。唯一の違いは CurrentCountMessage
に Value
以外に Continue
プロパティを追加しているところです。Continue
プロパティはユーザーがカウントアップを続けるかどうかの選択肢を表します。
// 初期メッセージ: カウントアップの増加量を指定
record SetAmountMessage(int Amount);
// カウントアップの現在値と継続フラグを含むメッセージ
record CurrentCountMessage(int Value, bool Continue = true);
// カウントアップを行うExecutor
class CountUpExecutor() : ReflectingExecutor<CountUpExecutor>(nameof(CountUpExecutor)),
IMessageHandler<SetAmountMessage, CurrentCountMessage>,
IMessageHandler<CurrentCountMessage, CurrentCountMessage>
{
// カウントアップの増加量
private int _amount;
// 現在のカウント値
private int _currentCount;
// 初期メッセージを処理し、増加量を設定。戻り値は現在の値を返す。
public ValueTask<CurrentCountMessage> HandleAsync(SetAmountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 増加量を設定
_amount = message.Amount;
// 初期カウント値を返す
return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
}
// カウントアップ後のメッセージを処理し、さらにカウントアップ
public ValueTask<CurrentCountMessage> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// カウント値を増加量分増やす
_currentCount += _amount;
// 更新後のカウント値を返す
return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
}
// チェックポイント保存時に呼ばれるメソッド
protected override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 増加量を状態として保存
await context.QueueStateUpdateAsync(nameof(_amount), _amount, cancellationToken);
// 現在のカウント値を状態として保存
await context.QueueStateUpdateAsync(nameof(_currentCount), _currentCount, cancellationToken);
}
// チェックポイント復元時に呼ばれるメソッド
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 保存された増加量を読み込む
_amount = await context.ReadStateAsync<int>(nameof(_amount));
// 保存された現在のカウント値を読み込む
_currentCount = await context.ReadStateAsync<int>(nameof(_currentCount));
}
}
// 最終的な出力メッセージを生成する Executor
class GenerateOutputMessageExecutor() : Executor<CurrentCountMessage, string>(nameof(GenerateOutputMessageExecutor))
{
// 最終的なカウント値からメッセージを生成
public override ValueTask<string> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
ValueTask.FromResult($"The final count is {message.Value}");
}
次にワークフローを組み立てます。ポイントは RequestPort
を CountUpExecutor
と GenerateOutputMessageExecutor
の間に挟んで、RequestPort
から返される CurrentCountMessage
の Continue
プロパティの値を見て行先の Executor
を AddSwitch
で決めています。
// ワークフローを構築するメソッド
static async ValueTask<Workflow<SetAmountMessage>> BuildWorkflowAsync()
{
// 各Executorのインスタンスを作成
var countUpExecutor = new CountUpExecutor();
var generateOutputMessageExecutor = new GenerateOutputMessageExecutor();
// 人間による確認を行うためのRequestPortを作成
var requestPort = RequestPort.Create<CurrentCountMessage, CurrentCountMessage>("HumanInTheLoop");
// ワークフローの構築
return await new WorkflowBuilder(countUpExecutor)
// CountUpExecutorからRequestPortへのエッジを追加
.AddEdge(countUpExecutor, requestPort)
// RequestPortの後の分岐処理を追加
.AddSwitch(requestPort, switchBuilder =>
{
// 継続フラグがtrueの場合はCountUpExecutorに戻る(ループ)
switchBuilder.AddCase((CurrentCountMessage? m) => m?.Continue ?? false, countUpExecutor)
// 継続フラグがfalseの場合はGenerateOutputMessageExecutorへ遷移
.WithDefault(generateOutputMessageExecutor);
})
// ワークフローの最終出力をGenerateOutputMessageExecutorから取得
.WithOutputFrom(generateOutputMessageExecutor)
.BuildAsync<SetAmountMessage>();
}
では、ワークフローを実行してみましょう。ワークフローを実行するコードで新しい所は RequestInfoEvent
のハンドリングの箇所です。ここでユーザーからの入力を受け取り Continue
プロパティの値を設定してワークフローにメッセージを返しています。
// ワークフローを構築
var workflow = await BuildWorkflowAsync();
// ワークフローをストリーミング実行で開始し、初期メッセージとして増加量4を設定
await using var run = await InProcessExecution.StreamAsync(workflow, new SetAmountMessage(4));
// ワークフローから発生するイベントを順次処理
await foreach (var evt in run.WatchStreamAsync())
{
// 人間による入力が必要なイベントの場合
if (evt is RequestInfoEvent requestInfoEvent)
{
// リクエストから現在のカウント値を取得
var currentCountMessage = requestInfoEvent.Request.DataAs<CurrentCountMessage>();
if (currentCountMessage is null)
{
throw new InvalidOperationException("Expected CurrentCountMessage");
}
// 現在のカウント値を表示し、ユーザーに継続確認を求める
Console.WriteLine($"Current count is {currentCountMessage.Value}, please type 'continue' to proceed.");
var input = Console.ReadLine();
// ユーザーが "continue" 以外を入力した場合は継続フラグをfalseに設定
if (input != "continue")
{
currentCountMessage = currentCountMessage with { Continue = false };
}
// ユーザーの応答をワークフローに送信
await run.SendResponseAsync(requestInfoEvent.Request.CreateResponse(currentCountMessage));
}
// ワークフローの最終出力イベントの場合
if (evt is WorkflowOutputEvent outputEvent)
{
// 完了メッセージを表示
Console.WriteLine($"Workflow completed with output: {outputEvent.As<string>()}");
}
}
実行すると以下のような結果になります。
Current count is 0, please type 'continue' to proceed.
continue
Current count is 4, please type 'continue' to proceed.
continue
Current count is 8, please type 'continue' to proceed.
continue
Current count is 12, please type 'continue' to proceed.
continue
Current count is 16, please type 'continue' to proceed.
NO!!!!
Workflow completed with output: The final count is 16
continue
と入力するとカウントアップが行われて、それ以外を入力すると最終的なメッセージが生成されてワークフローが終わっていることが確認できます。
CheckpointManager との連携
先ほどの例では、イベントを処理する await foreach
の中でユーザーからの入力を処理していました。実際のシステムではループ内で処理するのではなく、一旦ワークフローを止めてユーザーに確認をとることになると思います。ユーザーからの応答もいつ来るかわからないと思うのでワークフローは状態の保存をして、ユーザーからの応答が返ってきた段階で状態を復元して続きを実行するという形になります。
このような動作を実現するには、前回やった CheckpointManager
を使ってワークフローのチェックポイントを保存するのが効果的です。
やってみましょう。まずはワークフローを細切れに実行できるようにします。
細切れに実行するために以下のように WorkflowResult
というレコードを定義します。WorkflowResult
にはワークフローの実行結果として、RequestInfoEvent
、ワークフローの出力イベント、エラーイベント、チェックポイント情報を含めます。
// ワークフロー実行結果を保持するレコード
record WorkflowResult(
RequestInfoEvent? RequestInfoEvent,
IReadOnlyCollection<WorkflowOutputEvent> Outputs,
IReadOnlyCollection<WorkflowErrorEvent> Errors,
IReadOnlyCollection<CheckpointInfo> Checkpoints);
RequestInfoEvent
が null
でない場合は Human in the loop の入力待ち状態、Outputs
に値が含まれる場合はワークフローの出力が完了した状態、Errors
に値が含まれる場合はワークフローの実行中にエラーが発生した状態を表します。Checkpoints
にはワークフローのチェックポイント情報が含まれます。
次にワークフローを開始して、RequestInfoEvent
が返ってくるまで実行する StartWorkflowAsync
メソッドを定義します。
// ワークフローを新規開始するメソッド
static async ValueTask<WorkflowResult> StartWorkflowAsync(SetAmountMessage message, CheckpointManager checkpointManager)
{
// ワークフローの定義を構築
var workflow = await BuildWorkflowAsync();
// ワークフローを開始し、ストリーミング実行を取得
await using var checkpointedRun = await InProcessExecution.StreamAsync(workflow, message, checkpointManager);
// ワークフローイベントを収集するためのコレクションを初期化
RequestInfoEvent? lastRequestInfoEvent = null;
List<WorkflowOutputEvent> outputs = [];
List<WorkflowErrorEvent> errors = [];
List<CheckpointInfo> checkpoints = [];
// ワークフローイベントをストリームから監視
await foreach (var workflowEvent in checkpointedRun.Run.WatchStreamAsync())
{
switch (workflowEvent)
{
// RequestInfoEventを受信したら、即座に結果を返す(Human-in-the-Loop地点)
case RequestInfoEvent requestInfoEvent:
lastRequestInfoEvent = requestInfoEvent;
return new(lastRequestInfoEvent, outputs, errors, checkpoints);
// 出力イベントを収集
case WorkflowOutputEvent outputEvent:
outputs.Add(outputEvent);
break;
// エラーイベントを収集
case WorkflowErrorEvent errorEvent:
errors.Add(errorEvent);
break;
// チェックポイント情報を収集
case SuperStepCompletedEvent superStepCompletedEvent when superStepCompletedEvent.CompletionInfo?.Checkpoint is not null:
checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
break;
}
}
// ストリームが終了したら結果を返す
return new(null, outputs, errors, checkpoints);
}
次に続きからワークフローを実行する ResumeWorkflowAsync
メソッドを定義します。
このメソッドはチェックポイント、CheckpointManager
、ユーザーからの応答メッセージを受け取ります。ワークフローの定義を再構築して、チェックポイントからワークフローを復元し、ユーザーからの応答メッセージをワークフローに送信します。RequestInfoEvent
が返ってくるまでワークフローを実行し、イベントを収集して結果を返します。
// チェックポイントからワークフローを復元して再開するメソッド
static async ValueTask<WorkflowResult> ResumeWorkflowAsync(
CheckpointInfo checkpoint,
CheckpointManager checkpointManager,
CurrentCountMessage message)
{
CurrentCountMessage? responseMessage = message;
// ワークフローの定義を再構築(バッチ処理シナリオでは別プロセスで実行される想定)
var workflow = await BuildWorkflowAsync();
// チェックポイントからワークフローを復元
await using var checkpointedRun = await InProcessExecution.ResumeStreamAsync(workflow, checkpoint, checkpointManager, checkpoint.RunId);
// ワークフローイベントを収集するためのコレクションを初期化
RequestInfoEvent? lastRequestInfoEvent = null;
List<WorkflowOutputEvent> outputs = [];
List<WorkflowErrorEvent> errors = [];
List<CheckpointInfo> checkpoints = [];
// ワークフローイベントをストリームから監視
await foreach (var workflowEvent in checkpointedRun.Run.WatchStreamAsync())
{
switch (workflowEvent)
{
case RequestInfoEvent requestInfoEvent:
lastRequestInfoEvent = requestInfoEvent;
// 応答メッセージがある場合、ワークフローに送信
if (responseMessage is not null)
{
await checkpointedRun.Run.SendResponseAsync(requestInfoEvent.Request.CreateResponse(responseMessage));
responseMessage = null;
}
else
{
// 応答済みの場合は、次のHuman-in-the-Loop地点として結果を返す
return new(lastRequestInfoEvent, outputs, errors, checkpoints);
}
break;
// 出力イベントを収集
case WorkflowOutputEvent outputEvent:
outputs.Add(outputEvent);
break;
// エラーイベントを収集
case WorkflowErrorEvent errorEvent:
errors.Add(errorEvent);
break;
// チェックポイント情報を収集
case SuperStepCompletedEvent superStepCompletedEvent when superStepCompletedEvent.CompletionInfo?.Checkpoint is not null:
checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
break;
}
}
// ストリームが終了したら結果を返す
return new(null, outputs, errors, checkpoints);
}
これを組み合わせると以下のようにして Human in the loop を実現できます。
// チェックポイントを管理するマネージャーを取得(デフォルト実装を使用)
var checkpointManager = CheckpointManager.Default;
// ワークフローを開始し、初期メッセージを送信
var result = await StartWorkflowAsync(new SetAmountMessage(4), checkpointManager);
// RequestInfoEventが存在する限りループ(Human-in-the-Loopの処理)
while(result.RequestInfoEvent != null)
{
// リクエストからCurrentCountMessageを取得
var currentCountMessage = result.RequestInfoEvent.Request.DataAs<CurrentCountMessage>();
if (currentCountMessage == null) throw new InvalidOperationException("Received null CurrentCountMessage.");
// 現在のカウント値をユーザーに表示し、入力を待つ
Console.WriteLine($"Current count is {currentCountMessage.Value}, please type 'continue' to proceed.");
var input = Console.ReadLine();
var continueFlag = string.Equals(input, "continue", StringComparison.OrdinalIgnoreCase);
// ユーザーの入力を元に応答メッセージを作成
var responseMessage = currentCountMessage with { Continue = continueFlag };
// NOTE: チェックポイントは SuperStepCompletedEvent 発生時に追加される想定。
// 実運用で空コレクションの可能性を考慮するなら result.Checkpoints.Any() でガードしてから Last() を呼ぶと安全。
var latestCheckpoint = result.Checkpoints.Last();
// チェックポイントから復元して、ワークフローを再開
result = await ResumeWorkflowAsync(latestCheckpoint, checkpointManager, responseMessage);
}
// NOTE: このサンプルでは最終出力が1つ得られる前提で Last() を使用。
// 実運用で出力が存在しない可能性を考慮するなら result.Outputs.Any() で確認してから参照する。
Console.WriteLine(result.Outputs.Last().As<string>());
実行すると以下のような結果になります。
Current count is 0, please type 'continue' to proceed.
continue
Current count is 4, please type 'continue' to proceed.
continue
Current count is 8, please type 'continue' to proceed.
continue
Current count is 12, please type 'continue' to proceed.
continue
Current count is 16, please type 'continue' to proceed.
no
The final count is 16
ちゃんと動いてますね。
この例では 1 回のプログラムの実行でワークフローを完了させていますが、実際のシステムではユーザーからの応答を受け取ったタイミングでプログラムを終了し、次回ユーザーからの応答があったタイミングでプログラムを再度起動して ResumeWorkflowAsync
を呼び出す形になると思います。こうすることで、ユーザーからの応答を待つ間にシステムのリソースを消費し続けることを防げます。
例えば以下のようにコードを書き換えるとプロセスの終了を跨いでワークフローを継続できます。
// === 永続化付き Human-in-the-Loop ワークフロー サンプル ===
// プロセスを再起動しても Checkpoint + 直前の Request 情報を元に続きから再開する
// 1. FileSystemJsonCheckpointStore により Executor 内部状態などをチェックポイント永続化
// 2. 人間への問い合わせ直後 (RequestInfoEvent 受信時) にシリアライズした WorkflowResult を state.json に保存
// 3. 次回プロセス起動時に state.json を読み込み、ユーザー入力を受けて Resume 実行
// 4. 追加の Request が出るまで、または最終 Output まで進める
// ---------------------------------------------------------
// チェックポイントをファイルシステムに JSON 形式で保存するストアを初期化
using var store = new FileSystemJsonCheckpointStore(
new DirectoryInfo(Directory.GetCurrentDirectory()));
// 上記ストアを使って CheckpointManager を生成(Default ではなくカスタム JSON 永続化)
var checkpointManager = CheckpointManager.CreateJson(store);
// 前回の Human 待ち状態を表すシリアライズ済み WorkflowResult を読み込む(存在しなければ新規開始)
WorkflowResult? workflowResult = null;
if (File.Exists("state.json"))
{
// state.json には直前の RequestInfoEvent / 収集済み出力 / チェックポイント ID 等が保存されている
using (var fs = File.OpenRead("state.json"))
{
workflowResult = await JsonSerializer.DeserializeAsync<WorkflowResult>(fs);
}
// 再利用後は毎回削除(再入力失敗時にロールバックしたい場合は削除タイミングを調整)
File.Delete("state.json");
}
if (workflowResult == null)
{
// 初回起動: 新しくワークフローを開始し最初の Human 介入ポイント (または完了) まで進める
workflowResult = await StartWorkflowAsync(new SetAmountMessage(4), checkpointManager);
}
else
{
// 再起動: 前回 RequestInfoEvent のペイロード(JSON)を CurrentCountMessage として復元
// プロセスが変わると型情報が失われるため JsonElement 経由で復元する
var json = workflowResult.RequestInfoEvent?.Request.DataAs<JsonElement>();
var currentCountMessage = json?.Deserialize<CurrentCountMessage>();
if (currentCountMessage == null) throw new InvalidOperationException("Received null CurrentCountMessage.");
// ユーザーに続行可否を質問("continue" 以外は false 扱い)
Console.WriteLine($"Current count is {currentCountMessage.Value}, please type 'continue' to proceed.");
var input = Console.ReadLine();
var continueFlag = string.Equals(input, "continue", StringComparison.OrdinalIgnoreCase);
// 保存していた最新チェックポイントから再開
workflowResult = await ResumeWorkflowAsync(
workflowResult.Checkpoints.Last(),
checkpointManager,
currentCountMessage with { Continue = continueFlag });
}
if (workflowResult.RequestInfoEvent == null)
{
// 人間の待ちイベントが無い = ワークフローが完了なので結果を出力
Console.WriteLine(workflowResult.Outputs.Last().As<string>());
}
else
{
// 人間の介入待ちで再度プロセスを終了するケース
// 状態を state.json に保存
using var fs = File.Create("state.json");
await JsonSerializer.SerializeAsync(fs, workflowResult);
Console.WriteLine("State saved. Please run the program again to continue.");
}
このプログラムを実行すると、ユーザーの入力を待つ状態で state.json
に状態が保存されてプログラムが終了します。再度プログラムを実行すると state.json
から状態が復元されて、ユーザーに続行するかどうかの確認が再度表示されます。ユーザーが continue
と入力するとカウントアップが続行され、state.json
が作成されてプロセスが終了します。continue
以外を入力すると最終的なメッセージが表示されてワークフローが終了します。
処理の流れを図にすると以下のようになります。
実行すると以下のような結果になります。初回実行では Human in the loop の所まで進んで state.json
が作成されてプロセスが終了します。
State saved. Please run the program again to continue.
次に実行をすると state.json
から状態が復元されて、前回の続きからワークフローが再開されます。
Current count is 0, please type 'continue' to proceed.
continue
State saved. Please run the program again to continue.
さらに実行を続けると、カウントアップが進んでいきます。
Current count is 4, please type 'continue' to proceed.
continue
State saved. Please run the program again to continue.
continue
以外を入力するとワークフローが終了して最終的な出力が表示されます。
Current count is 8, please type 'continue' to proceed.
no
The final count is 8
いい感じに動いてますね。
まとめ
今回は Microsoft Agent Framework の Human in the loop の仕組みを試してみました。RequestPort
をワークフローに組み込むことで、ワークフローの途中で人間の判断を挟むことができるようになりました。また、CheckpointManager
と組み合わせることで、ワークフローの状態を保存して、ユーザーからの応答があったタイミングでワークフローを復元して再開することもできました。 これにより、より柔軟でインタラクティブなワークフローの構築が可能になります。
これまでのシリーズでワークフローの基本的な使い方から、条件分岐、ループ、チェックポイントの永続化、Human in the loop まで見てきました。次回からは Agent の機能を見ていこうと思います。
Discussion