MCPの通信に対する深い考察と実装
MCPの考察
MCPについて調査を始めたとき、ある問題に気づきました。ほとんどのドキュメントは @mcp.tool
のようなアノテーション方式で注入する方法を紹介しています。しかし、多くの非同期ビジネスプロセスがある場合、接続は非常に面倒になります。アノテーションを付けられるコードエンティティが存在しないのです。プロセスごとに同期関数を書く必要があるのでしょうか?好奇心に駆られて、MCPの通信原理をさらに分析し、MCPに接続する便利な方法があるかどうか、MCPの原理を理解しようとしました。
MCPの通信方式
MCPはSTDIOとSSEの2つの転送プロトコルを提供しています。現在、多くの実験的ツールはSTDIO転送を使用しています。しかし、サービスを提供する場合は、基本的にSSE(Server-Sent Events)を使用します。そのため、本記事ではSSEのMCP接続モードを重点的に分析・議論します。
特性 | STDIO | SSE |
---|---|---|
遅延 | 非常に低い(ローカル) | 比較的低いが、ネットワーク条件に左右される |
信頼性 | ローカルプロセスで高い信頼性 | 自動再接続とエラー処理メカニズムを備えている |
接続モデル | 1:1(クライアントとサーバープロセス) | 多クライアント(一つのサーバーで複数クライアントを処理可能) |
デプロイの複雑さ | 低い(ローカルプロセス間通信) | 中程度(HTTPサーバーのサポートが必要) |
ネットワーク転送 | 非サポート(ローカルのみ) | サポート(ネットワーク通信可能) |
セキュリティ | ローカルプロセスで安全 | HTTPSによりセキュリティ強化可能 |
リソース消費 | 低い | 中程度(HTTP接続維持が必要) |
拡張性 | 限定的(単一クライアントのみサポート) | 優れている(複数クライアント接続をサポート) |
SSEはWebSocketと同様に、ブラウザとサーバー間の通信チャネルを確立し、サーバーからブラウザに情報をプッシュします。全体的に見ると、WebSocketの方が強力で柔軟です。WebSocketは全二重チャネルで双方向通信が可能です。一方、SSEは一方向チャネルで、サーバーからブラウザへの送信のみ可能です。これはストリーミング情報の本質が単なるダウンロードだからです。ブラウザがサーバーに情報を送信する場合、それは別のHTTPリクエストになります。この特徴により、私はさらに好奇心を抱きました。stdioではstdinを使用して入力し、stdoutを使用して出力できます。しかし、SSEは一方向チャネルです。MCPはどのように双方向通信を実現するのでしょうか?2つのSSEチャネルを確立するのでしょうか?この疑問を持って、実践的な検証を始めました。
MCPのSSE通信フロー
MCP公式が提供するツール「npx @modelcontextprotocol/inspector」を使用すると、MCP検証用の管理ページを比較的簡単に立ち上げることができます。この管理ページのパケットをキャプチャすることで、SSE通信の手がかりを見つけることができます。
- /sseというURLは情報のプッシュのみを担当し、情報を送信することはできません。情報の送信には別のURLが必要です。
- クライアントが/sseアドレスに接続した最初のイベントは、クライアントに情報送信先のURLを伝えるものです。このURLには通常、一意のセッションIDが含まれています。
このパケットキャプチャを観察すると、先の双方向通信に関する疑問にほぼ答えが出ます:
- SSE長接続は1つだけで、サーバーからクライアントへのデータプッシュに使用されます。クライアントからサーバーへのリクエスト送信チャネルは、通常のHTTP POSTリクエストを使用します。
- クライアントからサーバーへのHTTP POSTリクエストでは、命令が受信されたかどうかのフィードバックに2xx応答コードのみを使用し、すべてのデータ返信は最初に確立されたSSE長接続を通じてプッシュされます。
この仮説を検証するため、私はcurlを使用してPOST /message?sessionId=***リクエストパケットをシミュレートする実験も行いました。案の定、SSEイベントストリームに新しいイベントが追加されました。
MCPのSSE通信実装
前節のパケットキャプチャを通じて、私たちはMCPのSSE通信フローをほぼ解明しました:
- /sseのURLでSSE長接続を確立した後、まずendpoint(一般的には/message)を返します。データ形式は純粋なテキストの同一ドメイン名URLの文字列です。
- クライアントはPOSTリクエストをendpoint(/message)に送信して呼び出しリクエストを行います。POSTのbodyはJSON-RPC仕様に準拠し、jsonrpc、method、params、idフィールドを含みます。
- /sse長接続で返されるイベントはJSON-RPC仕様に準拠し、jsonrpc、result、id、error(実行エラー時)フィールドを含みます。
それほど複雑には見えませんので、Python(MCP Python SDKを使用せず)で実装してみましょう。
from fastapi import FastAPI, Request
import uuid
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import json
app = FastAPI()
mcpHub = {}
class McpRequest(BaseModel):
id: Optional[int] = None
jsonrpc: str
method: str
params: Optional[dict] = None
class MCPServer:
def __init__(self):
self.queue = asyncio.Queue()
async def reader(self):
while True:
event = await self.queue.get()
yield event
async def request(self, payload: McpRequest):
if payload.method == "initialize":
await self.queue.put({"event": "message", "data": ..})
elif payload.method == "tools/list":
...
@app.get("/sse")
async def sse():
client_id = str(uuid.uuid4())
mcp = MCPServer()
mcpHub[client_id] = mcp
await mcp.queue.put({"event": "endpoint", "data": f"/message?client_id={client_id}"})
return EventSourceResponse(mcp.reader())
@app.post("/message")
async def message(request: Request, payload: McpRequest):
client_id = request.query_params.get("client_id")
if client_id not in mcpHub:
return "no client"
await mcpHub[client_id].request(payload)
return "ok"
このコードでは、以下のようないくつかの設計を導入しました:
-
asyncio.Queue()を使用してビジネスフローとMCPサービスフローを分離しました。このメッセージキューはEventSourceResponseのデータストリームと連動しています。このメッセージキューにメッセージを1つ送信するたびに、EventSourceResponseのデータストリームを通じてクライアントに自動的にメッセージがプッシュされます。これにより、サーバー側から見たクライアントは、MQを購読する標準的な消費者のように見えます。
-
メモリ内にclient_idとメッセージキューのマッピング辞書を維持しています。これにより、メッセージが入ってきたときに、どのMQが使用されているかを知ることができ、対応するMQにメッセージを配信できます。分散システムでは、このclient_idはメッセージキューのグローバルに一意な識別子となり、どのマシンに送信されても正しいキューを見つけることができます。
-
サーバー側が処理した後、メッセージをメッセージキューに戻すと、クライアントはそれを認識できます。MCPServerとMCPClientが長期接続を維持した後、後方のビジネスシステム側は理論的には無限の時間実行できます(クライアント側がタイムアウトで積極的に終了しない限り)。すべてはメッセージが戻ってくることを基準としています。
文書を参照して、サポートする必要があるmethodを確認してみましょう:
リクエストメソッド | 発信元 | 応答元 | 説明 |
---|---|---|---|
initialize | client | server | 会話の初期化 |
tools/list | client | server | 利用可能なツールの検出 |
tools/call | client | server | ツールの呼び出し |
resources/list | client | server | 利用可能なリソースの検出 |
resources/read | client | server | リソースの内容を取得 |
resources/templates/list | client | server | 利用可能なパラメータ化リソースの検出 |
resources/subscribe | client | server | 特定のリソースを購読し、その変更時に通知を受け取る |
prompts/list | client | server | 利用可能なプロンプトの検出 |
prompts/get | client | server | 特定のプロンプトを取得 |
roots/list | client | server | serverがclientのファイルシステムRootポイントにアクセスできる権限を列挙 |
sampling/createMessage | client | server | serverがAI能力の生成機能を利用できるようにする |
MCPのサブスクリプションモードに関する拡張的考察
MCPのresourceのmethod中に、あまり目立たない「resources/subscribe」が私の注目を引きました。まず、resourceとは何かを見てみましょう。公式の定義によると:
Resources represent any kind of data that an MCP server wants to make available to clients. This can include:File contents、Database records、API responses、Live system data、Screenshots and images、Log files、And more
したがって、resources/subscribeを使用してデータベースをサブスクライブすると、そのデータベースのすべての変更が次々とプッシュされてくることになり、これはストリーム処理の一般的な使用形態に非常に似ています。
SSEはすでにサーバーからクライアントへの一方向データストリームを確立しているため、クライアントがサブスクリプションを開始すると、Flinkストリーム処理タスクを作成してMQにメッセージを送信することで、リソースのサブスクリプションを非常にネイティブに実装できます。上記のトポロジー構造を拡張してみましょう。
-
大規模言語モデルの視点から見た流計算:
MCPプロトコルに基づき、大規模言語モデルは流計算の能力に非常にエレガントに接続し、複雑なビジネスロジックを構築することができます。 -
流計算の視点から見た大規模言語モデル:
MCPプロトコルを使用すると、大規模言語モデルは標準的な流計算処理ノードのようになり、ストリーミングメッセージを受信したり、他のMQにメッセージを配信したりすることができます。これはMCP設計の利点の一つです。MCPはRPCに少し似ていて、MQにも少し似ています。では、これは実際には何なのでしょうか?プログラミングモデルの観点から考えてみましょう。
MCPのプログラミングモデルに関する考察
MCPは、プログラミングモデルの観点から見ると、本質的にはステートフルな双方向RPC(リモートプロシージャコール)モデルであり、イベント駆動型とリクエスト-レスポンス型の特性を組み合わせています。この混合モードにより、AIアプリケーションと外部システムの統合において独自の利点を持っています。
MCPの主要な特徴は以下の通りです:
-
ステートフルセッション:
従来のステートレスなREST APIとは異なり、MCPはセッション状態を維持し、クライアントとサーバー間に長期的な接続を確立します。セッションには明確なライフサイクルがあります。 -
双方向通信:
クライアントがサーバーを呼び出す(従来のRPCモード)だけでなく、サーバーもクライアントを呼び出す(逆RPCモード)ことができます。例えば、サーバーはクライアントにAIサンプリングの実行を要求できます。 -
能力ベースの交渉:
初期化段階で能力交渉を行い、利用可能な機能を動的に発見し、異なる実装やバージョンに適応します。 -
イベント通知メカニズム:
一方向通知、リソース変更サブスクリプションモード、非同期イベント処理をサポートします。 -
標準化されたインターフェース:
一連の標準操作を定義し、JSON Schemaを使用してパラメータとリターン値を定義し、相互運用性を促進します。
MCPと他のプロトコルの比較
MCP vs REST API
特性 | MCP | REST API |
---|---|---|
状態管理 | ステートフルな会話 | ステートレス |
通信方向 | 双方向 | 単方向(クライアント→サーバー) |
通信モード | 同期リクエスト-レスポンス + 非同期通知 | 主に同期リクエスト-レスポンス |
リソース表現 | 構造化リソース + ツール | リソース中心 |
発見メカニズム | 動的な能力交渉とリソース/ツール発見 | 通常は事前にAPIを理解する必要がある |
MCP vs Message Queuing(MQ)
特性 | MCP | メッセージキュー(RabbitMQ、Kafkaなど) |
---|---|---|
通信モード | 直接ポイントツーポイント通信 | パブリッシュ-サブスクライブまたはプロデューサー-コンシューマー |
ミドルウェア依存性 | 専用のミドルウェア不要 | 専用のメッセージ処理サービスが必要 |
メッセージルーティング | 会話IDに基づく直接ルーティング | トピック/キューに基づく間接ルーティング |
メッセージ保証 | 通常はリアルタイム配信 | 永続化とメッセージ保証をサポート |
MCP vs WebSocket
特性 | MCP | WebSocket |
---|---|---|
プロトコル層 | アプリケーション層プロトコル(複数の転送層上で実装可能) | 転送層プロトコル |
メッセージ構造 | 構造化JSON-RPCメッセージ | 特定のメッセージフォーマットなし(自己定義が必要) |
状態管理 | 内蔵された会話状態管理 | 状態管理は自身で実装する必要がある |
機能発見 | 内蔵されたツールとリソース発見メカニズム | 発見メカニズムは自身で実装する必要がある |
MCPの独自性と実装
MCPは各種プログラミングモデルにおいて独特な位置を占めています:
- REST APIよりもステートフルで双方向通信が可能
- メッセージキューよりも直接的で軽量
- WebSocketよりも構造化され標準化されている
- gRPCよりも柔軟で理解しやすい
- GraphQLよりもツール呼び出しに焦点を当てている
- RPCよりもリソースとコンテキストを重視している
MCPがこのように独特な機能的位置にあるからこそ、現在の能力の制限を理由にMCPのネイティブな適応を諦めるべきではありません。非同期タスク、イベント駆動型アーキテクチャなどは本来MCPとネイティブに接続できるはずです。
MCPサービスの簡易実装
MCPの動作原理は複雑ではないので、優れた設計に敬意を表して自分たちで実装してみましょう。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import uuid
from pydantic import BaseModel
from typing import Optional
import uvicorn
import inspect
app = FastAPI()
mcpHub = {}
class McpRequest(BaseModel):
id: Optional[int] = None
jsonrpc: str
method: str
params: Optional[dict] = None
class MCPServer:
def __init__(self, name, message_path, tools):
self.queue = asyncio.Queue()
self.client_id = str(uuid.uuid4())
self.message_path = message_path
self.info = {
"protocolVersion": "2024-11-05",
"capabilities": {
"experimental": {},
"tools": {
"listChanged": False
}
},
"serverInfo": {
"name": name,
"version": "1.6.0"
}
}
self.tools = tools
def list_tool(self):
result = []
for tool in self.tools:
toolInfo = {
"name": tool.__name__,
"description": tool.__doc__,
"inputSchema": {"type": "object","properties":{}},
}
for name, param in inspect.signature(tool).parameters.items():
toolInfo["inputSchema"]["properties"][name] = {
"title": name,
"type": "string",
}
result.append(toolInfo)
return result
async def reader(self):
while True:
event = await self.queue.get()
yield event
@staticmethod
def response(result, id):
message = {
"jsonrpc": "2.0",
"result": result,
}
if id is not None:
message["id"] = id
return json.dumps(message)
async def request(self, req: McpRequest):
if req.method == "initialize":
await self.queue.put({"event": "message", "data": self.response(self.info, req.id)})
elif req.method == "tools/list":
await self.queue.put({"event": "message", "data": self.response({"tools": self.list_tool()}, req.id)})
elif req.method == "tools/call":
for tool in self.tools:
if tool.__name__ == req.params.get("name"):
result = await tool(**req.params["arguments"])
await self.queue.put({"event": "message", "data": self.response({"content": result, "isError": False}, req.id)})
break
async def test(state=None):
"""
description
"""
result = f"hi {state}"
await asyncio.sleep(1)
result += "!"
return result
@app.get("/receive_test")
async def receive_test():
mcp = MCPServer(name="mcp-test",message_path="/send_test", tools=[test])
mcpHub[mcp.client_id] = mcp
await mcp.queue.put({"event": "endpoint", "data": f"{mcp.message_path}?client_id={mcp.client_id}"})
return EventSourceResponse(mcp.reader())
@app.post("/send_test")
async def send_test(request: Request, payload: McpRequest):
client_id = request.query_params.get("client_id")
if client_id not in mcpHub:
return "no client"
await mcpHub[client_id].request(payload)
return "ok"
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)
約100行ほどのコードで、簡易版MCPサービスを実装しました。公式のMCP Python SDKと比較して、いくつかの重要な特性を最適化しています:
-
ツール登録が
@mcp.tool
のようなアノテーションに依存せず、完全に動的に渡すことができます。異なるシナリオに対して異なるMCP URLを提供し、それぞれ異なるツールを提供できます。 -
プログラミングモデルはMQ駆動型サービスで、非同期システム、イベント駆動型システムやプラットフォームとの連携に適しています。このPython実装を参考に、他の言語版への変換も比較的容易です。
-
/sse
や/message
などのデフォルトルートアドレスに依存せずに正常に動作し、MCPのURLが完全にカスタマイズ可能であることを証明しています。
総括:MCPの本質を理解する
MCPの原理、通信メカニズム、プログラミングモデルの本質を深く検討した結果、MCPが単なるAPIやSDKではなく、精巧に設計されたプロトコルであることがわかりました:
- クライアント-ホスト-サーバーアーキテクチャを採用し、複数のサーバー接続をサポート
- ステートフルな双方向RPCモデルを実現し、イベント駆動型の特性を組み合わせている
- 標準化されたツール呼び出しとリソースアクセスメカニズムを提供
- 動的な能力交渉と機能発見をサポート
- MQ、API、WSと比較して独自の機能的位置を占め、AIアプリケーションと外部システムの統合に最適化されている
100行のコードで見たように、MCPのコア原理は複雑ではありませんが、その設計は巧妙です。このような深い理解があれば、単純なSDK使用を超えて、より強力で柔軟なAIアプリケーション統合ソリューションを作成することができます。
Discussion