📑

A2AサーバーのTaskをRedisで永続化する

に公開

A2A サーバーの Task はデフォルトではメモリ上に保存されており、サーバーが再起動すると過去の Task が失われてしまいます。
今回は Redis を使って Task を永続化する方法を紹介します。

TaskStore の実装

公式のSDK サンプル では Task を保存するバックエンドが TaskStore インターフェースとして定義されており、標準では InMemoryTaskStore が利用されます。

// Helper type for the simplified store
export interface TaskAndHistory {
  task: schema.Task;
  history: schema.Message[];
}

/**
 * Simplified interface for task storage providers.
 * Stores and retrieves both the task and its full message history together.
 */
export interface TaskStore {
  /**
   * Saves a task and its associated message history.
   * Overwrites existing data if the task ID exists.
   * @param data An object containing the task and its history.
   * @returns A promise resolving when the save operation is complete.
   */
  save(data: TaskAndHistory): Promise<void>;

  /**
   * Loads a task and its history by task ID.
   * @param taskId The ID of the task to load.
   * @returns A promise resolving to an object containing the Task and its history, or null if not found.
   */
  load(taskId: string): Promise<TaskAndHistory | null>;
}

// ========================
// InMemoryTaskStore
// ========================

// Use TaskAndHistory directly for storage
export class InMemoryTaskStore implements TaskStore {
  private store: Map<string, TaskAndHistory> = new Map();

  async load(taskId: string): Promise<TaskAndHistory | null> {
    const entry = this.store.get(taskId);
    // Return copies to prevent external mutation
    return entry
      ? { task: { ...entry.task }, history: [...entry.history] }
      : null;
  }

  async save(data: TaskAndHistory): Promise<void> {
    // Store copies to prevent internal mutation if caller reuses objects
    this.store.set(data.task.id, {
      task: { ...data.task },
      history: [...data.history],
    });
  }
}

TaskStore に Redis を使用するには、Redis を使って TaskStore を実装する RedisStore を追加します。

import Redis from "ioredis";
import { TaskAndHistory } from "./store";
import { A2AError } from "./error";

export interface RedisStoreOptions {
  host?: string;
  port?: number;
  username?: string;
  password?: string;
  db?: number;
  keyPrefix?: string;
  tls?: boolean;
}

export class RedisStore {
  private redis: Redis;
  private keyPrefix: string;

  constructor(options: RedisStoreOptions = {}) {
    const {
      host = "localhost",
      port = 6379,
      username,
      password,
      db = 0,
      keyPrefix = "a2a:",
      tls = process.env.REDIS_TLS === "true",
    } = options;

    this.keyPrefix = keyPrefix;
    this.redis = new Redis({
      host,
      port,
      username,
      password,
      db,
      tls: tls ? {} : undefined,
      retryStrategy: (times) => {
        const delay = Math.min(times * 50, 2000);
        return delay;
      },
    });

    this.redis.on("error", (error) => {
      console.error("Redis connection error:", error);
    });
  }

  private getTaskKey(taskId: string): string {
    return `${this.keyPrefix}task:${taskId}`;
  }

  private getHistoryKey(taskId: string): string {
    return `${this.keyPrefix}history:${taskId}`;
  }

  async load(taskId: string): Promise<TaskAndHistory | null> {
    try {
      const [taskData, historyData] = await Promise.all([
        this.redis.get(this.getTaskKey(taskId)),
        this.redis.get(this.getHistoryKey(taskId)),
      ]);

      if (!taskData) {
        return null;
      }

      const task = JSON.parse(taskData);
      const history = historyData ? JSON.parse(historyData) : [];

      return { task, history };
    } catch (error: any) {
      throw A2AError.internalError(
        `Failed to load task ${taskId}: ${error.message}`,
        error
      );
    }
  }

  async save(data: TaskAndHistory): Promise<void> {
    try {
      const { task, history } = data;
      const taskKey = this.getTaskKey(task.id);
      const historyKey = this.getHistoryKey(task.id);

      await Promise.all([
        this.redis.set(taskKey, JSON.stringify(task)),
        this.redis.set(historyKey, JSON.stringify(history)),
      ]);
    } catch (error: any) {
      throw A2AError.internalError(
        `Failed to save task ${data.task.id}: ${error.message}`,
        error
      );
    }
  }

  async delete(taskId: string): Promise<void> {
    try {
      await Promise.all([
        this.redis.del(this.getTaskKey(taskId)),
        this.redis.del(this.getHistoryKey(taskId)),
      ]);
    } catch (error: any) {
      throw A2AError.internalError(
        `Failed to delete task ${taskId}: ${error.message}`,
        error
      );
    }
  }

  async disconnect(): Promise<void> {
    await this.redis.quit();
  }
}

あとはサーバー側で A2AServer を初期化する時にパラメータとして渡すだけです。

const server = new A2AServer(agent, {
  card: agentCard,
  taskStore: new RedisStore({
    host: "localhost",
    port: 6379,
  }),
});

UpStash

今回はクラウドで使える Redis データベースとして UpStash を使いました。
簡単なサンプル利用だと無料枠でも十分使える (500k / month, 256MB) ので便利です。AOF (Append Only File) モードが有効になっているため安心して永続ストレージとして使えます。

UpStashの紹介記事はこちら

https://zenn.dev/tkithrta/articles/a56603a37b08f0

コンソール画面

パラメータの設定

const server = new A2AServer(agent, {
  card: agentCard,
  taskStore: new RedisStore({
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT),
    username: process.env.REDIS_USERNAME,
    password: process.env.REDIS_PASSWORD,
    db: parseInt(process.env.REDIS_DB),
    tls: true,
  }),
});

おわり

Discussion