Koogで始めるA2Aプロトコル
こちらはMoney Forward Kansai Advent Calendarの1日目の記事です。
はじめに
こんにちは。マネーフォワード 大阪開発拠点にてバックエンドエンジニアをしているTaskです。
普段はKotlinを用いてバックエンド開発をしています。
巷で話題になっているA2Aプロトコルを理解するために、KotlinのAIエージェントフレームワークであるKoogで簡単なA2Aサーバーとクライアントを実装してみました。
その実装をもとに、A2Aプロトコルを紹介したいと思います。
この記事では、以下の読者を想定しています。
- A2Aプロトコルに興味がある方
- KotlinおよびKoogに興味がある方
- ドキュメントよりも、実装から入るボトムアップな理解を得意とする方
また、この記事では以下のトピックを扱いません。
- A2Aプロトコルに関する詳細
- KoogおよびAIエージェントに対する包括的な説明
A2Aプロトコルとは
A2AプロトコルとはAIエージェント間の通信方法の規定した標準です。
Googleによって2025年4月9日に公開され[1]、2025年12月1日現在のバージョンは0.3.0[2]です。
Agent間の相互作用について定義されており、以下の特徴があります[3]。
- クライアントとサーバーが存在する
- サーバーの情報はAgentCardというJSON形式のデータに集約される
- クライアント側はメッセージを送信し、サーバー側はそれに対してメッセージまたはTaskを返答する
- 上記のやりとりを通じて、サーバー側のAIエージェントが目的を達成する
- リアルタイム通信のためにSSEが利用される
- 一連のやりとりを記憶するためのContextが存在する
また、A2Aプロトコルはエンタープライズ開発で必須になる認証・認可やObservabilityに関してもサポートしています[4]。
Koogとは
Koogとは、JetBrainsから提供されているKotlinのAIエージェントフレームワークです。
Koogを利用することで、最小の設定でAIエージェント開発を開始できます。
fun main() = runBlocking {
// Before you run the example, assign a corresponding API key as an environment variable.
val apiKey = System.getenv("OPENAI_API_KEY") // or Anthropic, Google, OpenRouter, etc.
val agent = AIAgent(
executor = simpleOpenAIExecutor(apiKey), // or Anthropic, Google, OpenRouter, etc.
systemPrompt = "You are a helpful assistant. Answer user questions concisely.",
llmModel = OpenAIModels.Chat.GPT4o
)
val result = agent.run("Hello! How can you help me?")
println(result)
}
(README.mdより抜粋)
A2Aクライアントおよびサーバーもサポートされています。
以降では、実装にKoogを利用して挨拶と天気情報を教えてくれるエージェントを実装したA2Aサーバーおよびそのクライアントの実装例について紹介します。
挨拶エージェントは送られてきたメッセージに対し適切な挨拶を返し、天気情報エージェントは与えられた日時と場所の天気を検索して返します。
リポジトリはこちらです。
Koogのバージョンは2025年12月1日現在最新の0.5.3を利用しています。
A2Aサーバーの実装
まずはA2Aサーバーの実装を見ていきます。
AgentCard
最初にAgentCardを実装します。
AgentCardには、A2Aサーバーが扱うAIエージェントの情報を書きます。
名前や詳細、URLの他に、特徴的な要素としてCapabilitiesやSkillsといった情報を埋め込むことができます。
CapabilitiesにはStreaming(SSE)やPush通知をサポートしているかといった情報を書けます。
また、Skillsには、そのA2AサーバーがどのようなAIエージェントを所持しているかといった情報を複数書けます。
今回の実装では、挨拶と天気情報を教えてくれるAIエージェントがあり、Streamingをサポートしているので、AgentCardの実装は以下になります。
val agentCard = AgentCard(
name = "Greeting and Weather Search Agent",
description = "An agent that can greet you or search for weather information",
url = "http://localhost:8080/a2a",
version = "0.0.1",
capabilities = AgentCapabilities(streaming = true),
defaultInputModes = listOf("text"),
defaultOutputModes = listOf("text"),
skills = listOf(
AgentSkill(
id = "greetings",
name = "Greetings",
description = "Returns appropriate greetings when greeted",
tags = listOf("greeting", "hello", "hi"),
examples = listOf("Hello", "Hi", "Good morning", "Guten Tag"),
),
AgentSkill(
id = "weather-search",
name = "Weather Search",
description = "Searches for weather information for a specific date and location",
tags = listOf("weather", "forecast", "天気"),
examples = listOf("今日の大阪の天気は?", "明日の東京の天気を教えて"),
),
),
)
A2AクライアントはAgentCardをもとにA2Aサーバーとの通信を行います。
AgentExecutor
A2Aサーバーのメインの処理はAgentExecutor上で行われます。
AgentExecutorは以下のメソッドを持つインターフェースなので、継承した上でexecuteメソッドを実装していきます。
public interface AgentExecutor {
public suspend fun execute(context: RequestContext<MessageSendParams>, eventProcessor: SessionEventProcessor)
// cancelについては割愛
}
executeメソッドは、RequestContextオブジェクトとSessionEventProcessorオブジェクトの2つを受け取ります。
RequestContext
RequestContextはA2Aサーバーとクライアント間のやり取りをまとめて保持できるオブジェクトです。
Webアプリにおけるセッションに近い概念です。
RequestContextはContextTaskStorageとContextMessageStorageという2つのストレージを持っています(現在、インメモリストレージのみがサポートされています)。
また、ユーザーからのリクエストもこのRequestContextに保持されています。
1つのMessageオブジェクト中に複数のPartオブジェクトが保持されています。
Partはクライアントが送信できる最小の単位であり、テキストの他に複数のファイルなどを一度のMessageで送信可能です。
こちらは、RequestContextからクライアントが送信したテキストメッセージを取得する例
val userMessage = context.params.message
val userInput = userMessage.parts
.filterIsInstance<TextPart>()
.joinToString(" ") { it.text }
SessionEventProcessor
SessionEventProcessorは、サーバーからクライアントにイベントを送信するためのクラスです。
イベントにはMessageとTaskの2種類があり、それぞれsendMessageとsendTaskEventから実行できます。
Messageイベントでは、クライアント側に単一のMessageを送信します。
Messageは一度のリクエストに対して一つしか送信できません(複数送信しようとしてもエラーになります)。
今回の実装例では、挨拶を行うAgentExecutorはこの機能を使って実装しています。
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor,
) {
val message = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("Hello World")),
contextId = context.contextId,
taskId = context.taskId
)
eventProcessor.sendMessage(message)
}
一方で、Taskイベントでは、一度のリクエストに対して複数のレスポンスを返すことができます。
sendTaskEventで利用できるイベントはTask、TaskStatusUpdateEventおよびTaskArtifactUpdateEventの3種類があります。
Taskはステートフルなオブジェクトであり、大まかな流れとしては、はじめにTaskを作成し、TaskStatusUpdateEventあるいはTaskArtifactUpdateEventを通してそのTaskを更新しつつ、目的を達成してく、という流れになります。
また、Taskイベントを利用する場合、最後にstatusがfinal = trueになるように注意する必要があります。
天気情報エージェントでは、こちらを利用することで、複数のメッセージを返せるように実装しています。
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor,
) {
eventProcessor.sendTaskEvent(
Task(
id = context.taskId,
contextId = context.contextId,
status = TaskStatus(TaskState.Submitted),
)
)
// Messageからdateとlocationを抜き出す
val date = ...
val location = ...
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
taskId = context.taskId,
contextId = context.contextId,
status = TaskStatus(
state = TaskState.Working,
message = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("${date}の${location}の天気を検索します...")),
contextId = context.contextId,
taskId = context.taskId
)
),
final = false
)
)
// 結果を検索する
val weatherResult = ...
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
taskId = context.taskId,
contextId = context.contextId,
status = TaskStatus(
state = TaskState.Completed,
message = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart(weatherResult)),
contextId = context.contextId,
taskId = context.taskId
)
),
final = true
)
)
}
A2AServer
最後に、上記で実装したAgentCardとAgentExecutorを公開します。
Koogでは、A2AServerというクラスが定義されており、そのコンストラクタにそれぞれのオブジェクトを渡すことでサーバーのリクエストハンドラを設定できます。
val server = A2AServer(
agentExecutor,
agentCard,
)
最後に、設定したリクエストハンドラをサーバーランタイム(Ktor)に乗せて完成です。
HttpJSONRPCServerTransport(server).start(
engineFactory = ServerCIO,
port = 8080,
path = "/a2a",
wait = true,
agentCard = agentCard,
)
A2Aクライアントの実装
最初に、A2AClientオブジェクトを作成します。
A2AServerで設定した情報を入力します。
val transport = HttpJSONRPCClientTransport(url = "http://localhost:8080/a2a")
val agentCardResolver = UrlAgentCardResolver(
baseUrl = "http://localhost:8080",
)
val client = A2AClient(transport, agentCardResolver)
client.connect()
次にメッセージを送信します。
val text = "Message"
val message = Message(
messageId = UUID.randomUUID().toString(),
role = Role.User,
parts = listOf(TextPart(text)),
contextId = contextId,
)
val request = Request(data = MessageSendParams(message))
val response = client.sendMessageStreaming(request)
sendMessageStreamingオブジェクトの返り値はFlowになるので、終端まで読み込みます。
SessionEventProcessorオブジェクトから送信されたデータがそのままデシリアライズされるので、データの種類ごとに処理を分けます。
response.collect { eventResponse ->
when (val event = eventResponse.data) {
is Message -> {
val responseText = event.parts
.filterIsInstance<TextPart>()
.joinToString { part -> part.text }
println(responseText)
}
is Task -> {
println(event.status.state)
}
is TaskStatusUpdateEvent -> {
event.status.message?.parts
?.filterIsInstance<TextPart>()
?.joinToString { part -> part.text }
?.let { responseText ->
println(responseText)
}
if (event.final) {
println("Completed")
}
}
is TaskArtifactUpdateEvent -> {
val artifactText = event.artifact.parts
.filterIsInstance<TextPart>()
.joinToString { part -> part.text }
val artifactName = event.artifact.name ?: "unnamed"
println(artifactText)
if (event.lastChunk == true) {
println("Completed")
}
}
}
}
おわりに
今回は、簡単なA2Aサーバーとクライアントの実装を通して、A2Aプロトコルを紹介してみました。
A2Aプロトコルの全てを紹介できたわけではありませんが、読んでくださった方の理解にふんわりと貢献できると幸いです。
Discussion