Semantic Kernel のマルチエージェント AI 機能入門してみよう その 2
はじめに
今回は、Agent 系の機能のクラスの構造を見ていこうと思います。
この記事は、特にわかりやすく説明しようという類ではなく、自分がコードを見た時に思ったことをメモしているだけです。
Agents.Abstractions プロジェクト
まずは Agents.Abstractions
プロジェクトを覗いてみようと思います。
まずは、おおもとである Agent クラスです。
Agent クラス自体には Azure OpenAI Service などとやり取りするような機能はありません。恐らく CreateChannelAsync
メソッドで作られる AgentChannel
がその機能を持っているのだと思われます。ということで AgentChannel
クラスを見てみましょう。
AgentChannel
クラスに Agent
を呼び出したり、チャットの履歴を操作する系の機能がありました。
この Agent
クラスを継承して ChatHistoryKernelAgent
クラス ChatHistoryChannel
があります。
まずは ChatHistoryKernelAgent
クラスを見てみましょう。これは Agent
からの直継承ではなく間に KernelAgent
というクラスが挟まっています。
この KernelAgent
は Instructions
で Agent
への指示を受け取るプロパティが追加されています。個人的には Agent
のレベルで Instructions
があると思ってたのですが KernelAgent
からみたいです。恐らく Agent
の子クラスには AggregatorAgent
という複数の Agent
を束ねるような Agent
もあるので、そのために Instructions
が KernelAgent
にあるのかもしれません。
また、KernelAgent
には Kernel
プロパティもあります。これはデフォルトでは new Kernel()
しているだけの空っぽの Kernel
が設定されています。
次に ChatHistoryKernelAgent
クラスを見てみましょう。このクラスは KernelAgent
からの直継承です。このクラスは ChatHistory
というクラスを受け取る InvokeAsync
と InvokeStreamingAsync
メソッドを持っています。この 2 つのメソッドは IChatHistoryHandler
インターフェースを実装する形になっています。これで、Agent
と OpenAI
を繋ぐ処理が動きそうなメソッドが出てきました。ただ、この時点では、まだ abstract
メソッドなので中身はありません。
この ChatHistoryKernelAgent
クラスと対になる AgentChannel
クラスは ChatHistoryChannel
です。ChatHistoryKernelAgent
クラスの CreateChannelAsync
メソッドは以下のように実装されています。
protected internal sealed override Task<AgentChannel> CreateChannelAsync(CancellationToken cancellationToken)
{
ChatHistoryChannel channel =
new()
{
Logger = this.LoggerFactory.CreateLogger<ChatHistoryChannel>()
};
return Task.FromResult<AgentChannel>(channel);
}
この ChatHistoryChannel
クラスは AgentChannel
を継承する形で定義されています。
この ChatHistoryChannel
クラスくらいになってくると結構実装があります。コード量はそんなにないので、ここの丸ごと貼っておきます。
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SemanticKernel.Agents.Extensions;
using Microsoft.SemanticKernel.ChatCompletion;
namespace Microsoft.SemanticKernel.Agents;
/// <summary>
/// A <see cref="AgentChannel"/> specialization for that acts upon a <see cref="IChatHistoryHandler"/>.
/// </summary>
public class ChatHistoryChannel : AgentChannel
{
private readonly ChatHistory _history;
/// <inheritdoc/>
protected internal sealed override async IAsyncEnumerable<(bool IsVisible, ChatMessageContent Message)> InvokeAsync(
Agent agent,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (agent is not IChatHistoryHandler historyHandler)
{
throw new KernelException($"Invalid channel binding for agent: {agent.Id} ({agent.GetType().FullName})");
}
// Capture the current message count to evaluate history mutation.
int messageCount = this._history.Count;
HashSet<ChatMessageContent> mutatedHistory = [];
// Utilize a queue as a "read-ahead" cache to evaluate message sequencing (i.e., which message is final).
Queue<ChatMessageContent> messageQueue = [];
ChatMessageContent? yieldMessage = null;
await foreach (ChatMessageContent responseMessage in historyHandler.InvokeAsync(this._history, cancellationToken).ConfigureAwait(false))
{
// Capture all messages that have been included in the mutated the history.
for (int messageIndex = messageCount; messageIndex < this._history.Count; messageIndex++)
{
ChatMessageContent mutatedMessage = this._history[messageIndex];
mutatedHistory.Add(mutatedMessage);
messageQueue.Enqueue(mutatedMessage);
}
// Update the message count pointer to reflect the current history.
messageCount = this._history.Count;
// Avoid duplicating any message included in the mutated history and also returned by the enumeration result.
if (!mutatedHistory.Contains(responseMessage))
{
this._history.Add(responseMessage);
messageQueue.Enqueue(responseMessage);
}
// Dequeue the next message to yield.
yieldMessage = messageQueue.Dequeue();
yield return (IsMessageVisible(yieldMessage), yieldMessage);
}
// Dequeue any remaining messages to yield.
while (messageQueue.Count > 0)
{
yieldMessage = messageQueue.Dequeue();
yield return (IsMessageVisible(yieldMessage), yieldMessage);
}
// Function content not visible, unless result is the final message.
bool IsMessageVisible(ChatMessageContent message) =>
(!message.Items.Any(i => i is FunctionCallContent || i is FunctionResultContent) ||
messageQueue.Count == 0);
}
/// <inheritdoc/>
protected internal sealed override Task ReceiveAsync(IEnumerable<ChatMessageContent> history, CancellationToken cancellationToken)
{
this._history.AddRange(history);
return Task.CompletedTask;
}
/// <inheritdoc/>
protected internal sealed override IAsyncEnumerable<ChatMessageContent> GetHistoryAsync(CancellationToken cancellationToken)
{
return this._history.ToDescendingAsync();
}
/// <summary>
/// Initializes a new instance of the <see cref="ChatHistoryChannel"/> class.
/// </summary>
public ChatHistoryChannel()
{
this._history = [];
}
}
ReceiveAsync
はチャットメッセージを内部で管理している ChatHistory
に追加するメソッドです。GetHistoryAsync
はチャットメッセージの履歴を取得するメソッドです。InvokeAsync
は IChatHistoryHandler
に対してチャットメッセージを渡して、その結果を返すメソッドです。InvokeAsync
内では、関数呼び出しのメッセージの時は IsVisible
が false
になるようにしています。
次に、個人的に動きが良く分かっていない AggregatorAgent
と AggregatorChannel
を見てみたいと思います。
AggregatorAgent
は AgentChat
を束ねる Agent
です。CreateChannelAsync
メソッドでは AggregatorAgent
のコンストラクタで渡した chatProvider
を使った AgentChat
を作って後述する AggregatorChannel
を返します。
AggregatorChannel
はコードをまるっと引用します。
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.SemanticKernel.Agents;
/// <summary>
/// Adapt channel contract to underlying <see cref="AgentChat"/>.
/// </summary>
internal sealed class AggregatorChannel(AgentChat chat) : AgentChannel<AggregatorAgent>
{
private readonly AgentChat _chat = chat;
protected internal override IAsyncEnumerable<ChatMessageContent> GetHistoryAsync(CancellationToken cancellationToken = default)
{
return this._chat.GetChatMessagesAsync(cancellationToken);
}
protected internal override async IAsyncEnumerable<(bool IsVisible, ChatMessageContent Message)> InvokeAsync(AggregatorAgent agent, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ChatMessageContent? lastMessage = null;
await foreach (ChatMessageContent message in this._chat.InvokeAsync(cancellationToken).ConfigureAwait(false))
{
// For AggregatorMode.Flat, the entire aggregated chat is merged into the owning chat.
if (agent.Mode == AggregatorMode.Flat)
{
yield return (IsVisible: true, message);
}
lastMessage = message;
}
// For AggregatorMode.Nested, only the final message is merged into the owning chat.
// The entire history is always preserved within nested chat, however.
if (agent.Mode == AggregatorMode.Nested && lastMessage is not null)
{
ChatMessageContent message =
new(lastMessage.Role, lastMessage.Items, lastMessage.ModelId, lastMessage.InnerContent, lastMessage.Encoding, lastMessage.Metadata)
{
AuthorName = agent.Name
};
yield return (IsVisible: true, message);
}
}
protected internal override Task ReceiveAsync(IEnumerable<ChatMessageContent> history, CancellationToken cancellationToken = default)
{
// Always receive the initial history from the owning chat.
this._chat.AddChatMessages([.. history]);
return Task.CompletedTask;
}
}
AggregatorChannel
の InvokeAsync
を見ると AggregatorMode
の Flat
と Nested
によって、どのように挙動が変わるのかがわかります。Flat
の場合は AgentChat
の返したメッセージをそのまま履歴として使用しているのに対して、Nested
の場合は最後のメッセージだけを使っています。
Agents.Core プロジェクト
Abstractions
プロジェクトの方は見たいクラスは大体見たので最後に Core
プロジェクトを見てみたいと思います。
こちらは実際にマルチエージェントのプログラムを組もうと思った時に使うことが多い ChatCompletionAgent
と AgentGroupChat
があります。まずは ChatCompletionAgent
から見ていきます。
クラス図にすると以下のようになります。
ここまでくると IChatCompletionService
を使って実際に AI モデルとやり取りするような処理が入ってきています。これはコードを引用します。ストリーミングの処理は省略しています。基本的にストリーミングのほうはストリーミングじゃない方の InvokeAsync
と基本的には同じです。
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SemanticKernel.ChatCompletion;
namespace Microsoft.SemanticKernel.Agents;
/// <summary>
/// A <see cref="KernelAgent"/> specialization based on <see cref="IChatCompletionService"/>.
/// </summary>
/// <remarks>
/// NOTE: Enable OpenAIPromptExecutionSettings.ToolCallBehavior for agent plugins.
/// (<see cref="ChatCompletionAgent.ExecutionSettings"/>)
/// </remarks>
public sealed class ChatCompletionAgent : ChatHistoryKernelAgent
{
/// <summary>
/// Optional execution settings for the agent.
/// </summary>
public PromptExecutionSettings? ExecutionSettings { get; set; }
/// <inheritdoc/>
public override async IAsyncEnumerable<ChatMessageContent> InvokeAsync(
ChatHistory history,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
IChatCompletionService chatCompletionService = this.Kernel.GetRequiredService<IChatCompletionService>();
ChatHistory chat = this.SetupAgentChatHistory(history);
int messageCount = chat.Count;
this.Logger.LogAgentChatServiceInvokingAgent(nameof(InvokeAsync), this.Id, chatCompletionService.GetType());
IReadOnlyList<ChatMessageContent> messages =
await chatCompletionService.GetChatMessageContentsAsync(
chat,
this.ExecutionSettings,
this.Kernel,
cancellationToken).ConfigureAwait(false);
this.Logger.LogAgentChatServiceInvokedAgent(nameof(InvokeAsync), this.Id, chatCompletionService.GetType(), messages.Count);
// Capture mutated messages related function calling / tools
for (int messageIndex = messageCount; messageIndex < chat.Count; messageIndex++)
{
ChatMessageContent message = chat[messageIndex];
message.AuthorName = this.Name;
history.Add(message);
}
foreach (ChatMessageContent message in messages ?? [])
{
// TODO: MESSAGE SOURCE - ISSUE #5731
message.AuthorName = this.Name;
yield return message;
}
}
private ChatHistory SetupAgentChatHistory(IReadOnlyList<ChatMessageContent> history)
{
ChatHistory chat = [];
if (!string.IsNullOrWhiteSpace(this.Instructions))
{
chat.Add(new ChatMessageContent(AuthorRole.System, this.Instructions) { AuthorName = this.Name });
}
chat.AddRange(history);
return chat;
}
}
SetupAgentChatHistory
では、Agent
の Instructions
をチャット履歴の先頭にシステムメッセージとして仕込んだ ChatHistory
を作成しています。その状態で AI の Chat completions API に渡して結果を返しています。ChatCompletionAgeent
はシンプルに理解するとシステムメッセージをチャット履歴の先頭に差し込んで Chat Completions API を呼ぶためのヘルパー的なものになると思います。
最後に AgentGroupChat
は複数の Agent
が参加したチャットを管理するクラスです。これは真面目に読もうと思うので全部のコードを確認します。
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.Agents.Chat;
using Microsoft.SemanticKernel.ChatCompletion;
namespace Microsoft.SemanticKernel.Agents;
/// <summary>
/// A an <see cref="AgentChat"/> that supports multi-turn interactions.
/// </summary>
public sealed class AgentGroupChat : AgentChat
{
private readonly HashSet<string> _agentIds; // Efficient existence test O(1) vs O(n) for list.
private readonly List<Agent> _agents; // Maintain order the agents joined the chat
/// <summary>
/// Indicates if completion criteria has been met. If set, no further
/// agent interactions will occur. Clear to enable more agent interactions.
/// </summary>
public bool IsComplete { get; set; }
/// <summary>
/// Settings for defining chat behavior.
/// </summary>
public AgentGroupChatSettings ExecutionSettings { get; set; } = new AgentGroupChatSettings();
/// <summary>
/// The agents participating in the chat.
/// </summary>
public IReadOnlyList<Agent> Agents => this._agents.AsReadOnly();
/// <summary>
/// Add a <see cref="Agent"/> to the chat.
/// </summary>
/// <param name="agent">The <see cref="KernelAgent"/> to add.</param>
public void AddAgent(Agent agent)
{
if (this._agentIds.Add(agent.Id))
{
this._agents.Add(agent);
}
}
/// <summary>
/// Process a series of interactions between the <see cref="AgentGroupChat.Agents"/> that have joined this <see cref="AgentGroupChat"/>.
/// The interactions will proceed according to the <see cref="SelectionStrategy"/> and the <see cref="TerminationStrategy"/>
/// defined via <see cref="AgentGroupChat.ExecutionSettings"/>.
/// In the absence of an <see cref="AgentGroupChatSettings.SelectionStrategy"/>, this method will not invoke any agents.
/// Any agent may be explicitly selected by calling <see cref="AgentGroupChat.InvokeAsync(Agent, bool, CancellationToken)"/>.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>Asynchronous enumeration of messages.</returns>
public override async IAsyncEnumerable<ChatMessageContent> InvokeAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
this.EnsureStrategyLoggerAssignment();
if (this.IsComplete)
{
// Throw exception if chat is completed and automatic-reset is not enabled.
if (!this.ExecutionSettings.TerminationStrategy.AutomaticReset)
{
throw new KernelException("Agent Failure - Chat has completed.");
}
this.IsComplete = false;
}
this.Logger.LogAgentGroupChatInvokingAgents(nameof(InvokeAsync), this.Agents);
for (int index = 0; index < this.ExecutionSettings.TerminationStrategy.MaximumIterations; index++)
{
// Identify next agent using strategy
this.Logger.LogAgentGroupChatSelectingAgent(nameof(InvokeAsync), this.ExecutionSettings.SelectionStrategy.GetType());
Agent agent;
try
{
agent = await this.ExecutionSettings.SelectionStrategy.NextAsync(this.Agents, this.History, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
this.Logger.LogAgentGroupChatNoAgentSelected(nameof(InvokeAsync), exception);
throw;
}
this.Logger.LogAgentGroupChatSelectedAgent(nameof(InvokeAsync), agent.GetType(), agent.Id, this.ExecutionSettings.SelectionStrategy.GetType());
// Invoke agent and process messages along with termination
await foreach (var message in base.InvokeAgentAsync(agent, cancellationToken).ConfigureAwait(false))
{
if (message.Role == AuthorRole.Assistant)
{
var task = this.ExecutionSettings.TerminationStrategy.ShouldTerminateAsync(agent, this.History, cancellationToken);
this.IsComplete = await task.ConfigureAwait(false);
}
yield return message;
}
if (this.IsComplete)
{
break;
}
}
this.Logger.LogAgentGroupChatYield(nameof(InvokeAsync), this.IsComplete);
}
/// <summary>
/// Process a single interaction between a given <see cref="Agent"/> an a <see cref="AgentGroupChat"/>.
/// </summary>
/// <param name="agent">The agent actively interacting with the chat.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>Asynchronous enumeration of messages.</returns>
/// <remark>
/// Specified agent joins the chat.
/// </remark>>
public IAsyncEnumerable<ChatMessageContent> InvokeAsync(
Agent agent,
CancellationToken cancellationToken = default) =>
this.InvokeAsync(agent, isJoining: true, cancellationToken);
/// <summary>
/// Process a single interaction between a given <see cref="KernelAgent"/> an a <see cref="AgentGroupChat"/> irregardless of
/// the <see cref="SelectionStrategy"/> defined via <see cref="AgentGroupChat.ExecutionSettings"/>. Likewise, this does
/// not regard <see cref="TerminationStrategy.MaximumIterations"/> as it only takes a single turn for the specified agent.
/// </summary>
/// <param name="agent">The agent actively interacting with the chat.</param>
/// <param name="isJoining">Optional flag to control if agent is joining the chat.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>Asynchronous enumeration of messages.</returns>
public async IAsyncEnumerable<ChatMessageContent> InvokeAsync(
Agent agent,
bool isJoining,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
this.EnsureStrategyLoggerAssignment();
this.Logger.LogAgentGroupChatInvokingAgent(nameof(InvokeAsync), agent.GetType(), agent.Id);
if (isJoining)
{
this.AddAgent(agent);
}
await foreach (var message in base.InvokeAgentAsync(agent, cancellationToken).ConfigureAwait(false))
{
if (message.Role == AuthorRole.Assistant)
{
var task = this.ExecutionSettings.TerminationStrategy.ShouldTerminateAsync(agent, this.History, cancellationToken);
this.IsComplete = await task.ConfigureAwait(false);
}
yield return message;
}
this.Logger.LogAgentGroupChatYield(nameof(InvokeAsync), this.IsComplete);
}
/// <summary>
/// Initializes a new instance of the <see cref="AgentGroupChat"/> class.
/// </summary>
/// <param name="agents">The agents initially participating in the chat.</param>
public AgentGroupChat(params Agent[] agents)
{
this._agents = new(agents);
this._agentIds = new(this._agents.Select(a => a.Id));
}
private void EnsureStrategyLoggerAssignment()
{
// Only invoke logger factory when required.
if (this.ExecutionSettings.SelectionStrategy.Logger == NullLogger.Instance)
{
this.ExecutionSettings.SelectionStrategy.Logger = this.LoggerFactory.CreateLogger(this.ExecutionSettings.SelectionStrategy.GetType());
}
if (this.ExecutionSettings.TerminationStrategy.Logger == NullLogger.Instance)
{
this.ExecutionSettings.TerminationStrategy.Logger = this.LoggerFactory.CreateLogger(this.ExecutionSettings.TerminationStrategy.GetType());
}
}
}
InvokeAsync
メソッドを呼び出すと参加している Agent
達に対して ExecutionSettings
の SelectionStrategy
で次に話す Agent
を選択して TerminationStrategy
がチャットを完了させるかどうかを判断するようになっています。TerminationStrategy
の MaximumIterations
を最大のやり取り回数として会話が完了したと判定されるまで会話を続けます。チャットが完了したかどうかは IsComplete
プロパティで管理されています。
Agent
を受け取る方の InvokeAsync
は、必要であれば Agent
を追加してから 1 ターンだけ会話をさせるという事も出来ます。意外と融通が利きますね。
あと、あまり使わなさそうな機能として TerminationStrategy
の AutomaticReset
を true
に設定しておくと一度完了したチャットに対しても、シームレスに続きを開始することが出来ます。ただ、AgentChatGroup
とその親クラスの AgentChat
クラス通してみても内部で管理されている ChatHistory
をクリアする方法はないので、あんまり使いどころはないかもしれません。
まとめ
ということでコードを読みながらだらだらと思ったことを書いていきました。
この内容を踏まえて、今後、色々試していこうと思います。
因みに Agent
系機能は、まだ alpha なのと .Net: Enhance Extensibility of Agent Classes by Unsealing and Implementing Interfaces (.NET, Agents) という issue にもある通り interface が追加される予定なので今後も変わっていきます。
あくまで、今回の記事内にあるコードは 2024 年 9 月 15 日次点のものになります。
Discussion