Microsoft Agent Framework (C#) を見てみよう その3 ワークフローを見てみよう
シリーズ記事
はじめに
前回は、Microsoft Agent Framework のリポジトリを clone してきてざっと眺めてみました。今回は、Microsoft Agent Framework の中核的な機能であるワークフローについて見ていきたいと思います。
Agent Framework のワークフローのハローワールド
Agent Framework のワークフロー機能ですが、最初の記事で Agent を組み合わせたワークフローを作りましたが、実際のワークフロー機能は AI を使わない処理でもワークフローを定義できるようになっています。そのためコンソールアプリケーションを作って Microsoft.Agents.AI.Workflows
のパッケージをインストールするだけでワークフローを試すことができます。
ワークフローは Edge と呼ばれる 1 つの処理の実行単位を繋げて大きな処理を作っていくイメージです。
API も一般利用者が触ることを想定されているであろう一番低レベルな API は Edge を繋げていくような形になっています。
Edge を作るには ReflectingExecutor<T>
クラスを継承して行います。セットで IMessageHandler<TMessage, TResult>
か IMessageHandler<TMessage>
のどちらかを実装します。IMessageHandler<TMessage, TResult>
は入力を受け取り出力を返す単純な Edge を作る場合に使い、IMessageHandler<TMessage>
は入力を引数に受け取って出力はセットで渡される IWorkflowContext
を使って返すような Edge を作る場合に使います。IWorkflowContext
は、ワークフローの動きを細かく制御したいときに使って IMessageHandler<TMessage, TResult>
ではできないようなことができます。
ひとまず、IMessageHandler<TMessage, TResult>
を使った Edge の例を見てみましょう。文字列を受け取って、文字列を少し加工して返すような Edge を 2 つ作って繋いでみようと思います。
まずは、Edge を担当するクラスを 2 つ作ります。
// ☆ で文字列を囲むエグゼキューター
class StarEnclosingExecutor() : ReflectingExecutor<StarEnclosingExecutor>(nameof(StarEnclosingExecutor)),
IMessageHandler<string, string>
{
public ValueTask<string> HandleAsync(string message, IWorkflowContext context) =>
ValueTask.FromResult($"☆{message}☆");
}
// ← → で文字列を囲むエグゼキューター
class ArrowEnclosingExecutor() : ReflectingExecutor<ArrowEnclosingExecutor>(nameof(ArrowEnclosingExecutor)),
IMessageHandler<string, string>
{
public ValueTask<string> HandleAsync(string message, IWorkflowContext context) =>
ValueTask.FromResult($"←{message}→");
}
ReflectiongExecutor<T>
のコンストラクターには、Edge の ID を渡します。IMessageHandler<TMessage, TResult>
を実装して HandleAsync
メソッドを実装します。引数の message
が入力で、戻り値が出力になります。
次に、これらの Edge を繋げてワークフローを作ります。
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
// ステップ1: ☆で囲む処理
var starEnclosing = new StarEnclosingExecutor();
// ステップ2: ← → で囲む処理
var arrowEnclosing = new ArrowEnclosingExecutor();
// ワークフローを構築: starEnclosing → arrowEnclosing の順で実行
var builder = new WorkflowBuilder(starEnclosing);
builder.AddEdge(starEnclosing, arrowEnclosing)
// ワークフローの出力はarrowEnclosingの結果を使用
.WithOutputFrom(arrowEnclosing);
var workflow = await builder.BuildAsync<string>();
// ワークフローを実行して結果をストリーム処理
var run = await InProcessExecution.StreamAsync(workflow, "Kazuki Ota");
// WatchStreamAsync でワークフローが返すイベントを受け取る非同期ストリームが返ってくる
List<WorkflowOutputEvent> outputEvents = await run.WatchStreamAsync()
// WorkflowOutputEvent がワークフローの出力のイベントなので、それを収集する
.OfType<WorkflowOutputEvent>()
.ToListAsync();
// 結果を出力 (期待値: ArrowEnclosingExecutor: ←☆Kazuki Ota☆→)
foreach (var outputEvent in outputEvents)
{
Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<string>()}");
}
WorkflowBuilder
クラスを使ってワークフローを構築します。最初にワークフローの最初の Edge を渡して WorkflowBuilder
のインスタンスを作ります。次に AddEdge
メソッドで Edge を繋げていきます。最後に WithOutputFrom
メソッドでワークフローの出力をどの Edge の出力にするかを指定します。最後に BuildAsync<TOutput>
メソッドでワークフローをビルドします。ジェネリック引数にはワークフローの入力の型を指定します。
ワークフローの実行は InProcessExecution.StreamAsync
メソッドを使います。第一引数にビルドしたワークフロー、第二引数にワークフローの入力を渡します。戻り値は StreamingRun
というクラスのインスタンスが返ってきます。StreamingRun
クラスの WatchStreamAsync
メソッドを使うと、ワークフローの実行中に発生するイベントを非同期ストリームで受け取ることができます。ワークフローの出力は WorkflowOutputEvent
という型で渡されるので、LINQ の OfType<WorkflowOutputEvent>
メソッドでフィルターしてから ToListAsync
メソッドでリストに変換しています。
☆で囲ってから ← → で囲む処理を実行するワークフローに "Kazuki Ota" という文字列を渡して実行すると、"←☆Kazuki Ota☆→" という結果が得られるはずです。実際に実行してみましょう。
結果は以下のようになりました。
ArrowEnclosingExecutor: ←☆Kazuki Ota☆→
無事に期待した結果が得られました。個人的に面白いなと思ったのは WithOutputFrom
メソッドは params
引数なので複数の Edge を指定できることです。複数の Edge を指定すると WorkflowOutputEvent
が複数回発生するので、ワークフローの出力を複数回受け取ることができます。例えば、以下のように WithOutputFrom
メソッドで 2 つの Edge を指定すると、2 回ワークフローの出力を受け取ることができます。
builder.AddEdge(starEnclosing, arrowEnclosing)
// ワークフローの出力はarrowEnclosingの結果を使用
.WithOutputFrom(starEnclosing, arrowEnclosing);
この変更を加えて実行すると、以下のような結果が得られます。
StarEnclosingExecutor: ☆Kazuki Ota☆
ArrowEnclosingExecutor: ←☆Kazuki Ota☆→
終端が複数あるワークフローを作ることができるのはいいですね。今回のワークフローは以下のようなイメージになると思います。
ループしたい!
チューリング完全を目指すなら分岐やループができないとですよね。単純にループをするには双方向に Edge をつなげばいいです。例えば、*
を文字列の先頭と末尾に追加する Edge をそれぞれ作ります。
// 文字列の末尾に "*" を追加する Executor
class AppendStarExecutor() : ReflectingExecutor<AppendStarExecutor>(nameof(AppendStarExecutor)),
IMessageHandler<string, string>
{
public ValueTask<string> HandleAsync(string message, IWorkflowContext context) =>
ValueTask.FromResult(message + "*");
}
// 文字列の先頭に "*" を追加する Executor
class PrependStarExecutor() : ReflectingExecutor<PrependStarExecutor>(nameof(PrependStarExecutor)),
IMessageHandler<string, string>
{
public ValueTask<string> HandleAsync(string message, IWorkflowContext context) =>
ValueTask.FromResult("*" + message);
}
そして、これらの Edge を双方向に繋げてループを作ります。WithOutputFrom
メソッドで両方の Edge を指定して、両方の Edge の出力をワークフローの出力にします。
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
// カスタム Executor のインスタンスを作成
var appendStar = new AppendStarExecutor();
var prependStart = new PrependStarExecutor();
// WorkflowBuilder を使用してワークフローグラフを構築
// ループして両方の Edge の結果をアウトプットにするワークフロー
var workflow = await new WorkflowBuilder(appendStar) // appendStar を開始 Executor として設定
.AddEdge(appendStar, prependStart) // appendStar から prependStart へのエッジ(接続)を追加
.AddEdge(prependStart, appendStar) // prependStart から appendStar へのエッジを追加(ループ構造を形成)
.WithOutputFrom(appendStar, prependStart) // 両方の Executor からの出力をワークフローの出力として指定
.BuildAsync<string>(); // 入力メッセージタイプ string を指定してワークフローをビルド
// InProcessExecution.StreamAsync でワークフローをストリーミングモードで実行
var run = await InProcessExecution.StreamAsync(workflow, "( *´艸`)");
int count = 0;
// ワークフローの実行イベントを非同期で列挙
await foreach (var evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
// 出力イベントの結果をコンソールに表示
Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<string>()}");
if (count++ > 3) break; // 5 回の出力で停止
}
}
このプログラムを実行すると、以下のような結果が得られます。
AppendStarExecutor: ( *´艸`)*
PrependStarExecutor: *( *´艸`)*
AppendStarExecutor: *( *´艸`)**
PrependStarExecutor: **( *´艸`)**
AppendStarExecutor: **( *´艸`)***
想定通りですね。このワークフロー自体には終わりがなく、WatchStreamAsync
メソッドで受け取るイベントも無限に続きます。なので、上記のコードでは count
変数を使って 5 回出力を受け取ったらループを抜けるようにしています。その他にワークフロー自体に終了条件を設ける方法もあります。
例えば AddEdge
メソッドの第二引数には condition
という引数があり Func<T, bool>
で条件を指定できます。この条件が true
を返すと Edge に渡した Executor が実行されます。これを使うと、例えば以下のようにしてループを 5 回で終了するようなワークフローを作ることもできます。
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
// カスタム Executor のインスタンスを作成
var appendStar = new AppendStarExecutor();
var prependStart = new PrependStarExecutor();
// 条件付きエッジでループを制御し、両方の Executor の出力をワークフローの出力とする
var workflow = await new WorkflowBuilder(appendStar)
.AddEdge(appendStar,
prependStart,
(string? m) => m?.Where(x => x == '*').Count() <= 4) // "*" が 4 個以下の場合のみperpendStarへ進む(ループ終了条件)
.AddEdge(prependStart, appendStar) // ループ構造を形成
.WithOutputFrom(appendStar, prependStart) // 両方の Executor からの出力を指定
.BuildAsync<string>();
// ワークフローをストリーミングモードで実行
var run = await InProcessExecution.StreamAsync(workflow, "( *´艸`)");
int count = 0;
// ワークフローイベントのストリームを監視
await foreach (var evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
// ワークフローの出力イベントを受信したら、出力内容を表示
Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<string>()}");
}
}
foreach
から if
文が消えて AddEdge
メソッドの第二引数に条件が追加されているのがわかると思います。*
の数が 4 個以下のときだけ prependStart
に進むようにしています。これでループが 5 回で終了するワークフローになります。
ワークフローの実行フローを見てみよう
このワークフローがどのように実行されて、Executor が順次実行されて、条件の評価が行われてワークフローが終了するまでの流れを視覚的に見てみましょう。以下のシーケンス図で、ワークフローの実行フロー全体を表現しています。
各ステップをもう少し詳しく見てみましょう。
-
初回実行: ユーザーコードから
"( *´艸`)"
という入力でワークフローが開始されます -
AppendStarExecutor 実行: 最初の Executor として
AppendStarExecutor
が実行され、末尾に*
を追加して"( *´艸`)*"
を出力します -
条件評価 (1回目):
*
の数が 1 個なので条件<= 4
を満たし、次の Executor へ進みます -
PrependStarExecutor 実行: 先頭に
*
を追加して"*( *´艸`)*"
を出力します -
ループ継続:
PrependStarExecutor
からAppendStarExecutor
へ戻り、再度末尾に*
を追加 -
条件評価の繰り返し: このサイクルが繰り返され、
*
の数が増えていきます -
終了条件:
*
の数が 5 個になった時点で条件<= 4
を満たさなくなり、PrependStarExecutor
への Edge が実行されずワークフローが終了します
このように、AddEdge
の条件を使うことで、ループの終了条件をワークフロー自体に組み込むことができます。無限ループにならずに済むので、安心してループを使ったワークフローを作ることができますね。
Executor 内で判断する方法
その他に IWorkflowContext
を使って細かくワークフローの動きを制御する方法もあります。例えば、IMessageHandler<TMessage>
を実装した Edge の場合、HandleAsync
メソッドには戻り値がなく第二引数の IWorkflowContext
を使ってワークフローにイベントを送ったり、メッセージを次の Edge に渡したりすることができます。例えば、以下のように IMessageHandler<TMessage>
を使ってループを制御することもできます。
IWorkflowContext
の SendMessageAsync
メソッドを使うと、次の Edge にメッセージを送ることができます。YieldOutputAsync
メソッドを使うと、ワークフローの出力としてメッセージを送ることができます。これらを使って条件を満たす限りループするような Edge を作ることができます。
// 文字列の先頭に "*" を追加する Executor
class PrependStarExecutor() : ReflectingExecutor<PrependStarExecutor>(nameof(PrependStarExecutor)),
IMessageHandler<string>
{
public async ValueTask HandleAsync(string message, IWorkflowContext context)
{
if (message.Count(c => c == '*') > 4)
{
// "*" が 4 個より大きい場合は処理を終了(ループ終了条件)
return;
}
var output = "*" + message;
await context.SendMessageAsync(output); // 次の Executor へメッセージを送信
await context.YieldOutputAsync(output); // ワークフローの出力として送信
}
}
この PrependStarExecutor
は、*
の数が 4 個を超えたら何もせずに終了します。そうでなければ、先頭に *
を追加した文字列を次の Edge に送って、ワークフローの出力としても送ります。PrependStarExecutor
に終了条件を持たせたので WorkflowBuilder
でループの条件を指定する必要はなくなるので、以下のようにシンプルにワークフローを作ることができます。
// Executor 内でループを制御し、両方の Executor の出力をワークフローの出力とする
var workflow = await new WorkflowBuilder(appendStar)
.AddEdge(appendStar, prependStart)
.AddEdge(prependStart, appendStar) // ループ構造を形成
.WithOutputFrom(appendStar, prependStart) // 両方の Executor からの出力を指定
.BuildAsync<string>();
この方法でも同じようにループを制御できます。IWorkflowContext
を使うと、ワークフローの動きを細かく制御できるので、複雑なワークフローを作る場合には便利です。
まとめ
今回は、Microsoft Agent Framework のワークフロー機能について見てきました。Edge を作って繋げてワークフローを作る方法、ループの作り方、条件付きの Edge の使い方、IWorkflowContext
を使った細かい制御についてみてきました。
今までは雰囲気でワークフローを使ってきたので、次回はもう少しワークフローの API を掘り下げてみたいと思います。
Discussion