🤖
StreamingResponseとLangGraphのastreamとMCP Clientを一緒に使う際のtips
はじめに
FastAPIのStreamingResponseとLangGraphのagentのastreamとMCP Client一緒に使ったシンプルな公式ドキュメントがなかったため、こちらにサンプルコードと共に共有します。
サンプルコード
# Use server from examples/servers/streamable-http-stateless/
import json
from typing import AsyncGenerator
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain_core.messages import AIMessageChunk
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def response_generator() -> AsyncGenerator[str, None]:
async with streamablehttp_client("http://localhost:8080/mcp") as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools = await load_mcp_tools(session)
agent = create_react_agent("openai:gpt-4.1", tools)
async for msg, _ in agent.astream({"messages": "what's (3 + 5) x 12?"}, stream_mode="messages"):
if isinstance(msg, AIMessageChunk):
json_str = json.dumps({ "content": msg.content }, ensure_ascii=False)
yield f"data: {json_str}\n\n"
yield "data: [DONE]\n\n"
app = FastAPI()
@app.route("/")
async def root(request: Request):
return StreamingResponse(response_generator(), media_type="text/event-stream")
if __name__ == "__main__":
uvicorn.run(app)
streamablehttp_client
とClientSession
はジェネレータの内側に書く必要があり、ジェネレータの外側で定義してしまうと、ClosedResourceErrorが発生します。
参考
Discussion