🔭

A2AサーバをOpenTelemetryで計装する

に公開

A2A におけるオブザーバビリティの必要性

A2A[1]は Google が主導し開発を進めている、エージェント間の通信を可能にするオープンプロトコルです。
A2A を利用することで生成 AI アプリケーションはマルチエージェントシステムとして実装されます。

マルチエージェントシステムは分散システムであり、マイクロサービスと同様にオブザーバビリティが重要となります。
小さなエージェントであればわざわざ A2A でクライアントとサーバに分ける必要はありませんが、エージェントが巨大化すれば従来の Web アプリケーションの潮流と同様に分割される方向で進化するでしょう。

本記事ではA2Aサーバ(とクライアント)に対してOpenTelemetryによる計装を検証します。

A2A クライアントとサーバの準備

A2A サーバ

サーバはなんでも良いので公式のチュートリアル[2]をほとんどそのまま使います。

コード
task_manager.py
from typing import AsyncIterable

from common.server.task_manager import InMemoryTaskManager
from common.types import (
    Artifact,
    JSONRPCResponse,
    Message,
    SendTaskRequest,
    SendTaskResponse,
    SendTaskStreamingRequest,
    SendTaskStreamingResponse,
    Task,
    TaskState,
    TaskStatus,
)


class MyAgentTaskManager(InMemoryTaskManager):
    def __init__(self):
        super().__init__()

    async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        # Upsert a task stored by InMemoryTaskManager
        await self.upsert_task(request.params)

        task_id = request.params.id
        # Our custom logic that simply marks the task as complete
        # and returns the echo text
        received_text = request.params.message.parts[0].text
        task = await self._update_task(
            task_id=task_id,
            task_state=TaskState.COMPLETED,
            response_text=f"on_send_task received: {received_text}",
        )

        # Send the response
        return SendTaskResponse(id=request.id, result=task)

    async def on_send_task_subscribe(
        self, request: SendTaskStreamingRequest
    ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
        pass

    async def _update_task(
        self,
        task_id: str,
        task_state: TaskState,
        response_text: str,
    ) -> Task:
        task = self.tasks[task_id]
        agent_response_parts = [
            {
                "type": "text",
                "text": response_text,
            }
        ]
        task.status = TaskStatus(
            state=task_state,
            message=Message(
                role="agent",
                parts=agent_response_parts,
            ),
        )
        task.artifacts = [
            Artifact(
                parts=agent_response_parts,
            )
        ]
        return task
server.py
import logging

import click
from common.types import AgentSkill, AgentCapabilities, AgentCard
from common.server import A2AServer

from task_manager import MyAgentTaskManager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):
    skill = AgentSkill(
        id="my-project-echo-skill",
        name="Echo Tool",
        description="Echos the input given",
        tags=["echo", "repeater"],
        examples=["I will see this echoed back to me"],
        inputModes=["text"],
        outputModes=["text"],
    )

    capabilities = AgentCapabilities()
    agent_card = AgentCard(
        name="Echo Agent",
        description="This agent echos the input given",
        url=f"http://{host}:{port}/",
        version="0.1.0",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        capabilities=capabilities,
        skills=[skill],
    )

    task_manager = MyAgentTaskManager()
    server = A2AServer(
        agent_card=agent_card,
        task_manager=task_manager,
        host=host,
        port=port,
    )
    server.start()


if __name__ == "__main__":
    main()

A2A クライアント

クライアントはチュートリアルでは google-a2a-cli なるものを使っているのですが、今回はあとで分散トレースを確認したかったため自作します。
(追記:いらなかったかも)
タスクを 1 つ送信するだけのスクリプトを用意します。

client.py
from common.client import A2AClient
import asyncio
from uuid import uuid4

client = A2AClient(url="http://localhost:10002")
res = asyncio.run(
    client.send_task(
        {
            "id": str(uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Hello world"}],
            },
        }
    )
)
print(res.result)

動作確認

クライアントを実行するとタスクの結果が表示されます。

uv run client.py
id='a3b1ed42-d64c-455c-bbbe-8b1738b8a29f' sessionId='25d73c6a2c5c444186838eeff4c6cd5f' status=TaskStatus(state=<TaskState.COMPLETED: 'completed'>, message=Message(role='agent', parts=[TextPart(type='text', text='on_send_task received: Hello world', metadata=None)], metadata=None), timestamp=datetime.datetime(2025, 5, 5, 19, 14, 43, 656293)) artifacts=[Artifact(name=None, description=None, parts=[TextPart(type='text', text='on_send_task received: Hello world', metadata=None)], metadata=None, index=0, append=None, lastChunk=None)] history=[Message(role='user', parts=[TextPart(type='text', text='Hello world', metadata=None)], metadata=None)] metadata=None

OpenTelemetry による計装

実装

基本的な流れは OpenTelemetry の Getting Started[3]に従います。
ただし opentelemetry-bootstrap による依存関係のインストールをしてしまうと、サーバ側のルートスパンが Click のスパンになります。
クライアントとの分散トレースができなくなってしまうため、opentelemetry-bootstrap によるインストールはしません。

A2A サーバは HTTP サーバとして Starlette を利用しています。
https://github.com/google/A2A/blob/f8cff21a15667fd5223946574a9ed33ecccd6151/samples/python/common/server/server.py#L44-L48

この Starlette アプリケーションは A2AServer の app インスタンス変数なので、以下のように計装します。

uv add opentelemetry-instrumentation-starlette
server.py
+StarletteInstrumentor.instrument_app(server.app)

A2A クライアントは HTTP クライアントとして httpx を利用しています。
https://github.com/google/A2A/blob/f8cff21a15667fd5223946574a9ed33ecccd6151/samples/python/common/client/client.py#L55-L62

こちらは instrumentation をインストールするだけです。

uv add opentelemetry-instrumentation-httpx

動作確認

トレースを確認するため、Jaeger を起動しておきます。

docker run -d --name jaeger \
  -p 16686:16686 \
  -p 4317:4317 \
  -p 4318:4318 \
  -p 5778:5778 \
  -p 9411:9411 \
  jaegertracing/jaeger:2.5.0

再度 A2A クライアントを実行すると分散トレースが取得できました。

以上です。
コードは以下のリポジトリにあります。
https://github.com/YunosukeY/otel-and-a2a-sample

脚注
  1. https://github.com/google/A2A/tree/main ↩︎

  2. https://google.github.io/A2A/tutorials/python/1-introduction/ ↩︎

  3. https://opentelemetry.io/docs/languages/python/getting-started/ ↩︎

Discussion