🛸

MCPサーバーをSSE経由でエージェントと接続する

に公開

執筆日

2025/06/30

概要

先日FastAPIを使ってMCPサーバーを立て、OpenAIのAgent SDKと接続する方法について解説するブログを書きました。
https://zenn.dev/headwaters/articles/8d9a3e56ad4e1c
こちらの記事ではmcp.run(transport="stdio")を使って標準入出力形式でMCPサーバーを起動するコードを書いたため、以下のようにMCPサーバーをサブプロセスを使ったコマンド実行で呼び出してエージェントに渡す必要がありました。

<前略>
params = MCPServerStdioParams({
        "command": "python",
        "args": ["news.py"],
    })

# MCPサーバーの設定
async with MCPServerStdio(
    params=params
) as mcp_server:
    agent = Agent(
        name="Assistant",
        instructions="あなたはニュース検索エージェントです。最新のニュースを検索して、ユーザーに提供します。",
        mcp_servers=[mcp_server],
        model=OpenAIResponsesModel( 
            model=MODEL_NAME,
            openai_client=client,
        )
    )

しかし、エージェントの呼び出しが何回も行われる場合や、同じMCPサーバーを複数のエージェントで共有する場合、リモートでサーバーを立ち上げたい場合などでは、毎回コマンド実行をしサブプロセスを立ち上げる実装はどうなんだろうという気がします。MCPサーバーは常時起動しておいて、必要な時にエージェントと接続する方が一般的な使い方になると思います。
そこで使えるのがmcp.run(transport="sse")で、SSE通信を利用してMCPサーバーとクライアントを通信させることができます。(Server-Sent Events (SSE) は、サーバーからクライアントへリアルタイムにデータを送信するための技術です)


stdioとsseを使ってAPIを作った場合の違い
図にするとざっくりこんな感じでしょうか(各枠の正確な単位は気にしないでください)。MCPサーバーが完全に分離されていると、別プロセス・別ネットワーク・別マシン(コンテナ)で立ち上げられるので色々便利だろうな~ということが伝われば。

依存ライブラリインストール

前回同様python環境はuvを使います。今回はmcpライブラリからではなくfastmcpライブラリからfastmcpを使ってみます。(ほとんど同じように使えそうですが、runの引数の仕様が違うので注意)

uv add "fastmcp"

スクリプト

MCPサーバー
mcp_news_sse.py
from GoogleNews import GoogleNews
from fastmcp import FastMCP

# MCPサーバーのインスタンスを作成
mcp = FastMCP("NEWS Searcher")

RESULT_TEMPLATE = """\
タイトル: {title}
メディア: {media}
日時: {date}
概要: {text}
URL: {url}"""

@mcp.tool()
def search_news(keyword: str) -> float:
    """検索キーワードを使ってGoogle Newsで最新のニュースを検索します。
Args:
    search_target (str): 検索キーワード
Returns:
    str: 検索結果のまとめ
    """
    googlenews = GoogleNews()
    googlenews.search(key=keyword)

    # 記事の一覧を取得
    results = googlenews.results()

    ret = []
    # 上位3件を取得
    for i, article in enumerate(results[:3], start=1):
        ret.append(RESULT_TEMPLATE.format(
            title=article["title"],
            media=article["media"],
            date=article["date"],
            text=article["desc"],
            url=article["link"]
        ))

    return "\n\n".join(ret)

# サーバーを起動
if __name__ == "__main__":
    # mcp.run(transport="sse") # host, port, pathを指定しなかった場合エンドポイントは"http://127.0.0.1:8000/sse/"になる
    mcp.run(transport="sse", host="0.0.0.0", port=8080, path="/news") # "http://0.0.0.0:8080/news/"
エージェント実行
mcp_agent_sse.py
import json
import asyncio
from agents import Agent, Runner, OpenAIResponsesModel, set_default_openai_client, set_tracing_disabled
from openai import AsyncAzureOpenAI
from agents.mcp import MCPServerSse, MCPServerSseParams

OPENAI_ENDPOINT = "<endpoint>"
OPENAI_KEY = "<key>"
OPENAI_API_VERSION = "2025-03-01-preview" # 2025-03-01-preview以降であればResponses APIが使える
MODEL_NAME = "gpt-4.1" # Azure OpenAIではデプロイ名

async def main():
    # Azure OpenAIの設定
    client = AsyncAzureOpenAI(
        api_key=OPENAI_KEY,
        api_version=OPENAI_API_VERSION,
        azure_endpoint=OPENAI_ENDPOINT,
    )
    # Agents SDK にクライアントを設定
    set_default_openai_client(client)
    # トレース機能を無効にする(OpenAIのダッシュボードと接続しない場合オフにしないと401エラーが出る。)
    set_tracing_disabled(disabled=True)

    # SSEサーバー接続設定
    params = MCPServerSseParams({
            "url": "http://localhost:8080/news",
            "timeout": 60,
    })
    mcp_server = MCPServerSse(params=params)
    # MCPサーバーの接続チェック(事前にconnectを走らせないとエラーになる)
    await mcp_server.connect()

    # エージェントの作成
    agent = Agent(
        name="Assistant",
        instructions="あなたはニュース検索エージェントです。最新のニュースを検索して、ユーザーに提供します。",
        mcp_servers=[mcp_server],
        model=OpenAIResponsesModel( 
            model=MODEL_NAME,
            openai_client=client,
        )
    )

    input_list = []
    while True:
        # ユーザーからの入力を待つ
        user_input = input("\nユーザー入力 (終了は'exit'or'q'): ")
        if user_input.lower() == 'exit' or user_input.lower() == 'q':
            break
        input_list.append({
            "role": "user",
            "content": user_input,
        })

        # エージェントに入力を渡す
        result = await Runner.run(
            starting_agent=agent, 
            input=input_list,
        )
        print(result)
        input_list.extend(result.to_input_list())
    
    print("全てのやり取り:")
    for item in input_list:
        print(json.dumps(item, ensure_ascii=False, indent=2))

    await mcp_server.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

実行

2つのターミナルでterminal_1から順にスクリプトを実行してください。

terminal_1
$ uv run mcp_news_sse.py

INFO     Starting MCP server 'NEWS Searcher' with transport 'sse' on http://0.0.0.0:8080/news
INFO:     Started server process [29264]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
terminal_2
$ uv run mcp_agent_sse.py

ユーザー入力 (終了は'exit'or'q'): |

Note

  • SSEサーバーのインスタンスを立ち上げた後、mcp_server.connect()関数を実行せずにエージェントを走らせると以下のようなエラーが出て怒られた。
agents.exceptions.UserError: Server not initialized. Make sure you call `connect()` first.
  • withブロックでサーバーを立ち上げなかったので、mcp_server.cleanup()関数を実行せずサーバーとの接続を切らないでプロセスを終了すると以下のようなエラーが出て怒られた。
unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-43' coro=<<async_generator_athrow without __name__>()> exception=RuntimeError('Attempted to exit cancel scope in a different task than it was entered in')>       
Traceback (most recent call last):
<中略>
RuntimeError: Attempted to exit cancel scope in a different task than it was entered in

まとめ

今回はスクリプト内でサブプロセスを立ち上げて標準入出力をエージェントに渡すのではなく、別プロセスでサーバーを立ち上げてSSE通信でエージェントにMCPサーバーの出力を渡す方法を試しました。
実際にシステムを作る場合はSSEを使う場合が多いのではないかと思います。APIサーバーだったら、MCPサーバーを起動してアプリケーションコンテキストに追加しておけば使いまわせるか……という気もするけど別で管理した方がMCPサーバーだけに許可したい通信とか管理しやすいかもとかとか。

ヘッドウォータース

Discussion