ADK で作った agent を mcp server で公開する
ほぼ前回の続き
A2A を調べてたんですがその前に mcp 何も知らんということで実装しながら手で覚えていきます。
前回使っていた code_agent (sequential_agent) を公開できるようにします。
ADK の agent を作ったら、それを mcp server として公開ができる AgentTool というものがあるので、それを使います。
補足ですが、ADK 内で tool として MCP Client 使うこともできます。
前準備
前回に加えて mcp を追加してください。
uv add uvicorn fastapi google-adk mcp
実装
sample のようにサーバ部分を実装します
import json
import os
from dotenv import load_dotenv
# MCP Server Imports
from fastapi import FastAPI, Request
from fastapi.routing import Mount
from mcp import types as mcp_types # Use alias to avoid conflict with genai.types
from mcp.server.lowlevel import Server
from mcp.server.sse import SseServerTransport
# ADK Tool Imports
from google.adk.tools.agent_tool import AgentTool
# ADK <-> MCP Conversion Utility
from google.adk.tools.mcp_tool.conversion_utils import adk_to_mcp_tool_type
import uvicorn
from agents.code_agent.agent import root_agent
# --- Load Environment Variables (If ADK tools need them) ---
load_dotenv()
# --- Prepare the ADK Tool ---
# Instantiate the ADK tool you want to expose
print("Initializing ADK load_web_page tool...")
adk_tool = AgentTool(agent=root_agent)
print(f"ADK tool '{adk_tool.name}' initialized.")
# --- End ADK Tool Prep ---
# --- MCP Server Setup ---
print("Creating MCP Server instance...")
# Create a named MCP Server instance
app = Server("adk-tool-mcp-server")
sse = SseServerTransport("/messages/")
# Implement the MCP server's @app.list_tools handler
@app.list_tools()
async def list_tools() -> list[mcp_types.Tool]:
"""MCP handler to list available tools."""
print("MCP Server: Received list_tools request.")
# Convert the ADK tool's definition to MCP format
mcp_tool_schema = adk_to_mcp_tool_type(adk_tool)
print(f"MCP Server: Advertising tool: {mcp_tool_schema.name}")
return [mcp_tool_schema]
# Implement the MCP server's @app.call_tool handler
@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[mcp_types.TextContent | mcp_types.ImageContent | mcp_types.EmbeddedResource]:
"""MCP handler to execute a tool call."""
print(f"MCP Server: Received call_tool request for '{name}' with args: {arguments}")
# Check if the requested tool name matches our wrapped ADK tool
if name == adk_tool.name:
try:
# Execute the ADK tool's run_async method
# Note: tool_context is None as we are not within a full ADK Runner invocation
adk_response = await adk_tool.run_async(
args=arguments,
tool_context=None, # No ADK context available here
)
print(f"MCP Server: ADK tool '{name}' executed successfully.")
# Format the ADK tool's response (often a dict) into MCP format.
# Here, we serialize the response dictionary as a JSON string within TextContent.
# Adjust formatting based on the specific ADK tool's output and client needs.
response_text = json.dumps(adk_response, indent=2)
return [mcp_types.TextContent(type="text", text=response_text)]
except Exception as e:
print(f"MCP Server: Error executing ADK tool '{name}': {e}")
# Return an error message in MCP format
# Creating a proper MCP error response might be more robust
error_text = json.dumps({"error": f"Failed to execute tool '{name}': {str(e)}"})
return [mcp_types.TextContent(type="text", text=error_text)]
else:
# Handle calls to unknown tools
print(f"MCP Server: Tool '{name}' not found.")
error_text = json.dumps({"error": f"Tool '{name}' not implemented."})
# Returning error as TextContent for simplicity
return [mcp_types.TextContent(type="text", text=error_text)]
if __name__ == "__main__":
# Mount the SSE server to the existing ASGI server
fastAPI = FastAPI(
routes=[
Mount('/messages', app=sse.handle_post_message),
]
)
# Add documentation for the /messages endpoint
@fastAPI.get("/messages", tags=["MCP"], include_in_schema=True)
def messages_docs():
"""
Messages endpoint for SSE communication
This endpoint is used for posting messages to SSE clients.
Note: This route is for documentation purposes only.
The actual implementation is handled by the SSE transport.
"""
pass # This is just for documentation, the actual handler is mounted above
@fastAPI.get("/sse", tags=["MCP"])
async def handle_sse(request: Request):
"""
SSE endpoint that connects to the MCP server
This endpoint establishes a Server-Sent Events connection with the client
and forwards communication to the Model Context Protocol server.
"""
# Use sse.connect_sse to establish an SSE connection with the MCP server
async with sse.connect_sse(request.scope, request.receive, request._send) as (
read_stream,
write_stream,
):
# Run the MCP server with the established streams
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)
uvicorn.run(fastAPI, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
-
adk_tool = AgentTool(agent=root_agent)
で agent を使っています。 - FastAPI で sse transport を動かすためにゴニョゴニョやっています。
疎通確認
意味はまったくないですが、私が使ってみたいということでテストは adk の方で mcp client が使えるため、やってみます。
SseServerParams
があり、これを使うと server の方にリクエストします。
import asyncio
from google.genai import types
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
async def get_tools_async():
"""Gets tools from the File System MCP Server."""
print("Attempting to connect to MCP Filesystem server...")
tools, exit_stack = await MCPToolset.from_server(
connection_params=SseServerParams(url="http://localhost:8080/sse")
)
return tools, exit_stack
async def get_agent_async():
"""Creates an ADK Agent equipped with tools from the MCP Server."""
tools, exit_stack = await get_tools_async()
print(f"Fetched {len(tools)} tools from MCP server.")
root_agent = LlmAgent(
model='gemini-2.0-flash', # Adjust model name if needed based on availability
name='mcp_code_gen_app',
instruction='You are a helpful coding assistant. Help users create and generate source code using the available tools. Prefer practical, working solutions and explain your code when appropriate.',
tools=tools, # Provide the MCP tools to the ADK agent
)
return root_agent, exit_stack
async def async_main():
session_service = InMemorySessionService()
session = session_service.create_session(
state={}, app_name='mcp_code_gen_app', user_id='user_fs'
)
query = "Write a Python function to calculate the factorial of a number"
print(f"User Query: '{query}'")
content = types.Content(role='user', parts=[types.Part(text=query)])
root_agent, exit_stack = await get_agent_async()
runner = Runner(
app_name='mcp_code_gen_app',
agent=root_agent,
session_service=session_service,
)
print("Running agent...")
events_async = runner.run_async(
session_id=session.id, user_id=session.user_id, new_message=content
)
async for event in events_async:
print(f"Event received: {event}")
# Crucial Cleanup: Ensure the MCP server process connection is closed.
print("Closing MCP server connection...")
await exit_stack.aclose()
print("Cleanup complete.")
if __name__ == '__main__':
try:
asyncio.run(async_main())
except Exception as e:
print(f"An error occurred: {e}")
実行すると以下のようにログが流れます
ログ
$ python3 ./test_mcp_client.py
User Query: 'Write a Python function to calculate the factorial of a number.'
Attempting to connect to MCP Filesystem server...
Fetched 1 tools from MCP server.
Running agent...
Event received: content=Content(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='```python\ndef factorial(n):\n """\n Calculate the factorial of a non-negative integer.\n\n Args:\n n: A non-negative integer.\n\n Returns:\n The factorial of n, which is the product of all positive integers less than or equal to n.\n """\n if n == 0:\n return 1\n else:\n return n * factorial(n-1)\n```'), Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='\nThis function uses recursion to calculate the factorial. If n is 0, it returns 1 (base case). Otherwise, it returns n multiplied by the factorial of n-1.')], role='model') grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None invocation_id='e-d88342e3-dc9f-4f16-ac77-05433a357d4d' author='mcp_code_gen_app' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='B0ojtBVD' timestamp=1744385022.879246
Closing MCP server connection...
Cleanup complete.
Cloud Run で動かしてみる
uvicorn, fastapi で動かすので、前回と一緒です。が iap を今回外しno-allow-unauthenticatedをつけます。
gcloud beta run deploy adk-agent-mcp \
--region $GOOGLE_CLOUD_LOCATION \
--project $GOOGLE_CLOUD_PROJECT \
--image asia-northeast1-docker.pkg.dev/$GOOGLE_CLOUD_PROJECT/samples/adk-python-mcp \
--no-allow-unauthenticated \
--set-env-vars="GOOGLE_CLOUD_PROJECT=$GOOGLE_CLOUD_PROJECT,GOOGLE_CLOUD_LOCATION=$GOOGLE_CLOUD_LOCATION,GOOGLE_GENAI_USE_VERTEXAI=$GOOGLE_GENAI_USE_VERTEXAI"
no-allow-unauthenticated をつけましたので普通にはアクセスできません。そのため client の SseServerParams
に auth header をつけ cloud run の URL にします。
token = "xxxxxx" # ここは動的にしておいたほうが良いですがデモなので
tools, exit_stack = await MCPToolset.from_server(
connection_params=SseServerParams(url="https://xxxxxxx.us-central1.run.app/sse", headers={
'Authorization': f"Bearer {token}",
})
)
これにより、認証したユーザの invoke 権限で mcp server を叩くことができます
感想
mcp サーバ作らねばみたいなこと言ってた人が ほぼ 0 から5時間でぐらいまぁ調べてなんとかできましたという感じです。
adk 通してということで agent を使っているため、チョット成功率は低い気がしますし、なんでここでpromptを書いているのかよくわからなくなるので、
agent 呼び出しという意味では A2A のような形で Card を共有する感じなのかなという気持ちです。
Discussion