Mastra で作る AIエージェント(6) AIの考え中にユーザを待たせない
Mastra で作るAI エージェント というシリーズの第6回です。
AIエージェントの構成を「三国志」になぞらえて以下のように把握しました。
- フロントに立つリーダーの劉備=エージェント:何をやるにしても軍師に相談
- 天才軍師・諸葛孔明=LLM:劉備に何かと助言するが、決して自分が直接前面に出ない
- 将軍たち=ツール:劉備に呼ばれて定型作業を遂行、RAG/API/MCPなどなど

ひととおり、「チーム劉備」の組成の仕方が分かったところで、前回からしばらくは、チームの組成以外の話をまとめてご紹介しています。今回はストリーミングです。
ストリーミングとは
AI チャットアプリケーションでは、ユーザーの質問に対する回答が長くなることがあります。従来の「リクエスト → 全文レスポンス」方式だと、回答が完成するまでユーザーは何も見えず待たされてしまいイライラします。
そこでストリーミングを使うと、AIが生成したテキストを少しずつリアルタイムに表示できます。ChatGPT のように文字が流れるように表示されるあの挙動のことです。
今回はMastra でのストリーミング処理を見ていきます。実際の現場ではNext.js + assistant-ui などのフレームワークやライブラリを使って高速に開発することが多いですが、ここでは「呼び出し側が何をやっているか」を分かりやすくするため、あえて「ピュアなJavaScript」でAPIを呼び出すコードにしています。
通常のリクエスト(非ストリーミング)

- ユーザーは5秒とか20秒の間、真っ白な画面で待つ
- 回答は一度にドンと表示される
ストリーミング

- テキストが生成されるたびにクライアントに送られる
- ユーザーはリアルタイムで回答を読める
- 待ち時間が体感的に短く感じられる
Mastra のストリーミング API
ストリーミングが何か分かったところで、MastraサーバにアクセスするクライアントサイドのHTMLを作っていきます。
npm run dev でローカルサーバを起動しておいて、ローカルに作ったHTMLファイル中のJavaScriptから直接APIをたたいてしまおう、というものです。

最終的な出来上がりイメージは以下のようなものです。

エンドポイントの選択
執筆時点で最新の Mastra v1.2.0 + AI SDK v5 の構成では、LanguageModelV2 インターフェースに対応したストリーミングエンドポイントを使用していると想定します。
| エンドポイント | 対応インターフェース | 形式 | プロバイダー |
|---|---|---|---|
/stream |
LanguageModelV2 | Server-Sent Events(SSE) | AI SDK v5 互換プロバイダー(@ai-sdk/openai v2.x/v3.x) |
/stream-legacy |
LanguageModelV1 | Data Stream Protocol | AI SDK v4 互換プロバイダー(@ai-sdk/openai v1.x) |
ちなみに、最新のMastraであっても、 AI SDK v4 互換プロバイダー(@ai-sdk/openai v1.x)の/stream-legacy も使えます。ただ、その場合はストリーミングデータの取り扱い方がData Stream Protocol になりますので、本ページでご案内する方法(SSE)とかなり(まったく、と言っていいほど)違いますので、ご注意ください。
なお、AI SDK v6 が2025年12月に公開されています。今後のMastraアップデートでAI SDK v6対応が追加される可能性があります。
リクエスト形式
const response = await fetch(`${API_BASE}/agents/${AGENT_ID}/${STREAM_ENDPOINT}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [{ role: 'user', content: 'こんにちは' }],
threadId: 'optional-thread-id' // 会話継続時に指定
})
});
Server-Sent Events (SSE) プロトコル
プロトコルの概要
LanguageModelV2 では Server-Sent Events (SSE) 形式でストリーミングレスポンスが送られます。これは以前の Data Stream Protocol とは異なり、各行が以下の形式になっています:
data: {JSON}
各イベントは JSON オブジェクトとして送信され、type フィールドでイベントの種類を識別します。
イベントタイプ一覧
| イベントタイプ | 意味 | 主なペイロードフィールド |
|---|---|---|
start |
エージェント実行開始 |
id, runId
|
step-start |
処理ステップ開始 | warnings |
reasoning-start |
推論開始 | - |
reasoning-end |
推論終了 | - |
tool-call-input-streaming-start |
ツール入力ストリーミング開始 | - |
tool-call-delta |
ツール引数の増分データ |
toolCallId, argsTextDelta
|
tool-call-input-streaming-end |
ツール入力ストリーミング終了 | - |
tool-call |
ツール呼び出し確定 |
toolCallId, toolName, args
|
tool-result |
ツール実行結果 |
toolCallId, result
|
text-delta |
テキストチャンク |
textDelta (または delta, text) |
step-finish |
処理ステップ終了 | messages |
finish |
エージェント実行完了 |
usage, finishReason
|
実際のレスポンス例
data: {"type":"start","runId":"abc-123","from":"AGENT","payload":{"id":"project-knowledge-agent-github"}}
data: {"type":"step-start","runId":"abc-123","from":"AGENT","payload":{}}
data: {"type":"tool-call","runId":"abc-123","from":"AGENT","payload":{"toolCallId":"call_xyz","toolName":"searchByFullTextGitHubTool","args":{"query":"デプロイ"}}}
data: {"type":"tool-result","runId":"abc-123","from":"AGENT","payload":{"toolCallId":"call_xyz","result":[...]}}
data: {"type":"text-delta","runId":"abc-123","from":"AGENT","payload":{"textDelta":"デプロイ"}}
data: {"type":"text-delta","runId":"abc-123","from":"AGENT","payload":{"textDelta":"方法について"}}
data: {"type":"text-delta","runId":"abc-123","from":"AGENT","payload":{"textDelta":"説明します"}}
data: {"type":"finish","runId":"abc-123","from":"AGENT","payload":{"usage":{"promptTokens":150,"completionTokens":80}}}
この例では:
- エージェント実行が開始(
start) - ステップが開始(
step-start) - ツールが呼び出され(
tool-call) - ツールの結果が返り(
tool-result) - テキストが3回に分けて送られ(
text-delta) - 処理が完了(
finish)
クライアント側の実装の全体像
クライアント側のストリーミング処理は、例えば以下の流れで実装します:

それでは、以下ステップ・バイ・ステップでソースコード例を見てみましょう。
ステップ1~2 : ストリームの読み取り準備
const response = await fetch(`${API_BASE}/agents/${AGENT_ID}/${STREAM_ENDPOINT}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requestBody)
});
// レスポンスが正常か確認
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
// ストリームリーダーとデコーダーを準備
const reader = response.body.getReader();
const decoder = new TextDecoder();
ポイント:
-
response.bodyは ReadableStream オブジェクト -
getReader()でストリームを読み取るリーダーを取得 -
TextDecoderでバイナリをテキストに変換
ステップ3: チャンク読み取りループ
let buffer = ''; // 未処理データを保持するバッファ
while (true) {
const { done, value } = await reader.read();
if (done) break; // ストリーム終了
// バイナリをテキストに変換してバッファに追加
buffer += decoder.decode(value, { stream: true });
// ... 行ごとの処理(後述)
}
ポイント:
-
reader.read()は{done, value}を返す Promise -
done === trueでストリーム終了 -
decoder.decode(value, { stream: true })のstream: trueオプションは、マルチバイト文字が途中で分割された場合に対応
ステップ4~5: 行ごとに分割してSSE形式をパース
// 行ごとに分割
const lines = buffer.split('\\n');
// 最後の要素は不完全な行の可能性があるのでバッファに残す
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.trim()) continue; // 空行はスキップ
// SSE形式: "data: {JSON}" を検出
if (!line.startsWith('data: ')) {
console.debug('Non-data line:', line);
continue;
}
// "data: " の後のJSON部分を抽出
const jsonStr = line.substring(6);
if (!jsonStr.trim()) continue;
try {
const event = JSON.parse(jsonStr);
const eventType = event.type;
const payload = event.payload || {};
// ... イベントタイプに応じた処理(後述)
} catch (e) {
// JSON解析失敗は無視(不完全なチャンクの可能性)
console.debug('Parse error:', e);
}
}
なぜバッファが必要か?
ストリームのチャンクは、必ずしも行の区切りで分割されるわけではありません:
[チャンク1] "data: {\\"type\\":\\"text-delta\\",\\"payload\\":{\\"textDel"
[チャンク2] "ta\\":\\"こんにちは\\"}}\\ndata: {\\"type\\":..."
チャンク1の最後は不完全な行なので、次のチャンクと結合してから処理する必要があります。
ステップ6~7: イベントタイプごとの処理
switch (eventType) {
case 'text-delta': // テキストチャンク
const delta = payload.textDelta || payload.delta || payload.text || '';
textContent += delta;
contentEl.textContent = textContent;
scrollToBottom();
break;
case 'tool-call': // ツール呼び出し開始
const toolName = payload.toolName || 'unknown';
const toolCallId = payload.toolCallId || '';
const args = payload.args || {};
console.log('Tool call:', toolName, 'ID:', toolCallId);
const panel = createToolPanel(toolName, args);
toolPanels[toolCallId] = panel;
assistantMsg.insertBefore(panel, contentEl);
scrollToBottom();
break;
case 'tool-result': // ツール実行結果
const resultToolCallId = payload.toolCallId || '';
const result = payload.result || {};
if (toolPanels[resultToolCallId]) {
updateToolPanel(toolPanels[resultToolCallId], result);
}
break;
case 'finish': // 完了
if (payload.usage) {
usage = payload.usage;
}
break;
case 'start':
case 'step-start':
case 'step-finish':
case 'reasoning-start':
case 'reasoning-end':
case 'tool-call-delta':
case 'tool-call-input-streaming-start':
case 'tool-call-input-streaming-end':
// これらのイベントはログのみ(必要に応じて処理を追加)
console.debug(eventType, payload);
break;
default:
console.debug('Unknown event type:', eventType, payload);
}
重要な変更点(旧 Data Stream Protocol との違い):
| 項目 | Data Stream Protocol (旧) | Server-Sent Events (新) |
|---|---|---|
| プレフィックス |
0:, 9:, a:, e:, f:
|
data: のみ |
| イベント識別 | プレフィックス文字 | JSON内の type フィールド |
| テキストデルタ | 0:"文字列" |
data: {"type":"text-delta","payload":{"textDelta":"文字列"}} |
| ツール呼び出し | 9:{...} |
data: {"type":"tool-call","payload":{...}} |
| ツール結果 | a:{...} |
data: {"type":"tool-result","payload":{...}} |
| 完了通知 | e:{...} |
data: {"type":"finish","payload":{...}} |
ツールパネルの実装
ツールパネルは、エージェントがツールを呼んだり、ツール処理が終了するたびに画面に表示するパネルです。エージェントの返却値のみを画面にストリーミング表示するのでもよいのですが、実際はしばらくツール呼び出しが連続するので、その間ユーザーは白い画面を見続けることになります。
ツールパネルを見せてあげると、ユーザーも「何かエージェントが頑張っているんだな」と分かり安心する、というものです。

function createToolPanel(toolName, args) {
const panel = document.createElement('div');
panel.className = 'tool-panel';
panel.innerHTML = `
<div class="tool-header" onclick="this.nextElementSibling.classList.toggle('open')">
<span class="tool-icon">🔧</span>
<span class="tool-name">${escapeHtml(toolName)}</span>
<span class="tool-status running">実行中<span class="thinking"></span></span>
</div>
<div class="tool-details">
<div><strong>引数:</strong></div>
<pre>${escapeHtml(JSON.stringify(args, null, 2))}</pre>
<div class="tool-result"></div>
</div>
`;
return panel;
}
function updateToolPanel(panel, result) {
const statusEl = panel.querySelector('.tool-status');
if (statusEl) {
statusEl.className = 'tool-status completed';
statusEl.innerHTML = '完了';
}
const resultEl = panel.querySelector('.tool-result');
if (resultEl) {
resultEl.innerHTML = `
<div><strong>結果:</strong></div>
<pre>${escapeHtml(JSON.stringify(result, null, 2))}</pre>
`;
}
}
ポイント:
- クリックで
tool-detailsの表示/非表示を切り替え -
toolCallIdをキーにしたマップで管理し、結果が返ってきたら対応するパネルを更新
const toolPanels = {}; // toolCallId → panel のマップ
// 呼び出し時
toolPanels[toolCallId] = panel;
// 結果時
if (toolPanels[resultToolCallId]) {
updateToolPanel(toolPanels[resultToolCallId], result);
}
ユーザーの満足度には、クライアントサイドも重要
AIエージェントシステムは、本質的にはサーバサイドのAIエージェントの働きこそが価値の中心ですが、一方で時間のかかる処理の待ち時間にユーザーをいかにイライラさせないでおくか、クライアントサイドの挙動も非常に重要です。
今回はピュアな JavaScript でストリーミング処理のサンプルを示しましたが、AI チャット の実現にフォーカスしたReactやVue向けのコンポーネントもいろいろ出ていますので、ぜひ活用してみてください。
Discussion