😎

A2Aプロトコル開発ガイド

に公開

概要

A2A(Agent-to-Agent)プロトコルは、エージェント間の通信のために設計されたJSON-RPCベースの通信プロトコルです。このガイドでは、A2Aプロトコル仕様に準拠するサーバーとクライアントの両方のコンポーネントを開発するための包括的な手順を提供します。

目次

  1. プロトコルの基本
  2. サーバー実装
  3. クライアント実装
  4. Coderデモの実行

プロトコルの基本

A2Aプロトコルは、JSON-RPC 2.0の上に構築されており、エージェント通信のための一連のメソッドを定義しています。主要なコンポーネントには以下が含まれます:

メッセージ構造

すべてのA2Aメッセージは、以下の基本構造を持つJSON-RPC 2.0フォーマットに従います:

interface JSONRPCMessage {
  jsonrpc?: "2.0";
  id?: number | string | null;
  method?: string;
  params?: unknown;
  result?: unknown;
  error?: JSONRPCError;
}

プロトコルフロー

以下のシーケンス図は、A2Aプロトコルの主要な相互作用フローを示しています:

コアメソッド

プロトコルはいくつかのコアメソッドをサポートしています:

  • tasks/send: エージェントにタスクメッセージを送信
  • tasks/get: タスクのステータスを取得
  • tasks/cancel: 実行中のタスクをキャンセル
  • tasks/pushNotification/set: タスクのプッシュ通知を設定
  • tasks/pushNotification/get: プッシュ通知の設定を取得
  • tasks/resubscribe: タスク更新の再購読
  • tasks/sendSubscribe: タスクメッセージを送信し、更新を購読

タスクの状態

タスクは以下の状態のいずれかになります:

  • submitted(提出済み)
  • working(処理中)
  • input-required(入力が必要)
  • completed(完了)
  • canceled(キャンセル済み)
  • failed(失敗)
  • unknown(不明)

サーバー実装

コアコンポーネント

サーバー実装はいくつかの主要なコンポーネントで構成されています:

  1. サーバー (server.ts): HTTPリクエストを処理するメインサーバー実装
  2. ハンドラー (handler.ts): A2Aプロトコルメッセージを処理するリクエストハンドラー
  3. ストア (store.ts): タスクの保存と管理
  4. ユーティリティ (utils.ts): ユーティリティ関数
  5. エラー処理 (error.ts): エラー定義と処理

基本的な使用法(概念)

import {
  A2AServer,
  InMemoryTaskStore,
  TaskContext,
  TaskYieldUpdate,
} from "./index"; // Assuming imports from the server package

// 1. Define your agent's logic as a TaskHandler
async function* myAgentLogic(
  context: TaskContext
): AsyncGenerator<TaskYieldUpdate> {
  console.log(`Handling task: ${context.task.id}`);
  yield {
    state: "working",
    message: { role: "agent", parts: [{ text: "Processing..." }] },
  };

  // Simulate work...
  await new Promise((resolve) => setTimeout(resolve, 1000));

  if (context.isCancelled()) {
    console.log("Task cancelled!");
    yield { state: "canceled" };
    return;
  }

  // Yield an artifact
  yield {
    name: "result.txt",
    mimeType: "text/plain",
    parts: [{ text: `Task ${context.task.id} completed.` }],
  };

  // Yield final status
  yield {
    state: "completed",
    message: { role: "agent", parts: [{ text: "Done!" }] },
  };
}

// 2. Create and start the server
const store = new InMemoryTaskStore(); // Or new FileStore()
const server = new A2AServer(myAgentLogic, { taskStore: store });

server.start(); // Starts listening on default port 41241

console.log("A2A Server started.");

サーバーは設定されたポート(デフォルト:3000)で起動します。

クライアント実装

主な機能:

  • JSON-RPC通信: JSON-RPC 2.0仕様に従ってリクエストの送信とレスポンスの受信(標準およびServer-Sent Eventsを介したストリーミング)を処理します。
  • A2Aメソッド: sendTasksendTaskSubscribegetTaskcancelTasksetTaskPushNotificationgetTaskPushNotificationresubscribeTaskなどの標準A2Aメソッドを実装しています。
  • エラー処理: ネットワーク問題やJSON-RPCエラーに対する基本的なエラー処理を提供します。
  • ストリーミングサポート: リアルタイムのタスク更新(sendTaskSubscriberesubscribeTask)のためのServer-Sent Events(SSE)を管理します。
  • 拡張性: 異なる環境(例:Node.js)向けにカスタムfetch実装の提供が可能です。

基本的な使用法

import { A2AClient, Task, TaskQueryParams, TaskSendParams } from "./client"; // Import necessary types
import { v4 as uuidv4 } from "uuid"; // Example for generating task IDs

const client = new A2AClient("http://localhost:41241"); // Replace with your server URL

async function run() {
  try {
    // Send a simple task (pass only params)
    const taskId = uuidv4();
    const sendParams: TaskSendParams = {
      id: taskId,
      message: { role: "user", parts: [{ text: "Hello, agent!" }] },
    };
    // Method now returns Task | null directly
    const taskResult: Task | null = await client.sendTask(sendParams);
    console.log("Send Task Result:", taskResult);

    // Get task status (pass only params)
    const getParams: TaskQueryParams = { id: taskId };
    // Method now returns Task | null directly
    const getTaskResult: Task | null = await client.getTask(getParams);
    console.log("Get Task Result:", getTaskResult);
  } catch (error) {
    console.error("A2A Client Error:", error);
  }
}

run();

ストリーミングの使用法

import {
  A2AClient,
  TaskStatusUpdateEvent,
  TaskArtifactUpdateEvent,
  TaskSendParams, // Use params type directly
} from "./client"; // Adjust path if necessary
import { v4 as uuidv4 } from "uuid";

const client = new A2AClient("http://localhost:41241");

async function streamTask() {
  const streamingTaskId = uuidv4();
  try {
    console.log(`\n--- Starting streaming task ${streamingTaskId} ---`);
    // Construct just the params
    const streamParams: TaskSendParams = {
      id: streamingTaskId,
      message: { role: "user", parts: [{ text: "Stream me some updates!" }] },
    };
    // Pass only params to the client method
    const stream = client.sendTaskSubscribe(streamParams);

    // Stream now yields the event payloads directly
    for await (const event of stream) {
      // Type guard to differentiate events based on structure
      if ("status" in event) {
        // It's a TaskStatusUpdateEvent
        const statusEvent = event as TaskStatusUpdateEvent; // Cast for clarity
        console.log(
          `[${streamingTaskId}] Status Update: ${statusEvent.status.state} - ${
            statusEvent.status.message?.parts[0]?.text ?? "No message"
          }`
        );
        if (statusEvent.final) {
          console.log(`[${streamingTaskId}] Stream marked as final.`);
          break; // Exit loop when server signals completion
        }
      } else if ("artifact" in event) {
        // It's a TaskArtifactUpdateEvent
        const artifactEvent = event as TaskArtifactUpdateEvent; // Cast for clarity
        console.log(
          `[${streamingTaskId}] Artifact Update: ${
            artifactEvent.artifact.name ??
            `Index ${artifactEvent.artifact.index}`
          } - Part Count: ${artifactEvent.artifact.parts.length}`
        );
        // Process artifact content (e.g., artifactEvent.artifact.parts[0].text)
      } else {
        console.warn("Received unknown event structure:", event);
      }
    }
    console.log(`--- Streaming task ${streamingTaskId} finished ---`);
  } catch (error) {
    console.error(`Error during streaming task ${streamingTaskId}:`, error);
  }
}

streamTask();

Coderデモの実行

Coderデモは、コード関連のタスクを処理できるA2Aエージェントの実装例です。

セットアップ

  1. 依存関係をインストールします:
git clone https://github.com/sing1ee/a2a-agent-coder.git
#または
git clone git@github.com:sing1ee/a2a-agent-coder.git

bun install
  1. 必要な環境変数があることを確認します:
# set .env first
export $(cat .env | xargs)

デモの実行

  1. a2aサーバーを起動します(Node.js環境が必要):
bun run agents:coder
  1. a2aクライアントを起動します:
bun run a2a:cli

# 結果
$ bun x tsx src/cli.ts
A2A Terminal Client
Agent URL: http://localhost:41241
Attempting to fetch agent card from: http://localhost:41241/.well-known/agent.json
✓ Agent Card Found:
  Name:        Coder Agent
  Description: An agent that generates code based on natural language instructions and streams file outputs.
  Version:     0.0.1
Starting Task ID: a1a608b3-3015-4404-a83f-6ccc05083761
Enter messages, or use '/new' to start a new task.
Coder Agent > You: implement binary search
Sending...

Coder Agent [4:28:00 PM]: ⏳ Status: working
  Part 1: 📝 Text: Generating code...

Coder Agent [4:28:02 PM]: ⏳ Status: working
  Part 1: 📄 File: Name: src/algorithms/binary_search.py, Source: """
Implementation of the binary search algorithm in Python.
"""

def binary_search(arr, target):
    """
    Performs a binary search on a sorted array to find the index of a target value.

    Args:
        arr (list): A sorted list of elements.
        target: The value to search for in the array.

    Returns:
        int: The index of the target value if found, otherwise -1.
    """
    low = 0
    high = len(arr) - 1

    while low <= high:
        mid = (low + high) // 2  # Integer division to find the middle index

        if arr[mid] == target:
            return mid  # Target found at index mid
        elif arr[mid] < target:
            low = mid + 1  # Target is in the right half
        else:
            high = mid - 1  # Target is in the left half

    return -1  # Target not found in the array


Coder Agent [4:28:02 PM]: ✅ Status: completed
SSE stream finished for method tasks/sendSubscribe.
--- End of response for this input ---
Coder Agent > You:
Exiting terminal client. Goodbye!

エラー処理

A2Aプロトコルはいくつかの標準エラーコードを定義しています:

  • -32700: 解析エラー
  • -32600: 無効なリクエスト
  • -32601: メソッドが見つからない
  • -32602: 無効なパラメータ
  • -32603: 内部エラー
  • -32000: タスクが見つからない
  • -32001: タスクがキャンセル不可
  • -32002: プッシュ通知がサポートされていない
  • -32003: サポートされていない操作

ベストプラクティス

  1. エラー処理: すべてのA2Aプロトコルメソッドに適切なエラー処理を実装する
  2. 認証: 安全な通信のための適切な認証メカニズムを実装する
  3. タスク管理: 適切なタスク状態管理とクリーンアップを維持する
  4. プッシュ通知: サポートされている場合、リアルタイム更新のためのプッシュ通知を実装する
  5. ロギング: デバッグとモニタリングのための包括的なロギングを実装する

追加リソース

A2A TypeScriptガイド

Discussion