💨

Mastra で作る AIエージェント(6) AIの考え中にユーザを待たせない

に公開

Mastra で作るAI エージェント というシリーズの第6回です。


AIエージェントの構成を「三国志」になぞらえて以下のように把握しました。

  • フロントに立つリーダーの劉備=エージェント:何をやるにしても軍師に相談
  • 天才軍師・諸葛孔明=LLM:劉備に何かと助言するが、決して自分が直接前面に出ない
  • 将軍たち=ツール:劉備に呼ばれて定型作業を遂行、RAG/API/MCPなどなど

ひととおり、「チーム劉備」の組成の仕方が分かったところで、前回からしばらくは、チームの組成以外の話をまとめてご紹介しています。今回はストリーミングです。

ストリーミングとは

AI チャットアプリケーションでは、ユーザーの質問に対する回答が長くなることがあります。従来の「リクエスト → 全文レスポンス」方式だと、回答が完成するまでユーザーは何も見えず待たされてしまいイライラします。

そこでストリーミングを使うと、AIが生成したテキストを少しずつリアルタイムに表示できます。ChatGPT のように文字が流れるように表示されるあの挙動のことです。

今回はMastra でのストリーミング処理を見ていきます。実際の現場ではNext.js + assistant-ui などのフレームワークやライブラリを使って高速に開発することが多いですが、ここでは「呼び出し側が何をやっているか」を分かりやすくするため、あえて「ピュアなJavaScript」でAPIを呼び出すコードにしています。

通常のリクエスト(非ストリーミング)

  • ユーザーは5秒とか20秒の間、真っ白な画面で待つ
  • 回答は一度にドンと表示される

ストリーミング

  • テキストが生成されるたびにクライアントに送られる
  • ユーザーはリアルタイムで回答を読める
  • 待ち時間が体感的に短く感じられる

Mastra のストリーミング API

ストリーミングが何か分かったところで、MastraサーバにアクセスするクライアントサイドのHTMLを作っていきます。

npm run dev でローカルサーバを起動しておいて、ローカルに作ったHTMLファイル中のJavaScriptから直接APIをたたいてしまおう、というものです。

最終的な出来上がりイメージは以下のようなものです。

エンドポイントの選択

執筆時点で最新の Mastra v1.2.0 + AI SDK v5 の構成では、LanguageModelV2 インターフェースに対応したストリーミングエンドポイントを使用していると想定します。

エンドポイント 対応インターフェース 形式 プロバイダー
/stream LanguageModelV2 Server-Sent Events(SSE) AI SDK v5 互換プロバイダー(@ai-sdk/openai v2.x/v3.x)
/stream-legacy LanguageModelV1 Data Stream Protocol AI SDK v4 互換プロバイダー(@ai-sdk/openai v1.x)

ちなみに、最新のMastraであっても、 AI SDK v4 互換プロバイダー(@ai-sdk/openai v1.x)の/stream-legacy も使えます。ただ、その場合はストリーミングデータの取り扱い方がData Stream Protocol になりますので、本ページでご案内する方法(SSE)とかなり(まったく、と言っていいほど)違いますので、ご注意ください。

なお、AI SDK v6 が2025年12月に公開されています。今後のMastraアップデートでAI SDK v6対応が追加される可能性があります。

リクエスト形式

const response = await fetch(`${API_BASE}/agents/${AGENT_ID}/${STREAM_ENDPOINT}`, {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    messages: [{ role: 'user', content: 'こんにちは' }],
    threadId: 'optional-thread-id'  // 会話継続時に指定
  })
});

Server-Sent Events (SSE) プロトコル

プロトコルの概要

LanguageModelV2 では Server-Sent Events (SSE) 形式でストリーミングレスポンスが送られます。これは以前の Data Stream Protocol とは異なり、各行が以下の形式になっています:

data: {JSON}

各イベントは JSON オブジェクトとして送信され、type フィールドでイベントの種類を識別します。

イベントタイプ一覧

イベントタイプ 意味 主なペイロードフィールド
start エージェント実行開始 id, runId
step-start 処理ステップ開始 warnings
reasoning-start 推論開始 -
reasoning-end 推論終了 -
tool-call-input-streaming-start ツール入力ストリーミング開始 -
tool-call-delta ツール引数の増分データ toolCallId, argsTextDelta
tool-call-input-streaming-end ツール入力ストリーミング終了 -
tool-call ツール呼び出し確定 toolCallId, toolName, args
tool-result ツール実行結果 toolCallId, result
text-delta テキストチャンク textDelta (または delta, text)
step-finish 処理ステップ終了 messages
finish エージェント実行完了 usage, finishReason

実際のレスポンス例

data: {"type":"start","runId":"abc-123","from":"AGENT","payload":{"id":"project-knowledge-agent-github"}}
data: {"type":"step-start","runId":"abc-123","from":"AGENT","payload":{}}
data: {"type":"tool-call","runId":"abc-123","from":"AGENT","payload":{"toolCallId":"call_xyz","toolName":"searchByFullTextGitHubTool","args":{"query":"デプロイ"}}}
data: {"type":"tool-result","runId":"abc-123","from":"AGENT","payload":{"toolCallId":"call_xyz","result":[...]}}
data: {"type":"text-delta","runId":"abc-123","from":"AGENT","payload":{"textDelta":"デプロイ"}}
data: {"type":"text-delta","runId":"abc-123","from":"AGENT","payload":{"textDelta":"方法について"}}
data: {"type":"text-delta","runId":"abc-123","from":"AGENT","payload":{"textDelta":"説明します"}}
data: {"type":"finish","runId":"abc-123","from":"AGENT","payload":{"usage":{"promptTokens":150,"completionTokens":80}}}

この例では:

  1. エージェント実行が開始(start
  2. ステップが開始(step-start
  3. ツールが呼び出され(tool-call
  4. ツールの結果が返り(tool-result
  5. テキストが3回に分けて送られ(text-delta
  6. 処理が完了(finish

クライアント側の実装の全体像

クライアント側のストリーミング処理は、例えば以下の流れで実装します:

それでは、以下ステップ・バイ・ステップでソースコード例を見てみましょう。

ステップ1~2 : ストリームの読み取り準備

const response = await fetch(`${API_BASE}/agents/${AGENT_ID}/${STREAM_ENDPOINT}`, {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(requestBody)
});

// レスポンスが正常か確認
if (!response.ok) {
  throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

// ストリームリーダーとデコーダーを準備
const reader = response.body.getReader();
const decoder = new TextDecoder();

ポイント:

  • response.body は ReadableStream オブジェクト
  • getReader() でストリームを読み取るリーダーを取得
  • TextDecoder でバイナリをテキストに変換

ステップ3: チャンク読み取りループ

let buffer = '';  // 未処理データを保持するバッファ

while (true) {
  const { done, value } = await reader.read();
  if (done) break;  // ストリーム終了

  // バイナリをテキストに変換してバッファに追加
  buffer += decoder.decode(value, { stream: true });

  // ... 行ごとの処理(後述)
}

ポイント:

  • reader.read(){done, value} を返す Promise
  • done === true でストリーム終了
  • decoder.decode(value, { stream: true })stream: true オプションは、マルチバイト文字が途中で分割された場合に対応

ステップ4~5: 行ごとに分割してSSE形式をパース

// 行ごとに分割
const lines = buffer.split('\\n');

// 最後の要素は不完全な行の可能性があるのでバッファに残す
buffer = lines.pop() || '';

for (const line of lines) {
  if (!line.trim()) continue;  // 空行はスキップ

  // SSE形式: "data: {JSON}" を検出
  if (!line.startsWith('data: ')) {
    console.debug('Non-data line:', line);
    continue;
  }

  // "data: " の後のJSON部分を抽出
  const jsonStr = line.substring(6);
  if (!jsonStr.trim()) continue;

  try {
    const event = JSON.parse(jsonStr);
    const eventType = event.type;
    const payload = event.payload || {};

    // ... イベントタイプに応じた処理(後述)
  } catch (e) {
    // JSON解析失敗は無視(不完全なチャンクの可能性)
    console.debug('Parse error:', e);
  }
}

なぜバッファが必要か?

ストリームのチャンクは、必ずしも行の区切りで分割されるわけではありません:

[チャンク1] "data: {\\"type\\":\\"text-delta\\",\\"payload\\":{\\"textDel"
[チャンク2] "ta\\":\\"こんにちは\\"}}\\ndata: {\\"type\\":..."

チャンク1の最後は不完全な行なので、次のチャンクと結合してから処理する必要があります。


ステップ6~7: イベントタイプごとの処理

switch (eventType) {
  case 'text-delta':  // テキストチャンク
    const delta = payload.textDelta || payload.delta || payload.text || '';
    textContent += delta;
    contentEl.textContent = textContent;
    scrollToBottom();
    break;

  case 'tool-call':  // ツール呼び出し開始
    const toolName = payload.toolName || 'unknown';
    const toolCallId = payload.toolCallId || '';
    const args = payload.args || {};

    console.log('Tool call:', toolName, 'ID:', toolCallId);

    const panel = createToolPanel(toolName, args);
    toolPanels[toolCallId] = panel;
    assistantMsg.insertBefore(panel, contentEl);
    scrollToBottom();
    break;

  case 'tool-result':  // ツール実行結果
    const resultToolCallId = payload.toolCallId || '';
    const result = payload.result || {};

    if (toolPanels[resultToolCallId]) {
      updateToolPanel(toolPanels[resultToolCallId], result);
    }
    break;

  case 'finish':  // 完了
    if (payload.usage) {
      usage = payload.usage;
    }
    break;

  case 'start':
  case 'step-start':
  case 'step-finish':
  case 'reasoning-start':
  case 'reasoning-end':
  case 'tool-call-delta':
  case 'tool-call-input-streaming-start':
  case 'tool-call-input-streaming-end':
    // これらのイベントはログのみ(必要に応じて処理を追加)
    console.debug(eventType, payload);
    break;

  default:
    console.debug('Unknown event type:', eventType, payload);
}

重要な変更点(旧 Data Stream Protocol との違い):

項目 Data Stream Protocol (旧) Server-Sent Events (新)
プレフィックス 0:, 9:, a:, e:, f: data: のみ
イベント識別 プレフィックス文字 JSON内の type フィールド
テキストデルタ 0:"文字列" data: {"type":"text-delta","payload":{"textDelta":"文字列"}}
ツール呼び出し 9:{...} data: {"type":"tool-call","payload":{...}}
ツール結果 a:{...} data: {"type":"tool-result","payload":{...}}
完了通知 e:{...} data: {"type":"finish","payload":{...}}

ツールパネルの実装

ツールパネルは、エージェントがツールを呼んだり、ツール処理が終了するたびに画面に表示するパネルです。エージェントの返却値のみを画面にストリーミング表示するのでもよいのですが、実際はしばらくツール呼び出しが連続するので、その間ユーザーは白い画面を見続けることになります。

ツールパネルを見せてあげると、ユーザーも「何かエージェントが頑張っているんだな」と分かり安心する、というものです。

function createToolPanel(toolName, args) {
  const panel = document.createElement('div');
  panel.className = 'tool-panel';
  panel.innerHTML = `
    <div class="tool-header" onclick="this.nextElementSibling.classList.toggle('open')">
      <span class="tool-icon">🔧</span>
      <span class="tool-name">${escapeHtml(toolName)}</span>
      <span class="tool-status running">実行中<span class="thinking"></span></span>
    </div>
    <div class="tool-details">
      <div><strong>引数:</strong></div>
      <pre>${escapeHtml(JSON.stringify(args, null, 2))}</pre>
      <div class="tool-result"></div>
    </div>
  `;
  return panel;
}

function updateToolPanel(panel, result) {
  const statusEl = panel.querySelector('.tool-status');
  if (statusEl) {
    statusEl.className = 'tool-status completed';
    statusEl.innerHTML = '完了';
  }

  const resultEl = panel.querySelector('.tool-result');
  if (resultEl) {
    resultEl.innerHTML = `
      <div><strong>結果:</strong></div>
      <pre>${escapeHtml(JSON.stringify(result, null, 2))}</pre>
    `;
  }
}

ポイント:

  • クリックで tool-details の表示/非表示を切り替え
  • toolCallId をキーにしたマップで管理し、結果が返ってきたら対応するパネルを更新
const toolPanels = {};  // toolCallId → panel のマップ

// 呼び出し時
toolPanels[toolCallId] = panel;

// 結果時
if (toolPanels[resultToolCallId]) {
  updateToolPanel(toolPanels[resultToolCallId], result);
}


ユーザーの満足度には、クライアントサイドも重要

AIエージェントシステムは、本質的にはサーバサイドのAIエージェントの働きこそが価値の中心ですが、一方で時間のかかる処理の待ち時間にユーザーをいかにイライラさせないでおくか、クライアントサイドの挙動も非常に重要です。

今回はピュアな JavaScript でストリーミング処理のサンプルを示しましたが、AI チャット の実現にフォーカスしたReactやVue向けのコンポーネントもいろいろ出ていますので、ぜひ活用してみてください。

>> 次回 : (7) JSONで結果を返してほしい

Discussion