💭

MCPの通信に対する深い考察と実装

に公開

MCPの考察

MCPについて調査を始めたとき、ある問題に気づきました。ほとんどのドキュメントは @mcp.tool のようなアノテーション方式で注入する方法を紹介しています。しかし、多くの非同期ビジネスプロセスがある場合、接続は非常に面倒になります。アノテーションを付けられるコードエンティティが存在しないのです。プロセスごとに同期関数を書く必要があるのでしょうか?好奇心に駆られて、MCPの通信原理をさらに分析し、MCPに接続する便利な方法があるかどうか、MCPの原理を理解しようとしました。

MCPの通信方式

MCPはSTDIOとSSEの2つの転送プロトコルを提供しています。現在、多くの実験的ツールはSTDIO転送を使用しています。しかし、サービスを提供する場合は、基本的にSSE(Server-Sent Events)を使用します。そのため、本記事ではSSEのMCP接続モードを重点的に分析・議論します。

特性 STDIO SSE
遅延 非常に低い(ローカル) 比較的低いが、ネットワーク条件に左右される
信頼性 ローカルプロセスで高い信頼性 自動再接続とエラー処理メカニズムを備えている
接続モデル 1:1(クライアントとサーバープロセス) 多クライアント(一つのサーバーで複数クライアントを処理可能)
デプロイの複雑さ 低い(ローカルプロセス間通信) 中程度(HTTPサーバーのサポートが必要)
ネットワーク転送 非サポート(ローカルのみ) サポート(ネットワーク通信可能)
セキュリティ ローカルプロセスで安全 HTTPSによりセキュリティ強化可能
リソース消費 低い 中程度(HTTP接続維持が必要)
拡張性 限定的(単一クライアントのみサポート) 優れている(複数クライアント接続をサポート)

SSEはWebSocketと同様に、ブラウザとサーバー間の通信チャネルを確立し、サーバーからブラウザに情報をプッシュします。全体的に見ると、WebSocketの方が強力で柔軟です。WebSocketは全二重チャネルで双方向通信が可能です。一方、SSEは一方向チャネルで、サーバーからブラウザへの送信のみ可能です。これはストリーミング情報の本質が単なるダウンロードだからです。ブラウザがサーバーに情報を送信する場合、それは別のHTTPリクエストになります。この特徴により、私はさらに好奇心を抱きました。stdioではstdinを使用して入力し、stdoutを使用して出力できます。しかし、SSEは一方向チャネルです。MCPはどのように双方向通信を実現するのでしょうか?2つのSSEチャネルを確立するのでしょうか?この疑問を持って、実践的な検証を始めました。

MCPのSSE通信フロー

MCP公式が提供するツール「npx @modelcontextprotocol/inspector」を使用すると、MCP検証用の管理ページを比較的簡単に立ち上げることができます。この管理ページのパケットをキャプチャすることで、SSE通信の手がかりを見つけることができます。

  1. /sseというURLは情報のプッシュのみを担当し、情報を送信することはできません。情報の送信には別のURLが必要です。
  2. クライアントが/sseアドレスに接続した最初のイベントは、クライアントに情報送信先のURLを伝えるものです。このURLには通常、一意のセッションIDが含まれています。

このパケットキャプチャを観察すると、先の双方向通信に関する疑問にほぼ答えが出ます:

  1. SSE長接続は1つだけで、サーバーからクライアントへのデータプッシュに使用されます。クライアントからサーバーへのリクエスト送信チャネルは、通常のHTTP POSTリクエストを使用します。
  2. クライアントからサーバーへのHTTP POSTリクエストでは、命令が受信されたかどうかのフィードバックに2xx応答コードのみを使用し、すべてのデータ返信は最初に確立されたSSE長接続を通じてプッシュされます。

この仮説を検証するため、私はcurlを使用してPOST /message?sessionId=***リクエストパケットをシミュレートする実験も行いました。案の定、SSEイベントストリームに新しいイベントが追加されました。

MCPのSSE通信実装

前節のパケットキャプチャを通じて、私たちはMCPのSSE通信フローをほぼ解明しました:

  1. /sseのURLでSSE長接続を確立した後、まずendpoint(一般的には/message)を返します。データ形式は純粋なテキストの同一ドメイン名URLの文字列です。
  2. クライアントはPOSTリクエストをendpoint(/message)に送信して呼び出しリクエストを行います。POSTのbodyはJSON-RPC仕様に準拠し、jsonrpc、method、params、idフィールドを含みます。
  3. /sse長接続で返されるイベントはJSON-RPC仕様に準拠し、jsonrpc、result、id、error(実行エラー時)フィールドを含みます。

それほど複雑には見えませんので、Python(MCP Python SDKを使用せず)で実装してみましょう。

from fastapi import FastAPI, Request
import uuid
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import json


app = FastAPI()
mcpHub = {}


class McpRequest(BaseModel):
    id: Optional[int] = None
    jsonrpc: str
    method: str
    params: Optional[dict] = None


class MCPServer:
    def __init__(self):
      self.queue = asyncio.Queue()
        
    async def reader(self):
        while True:
            event = await self.queue.get()
            yield event
    async def request(self, payload: McpRequest):
        if payload.method == "initialize":
            await self.queue.put({"event": "message", "data": ..})
        elif payload.method == "tools/list":
            ...


@app.get("/sse")
async def sse():
    client_id = str(uuid.uuid4())
    mcp = MCPServer()
    mcpHub[client_id] = mcp
    await mcp.queue.put({"event": "endpoint", "data": f"/message?client_id={client_id}"})
    return EventSourceResponse(mcp.reader())
    
@app.post("/message")
async def message(request: Request, payload: McpRequest):
    client_id = request.query_params.get("client_id")
    if client_id not in mcpHub:
        return "no client"
    await mcpHub[client_id].request(payload)
    return "ok"

このコードでは、以下のようないくつかの設計を導入しました:

  1. asyncio.Queue()を使用してビジネスフローとMCPサービスフローを分離しました。このメッセージキューはEventSourceResponseのデータストリームと連動しています。このメッセージキューにメッセージを1つ送信するたびに、EventSourceResponseのデータストリームを通じてクライアントに自動的にメッセージがプッシュされます。これにより、サーバー側から見たクライアントは、MQを購読する標準的な消費者のように見えます。

  2. メモリ内にclient_idとメッセージキューのマッピング辞書を維持しています。これにより、メッセージが入ってきたときに、どのMQが使用されているかを知ることができ、対応するMQにメッセージを配信できます。分散システムでは、このclient_idはメッセージキューのグローバルに一意な識別子となり、どのマシンに送信されても正しいキューを見つけることができます。

  3. サーバー側が処理した後、メッセージをメッセージキューに戻すと、クライアントはそれを認識できます。MCPServerとMCPClientが長期接続を維持した後、後方のビジネスシステム側は理論的には無限の時間実行できます(クライアント側がタイムアウトで積極的に終了しない限り)。すべてはメッセージが戻ってくることを基準としています。

文書を参照して、サポートする必要があるmethodを確認してみましょう:

リクエストメソッド 発信元 応答元 説明
initialize client server 会話の初期化
tools/list client server 利用可能なツールの検出
tools/call client server ツールの呼び出し
resources/list client server 利用可能なリソースの検出
resources/read client server リソースの内容を取得
resources/templates/list client server 利用可能なパラメータ化リソースの検出
resources/subscribe client server 特定のリソースを購読し、その変更時に通知を受け取る
prompts/list client server 利用可能なプロンプトの検出
prompts/get client server 特定のプロンプトを取得
roots/list client server serverがclientのファイルシステムRootポイントにアクセスできる権限を列挙
sampling/createMessage client server serverがAI能力の生成機能を利用できるようにする

MCPのサブスクリプションモードに関する拡張的考察

MCPのresourceのmethod中に、あまり目立たない「resources/subscribe」が私の注目を引きました。まず、resourceとは何かを見てみましょう。公式の定義によると:
Resources represent any kind of data that an MCP server wants to make available to clients. This can include:File contents、Database records、API responses、Live system data、Screenshots and images、Log files、And more
したがって、resources/subscribeを使用してデータベースをサブスクライブすると、そのデータベースのすべての変更が次々とプッシュされてくることになり、これはストリーム処理の一般的な使用形態に非常に似ています。
SSEはすでにサーバーからクライアントへの一方向データストリームを確立しているため、クライアントがサブスクリプションを開始すると、Flinkストリーム処理タスクを作成してMQにメッセージを送信することで、リソースのサブスクリプションを非常にネイティブに実装できます。上記のトポロジー構造を拡張してみましょう。

  1. 大規模言語モデルの視点から見た流計算:
    MCPプロトコルに基づき、大規模言語モデルは流計算の能力に非常にエレガントに接続し、複雑なビジネスロジックを構築することができます。

  2. 流計算の視点から見た大規模言語モデル:
    MCPプロトコルを使用すると、大規模言語モデルは標準的な流計算処理ノードのようになり、ストリーミングメッセージを受信したり、他のMQにメッセージを配信したりすることができます。これはMCP設計の利点の一つです。MCPはRPCに少し似ていて、MQにも少し似ています。では、これは実際には何なのでしょうか?プログラミングモデルの観点から考えてみましょう。

MCPのプログラミングモデルに関する考察

MCPは、プログラミングモデルの観点から見ると、本質的にはステートフルな双方向RPC(リモートプロシージャコール)モデルであり、イベント駆動型とリクエスト-レスポンス型の特性を組み合わせています。この混合モードにより、AIアプリケーションと外部システムの統合において独自の利点を持っています。

MCPの主要な特徴は以下の通りです:

  1. ステートフルセッション:
    従来のステートレスなREST APIとは異なり、MCPはセッション状態を維持し、クライアントとサーバー間に長期的な接続を確立します。セッションには明確なライフサイクルがあります。

  2. 双方向通信:
    クライアントがサーバーを呼び出す(従来のRPCモード)だけでなく、サーバーもクライアントを呼び出す(逆RPCモード)ことができます。例えば、サーバーはクライアントにAIサンプリングの実行を要求できます。

  3. 能力ベースの交渉:
    初期化段階で能力交渉を行い、利用可能な機能を動的に発見し、異なる実装やバージョンに適応します。

  4. イベント通知メカニズム:
    一方向通知、リソース変更サブスクリプションモード、非同期イベント処理をサポートします。

  5. 標準化されたインターフェース:
    一連の標準操作を定義し、JSON Schemaを使用してパラメータとリターン値を定義し、相互運用性を促進します。

MCPと他のプロトコルの比較

MCP vs REST API

特性 MCP REST API
状態管理 ステートフルな会話 ステートレス
通信方向 双方向 単方向(クライアント→サーバー)
通信モード 同期リクエスト-レスポンス + 非同期通知 主に同期リクエスト-レスポンス
リソース表現 構造化リソース + ツール リソース中心
発見メカニズム 動的な能力交渉とリソース/ツール発見 通常は事前にAPIを理解する必要がある

MCP vs Message Queuing(MQ)

特性 MCP メッセージキュー(RabbitMQ、Kafkaなど)
通信モード 直接ポイントツーポイント通信 パブリッシュ-サブスクライブまたはプロデューサー-コンシューマー
ミドルウェア依存性 専用のミドルウェア不要 専用のメッセージ処理サービスが必要
メッセージルーティング 会話IDに基づく直接ルーティング トピック/キューに基づく間接ルーティング
メッセージ保証 通常はリアルタイム配信 永続化とメッセージ保証をサポート

MCP vs WebSocket

特性 MCP WebSocket
プロトコル層 アプリケーション層プロトコル(複数の転送層上で実装可能) 転送層プロトコル
メッセージ構造 構造化JSON-RPCメッセージ 特定のメッセージフォーマットなし(自己定義が必要)
状態管理 内蔵された会話状態管理 状態管理は自身で実装する必要がある
機能発見 内蔵されたツールとリソース発見メカニズム 発見メカニズムは自身で実装する必要がある

MCPの独自性と実装

MCPは各種プログラミングモデルにおいて独特な位置を占めています:

  • REST APIよりもステートフルで双方向通信が可能
  • メッセージキューよりも直接的で軽量
  • WebSocketよりも構造化され標準化されている
  • gRPCよりも柔軟で理解しやすい
  • GraphQLよりもツール呼び出しに焦点を当てている
  • RPCよりもリソースとコンテキストを重視している

MCPがこのように独特な機能的位置にあるからこそ、現在の能力の制限を理由にMCPのネイティブな適応を諦めるべきではありません。非同期タスク、イベント駆動型アーキテクチャなどは本来MCPとネイティブに接続できるはずです。

MCPサービスの簡易実装

MCPの動作原理は複雑ではないので、優れた設計に敬意を表して自分たちで実装してみましょう。

from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import uuid
from pydantic import BaseModel
from typing import Optional
import uvicorn
import inspect
app = FastAPI()
mcpHub = {}
class McpRequest(BaseModel):
    id: Optional[int] = None
    jsonrpc: str
    method: str
    params: Optional[dict] = None
class MCPServer:
    def __init__(self, name, message_path, tools):
        self.queue = asyncio.Queue()
        self.client_id = str(uuid.uuid4())
        self.message_path = message_path
        self.info = {
            "protocolVersion": "2024-11-05",
            "capabilities": {
                "experimental": {},
                "tools": {
                    "listChanged": False
                }
            },
            "serverInfo": {
                "name": name,
                "version": "1.6.0"
            }
        }
        self.tools = tools
    def list_tool(self):
        result = []
        for tool in self.tools:
            toolInfo = {
                "name": tool.__name__,
                "description": tool.__doc__,
                "inputSchema": {"type": "object","properties":{}},
            }
            for name, param in inspect.signature(tool).parameters.items():
                toolInfo["inputSchema"]["properties"][name] = {
                    "title": name,
                    "type": "string",
                }
            result.append(toolInfo)
        return result
    async def reader(self):
        while True:
            event = await self.queue.get()
            yield event
    @staticmethod
    def response(result, id):
        message = {
            "jsonrpc": "2.0",
            "result": result,
        }
        if id is not None:
            message["id"] = id
        return json.dumps(message)
    async def request(self, req: McpRequest):
        if req.method == "initialize":
            await self.queue.put({"event": "message", "data": self.response(self.info, req.id)})
        elif req.method == "tools/list":
            await self.queue.put({"event": "message", "data": self.response({"tools": self.list_tool()}, req.id)})
        elif req.method == "tools/call":
            for tool in self.tools:
                if tool.__name__ == req.params.get("name"):
                    result = await tool(**req.params["arguments"])
                    await self.queue.put({"event": "message", "data": self.response({"content": result, "isError": False}, req.id)})
                    break
async def test(state=None):
    """
    description
    """
    result = f"hi {state}"
    await asyncio.sleep(1)
    result += "!"
    return result
@app.get("/receive_test")
async def receive_test():
    mcp = MCPServer(name="mcp-test",message_path="/send_test", tools=[test])
    mcpHub[mcp.client_id] = mcp
    await mcp.queue.put({"event": "endpoint", "data": f"{mcp.message_path}?client_id={mcp.client_id}"})
    return EventSourceResponse(mcp.reader())
@app.post("/send_test")
async def send_test(request: Request, payload: McpRequest):
    client_id = request.query_params.get("client_id")
    if client_id not in mcpHub:
        return "no client"
    await mcpHub[client_id].request(payload)
    return "ok"
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8001)

約100行ほどのコードで、簡易版MCPサービスを実装しました。公式のMCP Python SDKと比較して、いくつかの重要な特性を最適化しています:

  1. ツール登録が@mcp.toolのようなアノテーションに依存せず、完全に動的に渡すことができます。異なるシナリオに対して異なるMCP URLを提供し、それぞれ異なるツールを提供できます。

  2. プログラミングモデルはMQ駆動型サービスで、非同期システム、イベント駆動型システムやプラットフォームとの連携に適しています。このPython実装を参考に、他の言語版への変換も比較的容易です。

  3. /sse/messageなどのデフォルトルートアドレスに依存せずに正常に動作し、MCPのURLが完全にカスタマイズ可能であることを証明しています。

総括:MCPの本質を理解する

MCPの原理、通信メカニズム、プログラミングモデルの本質を深く検討した結果、MCPが単なるAPIやSDKではなく、精巧に設計されたプロトコルであることがわかりました:

  1. クライアント-ホスト-サーバーアーキテクチャを採用し、複数のサーバー接続をサポート
  2. ステートフルな双方向RPCモデルを実現し、イベント駆動型の特性を組み合わせている
  3. 標準化されたツール呼び出しとリソースアクセスメカニズムを提供
  4. 動的な能力交渉と機能発見をサポート
  5. MQ、API、WSと比較して独自の機能的位置を占め、AIアプリケーションと外部システムの統合に最適化されている

100行のコードで見たように、MCPのコア原理は複雑ではありませんが、その設計は巧妙です。このような深い理解があれば、単純なSDK使用を超えて、より強力で柔軟なAIアプリケーション統合ソリューションを作成することができます。

Acrosstudioテックブログ

Discussion