A2Aプロトコルで『勝手に終わる問題』をどう制御するか
はじめに
現在、コーディングなど、AIエージェントを使った自動化や効率化が進んでいます。
そのうち、AIエージェントが呼び出すtoolやAIエージェントをサービスとして公開するビジネスなども出てくるかと思いますし、AWS AgentCoreなど、それを可能にするサービスも登場しています。
AIエージェントを公開するときに使われるプロトコルとしてAgent to Agentがありますが、ちょっとクセがあり、アプリ側で工夫が必要かなと感じる場面があったので、備忘録として記録しようと思います。
A2Aプロトコルとは
MCPを補完するプロトコルとして、Googleが提唱した**A2A(Agent to Agent Protocol)**という仕組みがあります。
これはAIエージェント同士が連携するための仕組みで、Claude Codeでいうところのサブエージェントに近い機能です。
サブエージェントは自分で使うことが目的ですが、今後作ったAIエージェントを他人に共有したい! となった場合や、AIエージェントをサービスとして公開するとき、エージェントとやり取りするときの共通ルールがあればいいよね、という背景のもと考え出されたプロトコルになります。
A2Aプロトコルの処理と問題点
A2Aでは、メインのエージェントがサブエージェントに対してメッセージを送り、サブエージェントが実際の作業(タスク)を行います。
詳しい仕様については公式ページを参照してください。
タスクの進捗は 状態遷移(ステートマシン) で管理され、状態は主に以下の5つです。
| 状態 | 意味 |
|---|---|
submitted |
メッセージが送信され、エージェントが受け付けた |
working |
エージェントが処理中 |
input-required |
エージェントが追加の入力を待っている |
completed |
タスクが正常に完了した |
canceled |
タスクがキャンセルされた |
処理の流れをシーケンス図で表現すると以下のようになります。
フローには記載していませんが、タスクをキャンセルするAPIが別で用意され、それを呼び出すことでいつでもcanceledにすることができます。
ここで問題なのは、タスクをcompletedにするのがサブエージェント側であり、原則として一度completedとなったタスクではもうやり取りができなくなるという点です。
つまりサブエージェントが一方的にcompletedを宣言し、変な成果物を出してきたとしても、メインのエージェントはやり直しや修正をさせることができません。
試しにA2Aの公式SDKを使って、適当なA2Aサーバーとクライアントを作って実行してみます。pyproject.tomlとテストコードを以下に示します。
[project]
name = "snippets"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"a2a-sdk[http-server]>=1.0.1",
"fastapi>=0.136.1",
"uvicorn>=0.46.0",
]
from a2a.server.agent_execution import AgentExecutor
from a2a.types import Part, TaskState, TaskStatus, TaskArtifactUpdateEvent, TaskStatusUpdateEvent
from a2a.helpers import new_task_from_user_message, new_artifact
#############################################################
# Agent Executor #
#############################################################
class TestExecutor(AgentExecutor):
async def execute(self, context, event_queue):
if not context.current_task:
task = new_task_from_user_message(context.message)
await event_queue.enqueue_event(task)
else:
task = context.current_task
task_id = task.id
context_id = task.context_id
# 状態をSUBMITTEDにする
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED)
)
)
# 状態をWORKINGにする
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_WORKING)
)
)
# AIからの回答(Artifact)を生成。今回はAIを使わずHelloと返すだけ
artifact = new_artifact(parts=[Part(text="Hello!")], name="answer")
await event_queue.enqueue_event(
TaskArtifactUpdateEvent(
task_id=task_id,
context_id=context_id,
artifact=artifact,
append=False,
last_chunk=True
)
)
# 状態をCOMPLETEDにする
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED)
)
)
async def cancel(self, context, event_queue):
if not context.current_task:
return
# 状態をCANCELEDにする
task = context.current_task
status = TaskStatus(state=TaskState.TASK_STATE_CANCELED)
event = {"context_id": task.context_id, "task_id": task.id}
event = event | {"status": status}
event = TaskStatusUpdateEvent(**event)
await event_queue.enqueue_event(event)
#############################################################
# ASGI APP #
#############################################################
from fastapi import FastAPI
from a2a.server.routes import create_jsonrpc_routes, create_agent_card_routes
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCard, AgentCapabilities, AgentInterface, AgentSkill
jsonrpc_path = "/hello-agent/v1/jsonrpc/"
hello_agent_interface = [
AgentInterface(url="http://localhost:8000" + jsonrpc_path, protocol_binding="JSONRPC")
]
hello_agent_card = AgentCard(
name="hello agent",
description="Say Hello",
supported_interfaces=hello_agent_interface,
skills=[AgentSkill(id="say-hello-skill", name="say-hello-skill")],
default_input_modes=["text/plain"],
default_output_modes=["text/plain"]
)
request_handler = DefaultRequestHandler(
agent_card=hello_agent_card,
agent_executor=TestExecutor(),
task_store=InMemoryTaskStore()
)
card_routes = create_agent_card_routes(hello_agent_card)
api_routes = create_jsonrpc_routes(request_handler=request_handler, rpc_url=jsonrpc_path)
app = FastAPI(routes=card_routes + api_routes)
from uuid import uuid4
from asyncio import run
from httpx import AsyncClient
from a2a.client import create_client, A2ACardResolver, ClientConfig
from a2a.types import Part, Message, Role, SendMessageRequest, Task, ListTasksRequest
async def main():
async with AsyncClient() as httpx_client:
# Agent Cardを探して取得する
card_url = "http://localhost:8000"
resolver = A2ACardResolver(httpx_client=httpx_client, base_url=card_url)
agent_card = (await resolver.get_agent_card())
# Agent Cardの情報を使ってクライアントを作成
print(agent_card)
config = ClientConfig(streaming=False)
a2a_client = await create_client(agent_card, config)
# メッセージを送る(1回目)
parts = [Part(text='Say hello.')]
message = Message(role=Role.ROLE_USER, parts=parts, message_id=uuid4().hex)
response = a2a_client.send_message(request=SendMessageRequest(message=message))
# 応答を格納する
tasks = list[Task]()
async for chunk in response:
tasks.append(chunk.task)
print("----応答確認(1回目)----")
print(tasks)
print("-----応答確認(1回目)ここまで----")
# 現在のタスクIDとコンテクストIDを使い、再度メッセージ送信
task_id = tasks[-1].id
context_id = tasks[-1].context_id
parts = [Part(text='Say hello again.')]
message = Message(task_id=task_id, context_id=context_id, role=Role.ROLE_USER, parts=parts, message_id=uuid4().hex)
response = a2a_client.send_message(request=SendMessageRequest(message=message))
# 応答を格納する(COMPLETEDになっている場合、ここでエラー)
async for chunk in response:
tasks.append(chunk.task)
# 応答を確認する
print("-----応答確認(2回目)-----")
print(tasks)
print("-----応答確認(2回目)ここまで-----")
run(main())
uvicornを使ってserver.pyを起動し、その状態でclient.pyを実行すると
$ python ./client.py
> name: "hello agent"
> description: "Say Hello"
> supported_interfaces {
> url: "http://localhost:8000/hello-agent/v1/jsonrpc/"
> protocol_binding: "JSONRPC"
> }
> capabilities {
> }
> default_input_modes: "text/plain"
> default_output_modes: "text/plain"
> skills {
> id: "say-hello-skill"
> name: "say-hello-skill"
> }
>
> ----応答確認(1回目)----
> [id: "78f1680e-3c12-4915-83a7-a85f937d9722"
> context_id: "8558c2f4-53d1-47a4-a543-cc7a8f41a66c"
> status {
> state: TASK_STATE_COMPLETED
> }
> artifacts {
> artifact_id: "8fafd81d-e92b-43af-abef-6ad6a9b21aa5"
> name: "answer"
> parts {
> text: "Hello!"
> }
> }
> history {
> message_id: "7a254b36f5aa4987914a9a28b459adde"
> context_id: "8558c2f4-53d1-47a4-a543-cc7a8f41a66c"
> task_id: "78f1680e-3c12-4915-83a7-a85f937d9722"
> role: ROLE_USER
> parts {
> text: "Say hello."
> }
> }
> ]
> -----応答確認(1回目)ここまで----
> a2a.utils.errors.InvalidParamsError: Task 78f1680e-3c12-4915-83a7-a85f937d9722 is in terminal state: 3
という結果になります。
タスクの状態COMPLETEDが"terminal state"なので、もうメッセージは受け入れませんということです。
アプリ側での応急処置
A2Aの利点として、公式ページではstateful、マルチターンが挙げられていますが、一方的にタスク完了を宣言し、その後の会話ができなくなるのであれば、あまりその長所を活かせず、「AIエージェントを呼び出すMCPツール」とあまり変わりません。
とはいえ、費用を抑えるためにもタスクを管理するデータベースの容量は少ないほうがよいですし、タスクをデータベースから削除して良いというフラグとして、会話を継続できなくするcompletedがあっても良いと思います。
現時点ではアプリ側で対処すべきと考えたので、server.pyを以下のように変更しました。
class TestExecutor(AgentExecutor):
async def execute(self, context, event_queue):
if not context.current_task:
task = new_task_from_user_message(context.message)
await event_queue.enqueue_event(task)
else:
task = context.current_task
task_id = task.id
context_id = task.context_id
# 【追加】メッセージが空かつ`state`が`INPUT_REQUIRED`だったら`COMPLETED`にして終了
if not context.message:
if task.status.state != TaskState.TASK_STATE_INPUT_REQUIRED:
return
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED)
)
)
return
# 【追加】状態をSUBMITTEDにする
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED)
)
)
# 状態をWORKINGにする
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_WORKING)
)
)
# AIからの回答(Artifact)を生成。今回はAIを使わずHelloと返すだけ
artifact = new_artifact(parts=[Part(text="Hello!")], name="answer")
await event_queue.enqueue_event(
TaskArtifactUpdateEvent(
task_id=task_id,
context_id=context_id,
artifact=artifact,
append=False,
last_chunk=True
)
)
# 【変更】状態をINPUT_REQUIREDにする
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_INPUT_REQUIRED)
)
)
...
この状態で再度client.pyを実行してみます。
$python ./client.py
> name: "hello agent"
> description: "Say Hello"
> supported_interfaces {
> url: "http://localhost:8000/hello-agent/v1/jsonrpc/"
> protocol_binding: "JSONRPC"
> }
> capabilities {
> }
> default_input_modes: "text/plain"
> default_output_modes: "text/plain"
> skills {
> id: "say-hello-skill"
> name: "say-hello-skill"
> }
>
> ----応答確認(1回目)----
> [id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> status {
> state: TASK_STATE_INPUT_REQUIRED
> }
> artifacts {
> artifact_id: "ea173f49-b3b5-42d9-977c-68831b8faba5"
> name: "answer"
> parts {
> text: "Hello!"
> }
> }
> history {
> message_id: "1cc27e7f7778405ca65d223c3067063f"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> role: ROLE_USER
> parts {
> text: "Say hello."
> }
> }
> ]
> -----応答確認(1回目)ここまで----
> -----応答確認(2回目)-----
> [id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> status {
> state: TASK_STATE_INPUT_REQUIRED
> }
> artifacts {
> artifact_id: "ea173f49-b3b5-42d9-977c-68831b8faba5"
> name: "answer"
> parts {
> text: "Hello!"
> }
> }
> history {
> message_id: "1cc27e7f7778405ca65d223c3067063f"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> role: ROLE_USER
> parts {
> text: "Say hello."
> }
> }
> , id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> status {
> state: TASK_STATE_INPUT_REQUIRED
> }
> artifacts {
> artifact_id: "ea173f49-b3b5-42d9-977c-68831b8faba5"
> name: "answer"
> parts {
> text: "Hello!"
> }
> }
> artifacts {
> artifact_id: "f30b31cd-1ea8-48c4-b8e8-05954c4277da"
> name: "answer"
> parts {
> text: "Hello!"
> }
> }
> history {
> message_id: "1cc27e7f7778405ca65d223c3067063f"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> role: ROLE_USER
> parts {
> text: "Say hello."
> }
> }
> history {
> message_id: "20df4b7361d8433e9a1633c15874a185"
> context_id: "7fa03275-7cfe-4c17-a514-339f4f5da7f1"
> task_id: "9ba28658-9b6a-4899-b47a-a1290f4902e0"
> role: ROLE_USER
> parts {
> text: "Say hello again."
> }
> }
> ]
> -----応答確認(2回目)ここまで-----
これで作業完了後も会話を継続でき、完全に作業を終わらせたければ空のメッセージを送るか、タスクをcancelすればOKです。
まとめ
今回は、A2Aのstateful、マルチターンという利点を活かすため、input-requiredのまま放置するというアプローチをご紹介しました。
あくまで私個人のやり方ですが、実装例のひとつとして参考になれば幸いです。
Discussion