📑
A2AサーバーのTaskをRedisで永続化する
A2A サーバーの Task はデフォルトではメモリ上に保存されており、サーバーが再起動すると過去の Task が失われてしまいます。
今回は Redis を使って Task を永続化する方法を紹介します。
TaskStore の実装
公式のSDK サンプル では Task を保存するバックエンドが TaskStore インターフェースとして定義されており、標準では InMemoryTaskStore が利用されます。
// Helper type for the simplified store
export interface TaskAndHistory {
task: schema.Task;
history: schema.Message[];
}
/**
* Simplified interface for task storage providers.
* Stores and retrieves both the task and its full message history together.
*/
export interface TaskStore {
/**
* Saves a task and its associated message history.
* Overwrites existing data if the task ID exists.
* @param data An object containing the task and its history.
* @returns A promise resolving when the save operation is complete.
*/
save(data: TaskAndHistory): Promise<void>;
/**
* Loads a task and its history by task ID.
* @param taskId The ID of the task to load.
* @returns A promise resolving to an object containing the Task and its history, or null if not found.
*/
load(taskId: string): Promise<TaskAndHistory | null>;
}
// ========================
// InMemoryTaskStore
// ========================
// Use TaskAndHistory directly for storage
export class InMemoryTaskStore implements TaskStore {
private store: Map<string, TaskAndHistory> = new Map();
async load(taskId: string): Promise<TaskAndHistory | null> {
const entry = this.store.get(taskId);
// Return copies to prevent external mutation
return entry
? { task: { ...entry.task }, history: [...entry.history] }
: null;
}
async save(data: TaskAndHistory): Promise<void> {
// Store copies to prevent internal mutation if caller reuses objects
this.store.set(data.task.id, {
task: { ...data.task },
history: [...data.history],
});
}
}
TaskStore に Redis を使用するには、Redis を使って TaskStore を実装する RedisStore を追加します。
import Redis from "ioredis";
import { TaskAndHistory } from "./store";
import { A2AError } from "./error";
export interface RedisStoreOptions {
host?: string;
port?: number;
username?: string;
password?: string;
db?: number;
keyPrefix?: string;
tls?: boolean;
}
export class RedisStore {
private redis: Redis;
private keyPrefix: string;
constructor(options: RedisStoreOptions = {}) {
const {
host = "localhost",
port = 6379,
username,
password,
db = 0,
keyPrefix = "a2a:",
tls = process.env.REDIS_TLS === "true",
} = options;
this.keyPrefix = keyPrefix;
this.redis = new Redis({
host,
port,
username,
password,
db,
tls: tls ? {} : undefined,
retryStrategy: (times) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
});
this.redis.on("error", (error) => {
console.error("Redis connection error:", error);
});
}
private getTaskKey(taskId: string): string {
return `${this.keyPrefix}task:${taskId}`;
}
private getHistoryKey(taskId: string): string {
return `${this.keyPrefix}history:${taskId}`;
}
async load(taskId: string): Promise<TaskAndHistory | null> {
try {
const [taskData, historyData] = await Promise.all([
this.redis.get(this.getTaskKey(taskId)),
this.redis.get(this.getHistoryKey(taskId)),
]);
if (!taskData) {
return null;
}
const task = JSON.parse(taskData);
const history = historyData ? JSON.parse(historyData) : [];
return { task, history };
} catch (error: any) {
throw A2AError.internalError(
`Failed to load task ${taskId}: ${error.message}`,
error
);
}
}
async save(data: TaskAndHistory): Promise<void> {
try {
const { task, history } = data;
const taskKey = this.getTaskKey(task.id);
const historyKey = this.getHistoryKey(task.id);
await Promise.all([
this.redis.set(taskKey, JSON.stringify(task)),
this.redis.set(historyKey, JSON.stringify(history)),
]);
} catch (error: any) {
throw A2AError.internalError(
`Failed to save task ${data.task.id}: ${error.message}`,
error
);
}
}
async delete(taskId: string): Promise<void> {
try {
await Promise.all([
this.redis.del(this.getTaskKey(taskId)),
this.redis.del(this.getHistoryKey(taskId)),
]);
} catch (error: any) {
throw A2AError.internalError(
`Failed to delete task ${taskId}: ${error.message}`,
error
);
}
}
async disconnect(): Promise<void> {
await this.redis.quit();
}
}
あとはサーバー側で A2AServer を初期化する時にパラメータとして渡すだけです。
const server = new A2AServer(agent, {
card: agentCard,
taskStore: new RedisStore({
host: "localhost",
port: 6379,
}),
});
UpStash
今回はクラウドで使える Redis データベースとして UpStash を使いました。
簡単なサンプル利用だと無料枠でも十分使える (500k / month, 256MB) ので便利です。AOF (Append Only File) モードが有効になっているため安心して永続ストレージとして使えます。
UpStashの紹介記事はこちら
コンソール画面
パラメータの設定
const server = new A2AServer(agent, {
card: agentCard,
taskStore: new RedisStore({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
username: process.env.REDIS_USERNAME,
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB),
tls: true,
}),
});
おわり
Discussion