🌟

A2Aプロトコルで『勝手に終わる問題』をどう制御するか

に公開

はじめに

現在、コーディングなど、AIエージェントを使った自動化や効率化が進んでいます。
そのうち、AIエージェントが呼び出すtoolやAIエージェントをサービスとして公開するビジネスなども出てくるかと思いますし、AWS AgentCoreなど、それを可能にするサービスも登場しています。
AIエージェントを公開するときに使われるプロトコルとしてAgent to Agentがありますが、ちょっとクセがあり、アプリ側で工夫が必要かなと感じる場面があったので、備忘録として記録しようと思います。

A2Aプロトコルとは

MCPを補完するプロトコルとして、Googleが提唱した**A2A(Agent to Agent Protocol)**という仕組みがあります。
これはAIエージェント同士が連携するための仕組みで、Claude Codeでいうところのサブエージェントに近い機能です。
サブエージェントは自分で使うことが目的ですが、今後作ったAIエージェントを他人に共有したい! となった場合や、AIエージェントをサービスとして公開するとき、エージェントとやり取りするときの共通ルールがあればいいよね、という背景のもと考え出されたプロトコルになります。

A2Aプロトコルの処理と問題点

A2Aでは、メインのエージェントがサブエージェントに対してメッセージを送り、サブエージェントが実際の作業(タスク)を行います。
詳しい仕様については公式ページを参照してください。
タスクの進捗は 状態遷移(ステートマシン) で管理され、状態は主に以下の5つです。

状態 意味
submitted メッセージが送信され、エージェントが受け付けた
working エージェントが処理中
input-required エージェントが追加の入力を待っている
completed タスクが正常に完了した
canceled タスクがキャンセルされた

処理の流れをシーケンス図で表現すると以下のようになります。
フローには記載していませんが、タスクをキャンセルするAPIが別で用意され、それを呼び出すことでいつでもcanceledにすることができます。

ここで問題なのは、タスクをcompletedにするのがサブエージェント側であり、原則として一度completedとなったタスクではもうやり取りができなくなるという点です。
つまりサブエージェントが一方的にcompletedを宣言し、変な成果物を出してきたとしても、メインのエージェントはやり直しや修正をさせることができません。
試しにA2Aの公式SDKを使って、適当なA2Aサーバーとクライアントを作って実行してみます。pyproject.tomlとテストコードを以下に示します。

pyproject.toml
[project]
name = "snippets"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "a2a-sdk[http-server]>=1.0.1",
    "fastapi>=0.136.1",
    "uvicorn>=0.46.0",
]
server.py
from a2a.server.agent_execution import AgentExecutor
from a2a.types import Part, TaskState, TaskStatus, TaskArtifactUpdateEvent, TaskStatusUpdateEvent
from a2a.helpers import new_task_from_user_message, new_artifact


#############################################################
#                   Agent Executor                          #
#############################################################
class TestExecutor(AgentExecutor):
    async def execute(self, context, event_queue):
        if not context.current_task:
            task = new_task_from_user_message(context.message)
            await event_queue.enqueue_event(task)
        else:
            task = context.current_task
        
        task_id = task.id
        context_id = task.context_id

        # 状態をSUBMITTEDにする
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED)
            )
        )

        # 状態をWORKINGにする
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.TASK_STATE_WORKING)
            )
        )

        # AIからの回答(Artifact)を生成。今回はAIを使わずHelloと返すだけ
        artifact = new_artifact(parts=[Part(text="Hello!")], name="answer")
        await event_queue.enqueue_event(
            TaskArtifactUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                artifact=artifact,
                append=False,
                last_chunk=True
            )
        )

        # 状態をCOMPLETEDにする
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED)
            )
        )
    
    async def cancel(self, context, event_queue):
        if not context.current_task:
            return
        
        # 状態をCANCELEDにする
        task = context.current_task
        status = TaskStatus(state=TaskState.TASK_STATE_CANCELED)
        event = {"context_id": task.context_id, "task_id": task.id}
        event = event | {"status": status}
        event = TaskStatusUpdateEvent(**event)
        await event_queue.enqueue_event(event)


#############################################################
#                   ASGI APP                                #
#############################################################
from fastapi import FastAPI
from a2a.server.routes import create_jsonrpc_routes, create_agent_card_routes
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCard, AgentCapabilities, AgentInterface, AgentSkill


jsonrpc_path = "/hello-agent/v1/jsonrpc/"
hello_agent_interface = [
    AgentInterface(url="http://localhost:8000" + jsonrpc_path, protocol_binding="JSONRPC")
]
hello_agent_card = AgentCard(
    name="hello agent",
    description="Say Hello",
    supported_interfaces=hello_agent_interface,
    skills=[AgentSkill(id="say-hello-skill", name="say-hello-skill")],
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"]
)
request_handler = DefaultRequestHandler(
    agent_card=hello_agent_card,
    agent_executor=TestExecutor(),
    task_store=InMemoryTaskStore()
)


card_routes = create_agent_card_routes(hello_agent_card)
api_routes = create_jsonrpc_routes(request_handler=request_handler, rpc_url=jsonrpc_path)
app = FastAPI(routes=card_routes + api_routes)
client.py
from uuid import uuid4
from asyncio import run
from httpx import AsyncClient
from a2a.client import create_client, A2ACardResolver, ClientConfig
from a2a.types import Part, Message, Role, SendMessageRequest, Task, ListTasksRequest


async def main():
    async with AsyncClient() as httpx_client:
        # Agent Cardを探して取得する
        card_url = "http://localhost:8000"
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=card_url)
        agent_card = (await resolver.get_agent_card())

    # Agent Cardの情報を使ってクライアントを作成
    print(agent_card)
    config = ClientConfig(streaming=False)
    a2a_client = await create_client(agent_card, config)

    # メッセージを送る(1回目)
    parts = [Part(text='Say hello.')]
    message = Message(role=Role.ROLE_USER, parts=parts, message_id=uuid4().hex)
    response = a2a_client.send_message(request=SendMessageRequest(message=message))

    # 応答を格納する
    tasks = list[Task]()

    async for chunk in response:
        tasks.append(chunk.task)
    
    print("----応答確認(1回目)----")
    print(tasks)
    print("-----応答確認(1回目)ここまで----")

    # 現在のタスクIDとコンテクストIDを使い、再度メッセージ送信
    task_id = tasks[-1].id
    context_id = tasks[-1].context_id
    parts = [Part(text='Say hello again.')]
    message = Message(task_id=task_id, context_id=context_id, role=Role.ROLE_USER, parts=parts, message_id=uuid4().hex)
    response = a2a_client.send_message(request=SendMessageRequest(message=message))
    
    # 応答を格納する(COMPLETEDになっている場合、ここでエラー)
    async for chunk in response:
        tasks.append(chunk.task)

    # 応答を確認する
    print("-----応答確認(2回目)-----")
    print(tasks)
    print("-----応答確認(2回目)ここまで-----")

run(main())

uvicornを使ってserver.pyを起動し、その状態でclient.pyを実行すると

$ python ./client.py
> name: "hello agent"
> description: "Say Hello"
> supported_interfaces {
>   url: "http://localhost:8000/hello-agent/v1/jsonrpc/"
>   protocol_binding: "JSONRPC"
> }
> capabilities {
> }
> default_input_modes: "text/plain"
> default_output_modes: "text/plain"
> skills {
>   id: "say-hello-skill"
>   name: "say-hello-skill"
> }
> 
> ----応答確認(1回目)----
> [id: "78f1680e-3c12-4915-83a7-a85f937d9722"
> context_id: "8558c2f4-53d1-47a4-a543-cc7a8f41a66c"
> status {
>   state: TASK_STATE_COMPLETED
> }
> artifacts {
>   artifact_id: "8fafd81d-e92b-43af-abef-6ad6a9b21aa5"
>   name: "answer"
>   parts {
>     text: "Hello!"
>   }
> }
> history {
>   message_id: "7a254b36f5aa4987914a9a28b459adde"
>   context_id: "8558c2f4-53d1-47a4-a543-cc7a8f41a66c"
>   task_id: "78f1680e-3c12-4915-83a7-a85f937d9722"
>   role: ROLE_USER
>   parts {
>     text: "Say hello."
>   }
> }
> ]
> -----応答確認(1回目)ここまで----
> a2a.utils.errors.InvalidParamsError: Task 78f1680e-3c12-4915-83a7-a85f937d9722 is in terminal state: 3

という結果になります。
タスクの状態COMPLETEDが"terminal state"なので、もうメッセージは受け入れませんということです。

アプリ側での応急処置

A2Aの利点として、公式ページではstateful、マルチターンが挙げられていますが、一方的にタスク完了を宣言し、その後の会話ができなくなるのであれば、あまりその長所を活かせず、「AIエージェントを呼び出すMCPツール」とあまり変わりません。
とはいえ、費用を抑えるためにもタスクを管理するデータベースの容量は少ないほうがよいですし、タスクをデータベースから削除して良いというフラグとして、会話を継続できなくするcompletedがあっても良いと思います。
現時点ではアプリ側で対処すべきと考えたので、server.pyを以下のように変更しました。

server.py
class TestExecutor(AgentExecutor):
    async def execute(self, context, event_queue):
        if not context.current_task:
            task = new_task_from_user_message(context.message)
            await event_queue.enqueue_event(task)
        else:
            task = context.current_task
        
        task_id = task.id
        context_id = task.context_id

        # 【追加】メッセージが空かつ`state`が`INPUT_REQUIRED`だったら`COMPLETED`にして終了
        if not context.message:
            if task.status.state != TaskState.TASK_STATE_INPUT_REQUIRED:
                return
            
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    task_id=task_id,
                    context_id=context_id,
                    status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED)
                )
            )
            return

        # 【追加】状態をSUBMITTEDにする
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED)
            )
        )

        # 状態をWORKINGにする
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.TASK_STATE_WORKING)
            )
        )

        # AIからの回答(Artifact)を生成。今回はAIを使わずHelloと返すだけ
        artifact = new_artifact(parts=[Part(text="Hello!")], name="answer")
        await event_queue.enqueue_event(
            TaskArtifactUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                artifact=artifact,
                append=False,
                last_chunk=True
            )
        )

        # 【変更】状態をINPUT_REQUIREDにする
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.TASK_STATE_INPUT_REQUIRED)
            )
        )
    ...

この状態で再度client.pyを実行してみます。

$python ./client.py
> name: "hello agent"
> description: "Say Hello"
> supported_interfaces {
>   url: "http://localhost:8000/hello-agent/v1/jsonrpc/"
>   protocol_binding: "JSONRPC"
> }
> capabilities {
> }
> default_input_modes: "text/plain"
> default_output_modes: "text/plain"
> skills {
>   id: "say-hello-skill"
>   name: "say-hello-skill"
> }
>
> ----応答確認(1回目)----
> [id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> status {
>   state: TASK_STATE_INPUT_REQUIRED
> }
> artifacts {
>   artifact_id: "ea173f49-b3b5-42d9-977c-68831b8faba5"
>   name: "answer"
>   parts {
>     text: "Hello!"
>   }
> }
> history {
>   message_id: "1cc27e7f7778405ca65d223c3067063f"
>   context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
>   task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
>   role: ROLE_USER
>   parts {
>     text: "Say hello."
>   }
> }
> ]
> -----応答確認(1回目)ここまで----
> -----応答確認(2回目)-----
> [id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> status {
>   state: TASK_STATE_INPUT_REQUIRED
> }
> artifacts {
>   artifact_id: "ea173f49-b3b5-42d9-977c-68831b8faba5"
>   name: "answer"
>   parts {
>     text: "Hello!"
>   }
> }
> history {
>   message_id: "1cc27e7f7778405ca65d223c3067063f"
>   context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
>   task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
>   role: ROLE_USER
>   parts {
>     text: "Say hello."
>   }
> }
> , id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> status {
>   state: TASK_STATE_INPUT_REQUIRED
> }
> artifacts {
>   artifact_id: "ea173f49-b3b5-42d9-977c-68831b8faba5"
>   name: "answer"
>   parts {
>     text: "Hello!"
>   }
> }
> artifacts {
>   artifact_id: "f30b31cd-1ea8-48c4-b8e8-05954c4277da"
>   name: "answer"
>   parts {
>     text: "Hello!"
>   }
> }
> history {
>   message_id: "1cc27e7f7778405ca65d223c3067063f"
>   context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
>   task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
>   role: ROLE_USER
>   parts {
>     text: "Say hello."
>   }
> }
> history {
>   message_id: "20df4b7361d8433e9a1633c15874a185"
>   context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
>   task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
>   role: ROLE_USER
>   parts {
>     text: "Say hello again."
>   }
> }
> ]
> -----応答確認(2回目)ここまで-----

これで作業完了後も会話を継続でき、完全に作業を終わらせたければ空のメッセージを送るか、タスクをcancelすればOKです。

まとめ

今回は、A2Aのstateful、マルチターンという利点を活かすため、input-requiredのまま放置するというアプローチをご紹介しました。
あくまで私個人のやり方ですが、実装例のひとつとして参考になれば幸いです。

Discussion