🐕

ADK + MCP (PostgreSQL + GitHub) + RAG Engine (+ Notion API)を試す

に公開

昨今のAI関係の技術進化速度は目まぐるしいですね。

執筆時点では上記技術が結構熱い感じです。

これらの技術を一挙に試してみたので、備忘録として残します。
ちなみに私はpythonに強くないので、コードの8割はCursorで指示出して作ったものです。

作るAgent

Notionのデータベース内にあるページをナレッジとするルートエージェント
サブエージェントとして以下のエージェントをルートエージェントに登録

ちなみにGitHubは公式のMCPサーバーが公開されていますが、今回は使いません。(というか使えなかった)
軽い気持ちで使ったらレスポンスが充実しているようで、トークンが1回の実行で10万とか消費されてしまったためです。(トークンのリミットに引っ掛かってエラーにはなってくれた)

最初はNotionもMCPで情報を取得できるのでは?と期待しましたが、Notion APIはあまり検索って行為に強くないのでナレッジ化することにしました。

まずはNotionのナレッジ化

Notion APIでNotionデータベースからページを全部持ってくるマンを作成。
ページはNotionPageという形になる。
contentにページ内のテキストが入る。ページ内のテキストはある程度をマークダウンっぽく変換している。

@dataclass
class NotionPage:
    """Notionページの情報を格納するクラス"""

    page_id: str
    title: str
    url: str
    properties: Dict[str, Any]
    content: str


class NotionClient:
    """Notion APIクライアント"""

    def __init__(self):
        """Notion APIクライアントを初期化する"""
        notion_api_key = os.getenv("NOTION_API_KEY")
        print(notion_api_key)
        if not notion_api_key:
            raise ValueError("NOTION_API_KEYが設定されていません")
        self.client = Client(
            options=ClientOptions(
                auth=notion_api_key,
                base_url="https://api.notion.com",
                notion_version="2022-06-28",
                log_level=logging.DEBUG,
            )
        )

    def get_database_pages(
        self, database_id: str, limit: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """データベースに含まれるページの一覧を取得する

        Args:
            database_id: データベースID
            limit: 取得するページ数の上限

        Returns:
            ページ情報のリスト
        """
        logger.info(f"データベース {database_id} のページ一覧を取得します")

        pages = []

        start_cursor = None
        while len(pages) < limit:
            response = self.client.databases.query(
                database_id=database_id,
                filter={
                    "or": [
                        # {
                        #     "property": "有効期限",
                        #     "date": {
                        #         "is_empty": True,
                        #     },
                        # },
                    ]
                },
                sorts=[],
                start_cursor=start_cursor,
                **({"page_size": limit} if limit else {}),
            )

            pages.extend(response.get("results", []))
            start_cursor = response.get("next_cursor")

            if not response.get("has_more", False):
                break

        return pages

    def get_page_info(self, page_id: str) -> Dict[str, Any]:
        """ページの基本情報を取得する

        Args:
            page_id: ページID

        Returns:
            ページの基本情報
        """
        logger.info(f"ページ {page_id} の基本情報を取得します")
        return self.client.pages.retrieve(page_id=page_id)

    def get_page_content(self, block_id: str) -> List[Dict[str, Any]]:
        """ページのコンテンツを取得する

        Args:
            block_id: ブロックID(ページIDと同じ)

        Returns:
            ページのコンテンツ
        """
        logger.info(f"ブロック {block_id} のコンテンツを取得します")
        blocks = []

        # 再帰的にブロックを取得する関数
        def fetch_blocks(block_id: str):
            response = self.client.blocks.children.list(block_id=block_id)
            for block in response.get("results", []):
                blocks.append(block)
                # 子ブロックがあれば再帰的に取得
                if block.get("has_children", False):
                    fetch_blocks(block["id"])

        fetch_blocks(block_id)
        return blocks

    def extract_page_title(self, page_info: Dict[str, Any]) -> str:
        """ページのタイトルを抽出する

        Args:
            page_info: ページ情報

        Returns:
            ページのタイトル
        """
        properties = page_info.get("properties", {})
        for prop_name, prop_value in properties.items():
            prop_type = prop_value.get("type")
            if prop_type == "title":
                title_parts = prop_value.get("title", [])
                if title_parts:
                    return "".join(
                        [part.get("plain_text", "") for part in title_parts]
                    )
        return "無題"

    def extract_text_from_blocks(self, blocks: List[Dict[str, Any]]) -> str:
        """ブロックからテキストを抽出する

        Args:
            blocks: ブロックのリスト

        Returns:
            抽出されたテキスト
        """
        text_parts = []

        for block in blocks:
            block_type = block.get("type")
            if not block_type or block_type not in block:
                continue

            block_content = block[block_type]

            # 各ブロックタイプに応じてテキストを抽出
            if block_type in [
                "paragraph",
                "heading_1",
                "heading_2",
                "heading_3",
                "bulleted_list_item",
                "numbered_list_item",
                "toggle",
                "quote",
            ]:
                rich_text = block_content.get("rich_text", [])
                if rich_text:
                    text = "".join(
                        [rt.get("plain_text", "") for rt in rich_text]
                    )
                    prefix = ""
                    if block_type == "heading_1":
                        prefix = "# "
                    elif block_type == "heading_2":
                        prefix = "## "
                    elif block_type == "heading_3":
                        prefix = "### "
                    elif block_type == "bulleted_list_item":
                        prefix = "• "
                    elif block_type == "numbered_list_item":
                        prefix = "1. "  # 単純化のため番号は固定

                    text_parts.append(f"{prefix}{text}")

            elif block_type == "code":
                rich_text = block_content.get("rich_text", [])
                if rich_text:
                    code = "".join(
                        [rt.get("plain_text", "") for rt in rich_text]
                    )
                    language = block_content.get("language", "")
                    text_parts.append(f"```{language}\n{code}\n```")

            elif block_type == "table":
                # テーブルの処理は複雑なため、単純に「テーブルがあります」という情報だけ追加
                text_parts.append("[テーブル]")

        return "\n\n".join(text_parts)

    def extract_properties_text(self, properties: Dict[str, Any]) -> str:
        """プロパティからテキストを抽出する

        Args:
            properties: プロパティ情報

        Returns:
            抽出されたテキスト
        """
        property_texts = []

        for prop_name, prop_value in properties.items():
            prop_type = prop_value.get("type")

            if prop_type == "title":
                # タイトルはすでに別で取得するため、ここではスキップ
                continue

            if prop_type == "rich_text":
                rich_text = prop_value.get("rich_text", [])
                if rich_text:
                    text = "".join(
                        [rt.get("plain_text", "") for rt in rich_text]
                    )
                    property_texts.append(f"{prop_name}: {text}")

            elif prop_type == "select":
                select = prop_value.get("select")
                if select and "name" in select:
                    property_texts.append(f"{prop_name}: {select['name']}")

            elif prop_type == "multi_select":
                multi_select = prop_value.get("multi_select", [])
                if multi_select:
                    values = [
                        item.get("name", "")
                        for item in multi_select
                        if "name" in item
                    ]
                    if values:
                        property_texts.append(
                            f"{prop_name}: {', '.join(values)}"
                        )

            elif prop_type == "date":
                date = prop_value.get("date")
                if date and "start" in date:
                    date_str = date["start"]
                    if "end" in date and date["end"]:
                        date_str += f" - {date['end']}"
                    property_texts.append(f"{prop_name}: {date_str}")

            elif prop_type in [
                "number",
                "checkbox",
                "email",
                "phone_number",
                "url",
            ]:
                if prop_type in prop_value:
                    property_texts.append(
                        f"{prop_name}: {prop_value[prop_type]}"
                    )

        return "\n".join(property_texts)

    def get_page(self, page_id: str) -> NotionPage:
        """ページの全情報を取得する

        Args:
            page_id: ページID

        Returns:
            NotionPageオブジェクト
        """
        # ページの基本情報を取得
        page_info = self.get_page_info(page_id)

        # タイトルを抽出
        title = self.extract_page_title(page_info)

        # URLを取得
        url = page_info.get("url", "")

        # プロパティを取得
        properties = page_info.get("properties", {})

        # ページコンテンツを取得
        blocks = self.get_page_content(page_id)

        # ブロックからテキストを抽出
        content_text = self.extract_text_from_blocks(blocks)

        # プロパティからテキストを抽出
        properties_text = self.extract_properties_text(properties)

        # コンテンツとプロパティを結合
        combined_content = f"タイトル: {title}\n\n"
        if properties_text:
            combined_content += f"プロパティ:\n{properties_text}\n\n"
        combined_content += f"コンテンツ:\n{content_text}"

        return NotionPage(
            page_id=page_id,
            title=title,
            url=url,
            properties=properties,
            content=combined_content,
        )

ドキュメントクラスを用意。NotionPageをDocumentに変換する。

@dataclass
class Document:
    """ベクトルストアに保存するドキュメントクラス"""

    id: str
    content: str
    metadata: Dict[str, Any]

DocumentをEmbeddingしてVectorStoreに保存するマンを作成。

  1. Corpusを作成(指定されたDisplay Nameと同じものがあればそれを使用)
    コープスってログに出してるけど、コーパスだと思う。
  2. Documentはjsonに変換されてCloudStorageに保存。
  3. CloudStorageのファイルをRAG Engineにインポート(RAG Engineにインポートって言葉が適切かはわかりません)
    1回に読み込めるのは25ファイルまでっぽいので、その辺はいい感じにハンドリング。
def chunks(arr: List[Any], n: int) -> Generator[List[Any], None, None]:
    """リストを指定されたサイズのチャンクに分割するジェネレータ関数"""
    for i in range(0, len(arr), n):
        yield arr[i : i + n]

class VectorStoreInterface:
    """ベクトルストアインターフェース"""

    def add_documents(self, documents: List[Document]) -> None:
        """ドキュメントをベクトルストアに追加する

        Args:
            documents: 追加するドキュメントのリスト
        """
        raise NotImplementedError()


class GoogleRagEngine(VectorStoreInterface):
    """Google Cloud RAG Engineを使用したベクトルストア実装"""

    def __init__(
        self,
        project_id: str,
        location: str,
        corpus_display_name: str,
        bucket_name: Optional[str] = None,
    ):
        """初期化

        Args:
            project_id: GCPプロジェクトID
            location: GCPロケーション
            corpus_display_name: コープスの表示名
            bucket_name: Cloud Storage バケット名(指定がない場合はプロジェクトのデフォルトバケット)
        """
        self.project_id = project_id
        self.location = location
        self.corpus_display_name = corpus_display_name
        self.bucket_name = bucket_name or f"{project_id}-rag-documents"

        # Google Cloudの初期化
        vertexai.init(project=project_id, location=location)
        self.storage_client = storage.Client(project=project_id)

        # RAG Corpusの初期化
        self.corpus = None
        self.initialize_corpus()

        # バケットの作成または取得
        self.bucket = self.get_or_create_bucket()

        logger.info(
            f"Google RAG Corpus {corpus_display_name} を初期化しました"
        )

    def get_or_create_bucket(self):
        """Cloud Storageバケットを取得または作成する"""
        try:
            bucket = self.storage_client.get_bucket(self.bucket_name)
            logger.info(f"既存のバケット {self.bucket_name} を使用します")
            return bucket
        except Exception as e:
            logger.info(
                f"バケット {self.bucket_name} が存在しないため作成します: {e}"
            )
            return self.storage_client.create_bucket(
                self.bucket_name, location=self.location
            )

    def initialize_corpus(self):
        """RAG Corpusを初期化する"""
        try:
            self.corpus = get_corpus_by_display_name(self.corpus_display_name)
        except Exception as e:
            logger.info(
                f"コープス {self.corpus_display_name} が存在しないため作成します: {e}"
            )

        # マルチリンガルのembeddingモデルを使用(日本語対応)
        embedding_model_config = rag.EmbeddingModelConfig(
            publisher_model="publishers/google/models/text-multilingual-embedding-002",
        )

        self.corpus = rag.create_corpus(
            display_name=self.corpus_display_name,
            embedding_model_config=embedding_model_config,
        )

        logger.info(f"コープス {self.corpus_display_name} を作成しました")

    def upload_document_to_storage(self, document: Document) -> str:
        """ドキュメントをCloud Storageにアップロードする

        Args:
            document: アップロードするドキュメント

        Returns:
            Cloud Storage上のURI
        """
        # ドキュメントをJSONに変換
        document_data = {
            "id": document.id,
            "content": document.content,
            "metadata": document.metadata,
        }

        # 一時ファイルを作成してCloud Storageにアップロード
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".json", delete=False
        ) as temp_file:
            json.dump(document_data, temp_file)
            temp_file_path = temp_file.name

        try:
            # Cloud Storageへのパスを構築
            file_name = f"documents/{document.id}.json"
            blob = self.bucket.blob(file_name)
            blob.upload_from_filename(temp_file_path)

            # アップロードした一時ファイルを削除
            os.unlink(temp_file_path)

            gcs_uri = f"gs://{self.bucket_name}/{file_name}"
            logger.info(
                f"ドキュメント {document.id}{gcs_uri} にアップロードしました"
            )
            return gcs_uri
        except Exception as e:
            logger.error(f"ドキュメントのアップロードに失敗しました: {e}")
            if os.path.exists(temp_file_path):
                os.unlink(temp_file_path)
            raise

    def add_documents(self, documents: List[Document]) -> None:
        """ドキュメントをRAG Engineに追加する

        Args:
            documents: 追加するドキュメントのリスト
        """
        if not self.corpus:
            logger.error("RAG Corpusが初期化されていません")
            return

        logger.info(
            f"{len(documents)}件のドキュメントをRAG Engineに追加します"
        )

        # 各ドキュメントをCloud Storageにアップロードして、データソースとして登録
        gcs_uris = []
        for document in documents:
            try:
                gcs_uri = self.upload_document_to_storage(document)
                gcs_uris.append(gcs_uri)
            except Exception as e:
                logger.error(
                    f"ドキュメント {document.id} のアップロードに失敗しました: {e}"
                )

        if not gcs_uris:
            logger.error("アップロードに成功したドキュメントがありません")
            return

        # 一時JSONファイルを作成して、データソースを定義
        try:
            # データソースからコーパスにインポート
            gcs_uris_chunks = list(chunks(gcs_uris, 25))
            logger.info(
                f"ドキュメントのインポートを開始します: {len(gcs_uris_chunks)}チャンク"
            )
            for gcs_uris_chunk in gcs_uris_chunks:
                import_task = rag.import_files(
                    corpus_name=self.corpus.name,
                    paths=gcs_uris_chunk,
                    chunk_size=512,
                    chunk_overlap=64,
                )
                logger.info(
                    f"ドキュメントのインポートが完了しました: {import_task.imported_rag_files_count}件"
                )

        except Exception as e:
            logger.error(f"ドキュメントのインポートに失敗しました: {e}")
            raise e

        logger.info("ドキュメントの追加が完了しました")

Notionページ持ってくるマンとVectorStoreに保存するマンを組み合わせて使うマンを作成。

class NotionEmbedder:
    """NotionページをEmbeddingして保存するクラス"""

    def __init__(
        self,
        vector_store: VectorStoreInterface,
    ):
        """初期化

        Args:
            vector_store: ベクトルストアインターフェース
        """
        self.notion_client = NotionClient()
        self.vector_store = vector_store

    def process_database(
        self,
        database_id: str,
        category: str,
        limit: Optional[int] = None,
    ) -> None:
        """データベースの処理

        Args:
            database_id: NotionデータベースID
            category: カテゴリ文字列
            limit: 処理するページ数の上限
        """
        logger.info(
            f"データベース {database_id} の処理を開始します(カテゴリ: {category})"
        )

        # データベース内のページIDを取得
        pages = self.notion_client.get_database_pages(database_id, limit)

        logger.info(f"{len(pages)}件のページが見つかりました")

        # 各ページを処理
        documents = []
        for page in pages:
            page_id = page["id"]

            try:
                # ページ情報を取得
                notion_page = self.notion_client.get_page(page_id)

                # ドキュメントを作成
                document = Document(
                    id=notion_page.page_id,
                    content=notion_page.content,
                    metadata={
                        "title": notion_page.title,
                        "url": notion_page.url,
                        "category": category,
                        "database_id": database_id,
                    },
                )

                documents.append(document)
                logger.info(
                    f"ページ「{notion_page.title}」の処理が完了しました"
                )

            except Exception as e:
                logger.error(
                    f"ページ {page_id} の処理中にエラーが発生しました: {e}"
                )

        # ベクトルストアに保存
        if documents:
            self.vector_store.add_documents(documents)
            logger.info(f"{len(documents)}件のドキュメントを保存しました")
        else:
            logger.warning("保存するドキュメントがありません")

実行した結果をGoogle Cloudのどの画面で見れるのかわからない・・・。普段はAWSを使っていてGoogle Cloudは初めてなんですよね。

RAG EngineはデフォルトでRAG Managed DBというDBにベクトルデータを保存するらしい。
そのDBはSpannerっぽいらしいが、どこ見ればそのDBを確認できるのかわからん。

とりあえず、ドキュメントを参考にLLM+RAGで質問してNotionに書いていることが回答されるか試す。

import os

import vertexai
from dotenv import load_dotenv
from shared_functions import get_corpus_by_display_name
from vertexai.generative_models import (
    GenerativeModel,
    Tool,
)
from vertexai.preview import rag

load_dotenv()
vertexai.init(
    project=os.environ.get("GOOGLE_CLOUD_PROJECT", "your project"),
    location="us-central1",
)

corpus = get_corpus_by_display_name(
    os.environ.get("GOOGLE_RAG_CORPUS_DISPLAY_NAME")
)
rag_retrieval_config = rag.RagRetrievalConfig(
    top_k=10,
    filter=rag.Filter(vector_distance_threshold=0.5),
)

rag_retrieval_tool = Tool.from_retrieval(
    retrieval=rag.Retrieval(
        source=rag.VertexRagStore(
            rag_resources=[rag.RagResource(rag_corpus=corpus.name)],
            rag_retrieval_config=rag_retrieval_config,
        )
    )
)

model = GenerativeModel(
    model_name="gemini-1.5-flash-002",
    tools=[rag_retrieval_tool],
)

response = model.generate_content(
    "〇〇って何ですか?", # Notionに書いてあることを質問
)
print(response)

「〇〇とは、~~~~」と回答が返ってきたことを確認し、うまくいったことを確認。

次はADKでAgentを作る

ADKで作るAgentにこのRAGを使うように設定。
https://github.com/google/adk-samples/tree/main/agents/RAGhttps://github.com/google/adk-samples/tree/main/agents/fomc-research を参考に作成

import logging
import os
import warnings

from google.adk.agents import Agent
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools.retrieval.vertex_ai_rag_retrieval import (
    VertexAiRagRetrieval,
)
from vertexai.preview import rag

from . import MODEL
from .shared_libraries.callbacks import rate_limit_callback
from .shared_libraries.store_state import store_state_tool
from .sub_agents.database_agent import database_agent
from .sub_agents.github_agent import github_agent

def get_corpus_by_display_name(display_name: str) -> rag.RagCorpus:
    """指定された表示名に一致するRAGコープスを取得する"""
    corpus = None
    next_page_token = None
    while corpus is None:
        output = rag.list_corpora(page_token=next_page_token)
        for output in output.pages:
            for corpus in output.rag_corpora:
                if corpus.display_name == display_name:
                    return corpus
        next_page_token = output.next_page_token
        if next_page_token is None or next_page_token == "":
            raise ValueError(f"コープス {display_name} が見つかりません")

ROOT_PROMPT = """
あなたは〇〇の仕様に詳しいエージェントです。

基本的にあなたへのプロンプトは retrieve_rag_documentation を使って回答を探してください。
回答の中で、データベースに関することやGitHubリポジトリに関することがあれば、それらのサブエージェントを呼び出してください。
もし、あなたがわからない場合は、各サブエージェントに依頼してください。

各サブエージェントは特定の情報源にアクセスするようにMCP(Model Context Protocol)を通じて設計されています:

- GitHubリポジトリやソースコードに関する要求は、github_agentを呼び出してください。
  このエージェントは、https://github.com/owner/ のリポジトリにアクセスできます。

- データベースに関する要求は、database_agentを呼び出してください。
  このエージェントは、PostgreSQLデータベースのテーブル構造やデータを取得できます。

あなたがMCPの使い方を知っている必要はありません。
各エージェントが使い方を知っているので、それらを呼び出してください。

サブエージェントからの情報を整理して、ユーザーの質問に分かりやすく回答してください。
サブエージェントから「〇〇を参照する必要がある」という回答が返ってきた場合、該当するサブエージェントに依頼してください。
サブエージェントの回答から別のサブエージェントを呼び出すことも可能です。
"""

corpus = get_corpus_by_display_name(
    os.environ.get("GOOGLE_RAG_CORPUS_DISPLAY_NAME")
)

# RAGを使って回答を出すためのツールを定義
ask_vertex_retrieval_tool = VertexAiRagRetrieval(
    name="retrieve_rag_documentation",
    description="Use this tool to retrieve documentation and reference materials for the question from the RAG corpus,",
    rag_resources=[rag.RagResource(rag_corpus=corpus.name)],
    similarity_top_k=10,
    vector_distance_threshold=0.5,
)

root_agent = Agent(
    model=MODEL,
    name="root_agent",
    description="〇〇の仕様を調査するためのエージェント",
    instruction=ROOT_PROMPT,
    before_model_callback=rate_limit_callback,
    sub_agents=[
        github_agent,
        database_agent,
    ],
    tools=[
        store_state_tool,
        ask_vertex_retrieval_tool,
    ],
)

ADKは adk web . で作ったエージェントとの会話画面を簡単に用意できる。
そのためには、root_agentという名前でモジュール内にAgentが存在していなければならないっぽい。

AgentでLLMがMCPで何かするためには、MCPサーバーから取得できるツール一覧をAgentのツールとして設定する必要がある。

ADK + MCP 公式ドキュメント https://google.github.io/adk-docs/tools/mcp-tools/

ドキュメントを見てもらうとわかるように、MCPサーバーからツールを取得するのは非同期である。
これは root_agent を作るときに少し面倒な感じっぽい。
pythonに詳しくないですが、Node.jsで考えたら非同期でしか作れないものをモジュールの変数としてまるで同期的に作ったようにエクスポートすることはできないので理解はできる。

Agentクラスの実装を見て何かできないかなーと思ったら、AgentLlmAgentクラスのエイリアスであり、BaseAgentを継承していた。

BaseAgentの実装を見ると怪しげなメソッド発見。

  async def _run_async_impl(
      self, ctx: InvocationContext
  ) -> AsyncGenerator[Event, None]:
    """Core logic to run this agent via text-based conversaction.

    Args:
      ctx: InvocationContext, the invocation context for this agent.

    Yields:
      Event: the events generated by the agent.
    """
    raise NotImplementedError(
        f'_run_async_impl for {type(self)} is not implemented.'
    )
    yield  # AsyncGenerator requires having at least one yield statement

Agentを継承したクラスを作って、LlmAgentの実装を邪魔しない形で、コンテキストを処理する前にツールをMCPサーバーから取得して自身にセットすれば良さそう。
会話の中で何度もこの関数が動くと思われるので、スーパークラスの処理が終わった後にツールをセットする前に戻せば問題なし。

class McpAgent(LlmAgent):
    mcp_connection_params: StdioServerParameters

    filter_mcp_tools_callback: Callable[[List[ToolUnion]], List[ToolUnion]] = (
        None
    )

    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        logger.info("Connecting to MCP server...")
        # MCPサーバーからツールを取得
        tools, exit_stack = await MCPToolset.from_server(
            connection_params=self.mcp_connection_params
        )
        logger.info(
            f"Successfully connected to MCP server. Found {len(tools)} tools."
        )

        logger.info(f"Tools: {tools}")

        valid_tools = []
        for tool in tools:
            try:
                tool._get_declaration()
                valid_tools.append(tool)
            except Exception as e:
                logger.warning(f"Tool {tool.name} has no declaration: {e}")

        if self.filter_mcp_tools_callback:
            valid_tools = self.filter_mcp_tools_callback(valid_tools)

        original_tools = self.tools
        self.tools = [*valid_tools, *original_tools]

        tool_names = [getattr(t, "name", str(t)) for t in self.tools]
        print(f"Agent '{self.name}': Tools reconfigured to: {tool_names}")

        try:
            async for event in super()._run_async_impl(ctx):
                yield event
        finally:
            self.tools = original_tools
            if exit_stack:
                await exit_stack.aclose()

念のためWrite系のツールを除外できるようにフィルター関数をセットできるようにする。
いきなりIssueとか作られても困るしね。

これはめちゃくちゃ悩まされたんですけど、MCPサーバーから得られたツールの一部がpydanticのバリデーションエラーになる問題がありました。
MCPサーバーから得られたツールをループで回し、tool._get_declaration()が無事に実行できるかどうかで判断しています。
このメソッドの中でバリデートされてるみたいなので。

        valid_tools = []
        for tool in tools:
            try:
                tool._get_declaration()
                valid_tools.append(tool)
            except Exception as e:
                logger.warning(f"Tool {tool.name} has no declaration: {e}")

McpAgentを使って github_agent を作成する。

import logging
import os

from google.adk.tools.mcp_tool.mcp_toolset import (
    StdioServerParameters,
)

from .. import MODEL
from ..shared_libraries.callbacks import (
    after_tool_callback,
    before_tool_callback,
    rate_limit_callback,
)
from ..shared_libraries.mcpagent import McpAgent
from ..shared_libraries.store_state import store_state_tool

logger = logging.getLogger(__name__)

GITHUB_PAT = os.getenv("GITHUB_PAT")
if not GITHUB_PAT:
    logger.warning("GITHUB_PAT environment variable not set")

GITHUB_PROMPT = """
You are an agent that searches for code and information from the GitHub repository.

## About tools
- search_code
    - You need to generate some similar words to search from user's question. You can use AND/OR operator to search multiple words.
    - A Parentheses is also available.
    - If you want to search code under specific {organization} then include `org:{organization}` in the query
    - If you want to search code under a specific repository then include `repo:{organization}/{repository-name}` in the query
- search_repositories
    - A response will be too big. So, please specify per_page and page parameters.

If you cannot find the information requested, please answer clearly that "The information is not found".
If you find the information, please answer in markdown format without summarizing.
If the information from GitHub is too large, please answer only the requested information in markdown format.

You cannot make any changes to the repository.
"""

# McpAgentを使用してGitHubエージェントを作成
github_agent = McpAgent(
    model=MODEL,
    name="github_agent",
    description="Get code and information from the GitHub repository(MCP based)",
    instruction=GITHUB_PROMPT,
    tools=[store_state_tool],
    before_model_callback=rate_limit_callback,
    before_tool_callback=before_tool_callback,
    after_tool_callback=after_tool_callback,
    mcp_connection_params=StdioServerParameters(
        command="npx",
        args=[
            "-y",
            "@modelcontextprotocol/server-github",
        ],
        env={
            "GITHUB_PERSONAL_ACCESS_TOKEN": GITHUB_PAT,
        },
    ),
    filter_mcp_tools_callback=lambda tools: [
        tool
        for tool in tools
        if hasattr(tool, "name")
        and not (
            tool.name.startswith("create") or tool.name.startswith("update")
        )
    ],
)

トークン節約のために英語でinstructionを設定してみる。
database_agentも同様に作成。

before_tool_callback とか after_tool_callback はただログを出してるだけなのでお気になさらず。

これで adk web . でトーク画面から会話してみる。



Notionに書かれていることをしっかりと回答できてる!
GitHub Repository(プライベート)にもアクセスできてるし、データベースから検索することもできてる!

このAgentをMCPサーバーでWrapしてあげれば、Cursorからも利用できるかもしれない。
そうなると開発がかなり支援されそうな予感。

おもろー。

Discussion