A2AサーバをOpenTelemetryで計装する
A2A におけるオブザーバビリティの必要性
A2A[1]は Google が主導し開発を進めている、エージェント間の通信を可能にするオープンプロトコルです。
A2A を利用することで生成 AI アプリケーションはマルチエージェントシステムとして実装されます。
マルチエージェントシステムは分散システムであり、マイクロサービスと同様にオブザーバビリティが重要となります。
小さなエージェントであればわざわざ A2A でクライアントとサーバに分ける必要はありませんが、エージェントが巨大化すれば従来の Web アプリケーションの潮流と同様に分割される方向で進化するでしょう。
本記事ではA2Aサーバ(とクライアント)に対してOpenTelemetryによる計装を検証します。
A2A クライアントとサーバの準備
A2A サーバ
サーバはなんでも良いので公式のチュートリアル[2]をほとんどそのまま使います。
コード
from typing import AsyncIterable
from common.server.task_manager import InMemoryTaskManager
from common.types import (
Artifact,
JSONRPCResponse,
Message,
SendTaskRequest,
SendTaskResponse,
SendTaskStreamingRequest,
SendTaskStreamingResponse,
Task,
TaskState,
TaskStatus,
)
class MyAgentTaskManager(InMemoryTaskManager):
def __init__(self):
super().__init__()
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
# Upsert a task stored by InMemoryTaskManager
await self.upsert_task(request.params)
task_id = request.params.id
# Our custom logic that simply marks the task as complete
# and returns the echo text
received_text = request.params.message.parts[0].text
task = await self._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=f"on_send_task received: {received_text}",
)
# Send the response
return SendTaskResponse(id=request.id, result=task)
async def on_send_task_subscribe(
self, request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
pass
async def _update_task(
self,
task_id: str,
task_state: TaskState,
response_text: str,
) -> Task:
task = self.tasks[task_id]
agent_response_parts = [
{
"type": "text",
"text": response_text,
}
]
task.status = TaskStatus(
state=task_state,
message=Message(
role="agent",
parts=agent_response_parts,
),
)
task.artifacts = [
Artifact(
parts=agent_response_parts,
)
]
return task
import logging
import click
from common.types import AgentSkill, AgentCapabilities, AgentCard
from common.server import A2AServer
from task_manager import MyAgentTaskManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):
skill = AgentSkill(
id="my-project-echo-skill",
name="Echo Tool",
description="Echos the input given",
tags=["echo", "repeater"],
examples=["I will see this echoed back to me"],
inputModes=["text"],
outputModes=["text"],
)
capabilities = AgentCapabilities()
agent_card = AgentCard(
name="Echo Agent",
description="This agent echos the input given",
url=f"http://{host}:{port}/",
version="0.1.0",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=capabilities,
skills=[skill],
)
task_manager = MyAgentTaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
server.start()
if __name__ == "__main__":
main()
A2A クライアント
クライアントはチュートリアルでは google-a2a-cli なるものを使っているのですが、今回はあとで分散トレースを確認したかったため自作します。
(追記:いらなかったかも)
タスクを 1 つ送信するだけのスクリプトを用意します。
from common.client import A2AClient
import asyncio
from uuid import uuid4
client = A2AClient(url="http://localhost:10002")
res = asyncio.run(
client.send_task(
{
"id": str(uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Hello world"}],
},
}
)
)
print(res.result)
動作確認
クライアントを実行するとタスクの結果が表示されます。
uv run client.py
id='a3b1ed42-d64c-455c-bbbe-8b1738b8a29f' sessionId='25d73c6a2c5c444186838eeff4c6cd5f' status=TaskStatus(state=<TaskState.COMPLETED: 'completed'>, message=Message(role='agent', parts=[TextPart(type='text', text='on_send_task received: Hello world', metadata=None)], metadata=None), timestamp=datetime.datetime(2025, 5, 5, 19, 14, 43, 656293)) artifacts=[Artifact(name=None, description=None, parts=[TextPart(type='text', text='on_send_task received: Hello world', metadata=None)], metadata=None, index=0, append=None, lastChunk=None)] history=[Message(role='user', parts=[TextPart(type='text', text='Hello world', metadata=None)], metadata=None)] metadata=None
OpenTelemetry による計装
実装
基本的な流れは OpenTelemetry の Getting Started[3]に従います。
ただし opentelemetry-bootstrap による依存関係のインストールをしてしまうと、サーバ側のルートスパンが Click のスパンになります。
クライアントとの分散トレースができなくなってしまうため、opentelemetry-bootstrap によるインストールはしません。
A2A サーバは HTTP サーバとして Starlette を利用しています。
この Starlette アプリケーションは A2AServer の app インスタンス変数なので、以下のように計装します。
uv add opentelemetry-instrumentation-starlette
+StarletteInstrumentor.instrument_app(server.app)
A2A クライアントは HTTP クライアントとして httpx を利用しています。
こちらは instrumentation をインストールするだけです。
uv add opentelemetry-instrumentation-httpx
動作確認
トレースを確認するため、Jaeger を起動しておきます。
docker run -d --name jaeger \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
-p 5778:5778 \
-p 9411:9411 \
jaegertracing/jaeger:2.5.0
再度 A2A クライアントを実行すると分散トレースが取得できました。
以上です。
コードは以下のリポジトリにあります。
Discussion