💬

Semantic Kernel のマルチエージェント AI 機能入門してみよう その 2

2024/09/15に公開

はじめに

今回は、Agent 系の機能のクラスの構造を見ていこうと思います。
この記事は、特にわかりやすく説明しようという類ではなく、自分がコードを見た時に思ったことをメモしているだけです。

Agents.Abstractions プロジェクト

まずは Agents.Abstractions プロジェクトを覗いてみようと思います。
まずは、おおもとである Agent クラスです。

Agent クラス自体には Azure OpenAI Service などとやり取りするような機能はありません。恐らく CreateChannelAsync メソッドで作られる AgentChannel がその機能を持っているのだと思われます。ということで AgentChannel クラスを見てみましょう。

AgentChannel クラスに Agent を呼び出したり、チャットの履歴を操作する系の機能がありました。
この Agent クラスを継承して ChatHistoryKernelAgent クラス ChatHistoryChannel があります。

まずは ChatHistoryKernelAgent クラスを見てみましょう。これは Agent からの直継承ではなく間に KernelAgent というクラスが挟まっています。

この KernelAgentInstructionsAgent への指示を受け取るプロパティが追加されています。個人的には Agent のレベルで Instructions があると思ってたのですが KernelAgent からみたいです。恐らく Agent の子クラスには AggregatorAgent という複数の Agent を束ねるような Agent もあるので、そのために InstructionsKernelAgent にあるのかもしれません。
また、KernelAgent には Kernel プロパティもあります。これはデフォルトでは new Kernel() しているだけの空っぽの Kernel が設定されています。

次に ChatHistoryKernelAgent クラスを見てみましょう。このクラスは KernelAgent からの直継承です。このクラスは ChatHistory というクラスを受け取る InvokeAsyncInvokeStreamingAsync メソッドを持っています。この 2 つのメソッドは IChatHistoryHandler インターフェースを実装する形になっています。これで、AgentOpenAI を繋ぐ処理が動きそうなメソッドが出てきました。ただ、この時点では、まだ abstract メソッドなので中身はありません。

この ChatHistoryKernelAgent クラスと対になる AgentChannel クラスは ChatHistoryChannel です。ChatHistoryKernelAgent クラスの CreateChannelAsync メソッドは以下のように実装されています。

protected internal sealed override Task<AgentChannel> CreateChannelAsync(CancellationToken cancellationToken)
{
    ChatHistoryChannel channel =
        new()
        {
            Logger = this.LoggerFactory.CreateLogger<ChatHistoryChannel>()
        };

    return Task.FromResult<AgentChannel>(channel);
}

この ChatHistoryChannel クラスは AgentChannel を継承する形で定義されています。

この ChatHistoryChannel クラスくらいになってくると結構実装があります。コード量はそんなにないので、ここの丸ごと貼っておきます。

ChatHistoryChannel.cs
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SemanticKernel.Agents.Extensions;
using Microsoft.SemanticKernel.ChatCompletion;

namespace Microsoft.SemanticKernel.Agents;

/// <summary>
/// A <see cref="AgentChannel"/> specialization for that acts upon a <see cref="IChatHistoryHandler"/>.
/// </summary>
public class ChatHistoryChannel : AgentChannel
{
    private readonly ChatHistory _history;

    /// <inheritdoc/>
    protected internal sealed override async IAsyncEnumerable<(bool IsVisible, ChatMessageContent Message)> InvokeAsync(
        Agent agent,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        if (agent is not IChatHistoryHandler historyHandler)
        {
            throw new KernelException($"Invalid channel binding for agent: {agent.Id} ({agent.GetType().FullName})");
        }

        // Capture the current message count to evaluate history mutation.
        int messageCount = this._history.Count;
        HashSet<ChatMessageContent> mutatedHistory = [];

        // Utilize a queue as a "read-ahead" cache to evaluate message sequencing (i.e., which message is final).
        Queue<ChatMessageContent> messageQueue = [];

        ChatMessageContent? yieldMessage = null;
        await foreach (ChatMessageContent responseMessage in historyHandler.InvokeAsync(this._history, cancellationToken).ConfigureAwait(false))
        {
            // Capture all messages that have been included in the mutated the history.
            for (int messageIndex = messageCount; messageIndex < this._history.Count; messageIndex++)
            {
                ChatMessageContent mutatedMessage = this._history[messageIndex];
                mutatedHistory.Add(mutatedMessage);
                messageQueue.Enqueue(mutatedMessage);
            }

            // Update the message count pointer to reflect the current history.
            messageCount = this._history.Count;

            // Avoid duplicating any message included in the mutated history and also returned by the enumeration result.
            if (!mutatedHistory.Contains(responseMessage))
            {
                this._history.Add(responseMessage);
                messageQueue.Enqueue(responseMessage);
            }

            // Dequeue the next message to yield.
            yieldMessage = messageQueue.Dequeue();
            yield return (IsMessageVisible(yieldMessage), yieldMessage);
        }

        // Dequeue any remaining messages to yield.
        while (messageQueue.Count > 0)
        {
            yieldMessage = messageQueue.Dequeue();

            yield return (IsMessageVisible(yieldMessage), yieldMessage);
        }

        // Function content not visible, unless result is the final message.
        bool IsMessageVisible(ChatMessageContent message) =>
            (!message.Items.Any(i => i is FunctionCallContent || i is FunctionResultContent) ||
              messageQueue.Count == 0);
    }

    /// <inheritdoc/>
    protected internal sealed override Task ReceiveAsync(IEnumerable<ChatMessageContent> history, CancellationToken cancellationToken)
    {
        this._history.AddRange(history);

        return Task.CompletedTask;
    }

    /// <inheritdoc/>
    protected internal sealed override IAsyncEnumerable<ChatMessageContent> GetHistoryAsync(CancellationToken cancellationToken)
    {
        return this._history.ToDescendingAsync();
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="ChatHistoryChannel"/> class.
    /// </summary>
    public ChatHistoryChannel()
    {
        this._history = [];
    }
}

ReceiveAsync はチャットメッセージを内部で管理している ChatHistory に追加するメソッドです。GetHistoryAsync はチャットメッセージの履歴を取得するメソッドです。InvokeAsyncIChatHistoryHandler に対してチャットメッセージを渡して、その結果を返すメソッドです。InvokeAsync 内では、関数呼び出しのメッセージの時は IsVisiblefalse になるようにしています。

次に、個人的に動きが良く分かっていない AggregatorAgentAggregatorChannel を見てみたいと思います。

AggregatorAgentAgentChat を束ねる Agent です。CreateChannelAsync メソッドでは AggregatorAgent のコンストラクタで渡した chatProvider を使った AgentChat を作って後述する AggregatorChannel を返します。

AggregatorChannel はコードをまるっと引用します。

AggregatorChannel.cs
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.SemanticKernel.Agents;

/// <summary>
/// Adapt channel contract to underlying <see cref="AgentChat"/>.
/// </summary>
internal sealed class AggregatorChannel(AgentChat chat) : AgentChannel<AggregatorAgent>
{
    private readonly AgentChat _chat = chat;

    protected internal override IAsyncEnumerable<ChatMessageContent> GetHistoryAsync(CancellationToken cancellationToken = default)
    {
        return this._chat.GetChatMessagesAsync(cancellationToken);
    }

    protected internal override async IAsyncEnumerable<(bool IsVisible, ChatMessageContent Message)> InvokeAsync(AggregatorAgent agent, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ChatMessageContent? lastMessage = null;

        await foreach (ChatMessageContent message in this._chat.InvokeAsync(cancellationToken).ConfigureAwait(false))
        {
            // For AggregatorMode.Flat, the entire aggregated chat is merged into the owning chat.
            if (agent.Mode == AggregatorMode.Flat)
            {
                yield return (IsVisible: true, message);
            }

            lastMessage = message;
        }

        // For AggregatorMode.Nested, only the final message is merged into the owning chat.
        // The entire history is always preserved within nested chat, however.
        if (agent.Mode == AggregatorMode.Nested && lastMessage is not null)
        {
            ChatMessageContent message =
                new(lastMessage.Role, lastMessage.Items, lastMessage.ModelId, lastMessage.InnerContent, lastMessage.Encoding, lastMessage.Metadata)
                {
                    AuthorName = agent.Name
                };

            yield return (IsVisible: true, message);
        }
    }

    protected internal override Task ReceiveAsync(IEnumerable<ChatMessageContent> history, CancellationToken cancellationToken = default)
    {
        // Always receive the initial history from the owning chat.
        this._chat.AddChatMessages([.. history]);

        return Task.CompletedTask;
    }
}

AggregatorChannelInvokeAsync を見ると AggregatorModeFlatNested によって、どのように挙動が変わるのかがわかります。Flat の場合は AgentChat の返したメッセージをそのまま履歴として使用しているのに対して、Nested の場合は最後のメッセージだけを使っています。

Agents.Core プロジェクト

Abstractions プロジェクトの方は見たいクラスは大体見たので最後に Core プロジェクトを見てみたいと思います。
こちらは実際にマルチエージェントのプログラムを組もうと思った時に使うことが多い ChatCompletionAgentAgentGroupChat があります。まずは ChatCompletionAgent から見ていきます。

クラス図にすると以下のようになります。

ここまでくると IChatCompletionService を使って実際に AI モデルとやり取りするような処理が入ってきています。これはコードを引用します。ストリーミングの処理は省略しています。基本的にストリーミングのほうはストリーミングじゃない方の InvokeAsync と基本的には同じです。

ChatCompletionAgent.cs
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SemanticKernel.ChatCompletion;

namespace Microsoft.SemanticKernel.Agents;

/// <summary>
/// A <see cref="KernelAgent"/> specialization based on <see cref="IChatCompletionService"/>.
/// </summary>
/// <remarks>
/// NOTE: Enable OpenAIPromptExecutionSettings.ToolCallBehavior for agent plugins.
/// (<see cref="ChatCompletionAgent.ExecutionSettings"/>)
/// </remarks>
public sealed class ChatCompletionAgent : ChatHistoryKernelAgent
{
    /// <summary>
    /// Optional execution settings for the agent.
    /// </summary>
    public PromptExecutionSettings? ExecutionSettings { get; set; }

    /// <inheritdoc/>
    public override async IAsyncEnumerable<ChatMessageContent> InvokeAsync(
        ChatHistory history,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        IChatCompletionService chatCompletionService = this.Kernel.GetRequiredService<IChatCompletionService>();

        ChatHistory chat = this.SetupAgentChatHistory(history);

        int messageCount = chat.Count;

        this.Logger.LogAgentChatServiceInvokingAgent(nameof(InvokeAsync), this.Id, chatCompletionService.GetType());

        IReadOnlyList<ChatMessageContent> messages =
            await chatCompletionService.GetChatMessageContentsAsync(
                chat,
                this.ExecutionSettings,
                this.Kernel,
                cancellationToken).ConfigureAwait(false);

        this.Logger.LogAgentChatServiceInvokedAgent(nameof(InvokeAsync), this.Id, chatCompletionService.GetType(), messages.Count);

        // Capture mutated messages related function calling / tools
        for (int messageIndex = messageCount; messageIndex < chat.Count; messageIndex++)
        {
            ChatMessageContent message = chat[messageIndex];

            message.AuthorName = this.Name;

            history.Add(message);
        }

        foreach (ChatMessageContent message in messages ?? [])
        {
            // TODO: MESSAGE SOURCE - ISSUE #5731
            message.AuthorName = this.Name;

            yield return message;
        }
    }

    private ChatHistory SetupAgentChatHistory(IReadOnlyList<ChatMessageContent> history)
    {
        ChatHistory chat = [];

        if (!string.IsNullOrWhiteSpace(this.Instructions))
        {
            chat.Add(new ChatMessageContent(AuthorRole.System, this.Instructions) { AuthorName = this.Name });
        }

        chat.AddRange(history);

        return chat;
    }
}

SetupAgentChatHistory では、AgentInstructions をチャット履歴の先頭にシステムメッセージとして仕込んだ ChatHistory を作成しています。その状態で AI の Chat completions API に渡して結果を返しています。ChatCompletionAgeent はシンプルに理解するとシステムメッセージをチャット履歴の先頭に差し込んで Chat Completions API を呼ぶためのヘルパー的なものになると思います。

最後に AgentGroupChat は複数の Agent が参加したチャットを管理するクラスです。これは真面目に読もうと思うので全部のコードを確認します。

// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.Agents.Chat;
using Microsoft.SemanticKernel.ChatCompletion;

namespace Microsoft.SemanticKernel.Agents;

/// <summary>
/// A an <see cref="AgentChat"/> that supports multi-turn interactions.
/// </summary>
public sealed class AgentGroupChat : AgentChat
{
    private readonly HashSet<string> _agentIds; // Efficient existence test O(1) vs O(n) for list.
    private readonly List<Agent> _agents; // Maintain order the agents joined the chat

    /// <summary>
    /// Indicates if completion criteria has been met.  If set, no further
    /// agent interactions will occur.  Clear to enable more agent interactions.
    /// </summary>
    public bool IsComplete { get; set; }

    /// <summary>
    /// Settings for defining chat behavior.
    /// </summary>
    public AgentGroupChatSettings ExecutionSettings { get; set; } = new AgentGroupChatSettings();

    /// <summary>
    /// The agents participating in the chat.
    /// </summary>
    public IReadOnlyList<Agent> Agents => this._agents.AsReadOnly();

    /// <summary>
    /// Add a <see cref="Agent"/> to the chat.
    /// </summary>
    /// <param name="agent">The <see cref="KernelAgent"/> to add.</param>
    public void AddAgent(Agent agent)
    {
        if (this._agentIds.Add(agent.Id))
        {
            this._agents.Add(agent);
        }
    }

    /// <summary>
    /// Process a series of interactions between the <see cref="AgentGroupChat.Agents"/> that have joined this <see cref="AgentGroupChat"/>.
    /// The interactions will proceed according to the <see cref="SelectionStrategy"/> and the <see cref="TerminationStrategy"/>
    /// defined via <see cref="AgentGroupChat.ExecutionSettings"/>.
    /// In the absence of an <see cref="AgentGroupChatSettings.SelectionStrategy"/>, this method will not invoke any agents.
    /// Any agent may be explicitly selected by calling <see cref="AgentGroupChat.InvokeAsync(Agent, bool, CancellationToken)"/>.
    /// </summary>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>Asynchronous enumeration of messages.</returns>
    public override async IAsyncEnumerable<ChatMessageContent> InvokeAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        this.EnsureStrategyLoggerAssignment();

        if (this.IsComplete)
        {
            // Throw exception if chat is completed and automatic-reset is not enabled.
            if (!this.ExecutionSettings.TerminationStrategy.AutomaticReset)
            {
                throw new KernelException("Agent Failure - Chat has completed.");
            }

            this.IsComplete = false;
        }

        this.Logger.LogAgentGroupChatInvokingAgents(nameof(InvokeAsync), this.Agents);

        for (int index = 0; index < this.ExecutionSettings.TerminationStrategy.MaximumIterations; index++)
        {
            // Identify next agent using strategy
            this.Logger.LogAgentGroupChatSelectingAgent(nameof(InvokeAsync), this.ExecutionSettings.SelectionStrategy.GetType());

            Agent agent;
            try
            {
                agent = await this.ExecutionSettings.SelectionStrategy.NextAsync(this.Agents, this.History, cancellationToken).ConfigureAwait(false);
            }
            catch (Exception exception)
            {
                this.Logger.LogAgentGroupChatNoAgentSelected(nameof(InvokeAsync), exception);
                throw;
            }

            this.Logger.LogAgentGroupChatSelectedAgent(nameof(InvokeAsync), agent.GetType(), agent.Id, this.ExecutionSettings.SelectionStrategy.GetType());

            // Invoke agent and process messages along with termination
            await foreach (var message in base.InvokeAgentAsync(agent, cancellationToken).ConfigureAwait(false))
            {
                if (message.Role == AuthorRole.Assistant)
                {
                    var task = this.ExecutionSettings.TerminationStrategy.ShouldTerminateAsync(agent, this.History, cancellationToken);
                    this.IsComplete = await task.ConfigureAwait(false);
                }

                yield return message;
            }

            if (this.IsComplete)
            {
                break;
            }
        }

        this.Logger.LogAgentGroupChatYield(nameof(InvokeAsync), this.IsComplete);
    }

    /// <summary>
    /// Process a single interaction between a given <see cref="Agent"/> an a <see cref="AgentGroupChat"/>.
    /// </summary>
    /// <param name="agent">The agent actively interacting with the chat.</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>Asynchronous enumeration of messages.</returns>
    /// <remark>
    /// Specified agent joins the chat.
    /// </remark>>
    public IAsyncEnumerable<ChatMessageContent> InvokeAsync(
        Agent agent,
        CancellationToken cancellationToken = default) =>
        this.InvokeAsync(agent, isJoining: true, cancellationToken);

    /// <summary>
    /// Process a single interaction between a given <see cref="KernelAgent"/> an a <see cref="AgentGroupChat"/> irregardless of
    /// the <see cref="SelectionStrategy"/> defined via <see cref="AgentGroupChat.ExecutionSettings"/>.  Likewise, this does
    /// not regard <see cref="TerminationStrategy.MaximumIterations"/> as it only takes a single turn for the specified agent.
    /// </summary>
    /// <param name="agent">The agent actively interacting with the chat.</param>
    /// <param name="isJoining">Optional flag to control if agent is joining the chat.</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>Asynchronous enumeration of messages.</returns>
    public async IAsyncEnumerable<ChatMessageContent> InvokeAsync(
        Agent agent,
        bool isJoining,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        this.EnsureStrategyLoggerAssignment();

        this.Logger.LogAgentGroupChatInvokingAgent(nameof(InvokeAsync), agent.GetType(), agent.Id);

        if (isJoining)
        {
            this.AddAgent(agent);
        }

        await foreach (var message in base.InvokeAgentAsync(agent, cancellationToken).ConfigureAwait(false))
        {
            if (message.Role == AuthorRole.Assistant)
            {
                var task = this.ExecutionSettings.TerminationStrategy.ShouldTerminateAsync(agent, this.History, cancellationToken);
                this.IsComplete = await task.ConfigureAwait(false);
            }

            yield return message;
        }

        this.Logger.LogAgentGroupChatYield(nameof(InvokeAsync), this.IsComplete);
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="AgentGroupChat"/> class.
    /// </summary>
    /// <param name="agents">The agents initially participating in the chat.</param>
    public AgentGroupChat(params Agent[] agents)
    {
        this._agents = new(agents);
        this._agentIds = new(this._agents.Select(a => a.Id));
    }

    private void EnsureStrategyLoggerAssignment()
    {
        // Only invoke logger factory when required.
        if (this.ExecutionSettings.SelectionStrategy.Logger == NullLogger.Instance)
        {
            this.ExecutionSettings.SelectionStrategy.Logger = this.LoggerFactory.CreateLogger(this.ExecutionSettings.SelectionStrategy.GetType());
        }

        if (this.ExecutionSettings.TerminationStrategy.Logger == NullLogger.Instance)
        {
            this.ExecutionSettings.TerminationStrategy.Logger = this.LoggerFactory.CreateLogger(this.ExecutionSettings.TerminationStrategy.GetType());
        }
    }
}

InvokeAsync メソッドを呼び出すと参加している Agent 達に対して ExecutionSettingsSelectionStrategy で次に話す Agent を選択して TerminationStrategy がチャットを完了させるかどうかを判断するようになっています。TerminationStrategyMaximumIterations を最大のやり取り回数として会話が完了したと判定されるまで会話を続けます。チャットが完了したかどうかは IsComplete プロパティで管理されています。

Agent を受け取る方の InvokeAsync は、必要であれば Agent を追加してから 1 ターンだけ会話をさせるという事も出来ます。意外と融通が利きますね。

あと、あまり使わなさそうな機能として TerminationStrategyAutomaticResettrue に設定しておくと一度完了したチャットに対しても、シームレスに続きを開始することが出来ます。ただ、AgentChatGroup とその親クラスの AgentChat クラス通してみても内部で管理されている ChatHistory をクリアする方法はないので、あんまり使いどころはないかもしれません。

まとめ

ということでコードを読みながらだらだらと思ったことを書いていきました。
この内容を踏まえて、今後、色々試していこうと思います。

因みに Agent 系機能は、まだ alpha なのと .Net: Enhance Extensibility of Agent Classes by Unsealing and Implementing Interfaces (.NET, Agents) という issue にもある通り interface が追加される予定なので今後も変わっていきます。
あくまで、今回の記事内にあるコードは 2024 年 9 月 15 日次点のものになります。

Microsoft (有志)

Discussion