🤖

StreamingResponseとLangGraphのastreamとMCP Clientを一緒に使う際のtips

に公開

はじめに

FastAPIのStreamingResponseとLangGraphのagentのastreamとMCP Client一緒に使ったシンプルな公式ドキュメントがなかったため、こちらにサンプルコードと共に共有します。

サンプルコード

# Use server from examples/servers/streamable-http-stateless/

import json
from typing import AsyncGenerator

import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

from langchain_core.messages import AIMessageChunk
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def response_generator() -> AsyncGenerator[str, None]:
    async with streamablehttp_client("http://localhost:8080/mcp") as (read_stream, write_stream, _):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tools = await load_mcp_tools(session)
            agent = create_react_agent("openai:gpt-4.1", tools)
            async for msg, _ in agent.astream({"messages": "what's (3 + 5) x 12?"}, stream_mode="messages"):
                if isinstance(msg, AIMessageChunk):
                    json_str = json.dumps({ "content": msg.content }, ensure_ascii=False)
                    yield f"data: {json_str}\n\n"
            yield "data: [DONE]\n\n"

app = FastAPI()
@app.route("/")
async def root(request: Request):
    return StreamingResponse(response_generator(), media_type="text/event-stream")

if __name__ == "__main__":
    uvicorn.run(app)

streamablehttp_clientClientSessionはジェネレータの内側に書く必要があり、ジェネレータの外側で定義してしまうと、ClosedResourceErrorが発生します。

参考

https://github.com/langchain-ai/langchain-mcp-adapters/blob/main/README.md

https://github.com/langchain-ai/langchain-mcp-adapters/issues/78

https://github.com/langchain-ai/langchain-mcp-adapters/tree/main/examples/servers/streamable-http-stateless

https://github.com/langchain-ai/langchain-mcp-adapters/pull/154

Discussion