Cloudflare Agents Deep Dive
免責
Cloudflare Agentsコトハジメ用にGoogle AI StudioのGemini 2.5 Proで生成したAPIドキュメントです。
ソースコードを参照しているため、ハルシネーションは少ないと思いますが、より正しい情報を確認したい場合は公式リファレンスを確認してください。
2025/10/08更新:例でハルシネーションが発生していたので削除しました。
agents
概要 (Overview)
このコードは、Cloudflare Workers上で動作するステートフルなエージェントを構築するためのフレームワークです。partyserverライブラリをベースにしており、Durable Objectsの能力を活用して、永続的な状態、リアルタイム通信(WebSocket)、タスクスケジューリング、キューイング、Eメール処理などの機能を提供します。
主な機能は以下の通りです。
-
状態管理:
stateプロパティを通じて、エージェントの状態を永続化します。 -
RPC (Remote Procedure Call):
@callableデコレータを使用して、クライアントから安全に呼び出せるメソッドを定義できます。ストリーミング応答もサポートします。 -
タスクスケジューリング:
schedule()メソッドで、特定の日時、遅延時間、またはcron式に基づいてタスクを実行できます。 -
キューイング:
queue()メソッドで、先入れ先出し(FIFO)で非同期タスクを処理できます。 -
Eメール処理:
routeAgentEmail関数とonEmailメソッドで、エージェントへのEメールのルーティングと処理を実装できます。 -
MCP (Model Context Protocol) 統合:
addMcpServerなどのメソッドを通じて、外部のAIモデルやツールと連携できます。
主要なクラス (Main Classes)
Agent<Env, State>
エージェントを実装するための基本となる抽象クラスです。このクラスを継承して独自のエージェントを作成します。
-
Env: エージェントがアクセスできる環境変数やバインディングの型(wrangler.tomlで定義)。 -
State: エージェントが保持する状態の型。
プロパティ (Properties)
-
state: State- エージェントの現在の状態を取得します。
- このプロパティに初めてアクセスしたとき、Durable Object Storageから状態を読み込みます。データがない場合は
initialStateの値を使用します。 - 状態を変更するには
setState()メソッドを使用します。
-
initialState: State- エージェントが初めて作成されたとき、または状態がまだ保存されていないときに使用される初期状態。
- デフォルトの状態を提供したい場合に、サブクラスでこのプロパティをオーバーライドします。
-
mcp: MCPClientManager- Model Context Protocol (MCP) サーバーとの接続を管理するためのクライアントマネージャー。
-
observability?: Observability- エージェントのイベント(接続、状態更新、RPC呼び出しなど)を監視・ログ記録するための実装。
-
static options- エージェントの動作を設定する静的オプション。
-
hibernate: boolean:true(デフォルト)の場合、エージェントが非アクティブなときに休止状態になり、メモリとCPUを節約します。
メソッド (Methods)
-
setState(state: State): void- エージェントの状態を更新し、永続化します。
- 状態が更新されると、接続されているすべてのクライアントに新しい状態がブロードキャストされます。
- このメソッドが呼び出されると、
onStateUpdate()ライフサイクルメソッドがトリガーされます。
-
sql<T>(query: TemplateStringsArray, ...values: any[]): T[]- エージェントに関連付けられたDurable ObjectのSQLiteデータベースに対してSQLクエリを実行します。
- SQLインジェクションを防ぐために、テンプレートリテラル構文を使用します。
-
例:
this.sql<User>SELECT * FROM users WHERE id = ${userId}``
-
schedule<T>(when: Date | string | number, callback: keyof this, payload?: T): Promise<Schedule<T>>- 将来実行されるタスクをスケジュールします。
-
when:-
Date: 特定の日時に実行。 -
number: 指定した秒数後に実行。 -
string: cron式に基づいて定期的に実行。
-
-
callback: 実行するメソッドの名前(文字列)。 -
payload: コールバックメソッドに渡すデータ。 -
戻り値: スケジュールされたタスクを表す
Scheduleオブジェクト。
-
cancelSchedule(id: string): Promise<boolean>-
schedule()で作成されたタスクをIDでキャンセルします。
-
-
queue<T>(callback: keyof this, payload: T): Promise<string>- 非同期タスクをキューに追加します。タスクは追加された順に処理されます。
-
callback: 実行するメソッドの名前(文字列)。 -
payload: コールバックメソッドに渡すデータ。 - 戻り値: キューに追加されたタスクのID。
-
dequeue(id: string): Promise<void>- IDで指定されたタスクをキューから削除します。通常、タスクの処理が完了した後に内部的に呼び出されます。
-
addMcpServer(serverName: string, url: string, ...): Promise<{ id: string; authUrl: string | undefined }>- 新しいMCPサーバーに接続します。OAuth認証が必要な場合、認証用のURL (
authUrl) を返します。
- 新しいMCPサーバーに接続します。OAuth認証が必要な場合、認証用のURL (
-
removeMcpServer(id: string): Promise<void>- IDで指定されたMCPサーバーとの接続を解除します。
-
getMcpServers(): MCPServersState- 現在接続されているすべてのMCPサーバーの状態、利用可能なツール、プロンプト、リソースのリストを取得します。
-
destroy(): Promise<void>- エージェントを完全に破棄します。すべての状態、スケジュール、キュー、およびその他のデータが削除されます。
-
replyToEmail(email: AgentEmail, options: { ... }): Promise<void>-
onEmailで受信したEメールに対して返信を送信します。
-
ライフサイクルメソッド (Lifecycle Methods)
これらはAgentクラスで定義されており、必要に応じてサブクラスでオーバーライドしてカスタムロジックを実装します。
-
onConnect(connection: Connection, ctx: ConnectionContext): void | Promise<void>- クライアントがWebSocketで接続したときに呼び出されます。
-
onMessage(connection: Connection, message: WSMessage): void | Promise<void>- 接続されたクライアントからメッセージを受信したときに呼び出されます。RPCや状態更新メッセージはフレームワークによって自動的に処理されるため、カスタムメッセージを処理する場合にオーバーライドします。
-
onRequest(request: Request): Response | Promise<Response>- エージェントに対してHTTPリクエストがあった場合に呼び出されます。(例:
https://.../agents/my-agent/my-agent-id)
- エージェントに対してHTTPリクエストがあった場合に呼び出されます。(例:
-
onStart(): void | Promise<void>- エージェントが(休止状態から復帰して)起動したときに一度だけ呼び出されます。
-
onStateUpdate(state: State | undefined, source: Connection | "server"): void-
setState()が呼び出された後、またはクライアントから状態更新メッセージを受信した後にトリガーされます。 -
source: 状態変更の発生源(サーバーサイドのsetState呼び出しなら"server"、クライアントからの更新ならConnectionオブジェクト)。
-
-
onEmail(email: AgentEmail): Promise<void>-
routeAgentEmailによってこのエージェントにEメールがルーティングされたときに呼び出されます。このメソッドを実装してEメールを処理します。
-
-
onError(error: unknown): void | Promise<void> -
onError(connection: Connection, error: unknown): void | Promise<void>- エージェント内でエラーが発生したときに呼び出されます。WebSocket接続に関連するエラーと、それ以外のサーバーエラーでオーバーロードされています。
デコレーター (Decorators)
@callable(metadata?: CallableMetadata)
メソッドをクライアントからRPC経由で呼び出し可能としてマークします。セキュリティのため、このデコレータが付与されていないメソッドはクライアントから呼び出せません。
-
metadata: メソッドに関する追加情報。-
description?: string: メソッドの説明。 -
streaming?: boolean:trueに設定すると、メソッドが複数の応答をストリーミングで返すことを示します。この場合、メソッドの第一引数としてStreamingResponseオブジェクトを受け取ります。
-
@unstable_callable(metadata?: CallableMetadata)
@callableの古い名前です。非推奨であり、将来のバージョンで削除される予定です。@callableを使用してください。
型定義 (Type Definitions)
RPCRequest / RPCResponse
クライアントとサーバー間のRPC通信で使用されるメッセージの型定義。
StateUpdateMessage
クライアントからエージェントの状態を更新するために送信されるメッセージの型定義。
CallableMetadata
@callableデコレータに渡すメタデータの型。
QueueItem<T>
queue()で作成されるタスクの内部表現。
Schedule<T>
schedule()で作成されるタスクの表現。typeプロパティ('scheduled', 'delayed', 'cron')によってタスクの種類を示します。
MCPServersState / MCPServer
MCPサーバーの接続状態や能力に関する情報を格納する型。
AgentNamespace<Agentic>
特定のエージェントクラスに対応するDurable Objectネームスペースの型。
AgentContext
Durable Objectのstateオブジェクトの型エイリアス。
AgentEmail
onEmailメソッドに渡される、シリアライズ可能なEメールオブジェクト。元のEメールのヘッダーや本文へのアクセス、返信・転送機能を提供します。
ルーティング関数 (Routing Functions)
routeAgentRequest<Env>(request: Request, env: Env, options?: AgentOptions<Env>): Promise<Response | undefined>
受信したHTTPリクエストを適切なエージェントにルーティングします。ワーカーのエントリーポイント(fetchハンドラ)で使用します。
-
request: 受信したRequestオブジェクト。 -
env: ワーカーの環境変数・バインディング。 -
options:-
prefix?: string: URLパスのプレフィックス(デフォルトは"agents")。 -
cors?: boolean | HeadersInit: CORSヘッダーを自動的に付与するかどうかを設定します。
-
routeAgentEmail<Env>(email: ForwardableEmailMessage, env: Env, options: EmailRoutingOptions<Env>): Promise<void>
受信したEメールを適切なエージェントにルーティングします。ワーカーのemailハンドラで使用します。
-
email: 受信したForwardableEmailMessageオブジェクト。 -
env: ワーカーの環境変数・バインディング。 -
options:-
resolver: EmailResolver<Env>: Eメールをどのエージェント(名前とID)にルーティングするかを決定する関数。
-
ヘルパー関数 (Helper Functions)
getCurrentAgent<T>(): { agent: T, ... }
エージェントのメソッド内から、現在のエージェントインスタンス、リクエスト、接続コンテキストを取得するために使用します。フレームワークがメソッドを自動的にラップするため、通常は直接呼び出す必要はありません。
getAgentByName<Env, T>(namespace: AgentNamespace<T>, name: string, ...): Promise<DurableObjectStub<T>>
エージェントのDurable Objectネームスペースとインスタンス名(ID)から、そのエージェントのスタブを取得します。これにより、他の場所(別のエージェントやワーカーなど)から特定のエージェントと対話できます。
Eメールリゾルバ (Email Resolvers)
routeAgentEmailのresolverオプションに渡すための、事前定義されたリゾルバ関数。
-
createHeaderBasedEmailResolver<Env>():
Eメールヘッダー(Message-ID,References,X-Agent-Name/X-Agent-ID)を解析して、返信メールなどを正しいエージェントにルーティングします。 -
createAddressBasedEmailResolver<Env>(defaultAgentName: string):
Eメールの宛先アドレスに基づいてルーティングします。-
agent-name+agent-id@domain.com->agentName: "agent-name",agentId: "agent-id" -
agent-id@domain.com->agentName: defaultAgentName,agentId: "agent-id"
-
-
createCatchAllEmailResolver<Env>(agentName: string, agentId: string):
すべてのEメールを、指定された単一のエージェントにルーティングします。
補助的なクラス (Auxiliary Classes)
StreamingResponse
ストリーミングをサポートする@callableメソッドに渡されるヘルパークラス。
メソッド (Methods)
-
send(chunk: unknown): void- クライアントにデータのかたまり(チャンク)を送信します。ストリームの途中で複数回呼び出せます。
-
end(finalChunk?: unknown): void- ストリームを終了します。任意で最後のデータを送信できます。このメソッドを呼び出した後は
send()を呼び出すことはできません。
- ストリームを終了します。任意で最後のデータを送信できます。このメソッドを呼び出した後は
agents/client
概要 (Overview)
このコードは、前のリファレンスで解説したAgentサーバーと対話するための、クライアントサイド(ブラウザやNode.js環境など)のライブラリです。partysocketをベースにしており、エージェントへのWebSocket接続、状態の同期、RPC(Remote Procedure Call)の実行を簡単に行うための機能を提供します。
主な機能は以下の通りです。
-
WebSocket接続:
AgentClientクラスを使って、特定のAgentインスタンスへの永続的な接続を確立します。 -
状態同期: サーバーからプッシュされた状態の更新を
onStateUpdateコールバックで受信したり、クライアントからsetStateメソッドで状態を更新したりできます。 -
RPC実行:
callメソッドを使って、サーバー側のAgentで@callableとして定義されたメソッドを安全に呼び出せます。通常のRPCとストリーミングRPCの両方をサポートします。 -
HTTPリクエスト:
agentFetch関数を使って、WebSocket接続を確立せずに、Agentに対して単発のHTTPリクエストを送信できます。
主要なクラス (Main Classes)
AgentClient<State>
AgentサーバーへのWebSocket接続を管理し、対話するための主要なクライアントクラスです。
-
State: 接続先のAgentが持つ状態の型。これにより、onStateUpdateコールバックなどで型安全性が確保されます。
コンストラクタ (Constructor)
-
constructor(options: AgentClientOptions<State>)- 新しい
AgentClientインスタンスを作成し、指定されたAgentへのWebSocket接続を開始します。 -
options: 接続設定。詳細はAgentClientOptionsのセクションを参照してください。
- 新しい
メソッド (Methods)
-
setState(state: State): void- クライアントからサーバー上の
Agentの状態を更新するために使用します。 - このメソッドを呼び出すと、状態更新メッセージがサーバーに送信されます。また、
AgentClientOptionsで指定されたonStateUpdateコールバックがsource: "client"として即座に呼び出されます。
- クライアントからサーバー上の
-
call<T>(method: string, args?: unknown[], streamOptions?: StreamOptions): Promise<T>- サーバー上の
Agentで@callableデコレータが付与されたメソッドをリモートで呼び出します。 -
method: 呼び出すメソッドの名前(文字列)。 -
args: メソッドに渡す引数の配列。 -
streamOptions: 応答がストリーミングされる場合の処理方法を定義するオプション。詳細はStreamOptionsのセクションを参照してください。 -
戻り値: サーバーからの応答を解決する
Promise。ストリーミングでない場合はメソッドの戻り値が、ストリーミングの場合はストリームが完了したときの最終的な値が返されます。
- サーバー上の
静的メソッド (Static Methods)
-
static fetch(_opts: PartyFetchOptions): Promise<Response>-
非推奨 (Deprecated): このメソッドは実装されておらず、呼び出すとエラーがスローされます。代わりにスタンドアロンの
agentFetch関数を使用してください。
-
非推奨 (Deprecated): このメソッドは実装されておらず、呼び出すとエラーがスローされます。代わりにスタンドアロンの
スタンドアロン関数 (Standalone Functions)
agentFetch(opts: AgentClientFetchOptions, init?: RequestInit): Promise<Response>
Agentに対して(WebSocketではなく)通常のHTTPリクエストを行います。Agent側でonRequestライフサイクルメソッドが処理します。
-
opts: 接続先のAgentを指定するためのオプション。詳細はAgentClientFetchOptionsのセクションを参照してください。 -
init:fetchAPIに渡す標準的なリクエスト初期化オプション(method,headers,bodyなど)。
camelCaseToKebabCase(str: string): string
キャメルケース(myAgentName)の文字列をケバブケース(my-agent-name)に変換するユーティリティ関数です。内部的に、AgentClientOptionsで指定されたagent名を、PartyKitのルーティングが期待する形式に変換するために使用されます。
型定義 (Type Definitions)
AgentClientOptions<State>
AgentClientを初期化するための設定オプションです。PartySocketOptionsを拡張しています。
-
agent: string(必須)- 接続したい
Agentのクラス名(キャメルケース)。
- 接続したい
-
name?: string- 接続したい
Agentの特定のインスタンス名(ID)。指定しない場合、"default"が使用されます。
- 接続したい
-
onStateUpdate?: (state: State, source: "server" | "client") => void-
Agentの状態が更新されたときに呼び出されるコールバック関数。 -
state: 新しい状態オブジェクト。 -
source: 状態変更の発生源。サーバーからのプッシュ通知の場合は"server"、クライアント自身のsetState呼び出しによる場合は"client"となります。
-
StreamOptions
AgentClientのcallメソッドでストリーミング応答を処理するためのコールバック関数をまとめたものです。サーバー側の@callable({ streaming: true })メソッドに対して使用します。
-
onChunk?: (chunk: unknown) => void- ストリームのデータの一部(チャンク)を受信するたびに呼び出されます。
-
onDone?: (finalChunk: unknown) => void- ストリームが完了したときに呼び出されます。
finalChunkには、サーバーがストリームを終了する際に送信した最後のデータが含まれます。
- ストリームが完了したときに呼び出されます。
-
onError?: (error: string) => void- ストリーミング中にエラーが発生した場合に呼び出されます。
AgentClientFetchOptions
agentFetch関数に渡す設定オプションです。PartyFetchOptionsを拡張しています。
-
agent: string(必須)- リクエストを送信したい
Agentのクラス名(キャメルケース)。
- リクエストを送信したい
-
name?: string- リクエストを送信したい
Agentの特定のインスタンス名(ID)。指定しない場合、"default"が使用されます。
- リクエストを送信したい
agents/ai-chat-agent
概要 (Overview)
AIChatAgentは、基本的なAgentクラスを拡張し、AIチャットボット機能を組み込んだ特化型のエージェントです。Vercel AI SDK (v3/v5) との互換性を持ち、チャット履歴の永続化、ストリーミング応答、ツール呼び出し、リクエストのキャンセルといった、高度なチャットアプリケーションに必要な機能を提供します。
このクラスを継承することで、開発者はonChatMessageメソッドを実装するだけで、独自のAIロジックを持つチャットエージェントを簡単に構築できます。
主要なクラス (Main Classes)
AIChatAgent<Env, State>
Agentを継承した、AIチャット機能を持つエージェントの基本クラス。
-
Env: エージェントがアクセスできる環境変数やバインディングの型。 -
State:Agentクラスから継承した、エージェントが保持する状態の型。AIChatAgent自体は直接このstateを使用しませんが、継承クラスで利用可能です。
プロパティ (Properties)
-
messages: ChatMessage[]- 現在の会話のチャットメッセージの配列。
ChatMessageはVercel AI SDKのUIMessage型です。 - このプロパティはエージェントの起動時にDurable Objectストレージから自動的に読み込まれ、常に最新の状態が保たれます。
- Vercel AI SDKのv3からv5への移行を支援するため、古い形式のメッセージは自動的に新しい形式に変換されます。
- 現在の会話のチャットメッセージの配列。
ライフサイクル/オーバーライドすべきメソッド (Lifecycle/Methods to Override)
-
async onChatMessage(onFinish: StreamTextOnFinishCallback<ToolSet>, options?: { abortSignal: AbortSignal | undefined }): Promise<Response | undefined>- [実装必須] このクラスを継承する際に、必ずオーバーライドする必要がある中心的なメソッドです。
- ユーザーから新しいチャットメッセージを受信したときに呼び出されます。
- このメソッド内で、AIモデルへのリクエストを行い、その応答を
Responseオブジェクトとして返すロジックを実装します。 -
戻り値: AIモデルからの応答を含む
Responseオブジェクト。ストリーミング応答(Server-Sent Events)を返すことが期待されます。Vercel AI SDKのstreamTextやstreamObjectなどのユーティリティを使うと便利です。応答がない場合はundefinedを返します。 -
onFinish: ストリーミングが完了したときに呼び出されるコールバック。AI SDKによって内部的に使用されます。 -
options.abortSignal: クライアントがチャットリクエストをキャンセルした場合に発火するAbortSignal。AIモデルへのfetchリクエストなどにこのシグナルを渡すことで、進行中のリクエストを中断できます。
主要なメソッド (Key Methods)
-
async saveMessages(messages: ChatMessage[]): Promise<void>- チャットメッセージの配列をサーバーサイドで永続化します。
- このメソッドは、チャット履歴を完全に上書きします。通常は、
this.messagesに新しいメッセージを追加し、その結果の配列を渡して使用します。 - 呼び出されると、更新されたメッセージリストが接続されているすべてのクライアントにブロードキャストされます。
内部メソッドとフレームワークの動作 (Internal Methods and Framework Behavior)
AIChatAgentは、クライアントからの特定のメッセージタイプをリッスンし、チャットのロジックを自動的に処理します。開発者がこれらの内部メソッドを直接呼び出す必要は通常ありません。
-
onMessage(connection, message):-
Agentからオーバーライドされ、チャット関連のWebSocketメッセージを処理します。-
CF_AGENT_USE_CHAT_REQUEST: 新しいチャットメッセージの開始。onChatMessageをトリガーします。 -
CF_AGENT_CHAT_CLEAR: チャット履歴をクリアするリクエスト。 -
CF_AGENT_CHAT_MESSAGES: クライアントから送信されたメッセージリストでサーバーの状態を置き換えるリクエスト。 -
CF_AGENT_CHAT_REQUEST_CANCEL: 進行中のチャットリクエストをキャンセルするリクエスト。
-
-
-
onRequest(request):-
/get-messagesエンドポイントへのHTTP GETリクエストを処理し、現在のチャット履歴をJSON形式で返します。
-
-
persistMessages(messages, excludeBroadcastIds):-
saveMessagesの内部実装。メッセージをSQLiteデータベースに保存し、クライアントにブロードキャストします。
-
-
_reply(id, response):-
onChatMessageから返されたResponseオブジェクトを処理します。 - レスポンスボディがストリームの場合、それを読み取り、Server-Sent Events形式のチャンクをクライアントにリアルタイムで転送します。
- Vercel AI SDK v5のデータ形式(
text-delta,tool-input-available,tool-output-availableなど)を解釈します。 - ストリームが完了した後、アシスタントの完全な応答(テキストとツール呼び出しの結果を含む)を
ChatMessageとして構築し、persistMessagesを呼び出してチャット履歴に追加・保存します。
-
-
キャンセル処理関連メソッド (
_getAbortSignal,_cancelChatRequest, etc.):- クライアントからのキャンセル要求を処理するための内部メカニズム。
- 各チャットリクエストに
AbortControllerを関連付け、キャンセルメッセージを受信したときにabort()を呼び出すことで、onChatMessageに渡されるabortSignalを発火させます。
-
destroy():- エージェントが破棄される際に、進行中のすべてのリクエストをキャンセルし、親クラスの
destroyメソッドを呼び出します。
- エージェントが破棄される際に、進行中のすべてのリクエストをキャンセルし、親クラスの
hono-agents
概要 (Overview)
このコードは、Webフレームワーク Hono でCloudflare Agentsを簡単に利用するためのミドルウェアを提供します。
このミドルウェアをHonoアプリケーションに組み込むことで、特定のパスへのリクエスト(通常のHTTPリクエストとWebSocketアップグレードリクエストの両方)を自動的にCloudflare Agentsのルーティングシステムに渡し、適切なエージェントに処理させることができます。
これにより、Honoアプリケーションのルーティングの一部として、ステートフルなエージェントをシームレスに統合できます。
主要な関数 (Main Functions)
agentsMiddleware<E extends Env>(ctx?: AgentMiddlewareContext<E>)
Cloudflare Agentsのリクエストを処理するためのHonoミドルウェアを作成します。
-
E extends Env: Honoの環境変数の型。c.envの型安全性を確保します。 -
ctx?: AgentMiddlewareContext<E>: ミドルウェアの動作をカスタマイズするための設定オブジェクト。詳細はAgentMiddlewareContextのセクションを参照してください。 -
戻り値: Honoアプリケーションの
.use()メソッドで使用できるミドルウェア関数。
型定義 (Type Definitions)
AgentMiddlewareContext<E>
agentsMiddleware関数に渡す設定オブジェクトの型です。
-
options?: AgentOptions<E>- Cloudflare Agentsのコア機能である
routeAgentRequest関数に渡されるオプションです。 - これを使って、エージェントのURLプレフィックスの変更や、CORS設定の有効化などが可能です。
- Cloudflare Agentsのコア機能である
-
onError?: (error: Error) => void- ミドルウェアの処理中にエラーが発生した場合に呼び出される、オプショナルなエラーハンドリング関数です。
- この関数を指定すると、ミドルウェアはエラーをスローせず、代わりにこの関数を呼び出してから次のミドルウェア(またはHonoのエラーハンドラ)に処理を移します。
- ログ記録やカスタムのエラー応答生成に利用できます。
- 指定しない場合、エラーはそのままスローされ、Honoのデフォルトのエラー処理機構に委ねられます。