ADK + MCP (PostgreSQL + GitHub) + RAG Engine (+ Notion API)を試す
昨今のAI関係の技術進化速度は目まぐるしいですね。
- Google提供のADK(https://google.github.io/adk-docs/)
- Google提供のRAG Engine(https://cloud.google.com/vertex-ai/generative-ai/docs/rag-engine/rag-overview?hl=ja)
- Anthropicが発表したMCP(https://github.com/modelcontextprotocol)
執筆時点では上記技術が結構熱い感じです。
これらの技術を一挙に試してみたので、備忘録として残します。
ちなみに私はpythonに強くないので、コードの8割はCursorで指示出して作ったものです。
作るAgent
Notionのデータベース内にあるページをナレッジとするルートエージェント
サブエージェントとして以下のエージェントをルートエージェントに登録
- PostgreSQL MCPサーバーを利用してクエリできるエージェント
- GitHub MCPサーバーを利用してクエリできるエージェント
ちなみにGitHubは公式のMCPサーバーが公開されていますが、今回は使いません。(というか使えなかった)
軽い気持ちで使ったらレスポンスが充実しているようで、トークンが1回の実行で10万とか消費されてしまったためです。(トークンのリミットに引っ掛かってエラーにはなってくれた)
最初はNotionもMCPで情報を取得できるのでは?と期待しましたが、Notion APIはあまり検索って行為に強くないのでナレッジ化することにしました。
まずはNotionのナレッジ化
Notion APIでNotionデータベースからページを全部持ってくるマンを作成。
ページはNotionPageという形になる。
contentにページ内のテキストが入る。ページ内のテキストはある程度をマークダウンっぽく変換している。
@dataclass
class NotionPage:
"""Notionページの情報を格納するクラス"""
page_id: str
title: str
url: str
properties: Dict[str, Any]
content: str
class NotionClient:
"""Notion APIクライアント"""
def __init__(self):
"""Notion APIクライアントを初期化する"""
notion_api_key = os.getenv("NOTION_API_KEY")
print(notion_api_key)
if not notion_api_key:
raise ValueError("NOTION_API_KEYが設定されていません")
self.client = Client(
options=ClientOptions(
auth=notion_api_key,
base_url="https://api.notion.com",
notion_version="2022-06-28",
log_level=logging.DEBUG,
)
)
def get_database_pages(
self, database_id: str, limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""データベースに含まれるページの一覧を取得する
Args:
database_id: データベースID
limit: 取得するページ数の上限
Returns:
ページ情報のリスト
"""
logger.info(f"データベース {database_id} のページ一覧を取得します")
pages = []
start_cursor = None
while len(pages) < limit:
response = self.client.databases.query(
database_id=database_id,
filter={
"or": [
# {
# "property": "有効期限",
# "date": {
# "is_empty": True,
# },
# },
]
},
sorts=[],
start_cursor=start_cursor,
**({"page_size": limit} if limit else {}),
)
pages.extend(response.get("results", []))
start_cursor = response.get("next_cursor")
if not response.get("has_more", False):
break
return pages
def get_page_info(self, page_id: str) -> Dict[str, Any]:
"""ページの基本情報を取得する
Args:
page_id: ページID
Returns:
ページの基本情報
"""
logger.info(f"ページ {page_id} の基本情報を取得します")
return self.client.pages.retrieve(page_id=page_id)
def get_page_content(self, block_id: str) -> List[Dict[str, Any]]:
"""ページのコンテンツを取得する
Args:
block_id: ブロックID(ページIDと同じ)
Returns:
ページのコンテンツ
"""
logger.info(f"ブロック {block_id} のコンテンツを取得します")
blocks = []
# 再帰的にブロックを取得する関数
def fetch_blocks(block_id: str):
response = self.client.blocks.children.list(block_id=block_id)
for block in response.get("results", []):
blocks.append(block)
# 子ブロックがあれば再帰的に取得
if block.get("has_children", False):
fetch_blocks(block["id"])
fetch_blocks(block_id)
return blocks
def extract_page_title(self, page_info: Dict[str, Any]) -> str:
"""ページのタイトルを抽出する
Args:
page_info: ページ情報
Returns:
ページのタイトル
"""
properties = page_info.get("properties", {})
for prop_name, prop_value in properties.items():
prop_type = prop_value.get("type")
if prop_type == "title":
title_parts = prop_value.get("title", [])
if title_parts:
return "".join(
[part.get("plain_text", "") for part in title_parts]
)
return "無題"
def extract_text_from_blocks(self, blocks: List[Dict[str, Any]]) -> str:
"""ブロックからテキストを抽出する
Args:
blocks: ブロックのリスト
Returns:
抽出されたテキスト
"""
text_parts = []
for block in blocks:
block_type = block.get("type")
if not block_type or block_type not in block:
continue
block_content = block[block_type]
# 各ブロックタイプに応じてテキストを抽出
if block_type in [
"paragraph",
"heading_1",
"heading_2",
"heading_3",
"bulleted_list_item",
"numbered_list_item",
"toggle",
"quote",
]:
rich_text = block_content.get("rich_text", [])
if rich_text:
text = "".join(
[rt.get("plain_text", "") for rt in rich_text]
)
prefix = ""
if block_type == "heading_1":
prefix = "# "
elif block_type == "heading_2":
prefix = "## "
elif block_type == "heading_3":
prefix = "### "
elif block_type == "bulleted_list_item":
prefix = "• "
elif block_type == "numbered_list_item":
prefix = "1. " # 単純化のため番号は固定
text_parts.append(f"{prefix}{text}")
elif block_type == "code":
rich_text = block_content.get("rich_text", [])
if rich_text:
code = "".join(
[rt.get("plain_text", "") for rt in rich_text]
)
language = block_content.get("language", "")
text_parts.append(f"```{language}\n{code}\n```")
elif block_type == "table":
# テーブルの処理は複雑なため、単純に「テーブルがあります」という情報だけ追加
text_parts.append("[テーブル]")
return "\n\n".join(text_parts)
def extract_properties_text(self, properties: Dict[str, Any]) -> str:
"""プロパティからテキストを抽出する
Args:
properties: プロパティ情報
Returns:
抽出されたテキスト
"""
property_texts = []
for prop_name, prop_value in properties.items():
prop_type = prop_value.get("type")
if prop_type == "title":
# タイトルはすでに別で取得するため、ここではスキップ
continue
if prop_type == "rich_text":
rich_text = prop_value.get("rich_text", [])
if rich_text:
text = "".join(
[rt.get("plain_text", "") for rt in rich_text]
)
property_texts.append(f"{prop_name}: {text}")
elif prop_type == "select":
select = prop_value.get("select")
if select and "name" in select:
property_texts.append(f"{prop_name}: {select['name']}")
elif prop_type == "multi_select":
multi_select = prop_value.get("multi_select", [])
if multi_select:
values = [
item.get("name", "")
for item in multi_select
if "name" in item
]
if values:
property_texts.append(
f"{prop_name}: {', '.join(values)}"
)
elif prop_type == "date":
date = prop_value.get("date")
if date and "start" in date:
date_str = date["start"]
if "end" in date and date["end"]:
date_str += f" - {date['end']}"
property_texts.append(f"{prop_name}: {date_str}")
elif prop_type in [
"number",
"checkbox",
"email",
"phone_number",
"url",
]:
if prop_type in prop_value:
property_texts.append(
f"{prop_name}: {prop_value[prop_type]}"
)
return "\n".join(property_texts)
def get_page(self, page_id: str) -> NotionPage:
"""ページの全情報を取得する
Args:
page_id: ページID
Returns:
NotionPageオブジェクト
"""
# ページの基本情報を取得
page_info = self.get_page_info(page_id)
# タイトルを抽出
title = self.extract_page_title(page_info)
# URLを取得
url = page_info.get("url", "")
# プロパティを取得
properties = page_info.get("properties", {})
# ページコンテンツを取得
blocks = self.get_page_content(page_id)
# ブロックからテキストを抽出
content_text = self.extract_text_from_blocks(blocks)
# プロパティからテキストを抽出
properties_text = self.extract_properties_text(properties)
# コンテンツとプロパティを結合
combined_content = f"タイトル: {title}\n\n"
if properties_text:
combined_content += f"プロパティ:\n{properties_text}\n\n"
combined_content += f"コンテンツ:\n{content_text}"
return NotionPage(
page_id=page_id,
title=title,
url=url,
properties=properties,
content=combined_content,
)
ドキュメントクラスを用意。NotionPageをDocumentに変換する。
@dataclass
class Document:
"""ベクトルストアに保存するドキュメントクラス"""
id: str
content: str
metadata: Dict[str, Any]
DocumentをEmbeddingしてVectorStoreに保存するマンを作成。
- Corpusを作成(指定されたDisplay Nameと同じものがあればそれを使用)
コープスってログに出してるけど、コーパスだと思う。 - Documentはjsonに変換されてCloudStorageに保存。
- CloudStorageのファイルをRAG Engineにインポート(RAG Engineにインポートって言葉が適切かはわかりません)
1回に読み込めるのは25ファイルまでっぽいので、その辺はいい感じにハンドリング。
def chunks(arr: List[Any], n: int) -> Generator[List[Any], None, None]:
"""リストを指定されたサイズのチャンクに分割するジェネレータ関数"""
for i in range(0, len(arr), n):
yield arr[i : i + n]
class VectorStoreInterface:
"""ベクトルストアインターフェース"""
def add_documents(self, documents: List[Document]) -> None:
"""ドキュメントをベクトルストアに追加する
Args:
documents: 追加するドキュメントのリスト
"""
raise NotImplementedError()
class GoogleRagEngine(VectorStoreInterface):
"""Google Cloud RAG Engineを使用したベクトルストア実装"""
def __init__(
self,
project_id: str,
location: str,
corpus_display_name: str,
bucket_name: Optional[str] = None,
):
"""初期化
Args:
project_id: GCPプロジェクトID
location: GCPロケーション
corpus_display_name: コープスの表示名
bucket_name: Cloud Storage バケット名(指定がない場合はプロジェクトのデフォルトバケット)
"""
self.project_id = project_id
self.location = location
self.corpus_display_name = corpus_display_name
self.bucket_name = bucket_name or f"{project_id}-rag-documents"
# Google Cloudの初期化
vertexai.init(project=project_id, location=location)
self.storage_client = storage.Client(project=project_id)
# RAG Corpusの初期化
self.corpus = None
self.initialize_corpus()
# バケットの作成または取得
self.bucket = self.get_or_create_bucket()
logger.info(
f"Google RAG Corpus {corpus_display_name} を初期化しました"
)
def get_or_create_bucket(self):
"""Cloud Storageバケットを取得または作成する"""
try:
bucket = self.storage_client.get_bucket(self.bucket_name)
logger.info(f"既存のバケット {self.bucket_name} を使用します")
return bucket
except Exception as e:
logger.info(
f"バケット {self.bucket_name} が存在しないため作成します: {e}"
)
return self.storage_client.create_bucket(
self.bucket_name, location=self.location
)
def initialize_corpus(self):
"""RAG Corpusを初期化する"""
try:
self.corpus = get_corpus_by_display_name(self.corpus_display_name)
except Exception as e:
logger.info(
f"コープス {self.corpus_display_name} が存在しないため作成します: {e}"
)
# マルチリンガルのembeddingモデルを使用(日本語対応)
embedding_model_config = rag.EmbeddingModelConfig(
publisher_model="publishers/google/models/text-multilingual-embedding-002",
)
self.corpus = rag.create_corpus(
display_name=self.corpus_display_name,
embedding_model_config=embedding_model_config,
)
logger.info(f"コープス {self.corpus_display_name} を作成しました")
def upload_document_to_storage(self, document: Document) -> str:
"""ドキュメントをCloud Storageにアップロードする
Args:
document: アップロードするドキュメント
Returns:
Cloud Storage上のURI
"""
# ドキュメントをJSONに変換
document_data = {
"id": document.id,
"content": document.content,
"metadata": document.metadata,
}
# 一時ファイルを作成してCloud Storageにアップロード
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False
) as temp_file:
json.dump(document_data, temp_file)
temp_file_path = temp_file.name
try:
# Cloud Storageへのパスを構築
file_name = f"documents/{document.id}.json"
blob = self.bucket.blob(file_name)
blob.upload_from_filename(temp_file_path)
# アップロードした一時ファイルを削除
os.unlink(temp_file_path)
gcs_uri = f"gs://{self.bucket_name}/{file_name}"
logger.info(
f"ドキュメント {document.id} を {gcs_uri} にアップロードしました"
)
return gcs_uri
except Exception as e:
logger.error(f"ドキュメントのアップロードに失敗しました: {e}")
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
raise
def add_documents(self, documents: List[Document]) -> None:
"""ドキュメントをRAG Engineに追加する
Args:
documents: 追加するドキュメントのリスト
"""
if not self.corpus:
logger.error("RAG Corpusが初期化されていません")
return
logger.info(
f"{len(documents)}件のドキュメントをRAG Engineに追加します"
)
# 各ドキュメントをCloud Storageにアップロードして、データソースとして登録
gcs_uris = []
for document in documents:
try:
gcs_uri = self.upload_document_to_storage(document)
gcs_uris.append(gcs_uri)
except Exception as e:
logger.error(
f"ドキュメント {document.id} のアップロードに失敗しました: {e}"
)
if not gcs_uris:
logger.error("アップロードに成功したドキュメントがありません")
return
# 一時JSONファイルを作成して、データソースを定義
try:
# データソースからコーパスにインポート
gcs_uris_chunks = list(chunks(gcs_uris, 25))
logger.info(
f"ドキュメントのインポートを開始します: {len(gcs_uris_chunks)}チャンク"
)
for gcs_uris_chunk in gcs_uris_chunks:
import_task = rag.import_files(
corpus_name=self.corpus.name,
paths=gcs_uris_chunk,
chunk_size=512,
chunk_overlap=64,
)
logger.info(
f"ドキュメントのインポートが完了しました: {import_task.imported_rag_files_count}件"
)
except Exception as e:
logger.error(f"ドキュメントのインポートに失敗しました: {e}")
raise e
logger.info("ドキュメントの追加が完了しました")
Notionページ持ってくるマンとVectorStoreに保存するマンを組み合わせて使うマンを作成。
class NotionEmbedder:
"""NotionページをEmbeddingして保存するクラス"""
def __init__(
self,
vector_store: VectorStoreInterface,
):
"""初期化
Args:
vector_store: ベクトルストアインターフェース
"""
self.notion_client = NotionClient()
self.vector_store = vector_store
def process_database(
self,
database_id: str,
category: str,
limit: Optional[int] = None,
) -> None:
"""データベースの処理
Args:
database_id: NotionデータベースID
category: カテゴリ文字列
limit: 処理するページ数の上限
"""
logger.info(
f"データベース {database_id} の処理を開始します(カテゴリ: {category})"
)
# データベース内のページIDを取得
pages = self.notion_client.get_database_pages(database_id, limit)
logger.info(f"{len(pages)}件のページが見つかりました")
# 各ページを処理
documents = []
for page in pages:
page_id = page["id"]
try:
# ページ情報を取得
notion_page = self.notion_client.get_page(page_id)
# ドキュメントを作成
document = Document(
id=notion_page.page_id,
content=notion_page.content,
metadata={
"title": notion_page.title,
"url": notion_page.url,
"category": category,
"database_id": database_id,
},
)
documents.append(document)
logger.info(
f"ページ「{notion_page.title}」の処理が完了しました"
)
except Exception as e:
logger.error(
f"ページ {page_id} の処理中にエラーが発生しました: {e}"
)
# ベクトルストアに保存
if documents:
self.vector_store.add_documents(documents)
logger.info(f"{len(documents)}件のドキュメントを保存しました")
else:
logger.warning("保存するドキュメントがありません")
実行した結果をGoogle Cloudのどの画面で見れるのかわからない・・・。普段はAWSを使っていてGoogle Cloudは初めてなんですよね。
RAG EngineはデフォルトでRAG Managed DBというDBにベクトルデータを保存するらしい。
そのDBはSpannerっぽいらしいが、どこ見ればそのDBを確認できるのかわからん。
とりあえず、ドキュメントを参考にLLM+RAGで質問してNotionに書いていることが回答されるか試す。
import os
import vertexai
from dotenv import load_dotenv
from shared_functions import get_corpus_by_display_name
from vertexai.generative_models import (
GenerativeModel,
Tool,
)
from vertexai.preview import rag
load_dotenv()
vertexai.init(
project=os.environ.get("GOOGLE_CLOUD_PROJECT", "your project"),
location="us-central1",
)
corpus = get_corpus_by_display_name(
os.environ.get("GOOGLE_RAG_CORPUS_DISPLAY_NAME")
)
rag_retrieval_config = rag.RagRetrievalConfig(
top_k=10,
filter=rag.Filter(vector_distance_threshold=0.5),
)
rag_retrieval_tool = Tool.from_retrieval(
retrieval=rag.Retrieval(
source=rag.VertexRagStore(
rag_resources=[rag.RagResource(rag_corpus=corpus.name)],
rag_retrieval_config=rag_retrieval_config,
)
)
)
model = GenerativeModel(
model_name="gemini-1.5-flash-002",
tools=[rag_retrieval_tool],
)
response = model.generate_content(
"〇〇って何ですか?", # Notionに書いてあることを質問
)
print(response)
「〇〇とは、~~~~」と回答が返ってきたことを確認し、うまくいったことを確認。
次はADKでAgentを作る
ADKで作るAgentにこのRAGを使うように設定。
https://github.com/google/adk-samples/tree/main/agents/RAG と https://github.com/google/adk-samples/tree/main/agents/fomc-research を参考に作成
import logging
import os
import warnings
from google.adk.agents import Agent
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools.retrieval.vertex_ai_rag_retrieval import (
VertexAiRagRetrieval,
)
from vertexai.preview import rag
from . import MODEL
from .shared_libraries.callbacks import rate_limit_callback
from .shared_libraries.store_state import store_state_tool
from .sub_agents.database_agent import database_agent
from .sub_agents.github_agent import github_agent
def get_corpus_by_display_name(display_name: str) -> rag.RagCorpus:
"""指定された表示名に一致するRAGコープスを取得する"""
corpus = None
next_page_token = None
while corpus is None:
output = rag.list_corpora(page_token=next_page_token)
for output in output.pages:
for corpus in output.rag_corpora:
if corpus.display_name == display_name:
return corpus
next_page_token = output.next_page_token
if next_page_token is None or next_page_token == "":
raise ValueError(f"コープス {display_name} が見つかりません")
ROOT_PROMPT = """
あなたは〇〇の仕様に詳しいエージェントです。
基本的にあなたへのプロンプトは retrieve_rag_documentation を使って回答を探してください。
回答の中で、データベースに関することやGitHubリポジトリに関することがあれば、それらのサブエージェントを呼び出してください。
もし、あなたがわからない場合は、各サブエージェントに依頼してください。
各サブエージェントは特定の情報源にアクセスするようにMCP(Model Context Protocol)を通じて設計されています:
- GitHubリポジトリやソースコードに関する要求は、github_agentを呼び出してください。
このエージェントは、https://github.com/owner/ のリポジトリにアクセスできます。
- データベースに関する要求は、database_agentを呼び出してください。
このエージェントは、PostgreSQLデータベースのテーブル構造やデータを取得できます。
あなたがMCPの使い方を知っている必要はありません。
各エージェントが使い方を知っているので、それらを呼び出してください。
サブエージェントからの情報を整理して、ユーザーの質問に分かりやすく回答してください。
サブエージェントから「〇〇を参照する必要がある」という回答が返ってきた場合、該当するサブエージェントに依頼してください。
サブエージェントの回答から別のサブエージェントを呼び出すことも可能です。
"""
corpus = get_corpus_by_display_name(
os.environ.get("GOOGLE_RAG_CORPUS_DISPLAY_NAME")
)
# RAGを使って回答を出すためのツールを定義
ask_vertex_retrieval_tool = VertexAiRagRetrieval(
name="retrieve_rag_documentation",
description="Use this tool to retrieve documentation and reference materials for the question from the RAG corpus,",
rag_resources=[rag.RagResource(rag_corpus=corpus.name)],
similarity_top_k=10,
vector_distance_threshold=0.5,
)
root_agent = Agent(
model=MODEL,
name="root_agent",
description="〇〇の仕様を調査するためのエージェント",
instruction=ROOT_PROMPT,
before_model_callback=rate_limit_callback,
sub_agents=[
github_agent,
database_agent,
],
tools=[
store_state_tool,
ask_vertex_retrieval_tool,
],
)
ADKは adk web . で作ったエージェントとの会話画面を簡単に用意できる。
そのためには、root_agentという名前でモジュール内にAgentが存在していなければならないっぽい。
AgentでLLMがMCPで何かするためには、MCPサーバーから取得できるツール一覧をAgentのツールとして設定する必要がある。
ADK + MCP 公式ドキュメント https://google.github.io/adk-docs/tools/mcp-tools/
ドキュメントを見てもらうとわかるように、MCPサーバーからツールを取得するのは非同期である。
これは root_agent を作るときに少し面倒な感じっぽい。
pythonに詳しくないですが、Node.jsで考えたら非同期でしか作れないものをモジュールの変数としてまるで同期的に作ったようにエクスポートすることはできないので理解はできる。
Agentクラスの実装を見て何かできないかなーと思ったら、AgentはLlmAgentクラスのエイリアスであり、BaseAgentを継承していた。
BaseAgentの実装を見ると怪しげなメソッド発見。
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""Core logic to run this agent via text-based conversaction.
Args:
ctx: InvocationContext, the invocation context for this agent.
Yields:
Event: the events generated by the agent.
"""
raise NotImplementedError(
f'_run_async_impl for {type(self)} is not implemented.'
)
yield # AsyncGenerator requires having at least one yield statement
Agentを継承したクラスを作って、LlmAgentの実装を邪魔しない形で、コンテキストを処理する前にツールをMCPサーバーから取得して自身にセットすれば良さそう。
会話の中で何度もこの関数が動くと思われるので、スーパークラスの処理が終わった後にツールをセットする前に戻せば問題なし。
class McpAgent(LlmAgent):
mcp_connection_params: StdioServerParameters
filter_mcp_tools_callback: Callable[[List[ToolUnion]], List[ToolUnion]] = (
None
)
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
logger.info("Connecting to MCP server...")
# MCPサーバーからツールを取得
tools, exit_stack = await MCPToolset.from_server(
connection_params=self.mcp_connection_params
)
logger.info(
f"Successfully connected to MCP server. Found {len(tools)} tools."
)
logger.info(f"Tools: {tools}")
valid_tools = []
for tool in tools:
try:
tool._get_declaration()
valid_tools.append(tool)
except Exception as e:
logger.warning(f"Tool {tool.name} has no declaration: {e}")
if self.filter_mcp_tools_callback:
valid_tools = self.filter_mcp_tools_callback(valid_tools)
original_tools = self.tools
self.tools = [*valid_tools, *original_tools]
tool_names = [getattr(t, "name", str(t)) for t in self.tools]
print(f"Agent '{self.name}': Tools reconfigured to: {tool_names}")
try:
async for event in super()._run_async_impl(ctx):
yield event
finally:
self.tools = original_tools
if exit_stack:
await exit_stack.aclose()
念のためWrite系のツールを除外できるようにフィルター関数をセットできるようにする。
いきなりIssueとか作られても困るしね。
これはめちゃくちゃ悩まされたんですけど、MCPサーバーから得られたツールの一部がpydanticのバリデーションエラーになる問題がありました。
MCPサーバーから得られたツールをループで回し、tool._get_declaration()が無事に実行できるかどうかで判断しています。
このメソッドの中でバリデートされてるみたいなので。
valid_tools = []
for tool in tools:
try:
tool._get_declaration()
valid_tools.append(tool)
except Exception as e:
logger.warning(f"Tool {tool.name} has no declaration: {e}")
McpAgentを使って github_agent を作成する。
import logging
import os
from google.adk.tools.mcp_tool.mcp_toolset import (
StdioServerParameters,
)
from .. import MODEL
from ..shared_libraries.callbacks import (
after_tool_callback,
before_tool_callback,
rate_limit_callback,
)
from ..shared_libraries.mcpagent import McpAgent
from ..shared_libraries.store_state import store_state_tool
logger = logging.getLogger(__name__)
GITHUB_PAT = os.getenv("GITHUB_PAT")
if not GITHUB_PAT:
logger.warning("GITHUB_PAT environment variable not set")
GITHUB_PROMPT = """
You are an agent that searches for code and information from the GitHub repository.
## About tools
- search_code
- You need to generate some similar words to search from user's question. You can use AND/OR operator to search multiple words.
- A Parentheses is also available.
- If you want to search code under specific {organization} then include `org:{organization}` in the query
- If you want to search code under a specific repository then include `repo:{organization}/{repository-name}` in the query
- search_repositories
- A response will be too big. So, please specify per_page and page parameters.
If you cannot find the information requested, please answer clearly that "The information is not found".
If you find the information, please answer in markdown format without summarizing.
If the information from GitHub is too large, please answer only the requested information in markdown format.
You cannot make any changes to the repository.
"""
# McpAgentを使用してGitHubエージェントを作成
github_agent = McpAgent(
model=MODEL,
name="github_agent",
description="Get code and information from the GitHub repository(MCP based)",
instruction=GITHUB_PROMPT,
tools=[store_state_tool],
before_model_callback=rate_limit_callback,
before_tool_callback=before_tool_callback,
after_tool_callback=after_tool_callback,
mcp_connection_params=StdioServerParameters(
command="npx",
args=[
"-y",
"@modelcontextprotocol/server-github",
],
env={
"GITHUB_PERSONAL_ACCESS_TOKEN": GITHUB_PAT,
},
),
filter_mcp_tools_callback=lambda tools: [
tool
for tool in tools
if hasattr(tool, "name")
and not (
tool.name.startswith("create") or tool.name.startswith("update")
)
],
)
トークン節約のために英語でinstructionを設定してみる。
database_agentも同様に作成。
before_tool_callback とか after_tool_callback はただログを出してるだけなのでお気になさらず。
これで adk web . でトーク画面から会話してみる。



Notionに書かれていることをしっかりと回答できてる!
GitHub Repository(プライベート)にもアクセスできてるし、データベースから検索することもできてる!
このAgentをMCPサーバーでWrapしてあげれば、Cursorからも利用できるかもしれない。
そうなると開発がかなり支援されそうな予感。
おもろー。
Discussion