💬

Microsoft Agent Framework (C#) を見てみよう その4 ワークフローの Executor を掘り下げる

に公開

シリーズ記事

はじめに

前回は、Microsoft Agent Framework の中核的な機能であるワークフローについて、基本的なワークフローの定義方法や WorkflowBuilder を使った簡単なワークフローの構築方法を見てきました。今回は、もう少しワークフローの API を掘り下げて、より複雑なワークフローを構築するための基礎的なところを見ていこうと思います。

ワークフローの構成要素

Microsoft Agent Framework のワークフローは以下の要素で構成されています。

  • Executor: ワークフロー内で実行される処理の単位。具体的には、メッセージを受け取り、処理を行い、結果を返す役割を持つ。
  • Edge: ワークフロー内の Executor 同士を繋ぐ接続。メッセージの流れを定義し、どの Executor がどのように連携するかを決定する。
  • Workflow: 複数の Executor と Edge から構成される全体の処理フロー。特定の入力に対して、どのような処理を行うかを定義する。

今回は、この中の Executor を中心に掘り下げてみます。

Executor の詳細

この Executor は、ワークフローの中で実際に処理を行う部分です。
この Executor には名前の通り Executor というクラスのインスタンスを指定できる他に、Microsoft Agent Framework の AIAgent を指定したり、InputPort という外部とのリクエスト/レスポンスを指定するクラスを指定したり、純粋なデリゲートを指定したり、単に string 型で Executor の ID を指定することができます。
デリゲートだけは少し特殊でFunc<TInput, IWorkflowContext, CancellationToken, ValueTask<TOutput>> などの形の所定の引数と戻り値を持ったデリゲートを AsExecutor という拡張メソッドで ID を指定してラップすることで Executor として利用できます。

そのため、クラスを定義しなくてもワークフローを構築できる柔軟性があります。実際にデリゲートを使ったワークフローを定義してみましょう。

using Microsoft.Agents.AI.Workflows;

// デリゲートでワークフローの Edge の処理を定義
Func<string, IWorkflowContext, CancellationToken, ValueTask<string>> step1 = 
    (input, context, cancellationToken) => ValueTask.FromResult($"Step1 received: {input}");
Func<string, IWorkflowContext, CancellationToken, ValueTask<string>> step2 = 
    (input, context, cancellationToken) => ValueTask.FromResult($"Step2 received: {input}");

// ワークフローの構築
// 処理の開始を表す Edge としてデリゲート step1 を AsExecutor で Executor として扱えるようにする
// その際に ID を nameof(step1) で指定
var workflow = await new WorkflowBuilder(step1.AsExecutor(nameof(step1)))
    // step1 の出力を step2 の入力に接続。step1 は文字列指定で、step2 はデリゲートを AsExecutor で Executor に変換して指定
    .AddEdge(nameof(step1), step2.AsExecutor(nameof(step2)))
    // ワークフローの出力として step1, step2 を文字列で指定
    .WithOutputFrom(nameof(step1), nameof(step2))
    // ワークフローのビルド
    .BuildAsync<string>();

// 実行して出力結果を受け取る処理は今までと同じ
var run = await InProcessExecution.StreamAsync(workflow, "Hello, World!");
await foreach (var evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent outputEvent)
    {
        Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<string>()}");
    }
}

この例では、step1step2 という2つのデリゲートを定義し、それぞれが入力を受け取って処理を行い、結果を返すようになっています。AsExecutor 拡張メソッドを使ってこれらのデリゲートを Executor として扱えるようにし、WorkflowBuilder を使ってワークフローを構築しています。また、途中で Edge の指定を文字列で行っています。
色々なワークフローの Edge の指定方法を組み合わせて使うことができるので、柔軟にワークフローを定義できます。

Executor っぽいもの?

この Executor には、前述のとおり様々なものを指定できます。
こういうことをする時には素直に実装すると共通のインターフェースや基底クラスを定義してそれを継承したり実装したりすることが多いと思いますが、Microsoft Agent Framework のワークフローでは、そういった共通のインターフェースや基底クラスは特に定義されていません。

かといって object 型で受け取るわけでもなく ExecutorIsh というレコード型で受け取っています。
ExecutorIshExecutor っぽいもの、という意味で付けられた名前のようです。ExecutorIsh クラス内には以下のような Type 列挙型が定義されていて、この列挙型で指定された種類に応じて、実際にどのようなものが指定されたかを判別しています。

/// <summary>
/// The type of the <see cref="ExecutorIsh"/>.
/// </summary>
public enum Type
{
    /// <summary>
    /// An unbound executor reference, identified only by ID.
    /// </summary>
    Unbound,
    /// <summary>
    /// An actual <see cref="Executor"/> instance.
    /// </summary>
    Executor,
    /// <summary>
    /// A function delegate to be wrapped as an executor.
    /// </summary>
    Function,
    /// <summary>
    /// An <see cref="InputPort"/> for servicing external requests.
    /// </summary>
    InputPort,
    /// <summary>
    /// An <see cref="AIAgent"/> instance.
    /// </summary>
    Agent,
}

文字列で指定した場合が UnboundExecutor クラスの派生型を指定した場合は Executor といったように、指定されたものに応じて Type 列挙型の値が変わります。このようにすることで、共通のインターフェースや基底クラスを定義せずに、様々な種類の Executor っぽいものを受け取ることができるようになっています。

Executor の派生クラスや AIAgent クラスや InputPort クラスや string から ExecutorIsh への変換は暗黙的な型変換演算子が定義されているので、特に意識せずに指定できます。デリゲートだけは、デリゲート単体で ID を指定できないため、AsExecutor 拡張メソッドを使って ID を指定してラップする必要があります。

このようになっているため、ワークフローの定義自体に文字列だけを使って Edge を定義することもできます。まぁ、実際には WorkflowBuilderBuildAsync メソッドで ID だけ指定されたけど実行するものの実態がない Executor があると例外になるので、実際には文字列だけで完結するワークフローは作れません。処理の実態がないワークフローをビルドしようとすると以下のような例外が発生します。

System.InvalidOperationException
  HResult=0x80131509
  Message=Workflow cannot be built because there are unbound executors: step1, step2.
  Source=Microsoft.Agents.AI.Workflows
  スタック トレース:
   場所 Microsoft.Agents.AI.Workflows.WorkflowBuilder.Validate()
   場所 Microsoft.Agents.AI.Workflows.WorkflowBuilder.BuildInternal(Activity activity)
   場所 Microsoft.Agents.AI.Workflows.WorkflowBuilder.<BuildAsync>d__28`1.MoveNext()
   場所 System.Runtime.CompilerServices.ValueTaskAwaiter`1.GetResult()
   場所 Program.<<Main>$>d__0.MoveNext() (D:\source\WorkflowHelloWorldApp\WorkflowHelloWorldApp\Program.cs):行 12

実際にビルドする前に Executor と ID の紐づけをする必要があります。紐づけを行うには WorkflowBuilderBindExecutor メソッドを使います。このメソッドは実際に実行する処理の実態を持っていないとダメなので ExecutorIsh ではなく Executor クラスの派生型を指定する必要があります。

実際に文字列で指定した Executor を BindExecutor メソッドで紐づけてワークフローをビルドする例を見てみましょう。まずは、Executor を作成します。処理の中身は先ほど作成したデリゲートと同じように、入力を受け取って処理を行い、結果を返すようにします。初出のものとして Executor クラスには Executor<TInput, TOutput> という単純な Executor を定義するためのベースクラスがあるのでそれを使ってみます。これは HandleAsync メソッドをオーバーライドして処理を実装します。

class Step1Executor(string id) : Executor<string, string>(id)
{
    public override ValueTask<string> HandleAsync(string message, IWorkflowContext context) =>
        ValueTask.FromResult($"Step1 received: {message}");
}

class Step2Executor(string id) : Executor<string, string>(id)
{
    public override ValueTask<string> HandleAsync(string message, IWorkflowContext context) =>
        ValueTask.FromResult($"Step2 received: {message}");
}

今までの Executor の実装は id を型ごとに固定にしていましたが、今回はコンストラクタで id を受け取るようにして、インスタンス生成時に指定できるようにしています。では、これを使ってワークフローを定義してみましょう。

using Microsoft.Agents.AI.Workflows;

// ワークフローの定義
// ここでは、まだビルドを行わずに ID 指定でワークフローの定義を行う
var builder = new WorkflowBuilder("step1")
    .AddEdge("step1", "step2")
    .WithOutputFrom("step1", "step2");

// BindExecutor で ID に対応する Executor を紐づける
builder.BindExecutor(new Step1Executor("step1"));
builder.BindExecutor(new Step2Executor("step2"));

// ID に対応する Executor が全て紐づけられたのでビルド可能
var workflow = await builder.BuildAsync<string>();

//// 実行して出力結果を受け取る処理は今までと同じ
var run = await InProcessExecution.StreamAsync(workflow, "Hello, World!");
await foreach (var evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent outputEvent)
    {
        Console.WriteLine($"{outputEvent.SourceId}: {outputEvent.As<string>()}");
    }
}

実行すると最初の例と同じように以下の出力が得られます。

step1: Step1 received: Hello, World!
step2: Step2 received: Step1 received: Hello, World!

面白いですね。まだ、その部分のコードは見ていないのですが YAML でワークフローを定義する機能は、ここらへんの柔軟性を活かして実装されているのかもしれません。

まとめ

今回は、Microsoft Agent Framework のワークフローの中核的な要素である Executor について掘り下げてみました。Executor はワークフロー内で実際に処理を行う部分であり、様々な形で指定できる柔軟性があります。デリゲートを使った簡単なワークフローの定義方法や、文字列で指定した Executor を実際の処理と紐づける方法などを試してみました。

なかなか、面白い設計になっていると思います。次回は、今まで Edge を素直に AddEdge メソッドで追加していましたが、もう少し複雑なワークフローを構築するための方法を見ていこうと思います。

Microsoft (有志)

Discussion