🗂

Microsoft Agent Framework (C#) を見てみよう その5 ワークフローで条件分岐とループを扱う

に公開

シリーズ記事

はじめに

前回は、Microsoft Agent Framework のワークフローの中核的な要素である Executor について掘り下げてみました。今回は、ワークフローで条件分岐やループをどのように扱うかについて見ていきたいと思います。条件分岐やループを表現するにはワークフローの Executor と Executor を繋ぐ Edge がポイントになると思います。

そのため、ここでは Edge について掘り下げてみます。

Edge の役割

Edge はワークフロー内で Executor 同士を繋ぐ役割を持っています。Edge はメッセージの流れを定義し、どの Executor がどのように連携するかを決定します。Edge のつなぎ方には、以下の 3 種類があります。

  • Direct: ある Executor の出力を別の Executor の入力に直接接続する。
  • FanOut: ある Executor の出力を複数の Executor の入力に接続する。
  • FanIn: 複数の Executor の出力を 1 つの Executor の入力に接続する。

この定義は EdgeKind という列挙型で表現されています。

Edges.cs
/// <summary>
/// Specified the edge type.
/// </summary>
public enum EdgeKind
{
    /// <summary>
    /// A direct connection from one node to another.
    /// </summary>
    Direct,
    /// <summary>
    /// A connection from one node to a set of nodes.
    /// </summary>
    FanOut,
    /// <summary>
    /// A connection from a set of nodes to a single node.
    /// </summary>
    FanIn
}

つまり、この 3 つのつなぎ方を組み合わせることで、様々なワークフローを構築するのが Agent Framework のワークフローの基本的な考え方になります。

条件分岐の表現

条件分岐がないように見えますが、これは AddEdge メソッドの condition 引数を使うことで実現できます。これは以前に書いた「ワークフローを見てみよう」で触れました。これを使うと入力が偶数か奇数かで分岐するワークフローを定義できます。

試してみましょう。まず以下のような Executor を定義します。

class PassThroughExecutor() : ReflectingExecutor<PassThroughExecutor>(nameof(PassThroughExecutor)),
    IMessageHandler<int, int>
{
    public ValueTask<int> HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        ValueTask.FromResult(message);
}

// 偶数という文字列を返す
class EvenNumberExecutor() : ReflectingExecutor<EvenNumberExecutor>(nameof(EvenNumberExecutor)),
    IMessageHandler<int, string>
{
    public ValueTask<string> HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        ValueTask.FromResult("偶数");
}

// 奇数という文字列を返す
class OddNumberExecutor() : ReflectingExecutor<OddNumberExecutor>(nameof(OddNumberExecutor)),
    IMessageHandler<int, string>
{
    public ValueTask<string> HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
        ValueTask.FromResult("奇数");
}

// 文字列をコンソールに出力する
class PrintExecutor() : ReflectingExecutor<PrintExecutor>(nameof(PrintExecutor)),
    IMessageHandler<string>
{
    public ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine(message);
        return ValueTask.CompletedTask;
    }
}
  • PassThroughExecutor は入力をそのまま返すだけの Executor です。
  • EvenNumberExecutor は入力が偶数である場合に "偶数" という文字列を返します。
  • OddNumberExecutor は入力が奇数である場合に "奇数" という文字列を返します。
  • PrintExecutor は入力された文字列をコンソールに出力します。

これらを使ってワークフローを組んで偶数と奇数で分岐させてみます。

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

var passThrough = new PassThroughExecutor();
var evenNumber = new EvenNumberExecutor();
var oddNumber = new OddNumberExecutor();
var print = new PrintExecutor();

var builder = new WorkflowBuilder(passThrough)
    .AddEdge(passThrough, evenNumber, (int number) => number % 2 == 0)
    .AddEdge(passThrough, oddNumber, (int number) => number % 2 != 0)
    .AddEdge(evenNumber, print)
    .AddEdge(oddNumber, print);

{
    var workflow = await builder.BuildAsync<int>();
    await using var run = await InProcessExecution.StreamAsync(workflow, 10);
    await run.WatchStreamAsync().ToArrayAsync();
}

Console.WriteLine("===========================");

{
    var workflow = await builder.BuildAsync<int>();
    await using var run = await InProcessExecution.StreamAsync(workflow, 15);
    await run.WatchStreamAsync().ToArrayAsync();
}

このコードでは、PassThroughExecutor の出力を EvenNumberExecutorOddNumberExecutor に分岐させています。条件はラムダ式で指定しており、偶数か奇数かで分岐しています。最後に PrintExecutor に接続して結果を出力します。

以下のようなワークフローのイメージです。

実行すると、以下のような出力が得られます。

偶数
===========================
奇数

ちゃんと条件分岐が機能していることが確認できました。

ループの表現

ループも Edge の条件を使って表現できます。例えば、1 から 5 までの数字を順に処理するループを作成してみましょう。これは以前の記事の「ワークフローの Executor を掘り下げる」でもやったのでさくっと流します。

まず、値の表示、インクリメント、最終出力の Executor を定義します。

// 数字を表示
public class PrintExecutor() : ReflectingExecutor<PrintExecutor>(nameof(PrintExecutor)),
    IMessageHandler<int, int>
{
    public ValueTask<int> HandleAsync(int message, IWorkflowContext context)
    {
        Console.WriteLine($"Current number: {message}");
        return ValueTask.FromResult(message);
    }
}

// インクリメント
class CounterExecutor() : ReflectingExecutor<CounterExecutor>(nameof(CounterExecutor)),
    IMessageHandler<int, int>
{
    public ValueTask<int> HandleAsync(int message, IWorkflowContext context) =>
        ValueTask.FromResult(message + 1);
}

// 最終出力
class FinalOutputExecutor() : ReflectingExecutor<FinalOutputExecutor>(nameof(FinalOutputExecutor)),
    IMessageHandler<int, int>
{
    public ValueTask<int> HandleAsync(int message, IWorkflowContext context) =>
        ValueTask.FromResult(message);
}

そして、これらを使ってループを表現するワークフローを構築します。

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

var counter = new CounterExecutor();
var printer = new PrintExecutor();
var finalOutput = new FinalOutputExecutor();

var workflow = await new WorkflowBuilder(printer)
    .AddEdge(printer, counter)
    .AddEdge(counter, printer, (int message) => message < 5)
    .AddEdge(counter, finalOutput, (int message) => message >= 5)
    .WithOutputFrom(finalOutput)
    .BuildAsync<int>();
var run = await InProcessExecution.StreamAsync(workflow, 0);
await foreach (var evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent outputEvent)
    {
        Console.WriteLine($"Final output: {outputEvent.As<int>()}");
    }
}

これは以下のような流れになります。

実行すると以下のような出力が得られます。

Current number: 0
Current number: 1
Current number: 2
Current number: 3
Current number: 4
Final output: 5

これで順次実行と分岐とループができることが確認できました。これだけあればなんだってできそうですね。

FanOut・FanIn

ファンアウトとファンインも Edge の種類としてサポートされています。これを使うと、1 つの Executor の出力を複数の Executor に送信したり、複数の Executor の出力を 1 つの Executor に集約したりできます。

これの書き方は少しめんどくさくて、いくつかのポイントがあります。

FanOut の場合

FanOut は AddFanOutEdge メソッドで設定をします。AddFanOutEdge メソッドは第一引数に元の Executor、第二引数に FanOut 先の Executor の配列 (targets 引数) を指定します。他のオーバーロードもありますが、ここではこの形を使います。

FanOut を使う場合、元の Executor の出力が全ての FanOut 先の Executor に同じ値が送信されます。つまり、1つの出力を複数の Executor で並列に処理したい場合に使用します。各 Executor は同じ入力を受け取るため、それぞれが独自の処理を行うことができます。

FanIn の場合

FanIn は AddFanInEdge メソッドで設定をします。AddFanInEdge メソッドは第一引数に集約先の Executor、第二引数に FanIn 元の Executor の配列 (sources 引数) を指定します。

FanIn を使う場合、集約先の Executor は複数の入力を受け取ることになるため、内部で状態を管理して全ての入力を受け取ったかどうかを判定する必要があります。全ての入力を受け取ったら、IWorkflowContext.YieldOutputAsync メソッドを使って結果を出力します。もしくは、適切なタイミングで後続の処理に SendMessageAsync メソッドを使ってメッセージを送信します。

FanOut と FanIn を組み合わせた例

ここでは、入力文字列を単語に分割し、複数の Executor で並列に文字数をカウントし、最後に合計を集約する MapReduce パターンのワークフローを作成してみます。

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
using System.Collections.ObjectModel;

string[] mapperIds = Enumerable.Range(0, 4).Select(x => $"mapper_{x}").ToArray();
var split = new SplitExecutor(mapperIds.Length);
var mappers = mapperIds.Select((id, chunkIndex) => new CountCharsExecutor(id, chunkIndex)).ToArray();
var sum = new SumCountsExecutor(mapperIds.Length);

var workflow = await new WorkflowBuilder(split)
    .AddFanOutEdge(split, targets: [.. mappers])
    .AddFanInEdge(sum, sources: [.. mappers])
    .WithOutputFrom(sum)
    .BuildAsync<string>();

var input = """
    Workflows enable you to build intelligent automation systems that 
    seamlessly blend artificial intelligence agents with business processes
    """;
var run = await InProcessExecution.StreamAsync(workflow, input);
await foreach (var evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent outputEvent)
    {
        Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<int>()}");
    }
}

class SplitExecutor(int numberOfMappers) : ReflectingExecutor<SplitExecutor>(nameof(SplitExecutor)),
    IMessageHandler<string, ReadOnlyCollection<string[]>>
{
    public ValueTask<ReadOnlyCollection<string[]>> HandleAsync(string message, IWorkflowContext context)
    {
        Console.WriteLine($"[SplitExecutor:{Id}] メッセージ受信、分割処理を開始します: '{message}'");
        // データを分割して mapperIds で指定された各 Executor に送信
        var words = message.Split(' ').Select(x => x.Trim()).ToArray();
        var chunkSize = words.Length / numberOfMappers;
        var mod = words.Length % numberOfMappers;
        List<string[]> chunkItems = [];
        for (int i = 0, wordIndex = 0; i < numberOfMappers; i++)
        {
            var chunk = words[wordIndex..(wordIndex + chunkSize)];
            wordIndex += chunkSize;

            if (i == numberOfMappers - 1 && mod != 0)
            {
                // 最後の人にしわ寄せ
                chunk = [.. chunk, .. words[wordIndex..]];
            }

            // メッセージをターゲットの mapper に送信
            chunkItems.Add(chunk);
        }

        return ValueTask.FromResult(chunkItems.AsReadOnly());
    }
}

public class CountCharsExecutor(string id, int chunkIndex) : ReflectingExecutor<CountCharsExecutor>(id),
    IMessageHandler<ReadOnlyCollection<string[]>, int>
{
    public ValueTask<int> HandleAsync(ReadOnlyCollection<string[]> message, IWorkflowContext context)
    {
        Console.WriteLine($"[CountCharsExecutor:{Id}] メッセージ受信、文字数カウント処理を開始します: [{string.Join(", ", message[chunkIndex])}]");
        // 各単語の文字数をカウントして合計を返す
        return ValueTask.FromResult(message[chunkIndex].Sum(w => w.Length));
    }
}

class SumCountsExecutor(int numberOfMappers) : ReflectingExecutor<SumCountsExecutor>(nameof(SumCountsExecutor)), IMessageHandler<int>
{
    private int _count = 0;
    private int _sum = 0;
    public async ValueTask HandleAsync(int message, IWorkflowContext context)
    {
        Console.WriteLine($"[SumCountsExecutor:{Id}] メッセージ受信、合計集計処理を開始します: {message}");
        // 各 mapper からのメッセージを集計
        _count++;
        _sum += message;
        if (_count == numberOfMappers)
        {
            // 全ての mapper からのメッセージを受け取ったら結果を出力
            await context.YieldOutputAsync(_sum);
        }
    }
}

このワークフローは、以下のような流れで処理を行います。

  1. SplitExecutor が入力文字列を受け取り、単語に分割して4つのチャンクに分ける
  2. FanOut により、4つの CountCharsExecutor に並列に各チャンクを送信
  3. CountCharsExecutor が担当するチャンクの文字数をカウント
  4. FanIn により、4つの CountCharsExecutor の結果を SumCountsExecutor に集約
  5. SumCountsExecutor が全ての結果を受け取ったら合計を出力

これを図で表すと以下のようになります。

このように、FanOut と FanIn を組み合わせることで、MapReduce のような並列処理パターンを実現できます。実行してみましょう。実行すると以下のような出力が得られます。

[SplitExecutor:SplitExecutor] メッセージ受信、分割処理を開始します: 'Workflows enable you to build intelligent automation systems that
seamlessly blend artificial intelligence agents with business processes'
[CountCharsExecutor:mapper_0] メッセージ受信、文字数カウント処理を開始します: [Workflows, enable, you, to]
[CountCharsExecutor:mapper_1] メッセージ受信、文字数カウント処理を開始します: [build, intelligent, automation, systems]
[CountCharsExecutor:mapper_2] メッセージ受信、文字数カウント処理を開始します: [that, seamlessly, blend, artificial]
[CountCharsExecutor:mapper_3] メッセージ受信、文字数カウント処理を開始します: [intelligence, agents, with, business, processes]
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 20
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 33
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 29
[SumCountsExecutor:SumCountsExecutor] メッセージ受信、合計集計処理を開始します: 39
SumCountsExecutor: 121

ちゃんと全ての単語の文字数が合計されていることが確認できました。

この例では FanOut 先の CountCharsExecutor が自分が何番目のデータを処理するかを知っている状態で実装しています。他には、SplitExecutor が各チャンクを個別に送信するようにしても良いでしょう。FanOut 先の Executor が自分が何番目のデータを処理するかを知らない場合は、メッセージにその情報を含めるなどの工夫が必要です。

この他に AddFanOutEdge には partitioner 引数があり、メッセージを何番目の Executor に送信するかを決定する関数を指定できます。これを使うと、FanOut を行う Executor が、どこにメッセージを送るかを知らなくても partitioner 関数で決定できるようになります。今回の例では FanOut や FanIn で同じ種類の Executor を複数使っていますが、異なる種類の Executor を組み合わせることも可能です。これにより、より複雑なワークフローを構築できます。

まとめ

今回は、Microsoft Agent Framework のワークフローで条件分岐とループをどのように扱うかについて見てきました。Edge の条件を使うことで、柔軟に分岐やループを表現できることがわかりました。また、FanOut と FanIn を使うことで、並列処理や集約処理も実現できることが確認できました。これらの機能を組み合わせることで、複雑なワークフローを構築できそうです。

ただ、所感としては index で指定したりと少し面倒なところもあるので、FanOut や FanIn の部分はもう少しわかりやすく書けるといいなぁ…と思うのが正直なところです。

次回は、ワークフローの Executor でステータスを扱う方法やワークフローの実行を途中で止めたり、途中から再開する機能についてみてみようと思います。

Microsoft (有志)

Discussion