🍣

OpenAI Agent SDKによるMCP clientサンプル

に公開

はじめに

この記事はOpenAI Agents SDK用のMCP clientのサンプルの解説です。OpenAI AgentsはAIエージェントを作るためのSDKで、チャット機能よりは外部システムとの連携に重点が置かれています。
MCPはModel Context Protocolの略で、Anthropic社が発表したLLMと外部システムの連携用プロトコルで、MCPによりシステム間連携がかんたんになりました。
この記事で解説するコードはただのサンプルレベルのものですが、MCPサーバを設定ファイルで追加するだけで、ここここにあるアプリやシステムから情報取得、操作が可能になります。

参考

OpenAI Agents SDK を使った MCP Client サンプルコード解説

コード全体はこちらにあります。
main.pyの重要なところだけ解説します。


パッケージと実行方法

パッケージはOpenAI Agents SDKが必要です。

pip install openai-agents

また、エージェント同士のつながりの可視化をする場合はgraphvizが必要になります。可視化をしない場合は不要です。

pip install graphviz

また、実行にはOpenAIのAPIキーが必要です。プログラム内で環境変数から読み取るので、

export OPENAI_API_KEY=<APIキー>

を実行しておく必要があります。

実行は下記のコマンドのように第1引数に設定ファイルを指定します。

python main.py <設定ファイル(JSON)>

1. 必要なモジュールのインポート

import asyncio
from agents import Agent, Runner, gen_trace_id, trace, ItemHelpers
from agents.mcp import MCPServerStdio
from contextlib import AsyncExitStack
import json
import readline  # for accepting Japanese input
import sys
  • agents モジュール: OpenAI Agents SDK のメインモジュールです。ここから必要なものをインポートすることになります。
  • readline: 日本語入力をサポートするためのモジュール。これがないと日本語入力でバックスペースが使えません。

2. 設定ファイルの読み込み関数

def parse_config(config_file: str) -> dict:
  with open(config_file, 'r') as file:
    config = file.read()
  return json.loads(config)

MCPサーバの情報(起動コマンドや引数)、エージェント間のハンドオフ情報をJSONに書いておき、これを起動時に読み込みます。

設定ファイルのサンプルは以下のようになっています。

{
  "starting_agent": "Triage Agent",
  "agents": [
    {
      "name": "Triage Agent",
      "model": "gpt-4o-mini",
      "instructions": "You determine which agent to use.",
      "assistants": ["Translation Agent", "FileSystem Agent"],
      "mcp_servers": []
    },
    {
      "name": "Translation Agent",
      "model": "gpt-4o-mini",
      "instructions": "You are a specialized assistant of translation.",
      "assistants": [],
      "mcp_servers": []
    },
    {
      "name": "FileSystem Agent",
      "model": "gpt-4o-mini",
      "instructions": "You are a specialized assistant of read and write files.",
      "assistants": [],
      "mcp_servers": [
        {
          "name": "filesystem",
          "command": "npx",
          "args": [
            "-y",
            "@modelcontextprotocol/server-filesystem",
            "./"
          ],
          "cache_tools_list": false
        }
      ]
    }
  ]
}
  • starting_agent : ルートとなるエージェントを指定します。最初に質問する相手となるエージェントです。
  • agents : ここにエージェントを登録します。
    • Triage Agent: この設定ファイルでは他のエージェント(assistant)を選択して質問を投げる役割を担います。
    • Translation Agent: 翻訳用のエージェントです。LLMが翻訳能力を持っているので外部システムであるMCPサーバは登録していません。
    • FileSystem Agent: ローカルファイル操作をするエージェントです。ローカルファイル操作用のMCPであるFilesystem MCP serverを登録しています。オプションでカレントディレクトリを指定しており、このディレクトリのファイルの読み書きができます。

agentsでは

  • name: エージェントの名前で、自由に設定できます。
  • model: 使用するモデルです。
  • instruction: エージェントの役割を書きます。ここの書き方はエージェントの判断能力に影響するので、具体的にできること、やってもらいたいことを書くのがおすすめです。
  • assistants: 仕事を任せる可能性のあるエージェントを列挙します。
  • mcp_servers: MCPサーバを使う場合はここに列挙します。複数登録できます。
    • name: MCPサーバの名前です。
    • command: 起動コマンドを指定します。
    • args: リスト形式でコマンド引数を列挙します。
    • cache_tools_list: MCPサーバをキャッシュするかどうか指定します。

3. MCP サーバとエージェントの初期化

  async with AsyncExitStack() as stack:
    agents = dict()
    agents_candidates = {ag['name']: ag for ag in config['agents']}
    # check if all assistants are available
    for agent_name, ag in agents_candidates.items():
      if not set(ag['assistants']).issubset(agents_candidates.keys()):
        print(f"Error: Agent '{agent_name}' has invalid assistants.")
        sys.exit(1)
    # TODO: Currently, circular dependencies are not checked.

    # initialize agents
    while len(agents_candidates) > len(agents):
      for agent_name, ag in agents_candidates.items():
        if agent_name not in agents.keys() and set(ag['assistants']).issubset(agents.keys()):
          mcp_servers = [
            await stack.enter_async_context(
              MCPServerStdio(
                name=agent_name,
                params={
                  'command': server['command'],
                  'args': server['args']
                },
                cache_tools_list=ag['cache_tools_list'],
              )
            )
            for server in ag['mcp_servers']
          ]
          agents[agent_name] = Agent(
            model=ag['model'],
            name=agent_name,
            instructions=ag['instructions'],
            mcp_servers=mcp_servers,
            handoffs=[agents[assistant_name] for assistant_name in ag['assistants']],
          )

ここでは設定ファイルに書いてあるとおりにエージェントとMCPサーバを設定しています。
前半で、存在していないエージェントがassistantとして指定されていないかチェックをしています。
エージェントの循環参照はできない仕様ですが、そのチェックは実装していません。
設定ファイルに循環参照があるとここで無限ループになるので、注意してください。
また、非同期リソースであるMCPサーバを管理するためにAsyncExitStackを使っています。

4. エージェントの実行

    starting_agent = agents[config['starting_agent']]
    print(f'Starting agent: {starting_agent.name}')

    trace_id = gen_trace_id()
    print(f'Trace ID: {trace_id}')
    with trace(workflow_name='MCP test', trace_id=trace_id):
      result = None

      while True:
        try:
          print()
          print("Enter your query (or 'exit' to quit):")
          text = input('>>> ')
          if text == '':
            continue
          elif text.lower() in ['exit', 'quit', 'q']:
            break

          print(f'Query: {text}')
          print()

          if result is None:
            new_input = text
          else:
            new_input = result.to_input_list() + [
              {
                'role': 'user',
                'content': text,
              }
            ]

          result = Runner.run_streamed(starting_agent, new_input)
          async for event in result.stream_events():
            if event.type == 'raw_response_event':
              continue
            elif event.type == 'agent_updated_stream_event':
              print(f'Agent updated: {event.new_agent.name}')
              continue
            elif event.type == 'run_item_stream_event':
              if event.item.type == 'tool_call_item':
                print(f'-- Tool was called: {event.item.to_input_item()}')
              elif event.item.type == 'tool_call_output_item':
                print(f'-- Tool output: {event.item.output}')
              elif event.item.type == 'message_output_item':
                print(f'-- Message output:\n {ItemHelpers.text_message_output(event.item)}')
              else:
                pass

        except KeyboardInterrupt:
          print('Stopped')
          break
        except Exception as e:
          print(f'Error occurred:\n{e}')
          print('Please try again.')

gen_trace_id()でトレースIDを生成します。トレース機能を使うことでダッシュボードから確認できるようになります。
無限ループで入力を受け付けます。2回目以降は過去のやり取りも含めてエージェントに渡す必要があるため、new_inputはresultと新しい入力の連結になります。
Runner.run_streamed()でエージェントを実行します。このとき1つのエージェントとユーザ入力を引数にしています。
エージェントからの返答をストリーミングしない場合はRunner.run()を使います。
ここでは、ストリーミングにして、途中経過やMCPサーバとのやり取りを出力しています。

Discussion