🙆

Express.js + Next.jsで実装するMCP Streaming Chat

に公開

Express.js + Next.jsで実装するMCP Streaming Chat



1. はじめに

前回の記事「Express.js + Next.jsで実装するMCP Streamable HTTP Transport」では、MCPの通信基盤となるStreamable HTTPトランスポートの実装方法について解説しました。本記事はその続編として、最新のMCP仕様(2025-03-26)を活用したストリーミングチャットの実装方法について詳しく解説します。

MCPの基礎知識については前回の記事を参照していただくとして、今回は特にストリーミング機能の実装に焦点を当て、サーバー側とクライアント側の両方の実装を見ていきます。

Express.js + Next.jsで実装するMCP Streamable HTTP Transport:
https://zenn.dev/sbk0716/articles/ac111eca8ec263

対象読者

  • MCPを使ったリアルタイムアプリケーション開発に興味がある方
  • ストリーミングチャットの実装方法を学びたい方
  • Express.jsとNext.jsを使った開発経験がある方
  • 前回の記事を読んでMCPの基礎を理解している方

本記事で学べること

  • 最新のMCP仕様(2025-03-26)の主要な変更点と特徴
  • MCPを使用したストリーミングチャットの実装方法
  • サーバー側とクライアント側の連携方法
  • 再開可能性(Resumability)やエラー回復などの高度な機能の実装方法

GitHubリポジトリ

このプロジェクトのソースコードは以下のGitHubリポジトリで公開しています:

MCP Streaming Chat Repository:
https://github.com/sbk0716/mcp-streaming-chat

リポジトリには、バックエンドとフロントエンドの完全なソースコードが含まれており、READMEにはセットアップと実行手順が詳細に記載されています。

注意: 本記事では、コードの理解に必要な最小限の部分のみを抜粋して紹介しています。完全なコードはGitHubリポジトリを参照してください。

2. MCP 2025-03-26の主要な変更点

【参照情報】

最新のMCP仕様(2025-03-26)では、前回のバージョン(2024-11-05)から以下の主要な変更が行われました:

  1. Streamable HTTPトランスポート: 以前のHTTP+SSEトランスポートをより柔軟なStreamable HTTPトランスポートに置き換え
  2. JSON-RPCバッチング: 複数のリクエスト/レスポンスをバッチ処理する機能のサポート
  3. ツールアノテーション: ツールの動作をより詳細に説明するためのアノテーション機能の追加
  4. OAuth 2.1ベースの認証フレームワーク: 包括的な認証フレームワークの追加

これらの変更点の中でも、特にStreamable HTTPトランスポートとJSON-RPCバッチングは、ストリーミングチャットの実装において重要な役割を果たします。

Streamable HTTPトランスポートの詳細

【参照情報】

  • MCP仕様書: Streamable HTTP
  • MCP TypeScript SDK: server/streamableHttp.tshandleRequestメソッド
  • MCP TypeScript SDK: client/streamableHttp.tssendメソッド
  • MCP仕様書: セッション管理

Streamable HTTPトランスポートは、以前のHTTP+SSEトランスポートを置き換える新しい通信方式です。このトランスポートは、クライアントとサーバー間の通信を単一のHTTPエンドポイントで効率的に行うことを可能にします。

以下のシーケンス図は、Streamable HTTPトランスポートの基本的な通信フローを示しています。この図では、クライアントとサーバー間の通信が3つの主要なフェーズ(初期化、ツール呼び出し、セッション終了)に分かれていることが分かります。特に重要なのは、ツール呼び出しフェーズでサーバーがSSEストリーミングレスポンスまたはJSON直接レスポンスのいずれかを返せる点です。ストリーミングチャットの実装では、SSEストリーミングレスポンスを使用して、メッセージを複数のチャンクに分割して送信します。

「SSE: 空のレスポンス」について

シーケンス図に示されている「SSE: 空のレスポンス」について説明します。これは、Streamable HTTPトランスポートの重要な特徴の一つです。

なぜ空のレスポンスを返すのか?

  1. データ配信の分離: 実際のコンテンツは「通知(notification)」として送信され、最終的なJSON-RPCレスポンスは単なるプレースホルダーとして空になっています。

  2. プロトコル設計: MCPのStreamable HTTPトランスポートでは、ツール呼び出しの結果を2つの方法で返すことができます:

    • 通知メカニズムを使用してストリーミング形式でデータを送信
    • 最終的なJSON-RPCレスポンスとして結果を返す
  3. ストリーミングパターン: ストリーミングチャットの場合、すべてのコンテンツは既に通知として送信済みのため、最終レスポンスは「空」になります。

技術的な実装

chat-stream.tsの実装では、以下のようにコメントされています:

// 重要な変更点: 空のコンテンツを返す
// すべてのチャンクは通知として送信されるため、レスポンスは単なるプレースホルダー
return {
  content: [
    {
      type: 'text' as const,
      text: '', // テキスト内容 - 空文字列(すべての内容は通知として送信済み)
      // ...
    },
  ],
}

技術的な理由

このパターンには以下のような技術的な理由があります:

  1. プロトコル要件: MCPのJSON-RPCプロトコルでは、リクエストに対して必ずレスポンスを返す必要があります。しかし、実際のデータは既に通知として送信済みなので、レスポンスの内容は空になります。

  2. クライアント-サーバー通信の完了: 空のレスポンスは「すべてのデータが送信完了した」という合図として機能します。これにより、クライアントはストリーミングが完了したことを知ることができます。

  3. SSEストリームの終了: Streamable HTTPトランスポートの仕様によると、「すべてのJSON-RPCレスポンスが送信された後、サーバーはSSEストリームを閉じるべき」とされています。空のレスポンスを送信することで、このプロセスが完了します。

Streamable HTTPトランスポートの主な特徴は以下の通りです:

  • 単一エンドポイント: /mcpなどの単一のHTTPエンドポイントでPOSTとGETの両方をサポート
  • メッセージ送信: クライアントからサーバーへのメッセージはHTTP POSTで送信
  • レスポンス形式: サーバーからクライアントへのレスポンスはSSE形式で返却(enableJsonResponse: falseの設定により)
  • セッション管理: Mcp-Session-Idヘッダーを使用した強化されたセッション管理機能
  • 再開可能性: Last-Event-IDヘッダーを使用した再開可能性(Resumability)のサポート

JSON-RPCバッチングのサポート

【参照情報】

JSON-RPCバッチングは、複数のリクエストを一度に送信することで、ネットワークオーバーヘッドを削減し、パフォーマンスを向上させる機能です。ストリーミングチャットでは、複数のメッセージや通知を効率的に送信するために活用できます。

以下は、JSON-RPCバッチングを使用して複数のツール呼び出しを一度に行う例です:

// バッチリクエストの例 - 複数のリクエストを1回のHTTPリクエストで送信して効率化
const batchRequests = [
  {
    method: 'tools/call', // JSON-RPCメソッド名 - ツール呼び出しを指定
    params: { 
      name: 'chat_stream', // 呼び出すツール名 - ストリーミングチャットツール
      arguments: { message: 'こんにちは' } // ツールに渡す引数 - ユーザーメッセージ
    },
    id: '1' // リクエスト識別子 - レスポンスとの対応付けに使用
  },
  {
    method: 'tools/call', // 2つ目のツール呼び出し
    params: { 
      name: 'dice', // サイコロツール
      arguments: { sides: 6 } // 6面サイコロを指定
    },
    id: '2' // 2つ目のリクエスト識別子
  }
];

// MCPクライアントのrequestBatchメソッドを使用してバッチリクエストを送信
// 戻り値はリクエストと同じ順序で返される配列
const batchResults = await mcpClient.requestBatch(batchRequests);

ツールアノテーションの追加

【参照情報】

ツールアノテーションは、ツールの動作をより詳細に説明するための機能です。これにより、クライアントはツールの特性をより正確に理解し、適切な使用方法を判断できます。

以下は、ツールアノテーションを使用してストリーミングチャットツールを定義する例です:

// ツールの定義例 - MCPサーバーにツールを登録する関数呼び出し
mcpServer.tool(
  'chat_stream', // ツール名 - クライアントからの呼び出しに使用される識別子
  'ストリーミングチャットツール', // ツールの説明 - クライアントに表示される説明文
  {
    message: z.string().describe('ユーザーからのメッセージ'), // 入力パラメータの定義 - zodスキーマを使用して型と説明を指定
  },
  { // ツールアノテーション - ツールの特性を示す追加情報
    title: 'ストリーミングチャット', // ツールのタイトル - UIに表示される名前
    readOnlyHint: true, // 読み取り専用フラグ - データを変更しないことを示す
    openWorldHint: false, // オープンワールドフラグ - 限定的な入力のみ受け付けることを示す
    destructiveHint: false // 破壊的操作フラグ - 破壊的な操作を行わないことを示す
  },
  async (args, context) => { // ツールのハンドラ関数 - 実際の処理を行う非同期関数
    // ツールの実装(省略) - 実際のコードはchat-stream.tsを参照
  }
);

OAuth 2.1ベースの認証フレームワーク

【参照情報】

  • MCP仕様書: 認証フレームワーク
  • MCP TypeScript SDK: server/auth/provider.tsOAuthServerProviderインターフェース
  • MCP TypeScript SDK: server/auth/router.tsmcpAuthRouter関数

最新のMCP仕様では、OAuth 2.1ベースの包括的な認証フレームワークが追加されました。これにより、セキュアな認証と認可が可能になります。認証フレームワークは、クライアントの認証、アクセストークンの検証、スコープベースの認可などの機能を提供します。

以下は、OAuth認証プロバイダーを設定する例です:

// OAuth認証プロバイダーの設定 - 簡略化バージョン
const authProvider = new ProxyOAuthServerProvider({
  endpoints: {
    authorizationUrl: "https://auth.example.com/oauth2/v1/authorize", 
    tokenUrl: "https://auth.example.com/oauth2/v1/token", 
    revocationUrl: "https://auth.example.com/oauth2/v1/revoke", 
  },
  verifyAccessToken: async (token) => {
    // トークン検証ロジック(実際の実装では署名検証やDB照会などを行う)
    return { token, clientId: "client123", scopes: ["chat", "dice"] }
  }
});

3. ストリーミングチャットの実装アーキテクチャ

【参照情報】

  • MCP TypeScript SDK: server/streamableHttp.tsclient/streamableHttp.tsの実装
  • プロジェクトコード: backend/src/tools/chat-stream.tsの実装
  • プロジェクトコード: frontend/lib/mcp-client.tsの実装
  • プロジェクトコード: frontend/src/app/components/ChatClient.tsxの実装
  • MCP仕様書: 通知機能

ストリーミングチャットの実装では、サーバー側とクライアント側の連携が重要です。この連携を効果的に行うためには、全体のシステム構成を理解することが必要です。

以下のシステム構成図は、ストリーミングチャットアプリケーションの主要コンポーネントとその関係を示しています。この図では、フロントエンド(Next.js)とバックエンド(Express.js)の両方のコンポーネントが表示されています。特に重要なのは、フロントエンドのStreamableHTTPClientTransportとバックエンドのStreamableHTTPServerTransportが直接通信している点です。これにより、MCPプロトコルに基づいた効率的な通信が可能になります。

サーバー側とクライアント側の責務分担

【参照情報】

  • プロジェクトコード: backend/src/tools/chat-stream.tsの実装
  • プロジェクトコード: frontend/lib/mcp-client.tsの実装
  • MCP仕様書: ツール実装
  • MCP仕様書: 通知ハンドラ

ストリーミングチャットの実装では、サーバー側とクライアント側で明確な責務分担が必要です。これにより、コードの保守性が向上し、拡張も容易になります。

サーバー側の責務:

  • ストリーミングチャットツールの実装 - chat-stream.tsでツールを定義
  • メッセージの生成と分割 - 文章単位でメッセージをチャンクに分割
  • チャンクの送信とプログレス管理 - 通知機能を使用して各チャンクを送信
  • セッション管理とイベントストア - セッション状態の保持と再開可能性のサポート

クライアント側の責務:

  • MCPクライアントの初期化と管理 - シングルトンパターンでクライアントを管理
  • 通知ハンドラの設定 - チャンク受信のためのハンドラを設定
  • チャンクの受信と処理 - 受信したチャンクをUIに反映
  • UIの更新とアニメーション - プログレスバーやタイピングインジケーターの実装

データフローとシーケンス図

【参照情報】

  • MCP TypeScript SDK: server/streamableHttp.tshandleRequestメソッド
  • MCP TypeScript SDK: client/streamableHttp.tssendメソッド
  • プロジェクトコード: backend/src/tools/chat-stream.tsの実装
  • MCP仕様書: Streamable HTTP

ストリーミングチャットのデータフローを理解することは、効果的な実装を行うために重要です。以下のシーケンス図は、ユーザーがメッセージを入力してから、チャンクに分割された応答が表示されるまでの一連の流れを示しています。

この図では、ユーザーからのメッセージがChatClientコンポーネントからMCPClientを通じてサーバーに送信され、サーバー側でChatStreamToolによって処理される様子を表しています。特に重要なのは、ChatStreamToolがメッセージを複数のチャンクに分割し、それぞれのチャンクを通知として順次送信する点です。各チャンクには進捗情報などのメタデータが付加され、クライアント側でプログレスバーやタイピングインジケーターなどのUI要素を制御するために使用されます。

この図を理解することで、ストリーミングチャットの実装における各コンポーネントの役割と相互作用が明確になり、効率的な実装が可能になります。

4. サーバー側の実装

【参照情報】

  • MCP仕様書: ツール実装
  • MCP仕様書: 通知機能
  • プロジェクトコード: backend/src/tools/chat-stream.tsの実装
  • MCP TypeScript SDK: server/mcp.tstoolメソッド
  • MCP TypeScript SDK: server/mcp.tssendNotificationメソッド

サーバー側の実装では、ストリーミングチャットツールを定義し、メッセージをチャンクに分割して送信する機能を実装します。この実装の中心となるのは、MCPの通知機能(Notification)を使用したストリーミング処理です。

以下のシーケンス図は、サーバー側でのストリーミングチャット処理の流れを示しています。クライアントからのリクエストを受け取ると、メッセージを生成し、チャンクに分割して、各チャンクを通知として送信します。最後に空のレスポンスを返します。

ストリーミングチャットツールの実装

ストリーミングチャットツールの核となる部分は以下のようになります:

export const chatStreamToolDefinition = {
  name: 'chat_stream',
  description: '質問に対して段階的に回答を返します(ストリーミング)',
  parameters: {
    message: z.string().describe('ユーザーからの質問'),
  },
  handler: async (args: { message: string }, context: any) => {
    const { message } = args
    const { sendNotification } = context
    
    let generatedMessage = generateResponseFromClientMessage(message)
    generatedMessage = generatedMessage.replace(/\n+/g, ' ')
    const chunks = generatedMessage.split(/(?<=\.|\。)/)
    const filteredChunks = chunks.filter((chunk) => chunk.trim().length > 0)
    
    if (filteredChunks.length > 0) {
      const progress = Math.floor((1 / filteredChunks.length) * 100)
      const isComplete = filteredChunks.length === 1
      
      await sendNotification({
        method: 'notifications/message',
        params: {
          level: 'info',
          data: filteredChunks[0],
          metadata: {
            streaming: !isComplete,
            progress: progress,
            totalChunks: filteredChunks.length,
            currentChunk: 1,
            isComplete: isComplete,
          },
        },
      })
    }
    
    for (let i = 1; i < filteredChunks.length; i++) {
      await new Promise(resolve => setTimeout(resolve, 300))
      
      const chunkProgress = Math.floor(((i + 1) / filteredChunks.length) * 100)
      const chunkIsComplete = i >= filteredChunks.length - 1
      
      await sendNotification({
        method: 'notifications/message',
        params: {
          level: 'info',
          data: filteredChunks[i],
          metadata: {
            streaming: !chunkIsComplete,
            progress: chunkProgress,
            totalChunks: filteredChunks.length,
            currentChunk: i + 1,
            isComplete: chunkIsComplete,
          },
        },
      })
    }
    
    return {
      content: [{ type: 'text' as const, text: '' }],
    }
  }
}

チャンク分割とメタデータの活用

【参照情報】

  • MCP仕様書: 通知メタデータ
  • プロジェクトコード: frontend/src/app/components/ChatClient.tsxの実装

ストリーミングチャットの実装では、メッセージをチャンクに分割し、各チャンクにメタデータを付加することが重要です。このメタデータは、クライアント側でプログレスバーやタイピングインジケーターなどのUI要素を制御するために使用されます。

メタデータの主な要素は以下の通りです:

  1. streaming: ストリーミング中かどうかを示すブール値。最後のチャンクではfalseに設定します。
  2. progress: 進捗率(0-100%)を示す数値。チャンク番号と全チャンク数から計算します。
  3. totalChunks: 全チャンク数を示す数値。
  4. currentChunk: 現在のチャンク番号を示す数値。
  5. isComplete: 最後のチャンクかどうかを示すブール値。

これらのメタデータを活用することで、クライアント側では以下のようなUI要素を実装できます:

  • プログレスバー: progress値に基づいて進捗状況を視覚的に表示
  • タイピングインジケーター: streamingtrueの間、タイピング中のアニメーションを表示
  • チャンクカウンター: currentChunktotalChunksを使用して「X/Y チャンク」のように表示

通知機能を使用したストリーミング処理

【参照情報】

MCPの通知機能(Notification)は、サーバーからクライアントへの一方向のメッセージ送信に使用されます。ストリーミングチャットでは、この通知機能を活用して、メッセージのチャンクを順次送信します。

通知の送信には、sendNotification関数を使用します。この関数は、MCPサーバーのコンテキストから提供され、以下のような形式で使用します:

// 通知を送信する例 - MCPの通知機能を使用してチャンクを送信
await sendNotification({
  method: "notifications/message", // 通知メソッド - MCP仕様で定義された標準的なメッセージ通知メソッド
  params: {
    level: "info", // 通知レベル - 情報レベルの通知(他にwarning, errorなどがある)
    data: chunk, // 通知データ - チャンクの実際のテキスト内容(ユーザーに表示される部分)
    metadata: { // メタデータ - クライアント側でUIの制御に使用する追加情報
      streaming: true, // ストリーミングフラグ - まだストリーミング中であることを示す(最後のチャンクではfalse)
      progress: 50, // 進捗率 - 全体の50%まで進んだことを示す(0-100%の値)
      totalChunks: 10, // 全チャンク数 - 全部で10チャンクあることを示す
      currentChunk: 5, // 現在のチャンク番号 - 現在5チャンク目であることを示す
      isComplete: false // 完了フラグ - まだ完了していないことを示す(最後のチャンクではtrue)
    }
  }
});

サーバー側で送信された通知は、Streamable HTTPトランスポートによってSSEイベントとしてクライアントに送信されます。クライアント側では、通知ハンドラを設定して、これらのイベントを受信・処理します。

MCPの通知メカニズム

MCPの通知機能はサーバーからクライアントへの一方向のメッセージ送信を実現します。主要なコンポーネントと流れは以下の通りです:

  1. サーバー側: sendNotification関数を使用して通知を送信
  2. トランスポート層: 通知をSSEイベントに変換
  3. クライアント側: setNotificationHandlerで登録したハンドラが通知を受信・処理

サーバー側のコード例:

await sendNotification({
  method: "notifications/message",
  params: {
    level: "info",
    data: chunk,
    metadata: { /* プログレス情報など */ }
  }
});

クライアント側のコード例:

mcpClient.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => {
  const text = notification.params.data as string;
  const metadata = notification.params.metadata || {};
  onChunkReceived(text, metadata);
});

この仕組みにより、サーバーからクライアントへのリアルタイムなメッセージ送信が可能になります。

SSEイベントの送受信

【参照情報】

  • MCP TypeScript SDK: server/streamableHttp.tswriteSSEEventメソッド
  • MCP TypeScript SDK: client/streamableHttp.ts_handleSseStreamメソッド
  • Server-Sent Events (SSE) 仕様: MDN Web Docs

トランスポート層では、JSONオブジェクトをSSE(Server-Sent Events)形式に変換して送信し、クライアント側で再度JSONオブジェクトに変換しています。

サーバー側の送信処理

サーバー側では、writeSSEEventメソッドがJSONオブジェクトをSSEイベントとして整形します:

// SSEイベントのフォーマット
eventData = `event: message\n`; // イベントタイプ
if (eventId) eventData += `id: ${eventId}\n`; // イベントID(再開可能性用)
eventData += `data: ${JSON.stringify(message)}\n\n`; // JSON文字列化したデータ

実際の送信データは以下のような形式になります:

event: message
id: 1234-5678-90ab-cdef
data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"こんにちは","metadata":{"streaming":true,"progress":25,"totalChunks":4,"currentChunk":1}}}

クライアント側の受信処理

クライアント側では、_handleSseStreamメソッドがSSEイベントを処理します:

  1. バイナリストリーム → テキストデコーダ → SSEパーサーを通してイベントを抽出
  2. イベントIDがあれば保存(再開可能性のため)
  3. event.dataJSON.parseでJSONオブジェクトに変換
  4. スキーマ検証後、登録済みのハンドラに渡す

このように、JSONオブジェクト → JSON文字列 → SSEイベント → HTTP送信 → SSEイベント解析 → JSON文字列 → JSONオブジェクトという変換が行われ、構造化されたデータをストリーミング形式で効率的に送受信できます。

エラーハンドリングとログ記録

【参照情報】

  • プロジェクトコード: backend/src/utils/logger.tsの実装
  • MCP TypeScript SDK: エラーハンドリングのベストプラクティス

ストリーミング処理中のエラーを適切に処理するために、各チャンク送信時にtry-catchブロックを使用することが重要です。エラーが発生した場合は、ログに記録し、可能であれば処理を継続します。

エラーハンドリングの基本的なパターンは以下の通りです:

// エラーハンドリングとログ記録の例 - 簡略化
try {
  await sendNotification({
    method: "notifications/message",
    params: { level: "info", data: chunk }
  });
  logMessage(`チャンク送信成功: ${i + 1}/${totalChunks}`);
} catch (err) {
  logMessage(`通知送信エラー: ${err}`, 'error');
  if (isCriticalError(err)) throw err;
}

このようなエラーハンドリングにより、一部のチャンク送信に失敗しても、全体の処理が中断されることなく、可能な限りメッセージを送信することができます。また、エラーの詳細をログに記録することで、問題の診断と解決が容易になります。

5. クライアント側の実装

【参照情報】

  • MCP TypeScript SDK: client/index.tsClientクラス
  • MCP TypeScript SDK: client/streamableHttp.tsStreamableHTTPClientTransportクラス
  • MCP仕様書: 通知ハンドラ
  • MCP仕様書: 再開可能性
  • プロジェクトコード: frontend/lib/mcp-client.tsの実装

クライアント側では、MCPクライアントを初期化し、通知ハンドラを設定して、チャンクを受信・処理します。クライアント側の実装は、サーバーから送信されるストリーミングチャンクを効率的に処理し、UIに反映するための重要な役割を担っています。

以下のシーケンス図は、クライアント側の初期化から通知処理までの流れを示しています。

以下は、mcp-client.tsの主要な実装です:

// frontend/lib/mcp-client.ts
import { Client } from '@modelcontextprotocol/sdk/client/index.js'
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
import {
  CallToolRequest,
  CallToolResultSchema,
  LoggingMessageNotificationSchema,
} from '@modelcontextprotocol/sdk/types.js'

// シングルトンインスタンス
let client: Client | null = null
let transport: StreamableHTTPClientTransport | null = null
let sessionId: string | undefined

const SERVER_URL = 'http://localhost:3000/mcp'

// MCPクライアントを初期化する関数
export async function initializeClient() {
  if (client && transport) return client

  try {
    console.log('Initializing MCP client...')
    
    client = new Client({
      name: 'mcp-streaming-chat-frontend',
      version: '1.0.0',
    })

    client.onerror = (error) => {
      console.error('Client error:', error)
    }

    transport = new StreamableHTTPClientTransport(
      new URL(SERVER_URL),
      {
        sessionId,
        requestInit: {
          headers: {
            'Content-Type': 'application/json',
            Accept: 'application/json, text/event-stream',
          },
          credentials: 'include',
          mode: 'cors',
        },
      }
    )

    await client.connect(transport)
    
    if (transport.sessionId) {
      sessionId = transport.sessionId
      console.log('Session initialized with ID:', sessionId)
    }

    return client
  } catch (error) {
    console.error('Error initializing client:', error)
    throw error
  }
}

// ストリーミングチャットメッセージを送信する関数
export async function sendStreamingChatMessage(
  message: string,
  onChunkReceived: (text: string, metadata: any) => void
) {
  if (!message.trim()) {
    throw new Error('Message cannot be empty')
  }

  const mcpClient = await initializeClient()

  try {
    mcpClient.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => {
      const text = notification.params.data as string
      const metadata = notification.params.metadata || {}
      
      onChunkReceived(text, metadata)
    })

    const req: CallToolRequest = {
      method: 'tools/call',
      params: {
        name: 'chat_stream',
        arguments: { message },
      },
    }

    let lastEventId: string | undefined

    const res = await mcpClient.request(req, CallToolResultSchema, {
      onresumptiontoken: (eventId) => {
        lastEventId = eventId
      }
    })

    return res
  } catch (error) {
    console.error('Error sending streaming chat message:', error)
    throw error
  }
}

通知ハンドラの設定とチャンク処理

【参照情報】

クライアント側では、LoggingMessageNotificationSchemaを使用して通知ハンドラを設定し、サーバーから送信されるチャンクを受信・処理します。通知ハンドラは、サーバーから送信される通知メッセージを受信し、適切に処理するためのコールバック関数です。

// 通知ハンドラの設定
mcpClient.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => {
  // 通知からテキストとメタデータを抽出
  const text = notification.params.data as string // チャンクのテキスト内容
  const metadata = notification.params.metadata || {} // メタデータ(進捗情報など)
  
  // コールバック関数を呼び出してチャンクを通知
  // これにより、UIコンポーネントがチャンクを受け取り、表示を更新できる
  onChunkReceived(text, metadata)
})

通知ハンドラを使用することで、サーバーからのストリーミングチャンクをリアルタイムで処理し、UIに反映することができます。各チャンクには、テキスト内容だけでなく、進捗情報などのメタデータも含まれており、これらを活用してプログレスバーやタイピングインジケーターなどのUI要素を制御できます。

再接続メカニズムと再開可能性の実装

【参照情報】

最新のMCP仕様では、再接続メカニズムと再開可能性(Resumability)が強化されています。これにより、ネットワーク接続が一時的に切断された場合でも、最後に受信したイベントIDを使用して、そこから再開することができます。

// 再開可能性の実装
const res = await mcpClient.request(req, CallToolResultSchema, {
  // 再接続時に最後のイベントIDを使用するための設定
  // このコールバックは、各イベントが受信されるたびに呼び出される
  onresumptiontoken: (eventId) => {
    // 最後のイベントIDを保存
    lastEventId = eventId
    
    // 必要に応じて、ローカルストレージなどに保存することも可能
    // localStorage.setItem('last-event-id', eventId);
  }
})

再接続時には、保存した最後のイベントIDを使用して、サーバーに再接続リクエストを送信します。サーバーは、このイベントID以降のイベントを再送信します。これにより、接続が切断された場合でも、メッセージの欠落を防ぐことができます。

セッション管理の実装

【参照情報】

  • MCP仕様書: セッション管理
  • MCP TypeScript SDK: client/streamableHttp.tssessionIdプロパティ
  • MCP TypeScript SDK: client/streamableHttp.tsterminateSessionメソッド

クライアント側では、セッションIDを保存し、再接続時に使用します。また、セッションを明示的に終了するためのterminateSession関数も実装しています。セッション管理は、ステートフルなチャットアプリケーションを実装する上で重要な要素です。

// セッションIDの保存
if (transport.sessionId) {
  // トランスポートから取得したセッションIDを保存
  sessionId = transport.sessionId
  console.log('Session initialized with ID:', sessionId)
  
  // 必要に応じて、ローカルストレージなどに保存することも可能
  // localStorage.setItem('mcp-session-id', sessionId);
}

// セッションの終了
export async function terminateSession() {
  // トランスポートが初期化されていない場合は何もしない
  if (!transport) {
    return
  }

  try {
    // サーバーにDELETEリクエストを送信してセッションを終了
    await transport.terminateSession()
    // セッションIDをクリア
    sessionId = undefined
    // クライアントとトランスポートを閉じる
    await closeClient()
  } catch (error) {
    console.error('Error terminating session:', error)
    throw error
  }
}

セッション管理を適切に実装することで、ユーザーは会話の文脈を維持したまま、チャットを継続することができます。また、セッションを明示的に終了することで、サーバー側のリソースを適切に解放することができます。

6. UIコンポーネントの実装

【参照情報】

  • プロジェクトコード: frontend/src/app/components/ChatClient.tsxの実装
  • MCP TypeScript SDK: client/index.tssetNotificationHandlerメソッド
  • React Hooks: useState, useEffect, useRefの使用方法
  • Tailwind CSS: UIコンポーネントのスタイリング

UIコンポーネントでは、ストリーミングメッセージの表示とアニメーション、プログレスバーとタイピングインジケーター、接続状態の視覚的フィードバックなどを実装します。以下の図は、UIコンポーネントの主要な部分と、それらがどのように相互作用するかを示しています。

以下は、ChatClient.tsxの主要な実装です:

// frontend/src/app/components/ChatClient.tsx
'use client'

import { useState, useEffect, useRef } from 'react'
import {
  initializeClient,
  getSessionId,
  terminateSession,
  sendStreamingChatMessage,
} from '../../../lib/mcp-client'

type ConnectionState = 'connecting' | 'connected' | 'reconnecting' | 'disconnected'

interface Message {
  id: string
  text: string
  sender: 'user' | 'bot'
  timestamp: Date
  isStreaming?: boolean
  progress?: number
  totalChunks?: number
  currentChunk?: number
}

export default function ChatClient() {
  const [sessionId, setSessionId] = useState<string | undefined>()
  const [messages, setMessages] = useState<Message[]>([])
  const [inputMessage, setInputMessage] = useState('')
  const [loading, setLoading] = useState(false)
  const [error, setError] = useState<string | null>(null)
  const [connectionState, setConnectionState] = useState<ConnectionState>('connecting')

  const messagesEndRef = useRef<HTMLDivElement>(null)

  useEffect(() => {
    const initialize = async () => {
      setLoading(true)
      setError(null)
      setConnectionState('connecting')

      try {
        await initializeClient()
        const currentSessionId = getSessionId()
        setSessionId(currentSessionId)

        if (currentSessionId) {
          setConnectionState('connected')
        } else {
          setConnectionState('disconnected')
        }

        setMessages([
          {
            id: Date.now().toString(),
            text: 'こんにちは!何か質問があればどうぞ。',
            sender: 'bot',
            timestamp: new Date(),
          },
        ])
      } catch (err) {
        setError(`初期化エラー: ${err instanceof Error ? err.message : String(err)}`)
        setConnectionState('disconnected')
      } finally {
        setLoading(false)
      }
    }

    initialize()
  }, [])

  useEffect(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' })
  }, [messages])

  const handleSendMessage = async () => {
    if (!inputMessage.trim()) {
      return
    }

    const userMessage: Message = {
      id: Date.now().toString(),
      text: inputMessage,
      sender: 'user',
      timestamp: new Date(),
    }
    setMessages((prevMessages) => [...prevMessages, userMessage])
    setInputMessage('')
    setLoading(true)
    setError(null)

    try {
      const placeholderMessage: Message = {
        id: (Date.now() + 1).toString(),
        text: '考え中...',
        sender: 'bot',
        timestamp: new Date(),
        isStreaming: true,
        progress: 0,
      }
      setMessages((prevMessages) => [...prevMessages, placeholderMessage])

      await sendStreamingChatMessage(
        userMessage.text,
        (text: string, metadata: any) => {
          setMessages((prevMessages) => {
            const updatedMessages = [...prevMessages]
            const lastMessage = updatedMessages[updatedMessages.length - 1]

            if (lastMessage && lastMessage.isStreaming) {
              updatedMessages[updatedMessages.length - 1] = {
                ...lastMessage,
                text: lastMessage.text === '考え中...' || metadata.currentChunk === 1 
                  ? text
                  : lastMessage.text + text,
                progress: metadata.progress || 0,
                totalChunks: metadata.totalChunks || 1,
                currentChunk: metadata.currentChunk || 1,
                isStreaming: metadata.streaming !== false && !metadata.isComplete,
              }
            }

            return updatedMessages
          })
        }
      )
    } catch (err) {
      setError(`メッセージ送信エラー: ${err instanceof Error ? err.message : String(err)}`)
      
      setMessages((prevMessages) => {
        const updatedMessages = [...prevMessages]
        
        if (
          updatedMessages.length > 0 &&
          updatedMessages[updatedMessages.length - 1].isStreaming
        ) {
          updatedMessages[updatedMessages.length - 1] = {
            id: (Date.now() + 1).toString(),
            text: `エラーが発生しました: ${err instanceof Error ? err.message : String(err)}`,
            sender: 'bot',
            timestamp: new Date(),
          }
          return updatedMessages
        } else {
          return [...prevMessages, {
            id: (Date.now() + 1).toString(),
            text: `エラーが発生しました: ${err instanceof Error ? err.message : String(err)}`,
            sender: 'bot',
            timestamp: new Date(),
          }]
        }
      })
    } finally {
      setLoading(false)
    }
  }

  return (
    <div className="flex flex-col h-screen max-w-4xl mx-auto">
      <div className="p-4">
        <h1 className="text-3xl font-bold mb-4 text-center">MCP ストリーミングチャット</h1>
        
        <div className="bg-white dark:bg-gray-800 rounded-lg shadow-md p-4">
          <div className="flex justify-between items-center">
            <div className={`px-3 py-1 rounded-full text-sm font-medium ${
              connectionState === 'connected' 
                ? 'bg-green-100 text-green-800'
                : connectionState === 'connecting' 
                  ? 'bg-yellow-100 text-yellow-800'
                  : 'bg-red-100 text-red-800'
            }`}>
              {connectionState === 'connected' ? '接続済み' : 
               connectionState === 'connecting' ? '接続中...' : 
               connectionState === 'reconnecting' ? '再接続中...' : '切断'}
            </div>
            
            <div className="p-2 rounded-md bg-gray-100 text-gray-700">
              {sessionId ? `セッション: ${sessionId.substring(0, 8)}...` : 'セッションなし'}
            </div>
            
            <button
              onClick={async () => {
                try {
                  setLoading(true)
                  await terminateSession()
                  setSessionId(undefined)
                  setConnectionState('disconnected')
                  setMessages(prev => [...prev, {
                    id: Date.now().toString(),
                    text: 'セッションが終了しました。',
                    sender: 'bot',
                    timestamp: new Date(),
                  }])
                } catch (err) {
                  setError(`セッション終了エラー: ${err instanceof Error ? err.message : String(err)}`)
                } finally {
                  setLoading(false)
                }
              }}
              disabled={!sessionId || loading}
              className="bg-red-600 hover:bg-red-700 text-white font-medium py-1 px-3 text-sm rounded-md disabled:bg-gray-400 disabled:cursor-not-allowed"
            >
              セッション終了
            </button>
          </div>
        </div>
      </div>

      <div className="flex-grow overflow-hidden bg-white dark:bg-gray-800 rounded-lg shadow-md mx-4 mb-2">
        <div className="h-full overflow-y-auto p-4">
          <div className="space-y-4">
            {messages.map((message) => (
              <div
                key={message.id}
                className={`flex ${message.sender === 'user' ? 'justify-end' : 'justify-start'}`}
              >
                <div
                  className={`max-w-[80%] rounded-lg p-3 ${
                    message.sender === 'user'
                      ? 'bg-blue-600 text-white'
                      : message.isStreaming
                        ? 'bg-gray-100 text-gray-800 relative'
                        : 'bg-gray-100 text-gray-800'
                  }`}
                >
                  <p className="whitespace-pre-wrap">
                    {message.text}
                    {message.isStreaming && (
                      <span className="ml-1 inline-block w-2 h-4 bg-transparent border-r-2 border-current animate-pulse"></span>
                    )}
                  </p>

                  {message.isStreaming && message.progress !== undefined && (
                    <div className="w-full bg-gray-200 h-1 mt-2 rounded-full overflow-hidden">
                      <div
                        className="bg-blue-500 h-1 rounded-full transition-all duration-300 ease-in-out"
                        style={{ width: `${message.progress}%` }}
                      ></div>
                    </div>
                  )}

                  {message.isStreaming &&
                    message.currentChunk !== undefined &&
                    message.totalChunks !== undefined && (
                      <p className="text-xs mt-1 text-gray-500">
                        {message.currentChunk}/{message.totalChunks} チャンク (
                        {message.progress !== undefined ? message.progress : 0}%)
                      </p>
                    )}

                  <p className="text-xs mt-1 text-gray-500">
                    {message.timestamp.toLocaleTimeString()}
                  </p>
                </div>
              </div>
            ))}

            <div ref={messagesEndRef} />
          </div>
        </div>
      </div>

      {error && (
        <div className="mx-4 mb-2 p-3 bg-red-100 text-red-800 rounded-md">
          <p className="font-medium">エラーが発生しました</p>
          <p className="text-sm">{error}</p>
        </div>
      )}

      <div className="bg-white dark:bg-gray-800 rounded-lg shadow-md p-4 mx-4 mb-4">
        <div className="flex space-x-2">
          <textarea
            value={inputMessage}
            onChange={(e) => setInputMessage(e.target.value)}
            onKeyDown={(e) => {
              if (e.key === 'Enter' && !e.shiftKey && !e.nativeEvent.isComposing) {
                e.preventDefault()
                handleSendMessage()
              }
            }}
            placeholder="メッセージを入力してください..."
            className="flex-grow p-2 border border-gray-300 rounded-md resize-none"
            rows={2}
            disabled={loading || !sessionId}
          />
          <button
            onClick={handleSendMessage}
            disabled={loading || !sessionId || !inputMessage.trim()}
            className="bg-blue-600 hover:bg-blue-700 text-white font-medium py-2 px-4 rounded-md disabled:bg-gray-400 disabled:cursor-not-allowed"
          >
            送信
          </button>
        </div>
        <p className="text-xs text-gray-500 mt-1">Enterキーで送信、Shift+Enterで改行</p>
      </div>
    </div>
  )
}

ストリーミングメッセージの表示とアニメーション

ストリーミングメッセージの表示では、以下の要素を実装しています:

  1. プレースホルダーメッセージ: 最初に「考え中...」というプレースホルダーを表示
  2. チャンク追加: 受信したチャンクをメッセージに追加
  3. タイピングインジケーター: ストリーミング中はタイピングインジケーターを表示
  4. プログレスバー: 進捗率に応じてプログレスバーを更新
  5. 進捗情報: 現在のチャンク番号と全チャンク数を表示
// プレースホルダーメッセージの追加
const placeholderMessage: Message = {
  id: (Date.now() + 1).toString(),
  text: '考え中...',
  sender: 'bot',
  timestamp: new Date(),
  isStreaming: true,
  progress: 0,
}
setMessages((prevMessages) => [...prevMessages, placeholderMessage])

// タイピングインジケーターの実装
{message.isStreaming && (
  <span className="ml-1 inline-block w-2 h-4 bg-transparent border-r-2 border-current animate-pulse"></span>
)}

// プログレスバーの実装
{message.isStreaming && message.progress !== undefined && (
  <div className="w-full bg-gray-200 h-1 mt-2 rounded-full overflow-hidden">
    <div
      className="bg-blue-500 h-1 rounded-full transition-all duration-300 ease-in-out"
      style={{ width: `${message.progress}%` }}
    ></div>
  </div>
)}

接続状態の視覚的フィードバック

接続状態に応じて、異なる色とテキストを表示することで、ユーザーに現在の接続状態を視覚的にフィードバックします。

<div className={`px-3 py-1 rounded-full text-sm font-medium ${
  connectionState === 'connected' 
    ? 'bg-green-100 text-green-800' 
    : connectionState === 'connecting' 
      ? 'bg-yellow-100 text-yellow-800'
      : 'bg-red-100 text-red-800'
}`}>
  {connectionState === 'connected' ? '接続済み' : 
   connectionState === 'connecting' ? '接続中...' : 
   connectionState === 'reconnecting' ? '再接続中...' : '切断'}
</div>

7. デバッグとトラブルシューティング

【参照情報】

  • MCP仕様書: Streamable HTTP
  • MCP TypeScript SDK: client/streamableHttp.tsの実装
  • プロジェクトコード: frontend/lib/mcp-client.tssendChatMessage関数
  • プロジェクトコード: backend/src/tools/chat.tsの実装

ストリーミングチャットアプリケーションの開発中に発生する可能性のある問題をデバッグし、トラブルシューティングするための方法について解説します。特に、ストリーミングON/OFFの違いによるDevToolsでの表示の違いや、コンソールログを活用したデバッグ方法について詳しく見ていきます。

ブラウザDevToolsを使用したデバッグ

【参照情報】

  • ブラウザDevTools: Network タブの使用方法
  • MCP仕様書: SSEストリーミング
  • プロジェクトコード: frontend/lib/mcp-client.tsの実装

ブラウザのDevToolsは、MCPアプリケーションのデバッグに非常に役立ちます。特に、Network タブを使用することで、クライアントとサーバー間の通信を詳細に確認することができます。

ストリーミングチャットでは、ストリーミングON/OFFの違いによって、DevToolsでの表示が異なる場合があります。これは、使用しているツールの違いによるものです:

  1. ストリーミングONの場合:

    • chat_streamツールを使用
    • SSEイベントストリームとして表示される
    • DevToolsのNetworkタブで「EventStream」として表示
  2. ストリーミングOFFの場合:

    • chatツールを使用
    • 実際にはSSE形式で通知を送信しているが、DevToolsのNetworkタブでは通常のHTTPリクエストとして表示される
    • コンソールログを使用してSSEイベントの内容を確認できる
// ストリーミングONの場合のコード例
await sendStreamingChatMessage(
  userMessage.text,
  (text, metadata) => {
    // チャンクを受信するたびに呼び出される
    // ...
  }
)

// ストリーミングOFFの場合のコード例
const response = await sendChatMessage(userMessage.text)

ストリーミングON/OFFの実装の違い

【参照情報】

  • プロジェクトコード: frontend/lib/mcp-client.tssendChatMessage関数とsendStreamingChatMessage関数
  • プロジェクトコード: backend/src/tools/chat.tsbackend/src/tools/chat-stream.tsの実装

ストリーミングON/OFFには以下の主な違いがあります:

1. 実装の違い

項目 ストリーミングON ストリーミングOFF
使用ツール chat_stream chat
メッセージ処理 複数チャンクに分割 単一チャンクで送信
通知回数 複数回 (チャンクごと) 1回 (メッセージ全体)
進捗表示 段階的に更新 一度に表示

2. データフローの違い

3. 共通点と注意点

  • 両方のモードで enableJsonResponse: false を使用(SSE形式で通信)
  • 両方のモードで最終的に空のレスポンスを返す
  • DevToolsでの表示の違いはブラウザの表示仕様によるもの
    • 複数チャンク: 「EventStream」として表示
    • 単一チャンク: 通常のHTTPリクエストとして表示

この違いを理解することで、デバッグ時に適切な情報を確認できます。

デバッグとコンソールログの活用

【参照情報】

  • プロジェクトコード: frontend/lib/mcp-client.tsの実装
  • ブラウザDevTools: Console タブと Network タブの使用方法

ストリーミングチャットアプリケーションの開発では、ブラウザのDevToolsとコンソールログを活用することが重要です。特に、ストリーミングON/OFFの違いによるDevToolsでの表示の違いを理解し、適切なデバッグ方法を選択することが必要です。

// デバッグ用コンソールログ(簡略化)
mcpClient.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => {
  const text = notification.params.data as string;
  const metadata = notification.params.metadata || {};
  
  // デバッグ出力
  console.log('通知データ:', text);
  console.log('メタデータ:', metadata);
  
  onChunkReceived(text, metadata);
});

また、ストリーミングOFFの場合でもSSE形式を使用するようにサーバー側の設定を調整することで、DevToolsでの表示を統一することができます:

// サーバー側のトランスポート設定(簡略化)
const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: () => randomUUID(),
  eventStore,
  enableJsonResponse: false, // 常にSSE形式を使用
});

8. 高度な機能と最適化

再開可能性(Resumability)の実装詳細

最新のMCP仕様では、再開可能性(Resumability)が強化されています。これにより、接続が切断された場合でも、最後に受信したイベントIDを使用して、そこから再開することができます。

// サーバー側の実装 - 再開可能性をサポートするためのサーバー側の設定
// イベントストアを使用してイベントを保存
const eventStore = new InMemoryEventStore() // 本番環境では永続的なストレージを使用することを推奨

// StreamableHTTPServerTransportの設定
const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: () => randomUUID(),
  eventStore,
  // その他の設定...
})

// クライアント側の実装
// 最後のイベントIDを保存
let lastEventId: string | undefined

// リクエスト送信時に再開可能性をサポートするための設定
const res = await mcpClient.request(req, CallToolResultSchema, {
  onresumptiontoken: (eventId) => {
    lastEventId = eventId
    // 必要に応じて永続化することも可能
    // localStorage.setItem('last-event-id', eventId);
  }
})

// 再接続時の実装
transport = new StreamableHTTPClientTransport(
  new URL(SERVER_URL),
  {
    // その他の設定...
    lastEventId,
  }
)

イベントストアを使用したセッション状態の保持

イベントストアは、セッション状態を保持するための重要なコンポーネントです。最新のMCP仕様では、InMemoryEventStoreが提供されていますが、本番環境では永続的なストレージを使用することが推奨されます。

// InMemoryEventStoreの概略実装
class InMemoryEventStore implements EventStore {
  private events: { [streamId: string]: { id: string, message: JSONRPCMessage }[] } = {};

  async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
    if (!this.events[streamId]) this.events[streamId] = [];
    const eventId = randomUUID();
    this.events[streamId].push({ id: eventId, message });
    return eventId;
  }

  async replayEventsAfter(lastEventId: string, options: { 
    send: (eventId: string, message: JSONRPCMessage) => Promise<void>
  }): Promise<string> {
    // ストリームを検索して見つかったイベントIDの後のイベントを再送信
    // 詳細はGitHubリポジトリのコードを参照
    return streamId; // 見つかったストリームIDを返す
  }
}

パフォーマンス最適化テクニック

ストリーミングチャットのパフォーマンスを最適化するために、以下のテクニックを使用できます:

  1. メモ化: React.memoやuseMemoを使用して、不要な再レンダリングを防ぐ
  2. 仮想スクロール: 大量のメッセージがある場合、仮想スクロールを使用して表示するメッセージを制限する
  3. バッチ更新: 複数のメッセージを一度に更新する
  4. 遅延読み込み: 古いメッセージは必要に応じて読み込む
// メモ化と仮想スクロールの簡略化例
const MemoizedMessage = React.memo(({ message }) => (
  <div className="message-container">
    <p>{message.text}</p>
    <span>{message.timestamp.toLocaleTimeString()}</span>
  </div>
));

// 仮想スクロール例(簡略化)
const MessageList = ({ messages }) => (
  <FixedSizeList height={500} width="100%" itemCount={messages.length} itemSize={100}>
    {({ index, style }) => (
      <div style={style}><MemoizedMessage message={messages[index]} /></div>
    )}
  </FixedSizeList>
);

エラー回復と再接続戦略

ネットワークエラーや一時的なサーバーエラーに対応するために、エラー回復と再接続戦略を実装することが重要です。

// 指数バックオフを使用した再接続戦略
async function reconnectWithExponentialBackoff() {
  const maxRetries = 5;
  const baseDelay = 1000;
  let retries = 0;
  
  while (retries < maxRetries) {
    try {
      await initializeClient();
      console.log("再接続に成功しました");
      return true;
    } catch (error) {
      retries++;
      
      if (retries >= maxRetries) {
        console.error("最大再試行回数に達しました");
        return false;
      }
      
      const delay = baseDelay * Math.pow(2, retries - 1);
      
      console.log(`${delay}ミリ秒後に再試行します (${retries}/${maxRetries})`);
      
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
  return false;
}

8. セキュリティと運用上の考慮事項

OAuth 2.1認証の実装

最新のMCP仕様では、OAuth 2.1ベースの認証フレームワークが追加されました。これにより、セキュアな認証と認可が可能になります。

// OAuth認証プロバイダーの設定 - 簡略化バージョン
const authProvider = new ProxyOAuthServerProvider({
  endpoints: {
    authorizationUrl: "https://auth.example.com/oauth2/v1/authorize", 
    tokenUrl: "https://auth.example.com/oauth2/v1/token", 
    revocationUrl: "https://auth.example.com/oauth2/v1/revoke", 
  },
  verifyAccessToken: async (token) => {
    // トークン検証ロジック(実際の実装では署名検証やDB照会などを行う)
    return { token, clientId: "client123", scopes: ["chat", "dice"] }
  }
});

セッションタイムアウトと自動クリーンアップ

長時間未使用のセッションを自動的に終了させるために、セッションタイムアウトと自動クリーンアップを実装することが重要です。

// セッションタイムアウトの実装
function setupSessionCleanup(transports, timeoutMs = 3600000) { // デフォルト1時間
  setInterval(() => {
    const now = Date.now();
    
    for (const sessionId in transports) {
      const transport = transports[sessionId];
      const lastActivity = transport.lastActivityTime || 0;
      
      if (now - lastActivity > timeoutMs) {
        console.log(`セッションタイムアウト: ${sessionId}`);
        transport.close();
        delete transports[sessionId];
      }
    }
  }, 60000); // 1分ごとにチェック
}

レート制限とリソース保護

DoS攻撃やリソース枯渇を防ぐために、レート制限とリソース保護を実装することが重要です。

// レート制限の実装(express-rate-limit使用)
import rateLimit from 'express-rate-limit';

// APIレート制限の設定
const apiLimiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15分間
  max: 100, // 15分あたり100リクエストまで許可
  standardHeaders: true,
  legacyHeaders: false,
});

// MCPエンドポイントにレート制限を適用
app.use('/mcp', apiLimiter);

本番環境での運用ベストプラクティス

本番環境でMCPアプリケーションを運用する際のベストプラクティスを以下に示します:

  1. HTTPS: すべての通信をHTTPS経由で行う
  2. ロギングとモニタリング: 詳細なログを記録し、アプリケーションの状態をモニタリングする
  3. スケーリング: 負荷に応じてサーバーをスケールアップ/スケールアウトする
  4. バックアップ: セッション状態や重要なデータを定期的にバックアップする
  5. セキュリティ更新: 依存ライブラリを定期的に更新し、セキュリティパッチを適用する
// HTTPSの設定例
import https from 'https';
import fs from 'fs';

// HTTPSオプションの設定
const options = {
  key: fs.readFileSync('key.pem'),
  cert: fs.readFileSync('cert.pem'),
};

// HTTPSサーバーの作成と起動
https.createServer(options, app).listen(443, () => {
  console.log('HTTPS server running on port 443');
});

9. まとめと今後の展望

本記事では、最新のMCP仕様(2025-03-26)を活用したストリーミングチャットの実装方法について解説しました。サーバー側とクライアント側の両方の実装を見ていくことで、MCPの通知機能を使用したストリーミング処理の実装方法を理解できたと思います。

実装のまとめと主要なポイント

  1. サーバー側: ストリーミングチャットツールの実装、メッセージのチャンク分割、通知機能を使用したストリーミング送信
  2. クライアント側: MCPクライアントの初期化、通知ハンドラの設定、チャンクの受信と処理、UIの更新
  3. 高度な機能: 再開可能性(Resumability)、イベントストアを使用したセッション状態の保持、エラー回復と再接続戦略
  4. セキュリティ: OAuth 2.1認証、セッションタイムアウト、レート制限、HTTPS

MCPの今後の発展方向

MCPは今後も発展を続け、以下のような方向性が考えられます:

  1. より高度なストリーミング機能: 双方向ストリーミング、マルチメディアストリーミングなど
  2. AIモデルとの統合: より高度なAIモデルとの統合、モデル固有の機能のサポート
  3. セキュリティの強化: より高度な認証・認可機能、エンドツーエンド暗号化など
  4. パフォーマンスの最適化: より効率的なプロトコル、圧縮機能など

応用例と拡張可能性

MCPを使用したストリーミングチャットの実装は、以下のような応用例が考えられます:

  1. AIアシスタント: リアルタイムで応答するAIアシスタント
  2. ライブコーディング: リアルタイムでコードを生成・編集するツール
  3. 教育プラットフォーム: リアルタイムで質問に回答する教育ツール
  4. カスタマーサポート: AIを活用したリアルタイムカスタマーサポート

MCPの柔軟性と拡張性を活かして、さまざまなアプリケーションを開発することができます。

参考リソース

Discussion