Microsoft Agent Framework (C#) を見てみよう その5 ワークフローで条件分岐とループを扱う
シリーズ記事
- 「雑感」とハローワールド
- ざっとリポジトリを見てみる
- ワークフローを見てみよう
- ワークフローの Executor を掘り下げる
- ワークフローで条件分岐とループを扱う
- Executor のステータス管理
- チェックポイントの永続化
- Human in the loop を試してみよう
はじめに
前回は、Microsoft Agent Framework のワークフローの中核的な要素である Executor について掘り下げてみました。今回は、ワークフローで条件分岐やループをどのように扱うかについて見ていきたいと思います。条件分岐やループを表現するにはワークフローの Executor と Executor を繋ぐ Edge がポイントになると思います。
そのため、ここでは Edge について掘り下げてみます。
Edge の役割
Edge はワークフロー内で Executor 同士を繋ぐ役割を持っています。Edge はメッセージの流れを定義し、どの Executor がどのように連携するかを決定します。Edge のつなぎ方には、以下の 3 種類があります。
- Direct: ある Executor の出力を別の Executor の入力に直接接続する。
- FanOut: ある Executor の出力を複数の Executor の入力に接続する。
- FanIn: 複数の Executor の出力を 1 つの Executor の入力に接続する。
この定義は EdgeKind
という列挙型で表現されています。
/// <summary>
/// Specified the edge type.
/// </summary>
public enum EdgeKind
{
/// <summary>
/// A direct connection from one node to another.
/// </summary>
Direct,
/// <summary>
/// A connection from one node to a set of nodes.
/// </summary>
FanOut,
/// <summary>
/// A connection from a set of nodes to a single node.
/// </summary>
FanIn
}
つまり、この 3 つのつなぎ方を組み合わせることで、様々なワークフローを構築するのが Agent Framework のワークフローの基本的な考え方になります。
条件分岐の表現
条件分岐がないように見えますが、これは AddEdge
メソッドの condition
引数を使うことで実現できます。これは以前に書いた「ワークフローを見てみよう」で触れました。これを使うと入力が偶数か奇数かで分岐するワークフローを定義できます。
試してみましょう。まず以下のような Executor を定義します。
class PassThroughExecutor() : ReflectingExecutor<PassThroughExecutor>(nameof(PassThroughExecutor)),
IMessageHandler<int, int>
{
public ValueTask<int> HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
ValueTask.FromResult(message);
}
// 偶数という文字列を返す
class EvenNumberExecutor() : ReflectingExecutor<EvenNumberExecutor>(nameof(EvenNumberExecutor)),
IMessageHandler<int, string>
{
public ValueTask<string> HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
ValueTask.FromResult("偶数");
}
// 奇数という文字列を返す
class OddNumberExecutor() : ReflectingExecutor<OddNumberExecutor>(nameof(OddNumberExecutor)),
IMessageHandler<int, string>
{
public ValueTask<string> HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
ValueTask.FromResult("奇数");
}
// 文字列をコンソールに出力する
class PrintExecutor() : ReflectingExecutor<PrintExecutor>(nameof(PrintExecutor)),
IMessageHandler<string>
{
public ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine(message);
return ValueTask.CompletedTask;
}
}
-
PassThroughExecutor
は入力をそのまま返すだけの Executor です。 -
EvenNumberExecutor
は入力が偶数である場合に "偶数" という文字列を返します。 -
OddNumberExecutor
は入力が奇数である場合に "奇数" という文字列を返します。 -
PrintExecutor
は入力された文字列をコンソールに出力します。
これらを使ってワークフローを組んで偶数と奇数で分岐させてみます。
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
var passThrough = new PassThroughExecutor();
var evenNumber = new EvenNumberExecutor();
var oddNumber = new OddNumberExecutor();
var print = new PrintExecutor();
var builder = new WorkflowBuilder(passThrough)
.AddEdge(passThrough, evenNumber, (int number) => number % 2 == 0)
.AddEdge(passThrough, oddNumber, (int number) => number % 2 != 0)
.AddEdge(evenNumber, print)
.AddEdge(oddNumber, print);
{
var workflow = await builder.BuildAsync<int>();
await using var run = await InProcessExecution.StreamAsync(workflow, 10);
await run.WatchStreamAsync().ToArrayAsync();
}
Console.WriteLine("===========================");
{
var workflow = await builder.BuildAsync<int>();
await using var run = await InProcessExecution.StreamAsync(workflow, 15);
await run.WatchStreamAsync().ToArrayAsync();
}
このコードでは、PassThroughExecutor
の出力を EvenNumberExecutor
と OddNumberExecutor
に分岐させています。条件はラムダ式で指定しており、偶数か奇数かで分岐しています。最後に PrintExecutor
に接続して結果を出力します。
以下のようなワークフローのイメージです。
実行すると、以下のような出力が得られます。
偶数
===========================
奇数
ちゃんと条件分岐が機能していることが確認できました。
ループの表現
ループも Edge の条件を使って表現できます。例えば、1 から 5 までの数字を順に処理するループを作成してみましょう。これは以前の記事の「ワークフローの Executor を掘り下げる」でもやったのでさくっと流します。
まず、値の表示、インクリメント、最終出力の Executor を定義します。
// 数字を表示
public class PrintExecutor() : ReflectingExecutor<PrintExecutor>(nameof(PrintExecutor)),
IMessageHandler<int, int>
{
public ValueTask<int> HandleAsync(int message, IWorkflowContext context)
{
Console.WriteLine($"Current number: {message}");
return ValueTask.FromResult(message);
}
}
// インクリメント
class CounterExecutor() : ReflectingExecutor<CounterExecutor>(nameof(CounterExecutor)),
IMessageHandler<int, int>
{
public ValueTask<int> HandleAsync(int message, IWorkflowContext context) =>
ValueTask.FromResult(message + 1);
}
// 最終出力
class FinalOutputExecutor() : ReflectingExecutor<FinalOutputExecutor>(nameof(FinalOutputExecutor)),
IMessageHandler<int, int>
{
public ValueTask<int> HandleAsync(int message, IWorkflowContext context) =>
ValueTask.FromResult(message);
}
そして、これらを使ってループを表現するワークフローを構築します。
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
var counter = new CounterExecutor();
var printer = new PrintExecutor();
var finalOutput = new FinalOutputExecutor();
var workflow = await new WorkflowBuilder(printer)
.AddEdge(printer, counter)
.AddEdge(counter, printer, (int message) => message < 5)
.AddEdge(counter, finalOutput, (int message) => message >= 5)
.WithOutputFrom(finalOutput)
.BuildAsync<int>();
var run = await InProcessExecution.StreamAsync(workflow, 0);
await foreach (var evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"Final output: {outputEvent.As<int>()}");
}
}
これは以下のような流れになります。
実行すると以下のような出力が得られます。
Current number: 0
Current number: 1
Current number: 2
Current number: 3
Current number: 4
Final output: 5
これで順次実行と分岐とループができることが確認できました。これだけあればなんだってできそうですね。
FanOut・FanIn
ファンアウトとファンインも Edge の種類としてサポートされています。これを使うと、1 つの Executor の出力を複数の Executor に送信したり、複数の Executor の出力を 1 つの Executor に集約したりできます。
これの書き方は少しめんどくさくて、いくつかのポイントがあります。
FanOut の場合
FanOut は AddFanOutEdge
メソッドで設定をします。AddFanOutEdge
メソッドは第一引数に元の Executor、第二引数に FanOut 先の Executor の配列 (targets
引数) を指定します。他のオーバーロードもありますが、ここではこの形を使います。
FanOut を使う場合、元の Executor の出力が全ての FanOut 先の Executor に同じ値が送信されます。つまり、1つの出力を複数の Executor で並列に処理したい場合に使用します。各 Executor は同じ入力を受け取るため、それぞれが独自の処理を行うことができます。
FanIn の場合
FanIn は AddFanInEdge
メソッドで設定をします。AddFanInEdge
メソッドは第一引数に集約先の Executor、第二引数に FanIn 元の Executor の配列 (sources
引数) を指定します。
FanIn を使う場合、集約先の Executor は複数の入力を受け取ることになるため、内部で状態を管理して全ての入力を受け取ったかどうかを判定する必要があります。全ての入力を受け取ったら、IWorkflowContext.YieldOutputAsync
メソッドを使って結果を出力します。もしくは、適切なタイミングで後続の処理に SendMessageAsync
メソッドを使ってメッセージを送信します。
FanOut と FanIn を組み合わせた例
ここでは、入力文字列を単語に分割し、複数の Executor で並列に文字数をカウントし、最後に合計を集約する MapReduce パターンのワークフローを作成してみます。
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
using System.Collections.ObjectModel;
string[] mapperIds = Enumerable.Range(0, 4).Select(x => $"mapper_{x}").ToArray();
var split = new SplitExecutor(mapperIds.Length);
var mappers = mapperIds.Select((id, chunkIndex) => new CountCharsExecutor(id, chunkIndex)).ToArray();
var sum = new SumCountsExecutor(mapperIds.Length);
var workflow = await new WorkflowBuilder(split)
.AddFanOutEdge(split, targets: [.. mappers])
.AddFanInEdge(sum, sources: [.. mappers])
.WithOutputFrom(sum)
.BuildAsync<string>();
var input = """
Workflows enable you to build intelligent automation systems that
seamlessly blend artificial intelligence agents with business processes
""";
var run = await InProcessExecution.StreamAsync(workflow, input);
await foreach (var evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<int>()}");
}
}
class SplitExecutor(int numberOfMappers) : ReflectingExecutor<SplitExecutor>(nameof(SplitExecutor)),
IMessageHandler<string, ReadOnlyCollection<string[]>>
{
public ValueTask<ReadOnlyCollection<string[]>> HandleAsync(string message, IWorkflowContext context)
{
Console.WriteLine($"[SplitExecutor:{Id}] メッセージ受信、分割処理を開始します: '{message}'");
// データを分割して mapperIds で指定された各 Executor に送信
var words = message.Split(' ').Select(x => x.Trim()).ToArray();
var chunkSize = words.Length / numberOfMappers;
var mod = words.Length % numberOfMappers;
List<string[]> chunkItems = [];
for (int i = 0, wordIndex = 0; i < numberOfMappers; i++)
{
var chunk = words[wordIndex..(wordIndex + chunkSize)];
wordIndex += chunkSize;
if (i == numberOfMappers - 1 && mod != 0)
{
// 最後の人にしわ寄せ
chunk = [.. chunk, .. words[wordIndex..]];
}
// メッセージをターゲットの mapper に送信
chunkItems.Add(chunk);
}
return ValueTask.FromResult(chunkItems.AsReadOnly());
}
}
public class CountCharsExecutor(string id, int chunkIndex) : ReflectingExecutor<CountCharsExecutor>(id),
IMessageHandler<ReadOnlyCollection<string[]>, int>
{
public ValueTask<int> HandleAsync(ReadOnlyCollection<string[]> message, IWorkflowContext context)
{
Console.WriteLine($"[CountCharsExecutor:{Id}] メッセージ受信、文字数カウント処理を開始します: [{string.Join(", ", message[chunkIndex])}]");
// 各単語の文字数をカウントして合計を返す
return ValueTask.FromResult(message[chunkIndex].Sum(w => w.Length));
}
}
class SumCountsExecutor(int numberOfMappers) : ReflectingExecutor<SumCountsExecutor>(nameof(SumCountsExecutor)), IMessageHandler<int>
{
private int _count = 0;
private int _sum = 0;
public async ValueTask HandleAsync(int message, IWorkflowContext context)
{
Console.WriteLine($"[SumCountsExecutor:{Id}] メッセージ受信、合計集計処理を開始します: {message}");
// 各 mapper からのメッセージを集計
_count++;
_sum += message;
if (_count == numberOfMappers)
{
// 全ての mapper からのメッセージを受け取ったら結果を出力
await context.YieldOutputAsync(_sum);
}
}
}
このワークフローは、以下のような流れで処理を行います。
-
SplitExecutor
が入力文字列を受け取り、単語に分割して4つのチャンクに分ける - FanOut により、4つの
CountCharsExecutor
に並列に各チャンクを送信 - 各
CountCharsExecutor
が担当するチャンクの文字数をカウント - FanIn により、4つの
CountCharsExecutor
の結果をSumCountsExecutor
に集約 -
SumCountsExecutor
が全ての結果を受け取ったら合計を出力
これを図で表すと以下のようになります。
このように、FanOut と FanIn を組み合わせることで、MapReduce のような並列処理パターンを実現できます。実行してみましょう。実行すると以下のような出力が得られます。
[SplitExecutor:SplitExecutor] メッセージ受信、分割処理を開始します: 'Workflows enable you to build intelligent automation systems that
seamlessly blend artificial intelligence agents with business processes'
[CountCharsExecutor:mapper_0] メッセージ受信、文字数カウント処理を開始します: [Workflows, enable, you, to]
[CountCharsExecutor:mapper_1] メッセージ受信、文字数カウント処理を開始します: [build, intelligent, automation, systems]
[CountCharsExecutor:mapper_2] メッセージ受信、文字数カウント処理を開始します: [that, seamlessly, blend, artificial]
[CountCharsExecutor:mapper_3] メッセージ受信、文字数カウント処理を開始します: [intelligence, agents, with, business, processes]
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 20
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 33
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 29
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 39
SumCountsExecutor: 121
ちゃんと全ての単語の文字数が合計されていることが確認できました。
この例では FanOut 先の CountCharsExecutor
が自分が何番目のデータを処理するかを知っている状態で実装しています。他には、SplitExecutor
が各チャンクを個別に送信するようにしても良いでしょう。FanOut 先の Executor が自分が何番目のデータを処理するかを知らない場合は、メッセージにその情報を含めるなどの工夫が必要です。
この他に AddFanOutEdge
には partitioner
引数があり、メッセージを何番目の Executor に送信するかを決定する関数を指定できます。これを使うと、FanOut を行う Executor が、どこにメッセージを送るかを知らなくても partitioner
関数で決定できるようになります。今回の例では FanOut や FanIn で同じ種類の Executor を複数使っていますが、異なる種類の Executor を組み合わせることも可能です。これにより、より複雑なワークフローを構築できます。
まとめ
今回は、Microsoft Agent Framework のワークフローで条件分岐とループをどのように扱うかについて見てきました。Edge の条件を使うことで、柔軟に分岐やループを表現できることがわかりました。また、FanOut と FanIn を使うことで、並列処理や集約処理も実現できることが確認できました。これらの機能を組み合わせることで、複雑なワークフローを構築できそうです。
ただ、所感としては index で指定したりと少し面倒なところもあるので、FanOut や FanIn の部分はもう少しわかりやすく書けるといいなぁ…と思うのが正直なところです。
次回は、ワークフローの Executor でステータスを扱う方法やワークフローの実行を途中で止めたり、途中から再開する機能についてみてみようと思います。
Discussion