😎
A2Aプロトコル開発ガイド
概要
A2A(Agent-to-Agent)プロトコルは、エージェント間の通信のために設計されたJSON-RPCベースの通信プロトコルです。このガイドでは、A2Aプロトコル仕様に準拠するサーバーとクライアントの両方のコンポーネントを開発するための包括的な手順を提供します。
目次
プロトコルの基本
A2Aプロトコルは、JSON-RPC 2.0の上に構築されており、エージェント通信のための一連のメソッドを定義しています。主要なコンポーネントには以下が含まれます:
メッセージ構造
すべてのA2Aメッセージは、以下の基本構造を持つJSON-RPC 2.0フォーマットに従います:
interface JSONRPCMessage {
jsonrpc?: "2.0";
id?: number | string | null;
method?: string;
params?: unknown;
result?: unknown;
error?: JSONRPCError;
}
プロトコルフロー
以下のシーケンス図は、A2Aプロトコルの主要な相互作用フローを示しています:
コアメソッド
プロトコルはいくつかのコアメソッドをサポートしています:
-
tasks/send
: エージェントにタスクメッセージを送信 -
tasks/get
: タスクのステータスを取得 -
tasks/cancel
: 実行中のタスクをキャンセル -
tasks/pushNotification/set
: タスクのプッシュ通知を設定 -
tasks/pushNotification/get
: プッシュ通知の設定を取得 -
tasks/resubscribe
: タスク更新の再購読 -
tasks/sendSubscribe
: タスクメッセージを送信し、更新を購読
タスクの状態
タスクは以下の状態のいずれかになります:
-
submitted
(提出済み) -
working
(処理中) -
input-required
(入力が必要) -
completed
(完了) -
canceled
(キャンセル済み) -
failed
(失敗) -
unknown
(不明)
サーバー実装
コアコンポーネント
サーバー実装はいくつかの主要なコンポーネントで構成されています:
- サーバー (server.ts): HTTPリクエストを処理するメインサーバー実装
- ハンドラー (handler.ts): A2Aプロトコルメッセージを処理するリクエストハンドラー
- ストア (store.ts): タスクの保存と管理
- ユーティリティ (utils.ts): ユーティリティ関数
- エラー処理 (error.ts): エラー定義と処理
基本的な使用法(概念)
import {
A2AServer,
InMemoryTaskStore,
TaskContext,
TaskYieldUpdate,
} from "./index"; // Assuming imports from the server package
// 1. Define your agent's logic as a TaskHandler
async function* myAgentLogic(
context: TaskContext
): AsyncGenerator<TaskYieldUpdate> {
console.log(`Handling task: ${context.task.id}`);
yield {
state: "working",
message: { role: "agent", parts: [{ text: "Processing..." }] },
};
// Simulate work...
await new Promise((resolve) => setTimeout(resolve, 1000));
if (context.isCancelled()) {
console.log("Task cancelled!");
yield { state: "canceled" };
return;
}
// Yield an artifact
yield {
name: "result.txt",
mimeType: "text/plain",
parts: [{ text: `Task ${context.task.id} completed.` }],
};
// Yield final status
yield {
state: "completed",
message: { role: "agent", parts: [{ text: "Done!" }] },
};
}
// 2. Create and start the server
const store = new InMemoryTaskStore(); // Or new FileStore()
const server = new A2AServer(myAgentLogic, { taskStore: store });
server.start(); // Starts listening on default port 41241
console.log("A2A Server started.");
サーバーは設定されたポート(デフォルト:3000)で起動します。
クライアント実装
主な機能:
- JSON-RPC通信: JSON-RPC 2.0仕様に従ってリクエストの送信とレスポンスの受信(標準およびServer-Sent Eventsを介したストリーミング)を処理します。
-
A2Aメソッド:
sendTask
、sendTaskSubscribe
、getTask
、cancelTask
、setTaskPushNotification
、getTaskPushNotification
、resubscribeTask
などの標準A2Aメソッドを実装しています。 - エラー処理: ネットワーク問題やJSON-RPCエラーに対する基本的なエラー処理を提供します。
-
ストリーミングサポート: リアルタイムのタスク更新(
sendTaskSubscribe
、resubscribeTask
)のためのServer-Sent Events(SSE)を管理します。 -
拡張性: 異なる環境(例:Node.js)向けにカスタム
fetch
実装の提供が可能です。
基本的な使用法
import { A2AClient, Task, TaskQueryParams, TaskSendParams } from "./client"; // Import necessary types
import { v4 as uuidv4 } from "uuid"; // Example for generating task IDs
const client = new A2AClient("http://localhost:41241"); // Replace with your server URL
async function run() {
try {
// Send a simple task (pass only params)
const taskId = uuidv4();
const sendParams: TaskSendParams = {
id: taskId,
message: { role: "user", parts: [{ text: "Hello, agent!" }] },
};
// Method now returns Task | null directly
const taskResult: Task | null = await client.sendTask(sendParams);
console.log("Send Task Result:", taskResult);
// Get task status (pass only params)
const getParams: TaskQueryParams = { id: taskId };
// Method now returns Task | null directly
const getTaskResult: Task | null = await client.getTask(getParams);
console.log("Get Task Result:", getTaskResult);
} catch (error) {
console.error("A2A Client Error:", error);
}
}
run();
ストリーミングの使用法
import {
A2AClient,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
TaskSendParams, // Use params type directly
} from "./client"; // Adjust path if necessary
import { v4 as uuidv4 } from "uuid";
const client = new A2AClient("http://localhost:41241");
async function streamTask() {
const streamingTaskId = uuidv4();
try {
console.log(`\n--- Starting streaming task ${streamingTaskId} ---`);
// Construct just the params
const streamParams: TaskSendParams = {
id: streamingTaskId,
message: { role: "user", parts: [{ text: "Stream me some updates!" }] },
};
// Pass only params to the client method
const stream = client.sendTaskSubscribe(streamParams);
// Stream now yields the event payloads directly
for await (const event of stream) {
// Type guard to differentiate events based on structure
if ("status" in event) {
// It's a TaskStatusUpdateEvent
const statusEvent = event as TaskStatusUpdateEvent; // Cast for clarity
console.log(
`[${streamingTaskId}] Status Update: ${statusEvent.status.state} - ${
statusEvent.status.message?.parts[0]?.text ?? "No message"
}`
);
if (statusEvent.final) {
console.log(`[${streamingTaskId}] Stream marked as final.`);
break; // Exit loop when server signals completion
}
} else if ("artifact" in event) {
// It's a TaskArtifactUpdateEvent
const artifactEvent = event as TaskArtifactUpdateEvent; // Cast for clarity
console.log(
`[${streamingTaskId}] Artifact Update: ${
artifactEvent.artifact.name ??
`Index ${artifactEvent.artifact.index}`
} - Part Count: ${artifactEvent.artifact.parts.length}`
);
// Process artifact content (e.g., artifactEvent.artifact.parts[0].text)
} else {
console.warn("Received unknown event structure:", event);
}
}
console.log(`--- Streaming task ${streamingTaskId} finished ---`);
} catch (error) {
console.error(`Error during streaming task ${streamingTaskId}:`, error);
}
}
streamTask();
Coderデモの実行
Coderデモは、コード関連のタスクを処理できるA2Aエージェントの実装例です。
セットアップ
- 依存関係をインストールします:
git clone https://github.com/sing1ee/a2a-agent-coder.git
#または
git clone git@github.com:sing1ee/a2a-agent-coder.git
bun install
- 必要な環境変数があることを確認します:
# set .env first
export $(cat .env | xargs)
デモの実行
- a2aサーバーを起動します(Node.js環境が必要):
bun run agents:coder
- a2aクライアントを起動します:
bun run a2a:cli
# 結果
$ bun x tsx src/cli.ts
A2A Terminal Client
Agent URL: http://localhost:41241
Attempting to fetch agent card from: http://localhost:41241/.well-known/agent.json
✓ Agent Card Found:
Name: Coder Agent
Description: An agent that generates code based on natural language instructions and streams file outputs.
Version: 0.0.1
Starting Task ID: a1a608b3-3015-4404-a83f-6ccc05083761
Enter messages, or use '/new' to start a new task.
Coder Agent > You: implement binary search
Sending...
Coder Agent [4:28:00 PM]: ⏳ Status: working
Part 1: 📝 Text: Generating code...
Coder Agent [4:28:02 PM]: ⏳ Status: working
Part 1: 📄 File: Name: src/algorithms/binary_search.py, Source: """
Implementation of the binary search algorithm in Python.
"""
def binary_search(arr, target):
"""
Performs a binary search on a sorted array to find the index of a target value.
Args:
arr (list): A sorted list of elements.
target: The value to search for in the array.
Returns:
int: The index of the target value if found, otherwise -1.
"""
low = 0
high = len(arr) - 1
while low <= high:
mid = (low + high) // 2 # Integer division to find the middle index
if arr[mid] == target:
return mid # Target found at index mid
elif arr[mid] < target:
low = mid + 1 # Target is in the right half
else:
high = mid - 1 # Target is in the left half
return -1 # Target not found in the array
Coder Agent [4:28:02 PM]: ✅ Status: completed
SSE stream finished for method tasks/sendSubscribe.
--- End of response for this input ---
Coder Agent > You:
Exiting terminal client. Goodbye!
エラー処理
A2Aプロトコルはいくつかの標準エラーコードを定義しています:
-
-32700
: 解析エラー -
-32600
: 無効なリクエスト -
-32601
: メソッドが見つからない -
-32602
: 無効なパラメータ -
-32603
: 内部エラー -
-32000
: タスクが見つからない -
-32001
: タスクがキャンセル不可 -
-32002
: プッシュ通知がサポートされていない -
-32003
: サポートされていない操作
ベストプラクティス
- エラー処理: すべてのA2Aプロトコルメソッドに適切なエラー処理を実装する
- 認証: 安全な通信のための適切な認証メカニズムを実装する
- タスク管理: 適切なタスク状態管理とクリーンアップを維持する
- プッシュ通知: サポートされている場合、リアルタイム更新のためのプッシュ通知を実装する
- ロギング: デバッグとモニタリングのための包括的なロギングを実装する
Discussion