💬

A2AサーバーにSlackで接続するクライアントを作る

に公開

前回 の記事では A2A のプロトコルに従って A2A サーバーを実装しましたが、クライアントはサンプル実装から持ってきた単純な CLI でした。
今回はクライアント側を Slack アプリに接続することで Slack から A2A サーバーと通信できるようにします。

コードは以下のレポジトリに公開しています。

SDK

https://github.com/ryukez/a2a-sdk-ryukez

サンプル実装

https://github.com/ryukez/a2a-sample

A2A との接続

A2A サーバーとの通信部分については、公式のサンプル実装 を参考にストリーミングを使って次のように書けます。

async function communicateWithA2A(agentClient: AgentClient) {
  const params: schema.TaskSendParams = {
    id: "task_id",
    sessionId: "session_id",
    message: {
      role: "user",
      parts: [{ type: "text", text: "text" }],
    },
  };

  const stream: AsyncIterable<
    | schema.TaskStatusUpdateEvent
    | schema.TaskArtifactUpdateEvent
    | null
    | undefined
  > = this.agentClient.sendTaskSubscribe(params);

  for await (const event of stream) {
    if (!event) continue;
    if ("status" in event) {
      // Status更新時の処理
    }
    if ("artifact" in event) {
      // Artifact更新時の処理
    }
  }
}

Slack のアプリとして実装する場合は、ユーザーからの何らかのアクションを受け取った際にこの関数が呼ばれるようにすれば良いことになります。

const app = new App({
  token: env.SLACK_BOT_TOKEN,
  appToken: env.SLACK_APP_TOKEN,
  socketMode: true,
});

app.event("app_mention", async ({ event }) => {
  const threadTs = event.thread_ts || event.ts;

  communicateWithA2A(threadTs); // あえてawaitせず非同期的に処理する
});

実装上の都合ですが、Slack では 3 秒以内にレスポンスを返す必要があるため、長い処理の場合にタイムアウトしてしまわないように非同期的に処理するようにしています。

AgentMessageChannel

今回の実装では上をもう少し抽象化して AgentMessageChannel というクラスを定義しました。
ステータスの更新を処理する onStatusUpdate()、アーティファクトの更新を処理する onArtifactUpdate() を引数として受け取り、内部の Queue オブジェクトで順番に処理していきます。
利用側は userMessage() メソッドを呼び出すことでユーザーからのメッセージを message channel に通知し、内部で A2A との通信が実行されます。

import * as schema from "../schema.js";
import { A2AClient } from "./client.js";
import AsyncLock from "async-lock";

export type UserMessage<C> = {
  taskId: string;
  sessionId: string;
  text: string;
  context: C;
};

export type OnStatusUpdate<C> = (
  userMessage: UserMessage<C>,
  event: schema.TaskStatusUpdateEvent
) => Promise<void>;

export type OnArtifactUpdate<C> = (
  userMessage: UserMessage<C>,
  event: schema.TaskArtifactUpdateEvent
) => Promise<void>;

class Queue<T> {
  private queue: T[] = [];
  private lock = new AsyncLock();

  constructor(private readonly processItem: (item: T) => Promise<void>) {}

  async add(item: T): Promise<void> {
    this.queue.push(item);
    await this.process();
  }

  private async process(): Promise<void> {
    // 排他制御
    await this.lock.acquire("processing", async () => {
      while (this.queue.length > 0) {
        const item = this.queue.shift()!;

        try {
          await this.processItem(item);
          break;
        } catch (error) {
          console.error(`Item processing failed:`, item);
        }
      }
    });
  }
}

export class AgentMessageChannel<C> {
  private queue: Queue<UserMessage<C>>;

  constructor(
    private readonly agentClient: A2AClient,
    private onStatusUpdate: OnStatusUpdate<C>,
    private onArtifactUpdate: OnArtifactUpdate<C>
  ) {
    this.queue = new Queue(this.processMessage.bind(this));
  }

  private async processMessage(userMessage: UserMessage<C>): Promise<void> {
    const params: schema.TaskSendParams = {
      id: userMessage.taskId,
      sessionId: userMessage.sessionId,
      message: {
        role: "user",
        parts: [{ type: "text", text: userMessage.text }],
      },
    };

    const stream: AsyncIterable<
      | schema.TaskStatusUpdateEvent
      | schema.TaskArtifactUpdateEvent
      | null
      | undefined
    > = this.agentClient.sendTaskSubscribe(params);

    for await (const event of stream) {
      if (!event) continue;
      if ("status" in event) await this.onStatusUpdate(userMessage, event);
      if ("artifact" in event) await this.onArtifactUpdate(userMessage, event);
    }
  }

  userMessage(message: UserMessage<C>): Promise<void> {
    return this.queue.add(message);
  }
}

最後にエージェントからのメッセージを処理する onStatusUpdate()onArtifactUpdate() を実装します。
基本的に受け取ったメッセージを client.chat.postMessage() で送信するだけですが、メッセージのデータ型に応じて適切にフォーマッティングする必要があります。
ファイルデータをバイナリで受け取った場合には client.filesUploadV2() を使ってファイルをアップロードするようにしています。

// ファイルをアップロードする(bytesの場合)か、メッセージブロックとして返す(uriの場合)
const handleFile = async (
  slack: App,
  userMessage: UserMessage<MessageContext>,
  parts: FilePart[]
): Promise<{
  blocks: KnownBlock[];
  uploads: (() => Promise<void>)[];
}> => {
  const blocks: KnownBlock[] = [];
  const uploads: (() => Promise<void>)[] = [];

  for (const part of parts) {
    if (part.file.uri) {
      if (part.file.mimeType?.startsWith("image/")) {
        blocks.push({
          type: "image",
          image_url: part.file.uri,
          alt_text: part.file.name ?? "image",
        });
      } else {
        blocks.push({
          type: "section",
          text: {
            type: "mrkdwn",
            text: part.file.uri,
          },
        });
      }
    } else if (part.file.bytes) {
      const bytes = part.file.bytes;
      uploads.push(async () => {
        const upload = await slack.client.filesUploadV2({
          channels: userMessage.context.channel,
          thread_ts: userMessage.context.threadTs,
          file: Buffer.from(bytes, "base64"),
          filename: part.file.name ?? "file",
        });

        // wait 5s for upload to complete
        await new Promise((resolve) => setTimeout(resolve, 5000));
      });
    }
  }

  return { blocks, uploads };
};

// ステータス更新時の処理
const onStatusUpdate =
  (slack: App): OnStatusUpdate<MessageContext> =>
  async (userMessage, event) => {
    const agentMessage = event.status.message;
    for (const part of agentMessage?.parts ?? []) {
      switch (part.type) {
        // テキストはそのまま投稿
        case "text": {
          await slack.client.chat.postMessage({
            text: part.text,
            channel: userMessage.context.channel,
            thread_ts: userMessage.context.threadTs,
          });
          break;
        }

        // データはコードブロックで囲んで投稿
        case "data": {
          const text = "```\n" + JSON.stringify(part.data, null, 2) + "\n```";

          await slack.client.chat.postMessage({
            text,
            channel: userMessage.context.channel,
            thread_ts: userMessage.context.threadTs,
          });
          break;
        }

        // ファイルはhandleFile()で適切なハンドリングを行う
        case "file": {
          const { blocks, uploads } = await handleFile(slack, userMessage, [
            part,
          ]);
          if (blocks.length > 0) {
            await slack.client.chat.postMessage({
              blocks,
              channel: userMessage.context.channel,
              thread_ts: userMessage.context.threadTs,
            });
          }
          // アップロードを実行
          for (const upload of uploads) {
            await upload();
          }
          break;
        }

        default: {
          throw new Error("Invalid part type");
        }
      }
    }
  };

// onArtifactUpdate() も同様

エントリポイント

ここまできたらあとは Slack アプリを起動して、ユーザーのアクションに応じて agentMessageChannel.userMessage() を呼び出すだけです。
Slack アプリの詳細なセットアップ方法は割愛しますが公式リファレンス等を参照してください。

import { App } from "@slack/bolt";
import dotenv from "dotenv";
import { env } from "./config/env";
import { defaultSlackMessageChannel } from "a2a-sdk-ryukez/client/slack";

// 環境変数の読み込み
dotenv.config();

// Slackアプリの初期化
const app = new App({
  token: process.env.SLACK_BOT_TOKEN,
  appToken: process.env.SLACK_APP_TOKEN,
  socketMode: true,
});

const agentMessageChannel = defaultSlackMessageChannel(
  process.env.AGENT_URL,
  app
);

// メンションに対する応答
app.event("app_mention", async ({ event }) => {
  const threadTs = event.thread_ts || event.ts;

  agentMessageChannel.userMessage({
    taskId: threadTs,
    sessionId: threadTs,
    text: event.text,
    context: { channel: event.channel, threadTs },
  });
});

// アプリの起動
(async () => {
  await app.start();
  console.log("⚡️ Bolt app is running!");
})();

実際に実行してみると、上のようにレシピと美味しそうなカレーの画像を返してくれました。^1
テキストやファイルなどそれぞれのデータ形式を適切なフォーマットで表示できていますね。

今回の実装はエントリポイント以外の部分を a2a-sdk-ryukez に含めているので、こちらをインポートすれば上の数十行だけで今回のように Slack へ接続するA2Aアプリを作ることができます。よければ試してみてください!

おわりに

A2A はその名前からエージェント間の通信やマルチエージェントシステムのためのものとして捉えられがちですが、実はエージェントのインターフェース標準が定義されたことで、今回のように色んなアプリケーションをシームレスに繋ぎ込みやすくなったことが直近では大きな意味を持っていると思います。

当面はマルチエージェントをやらないとしても、今回のような形でアプリケーションからの接続のインターフェースとして乗っかっておけば、将来的にマルチエージェントなシステムに自然に移行していくことができそうです。

Discussion