🚀

ADK で作った agent を mcp server で公開する

2025/04/12に公開

ほぼ前回の続き
https://zenn.dev/satohjohn/articles/b23bd65c289257

A2A を調べてたんですがその前に mcp 何も知らんということで実装しながら手で覚えていきます。
前回使っていた code_agent (sequential_agent) を公開できるようにします。
ADK の agent を作ったら、それを mcp server として公開ができる AgentTool というものがあるので、それを使います。
https://google.github.io/adk-docs/tools/function-tools/#3-agent-as-a-tool
補足ですが、ADK 内で tool として MCP Client 使うこともできます。

前準備

前回に加えて mcp を追加してください。

uv add uvicorn fastapi google-adk mcp

実装

sample のようにサーバ部分を実装します
https://google.github.io/adk-docs/tools/mcp-tools/#2-building-an-mcp-server-with-adk-tools-mcp-server-exposing-adk

adk_mcp_server.py
import json
import os
from dotenv import load_dotenv

# MCP Server Imports
from fastapi import FastAPI, Request
from fastapi.routing import Mount
from mcp import types as mcp_types # Use alias to avoid conflict with genai.types
from mcp.server.lowlevel import Server
from mcp.server.sse import SseServerTransport

# ADK Tool Imports
from google.adk.tools.agent_tool import AgentTool
# ADK <-> MCP Conversion Utility
from google.adk.tools.mcp_tool.conversion_utils import adk_to_mcp_tool_type
import uvicorn

from agents.code_agent.agent import root_agent

# --- Load Environment Variables (If ADK tools need them) ---
load_dotenv()

# --- Prepare the ADK Tool ---
# Instantiate the ADK tool you want to expose
print("Initializing ADK load_web_page tool...")
adk_tool = AgentTool(agent=root_agent)
print(f"ADK tool '{adk_tool.name}' initialized.")
# --- End ADK Tool Prep ---

# --- MCP Server Setup ---
print("Creating MCP Server instance...")
# Create a named MCP Server instance
app = Server("adk-tool-mcp-server")
sse = SseServerTransport("/messages/")

# Implement the MCP server's @app.list_tools handler
@app.list_tools()
async def list_tools() -> list[mcp_types.Tool]:
  """MCP handler to list available tools."""
  print("MCP Server: Received list_tools request.")
  # Convert the ADK tool's definition to MCP format
  mcp_tool_schema = adk_to_mcp_tool_type(adk_tool)
  print(f"MCP Server: Advertising tool: {mcp_tool_schema.name}")
  return [mcp_tool_schema]

# Implement the MCP server's @app.call_tool handler
@app.call_tool()
async def call_tool(
    name: str, arguments: dict
) -> list[mcp_types.TextContent | mcp_types.ImageContent | mcp_types.EmbeddedResource]:
  """MCP handler to execute a tool call."""
  print(f"MCP Server: Received call_tool request for '{name}' with args: {arguments}")

  # Check if the requested tool name matches our wrapped ADK tool
  if name == adk_tool.name:
    try:
      # Execute the ADK tool's run_async method
      # Note: tool_context is None as we are not within a full ADK Runner invocation
      adk_response = await adk_tool.run_async(
          args=arguments,
          tool_context=None, # No ADK context available here
      )
      print(f"MCP Server: ADK tool '{name}' executed successfully.")
      # Format the ADK tool's response (often a dict) into MCP format.
      # Here, we serialize the response dictionary as a JSON string within TextContent.
      # Adjust formatting based on the specific ADK tool's output and client needs.
      response_text = json.dumps(adk_response, indent=2)
      return [mcp_types.TextContent(type="text", text=response_text)]

    except Exception as e:
      print(f"MCP Server: Error executing ADK tool '{name}': {e}")
      # Return an error message in MCP format
      # Creating a proper MCP error response might be more robust
      error_text = json.dumps({"error": f"Failed to execute tool '{name}': {str(e)}"})
      return [mcp_types.TextContent(type="text", text=error_text)]
  else:
      # Handle calls to unknown tools
      print(f"MCP Server: Tool '{name}' not found.")
      error_text = json.dumps({"error": f"Tool '{name}' not implemented."})
      # Returning error as TextContent for simplicity
      return [mcp_types.TextContent(type="text", text=error_text)]

if __name__ == "__main__":

    # Mount the SSE server to the existing ASGI server

    fastAPI = FastAPI(
        routes=[
            Mount('/messages', app=sse.handle_post_message),
        ]
    )

    # Add documentation for the /messages endpoint
    @fastAPI.get("/messages", tags=["MCP"], include_in_schema=True)
    def messages_docs():
        """
        Messages endpoint for SSE communication

        This endpoint is used for posting messages to SSE clients.
        Note: This route is for documentation purposes only.
        The actual implementation is handled by the SSE transport.
        """
        pass  # This is just for documentation, the actual handler is mounted above


    @fastAPI.get("/sse", tags=["MCP"])
    async def handle_sse(request: Request):
        """
        SSE endpoint that connects to the MCP server

        This endpoint establishes a Server-Sent Events connection with the client
        and forwards communication to the Model Context Protocol server.
        """
        # Use sse.connect_sse to establish an SSE connection with the MCP server
        async with sse.connect_sse(request.scope, request.receive, request._send) as (
            read_stream,
            write_stream,
        ):
            # Run the MCP server with the established streams
            await app.run(
                read_stream,
                write_stream,
                app.create_initialization_options(),
            )

    uvicorn.run(fastAPI, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
  • adk_tool = AgentTool(agent=root_agent) で agent を使っています。
  • FastAPI で sse transport を動かすためにゴニョゴニョやっています。

疎通確認

意味はまったくないですが、私が使ってみたいということでテストは adk の方で mcp client が使えるため、やってみます。

SseServerParams があり、これを使うと server の方にリクエストします。

test_mcp_client.py
import asyncio
from google.genai import types
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

async def get_tools_async():
  """Gets tools from the File System MCP Server."""
  print("Attempting to connect to MCP Filesystem server...")
  tools, exit_stack = await MCPToolset.from_server(
    connection_params=SseServerParams(url="http://localhost:8080/sse")
  )
  return tools, exit_stack

async def get_agent_async():
  """Creates an ADK Agent equipped with tools from the MCP Server."""
  tools, exit_stack = await get_tools_async()
  print(f"Fetched {len(tools)} tools from MCP server.")
  root_agent = LlmAgent(
      model='gemini-2.0-flash', # Adjust model name if needed based on availability
      name='mcp_code_gen_app',
      instruction='You are a helpful coding assistant. Help users create and generate source code using the available tools. Prefer practical, working solutions and explain your code when appropriate.',
      tools=tools, # Provide the MCP tools to the ADK agent
  )
  return root_agent, exit_stack

async def async_main():
  session_service = InMemorySessionService()

  session = session_service.create_session(
      state={}, app_name='mcp_code_gen_app', user_id='user_fs'
  )

  query = "Write a Python function to calculate the factorial of a number"
  print(f"User Query: '{query}'")
  content = types.Content(role='user', parts=[types.Part(text=query)])

  root_agent, exit_stack = await get_agent_async()

  runner = Runner(
      app_name='mcp_code_gen_app',
      agent=root_agent,
      session_service=session_service,
  )

  print("Running agent...")
  events_async = runner.run_async(
      session_id=session.id, user_id=session.user_id, new_message=content
  )

  async for event in events_async:
    print(f"Event received: {event}")

  # Crucial Cleanup: Ensure the MCP server process connection is closed.
  print("Closing MCP server connection...")
  await exit_stack.aclose()
  print("Cleanup complete.")

if __name__ == '__main__':
  try:
    asyncio.run(async_main())
  except Exception as e:
    print(f"An error occurred: {e}")

実行すると以下のようにログが流れます

ログ
$ python3 ./test_mcp_client.py
User Query: 'Write a Python function to calculate the factorial of a number.'
Attempting to connect to MCP Filesystem server...
Fetched 1 tools from MCP server.
Running agent...
Event received: content=Content(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='```python\ndef factorial(n):\n  """\n  Calculate the factorial of a non-negative integer.\n\n  Args:\n    n: A non-negative integer.\n\n  Returns:\n    The factorial of n, which is the product of all positive integers less than or equal to n.\n  """\n  if n == 0:\n    return 1\n  else:\n    return n * factorial(n-1)\n```'), Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='\nThis function uses recursion to calculate the factorial. If n is 0, it returns 1 (base case). Otherwise, it returns n multiplied by the factorial of n-1.')], role='model') grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None invocation_id='e-d88342e3-dc9f-4f16-ac77-05433a357d4d' author='mcp_code_gen_app' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='B0ojtBVD' timestamp=1744385022.879246
Closing MCP server connection...
Cleanup complete.

Cloud Run で動かしてみる

uvicorn, fastapi で動かすので、前回と一緒です。が iap を今回外しno-allow-unauthenticatedをつけます。

gcloud beta run deploy adk-agent-mcp \
--region $GOOGLE_CLOUD_LOCATION \
--project $GOOGLE_CLOUD_PROJECT \
--image asia-northeast1-docker.pkg.dev/$GOOGLE_CLOUD_PROJECT/samples/adk-python-mcp \
--no-allow-unauthenticated \
--set-env-vars="GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT,GOOGLE_CLOUD_LOCATION=$GOOGLE_CLOUD_LOCATION,GOOGLE_GENAI_USE_VERTEXAI=$GOOGLE_GENAI_USE_VERTEXAI"

no-allow-unauthenticated をつけましたので普通にはアクセスできません。そのため client の SseServerParams に auth header をつけ cloud run の URL にします。

  token = "xxxxxx" # ここは動的にしておいたほうが良いですがデモなので
  tools, exit_stack = await MCPToolset.from_server(
    connection_params=SseServerParams(url="https://xxxxxxx.us-central1.run.app/sse", headers={
      'Authorization': f"Bearer {token}",
    })
  )

これにより、認証したユーザの invoke 権限で mcp server を叩くことができます

感想

mcp サーバ作らねばみたいなこと言ってた人が ほぼ 0 から5時間でぐらいまぁ調べてなんとかできましたという感じです。

adk 通してということで agent を使っているため、チョット成功率は低い気がしますし、なんでここでpromptを書いているのかよくわからなくなるので、
agent 呼び出しという意味では A2A のような形で Card を共有する感じなのかなという気持ちです。

Discussion