🔗
マルチエージェントAI完全解説 ─ A2AプロトコルとMCPで実現するエージェント連携2026
単一エージェント時代の終焉
2024年時点では、ほぼすべてのAI応用が「単一エージェント(1つのAI)」で実装されていました。2026年現在、複雑なタスクの95%以上がマルチエージェント構成で実装されています。
理由は明確です。1つのAIに全てを要求するより、複数の専門AIに役割分担させた方が、品質・速度・コスト全てで優位だからです。
このガイドでは、Google CloudのA2A(Agent-to-Agent)プロトコルとMCP(Model Context Protocol)を活用した、実戦的なマルチエージェント構築方法を解説します。
マルチエージェント:なぜ今なのか
課題:単一エージェントの限界
ユースケース:「ブログ記事を自動生成し、SEO最適化して、
自動翻訳し、SNS投稿も自動化」
- 単一エージェントアプローチ:
Agent-V1 が全部やろうとする
├─ ブログ執筆
├─ SEO最適化
├─ 翻訳
└─ SNS戦略
問題点:
× エラー連鎖:前工程の誤りが後工程に波及
× 品質低下:1つのAIが全領域で最高品質を出せない
× 制御困難:失敗時のロールバック不可能
× 遅延:1つのAIが順次処理
× コスト:高度な推論が全ステップで必要
解決策:マルチエージェント体制
- マルチエージェントアプローチ:
[Orchestrator エージェント]
(全体指揮・エラー処理)
↓↓↓↓
┌─────┬──────┬──────┬─────┐
↓ ↓ ↓ ↓ ↓
Writer SEO Trans Social Monitor
(執筆) (最適) (翻訳) (投稿) (品質)
各エージェント:
✓ 単一責任(1つの役割に特化)
✓ 並列処理(複数同時実行)
✓ エラー処理:失敗時は該当エージェント再実行
✓ 高速化:I/O待機を別エージェントに任せる
✓ コスト最適:簡単タスクは小さいLLMで実行
A2A(Agent-to-Agent)プロトコル:Google Cloud標準
A2Aプロトコルは、エージェント間通信の標準化仕様です。2024年にGoogle Cloudが正式化し、2026年には業界標準となっています。
A2Aプロトコルの3層構造
┌─────────────────────────────────────────┐
│ アプリケーション層 │
│ (ビジネスロジック:LLMが実行) │
└─────────────────────────────────────────┘
↑ ↓
[A2A Protocol Layer]
│ エージェント通信仕様 │
│ ・メッセージフォーマット │
│ ・エラーハンドリング │
│ ・リトライメカニズム │
└─────────────────────────────────────────┘
↑ ↓
┌─────────────────────────────────────────┐
│ インフラ層 │
│ (メッセージキュー、認証など) │
└─────────────────────────────────────────┘
A2Aメッセージフォーマット(JSON標準)
{
"version": "1.0",
"agent_id": "writing-agent-v2",
"message_id": "msg_2026_001_uuid",
"timestamp": "2026-02-13T10:30:45Z",
"sender": "orchestrator",
"recipient": ["writer-agent", "seo-agent"],
"task": {
"type": "document_generation",
"priority": "high",
"payload": {
"topic": "2026年のAI技術トレンド",
"target_words": 3000,
"keywords": ["AI", "2026", "技術"],
"language": "ja",
"style": "technical_blog"
}
},
"dependencies": [
{
"agent": "research-agent",
"output_field": "research_data"
}
],
"timeout_seconds": 300,
"callback_url": "https://orchestrator.example.com/callbacks"
}
オーケストレーションパターン:3つの構成方式
パターン1:シーケンシャル(順序実行)
Agent A 実行
↓ 完了
Agent B 実行(A の結果を入力)
↓ 完了
Agent C 実行(B の結果を入力)
↓ 完了
最終出力
使用例:
ブログ執筆 → SEO最適化 → 校正 → 公開
コード例:
```python
from langgraph.graph import StateGraph
class WritingWorkflow:
def __init__(self):
self.graph = StateGraph()
def add_sequential_agents(self):
# エージェント定義
self.graph.add_node(
"writer",
self.execute_writer_agent
)
self.graph.add_node(
"seo_optimizer",
self.execute_seo_agent
)
self.graph.add_node(
"proofreader",
self.execute_proofreader_agent
)
# 順序実行を定義
self.graph.add_edge("writer", "seo_optimizer")
self.graph.add_edge("seo_optimizer", "proofreader")
self.graph.set_entry_point("writer")
self.graph.set_finish_point("proofreader")
async def execute_writer_agent(self, state):
"""ブログ記事執筆エージェント"""
topic = state.get("topic")
response = await llm.agenerate(
prompt=f"Write a technical blog post about: {topic}",
temperature=0.7,
max_tokens=3000
)
return {
"article": response.text,
"word_count": len(response.text.split())
}
async def execute_seo_agent(self, state):
"""SEO最適化エージェント"""
article = state.get("article")
keywords = state.get("keywords", [])
seo_prompt = f"""
Optimize this article for SEO:
{article}
Target keywords: {', '.join(keywords)}
Return optimized version with:
1. Meta description
2. H1, H2 optimization
3. Keyword density improvement
"""
response = await llm.agenerate(
prompt=seo_prompt,
temperature=0.3
)
return {
"optimized_article": response.text,
"seo_score": self.calculate_seo_score(response.text)
}
async def execute_proofreader_agent(self, state):
"""校正エージェント"""
article = state.get("optimized_article")
proofread_prompt = f"""
Proofread this Japanese article for:
1. Grammar errors
2. Tone consistency
3. Technical accuracy
4. Readability improvements
Article:
{article}
"""
response = await llm.agenerate(
prompt=proofread_prompt,
temperature=0.2
)
return {
"final_article": response.text,
"review_notes": self.extract_review_notes(response.text)
}
async def execute(self, input_state):
"""ワークフロー実行"""
compiled_graph = self.graph.compile()
result = await compiled_graph.ainvoke(input_state)
return result
# 使用例
workflow = WritingWorkflow()
workflow.add_sequential_agents()
result = await workflow.execute({
"topic": "2026年のエッジAI",
"keywords": ["EdgeAI", "推論", "低遅延"]
})
print(f"完成記事: {result['final_article']}")
**特性:**
- 制御が単純
- 各ステップのエラー処理が容易
- 依存関係が明確
**欠点:**
- 処理時間が加算される(並列化不可)
- 後段でのエラーが前段の修正を要求する
### パターン2:パラレル(並列実行)
┌─ Agent A ─┐
│ │
├─ Agent B ─┤ → 統合 → 最終出力
│ │
└─ Agent C ─┘
使用例:
同時に複数言語への翻訳を並列実行
```python
import asyncio
from concurrent.futures import as_completed
class ParallelTranslationWorkflow:
async def translate_parallel(self, article, target_languages):
"""複数言語への並列翻訳"""
translation_tasks = []
for lang in target_languages:
task = self.translate_to_language(article, lang)
translation_tasks.append(task)
# 全翻訳を並列実行
results = await asyncio.gather(*translation_tasks)
return {
"original": article,
"translations": dict(zip(target_languages, results)),
"total_time": time.time() - start_time
}
async def translate_to_language(self, article, target_lang):
"""単一言語への翻訳エージェント"""
prompt = f"""
Translate this article to {target_lang}:
{article}
"""
response = await llm.agenerate(prompt)
return response.text
# 使用例
workflow = ParallelTranslationWorkflow()
result = await workflow.translate_parallel(
article=blog_article,
target_languages=["en", "zh", "ko", "es"]
)
# 結果:4言語への翻訳が同時進行
# 順次実行の場合 4分 → 並列実行で 1分に短縮
print(f"翻訳完了: {result['total_time']}秒")
特性:
- 高速化(並列処理)
- リソース効率的
- I/O待機を隠蔽
欠点:
- 結果の統合が複雑
- デバッグが困難
パターン3:階層的(ツリー構造)
[メインオーケストレータ]
↓
┌─────────┼─────────┐
↓ ↓ ↓
[分析管理] [生成管理] [発行管理]
↓ ↓ ↓
┌──┴──┐ ┌──┴──┐ ┌──┴──┐
↓ ↓ ↓ ↓ ↓ ↓
調査 評価 執筆 最適 校正 配信
from langgraph.graph import StateGraph, START, END
class HierarchicalAgentWorkflow:
def __init__(self):
self.graph = StateGraph()
self.setup_hierarchy()
def setup_hierarchy(self):
# レベル1:メインオーケストレータ
self.graph.add_node("main_orchestrator", self.main_orchestrator)
# レベル2:サブオーケストレータ
self.graph.add_node("analysis_manager", self.analysis_manager)
self.graph.add_node("generation_manager", self.generation_manager)
self.graph.add_node("publishing_manager", self.publishing_manager)
# レベル3:実行エージェント
self.graph.add_node("research_agent", self.research_agent)
self.graph.add_node("evaluation_agent", self.evaluation_agent)
self.graph.add_node("writer_agent", self.writer_agent)
self.graph.add_node("seo_agent", self.seo_agent)
self.graph.add_node("review_agent", self.review_agent)
self.graph.add_node("distribution_agent", self.distribution_agent)
# エッジ設定(階層構造)
self.graph.add_edge(START, "main_orchestrator")
# メイン → サブマネージャ
self.graph.add_edge("main_orchestrator", "analysis_manager")
self.graph.add_edge("main_orchestrator", "generation_manager")
self.graph.add_edge("generation_manager", "publishing_manager")
# 分析レイヤー
self.graph.add_edge("analysis_manager", "research_agent")
self.graph.add_edge("analysis_manager", "evaluation_agent")
# 生成レイヤー
self.graph.add_edge("generation_manager", "writer_agent")
self.graph.add_edge("generation_manager", "seo_agent")
self.graph.add_edge("generation_manager", "review_agent")
# 発行レイヤー
self.graph.add_edge("publishing_manager", "distribution_agent")
self.graph.add_edge("distribution_agent", END)
async def main_orchestrator(self, state):
"""最上位の制御エージェント"""
prompt = """
You are the main orchestrator.
Break down this task into analysis, generation, and publishing phases.
"""
response = await llm.agenerate(prompt)
return {"orchestration_plan": response.text}
async def analysis_manager(self, state):
"""分析フェーズの管理エージェント"""
# 調査と評価エージェントを並列実行
research_result = await self.research_agent(state)
evaluation_result = await self.evaluation_agent(state)
return {
"research_data": research_result["data"],
"evaluation": evaluation_result["evaluation"]
}
async def generation_manager(self, state):
"""生成フェーズの管理エージェント"""
# 複数の生成エージェントを並列実行
writer_result = await self.writer_agent(state)
seo_result = await self.seo_agent(writer_result)
review_result = await self.review_agent(seo_result)
return {"final_content": review_result["content"]}
async def publishing_manager(self, state):
"""発行フェーズの管理エージェント"""
content = state.get("final_content")
distribution = await self.distribution_agent({
"content": content,
"channels": ["blog", "social", "newsletter"]
})
return {"published": distribution}
# 実行
workflow = HierarchicalAgentWorkflow()
compiled = workflow.graph.compile()
result = await compiled.ainvoke({
"topic": "AI トレンド 2026",
"target_audience": "技術者"
})
特性:
- 大規模システムに適している
- 各層で独立した制御可能
- スケーラビリティ高い
フレームワーク比較:2026年版
| フレームワーク | 推奨度 | 特徴 | 学習曲線 |
|---|---|---|---|
| LangChain | ⭐⭐⭐⭐ | 最多機能、Ecosystem豊富 | 緩い |
| CrewAI | ⭐⭐⭐⭐⭐ | エージェント設計に特化 | 中程度 |
| AutoGen | ⭐⭐⭐ | Microsoft製、多目的 | 急 |
| LangGraph | ⭐⭐⭐⭐⭐ | グラフベース制御 | 中程度 |
CrewAI:最実践的なマルチエージェントフレームワーク
from crewai import Agent, Task, Crew, Process
from crewai_tools import tool
# カスタムツール定義
@tool
def search_web(query: str) -> str:
"""ウェブ検索を実行"""
# 実装
return f"検索結果: {query}"
@tool
def analyze_seo(article: str) -> dict:
"""SEO分析を実行"""
return {
"keyword_density": 2.5,
"readability_score": 85,
"optimization_suggestions": ["..."]
}
# エージェント定義
researcher = Agent(
role="リサーチャー",
goal="最新の技術トレンドを調査し、信頼できるデータを収集する",
backstory="あなたは業界の専門家で、常に最新情報を追跡しています",
tools=[search_web],
verbose=True
)
writer = Agent(
role="テクニカルライター",
goal="調査データを基に、高品質なブログ記事を執筆する",
backstory="複雑な技術を明確に説明することが得意です",
llm="claude-opus-4-6", # 高品質
verbose=True
)
seo_optimizer = Agent(
role="SEOスペシャリスト",
goal="記事をSEO最適化し、検索ランキング向上を実現する",
backstory="SEOとキーワード戦略の専門家です",
tools=[analyze_seo],
llm="claude-3.5-sonnet", # 軽量モデル(コスト最適)
verbose=True
)
quality_reviewer = Agent(
role="品質管理者",
goal="最終的な品質チェックを行い、公開基準を満たすかを確認",
backstory="完璧なコンテンツのみを許可する厳格なレビュアー",
verbose=True
)
# タスク定義
research_task = Task(
description="『2026年のAI技術トレンド』について最新の情報を調査してください",
agent=researcher,
expected_output="3-5個の主要トレンド、信頼できるソース付き"
)
writing_task = Task(
description=f"調査データを基に、3000字のブログ記事を執筆してください",
agent=writer,
expected_output="完全なブログ記事(HTML形式)",
context=[research_task] # 前のタスク結果を入力
)
seo_task = Task(
description="記事をSEO最適化してください。対象キーワード: 『AI』『2026』『技術』",
agent=seo_optimizer,
expected_output="最適化済みの記事とSEOスコア",
context=[writing_task]
)
review_task = Task(
description="最終品質チェックを実施してください",
agent=quality_reviewer,
expected_output="品質評価とフィードバック",
context=[seo_task]
)
# クルー構成
crew = Crew(
agents=[researcher, writer, seo_optimizer, quality_reviewer],
tasks=[research_task, writing_task, seo_task, review_task],
process=Process.SEQUENTIAL, # or HIERARCHICAL
verbose=True
)
# 実行
result = crew.kickoff(inputs={
"topic": "2026年のAI技術トレンド",
"tone": "professional",
"target_audience": "技術系ブロガー"
})
print(f"完成記事:\n{result}")
CrewAIの優位性:
- エージェントの「人格」設定で品質向上
- タスク依存関係の明確な記述
- エラー自動回復機能
MCP統合:エージェント × 外部ツール
A2Aプロトコルとは異なり、MCPはエージェントと外部ツールの通信規格です。2つを組み合わせる威力は絶大:
┌─────────────────────────────┐
│ マルチエージェント層 │
│ (CrewAI/LangChain) │
└────────────┬────────────────┘
│ A2A Protocol
↓
┌─────────────────────────────┐
│ MCPサーバー層 │
│ ・データベース │
│ ・社内ツール │
│ ・外部API │
└─────────────────────────────┘
実装例:
from mcp.server import Server
import anthropic
# MCP サーバー定義(知識ベースアクセス)
class KnowledgeBaseMCP:
@staticmethod
async def search_knowledge_base(query: str) -> str:
"""社内知識ベースを検索"""
# Notion/Confluence との連携
results = await notion_client.search(query)
return json.dumps(results)
@staticmethod
async def get_company_guidelines() -> str:
"""企業ガイドライン取得"""
return """
ブログ記事ガイドライン:
1. 最小2000字以上
2. 画像は3枚以上
3. 最低3つのコード例
4. 企業ブランドガイドに準拠
"""
# CrewAI エージェントで MCP ツールを利用
mcp_tools = [
{
"name": "search_knowledge_base",
"description": "社内知識ベースから情報を検索",
"function": KnowledgeBaseMCP.search_knowledge_base
},
{
"name": "get_guidelines",
"description": "ブログ記事ガイドラインを取得",
"function": KnowledgeBaseMCP.get_company_guidelines
}
]
writer_with_mcp = Agent(
role="ガイドライン準拠ライター",
goal="企業ガイドに完全準拠した、高品質ブログ記事を執筆する",
backstory="企業の価値観とガイドラインを深く理解しています",
tools=mcp_tools, # MCP ツール統合
llm="claude-opus-4-6",
)
# 実行時に MCP ツールが自動的に呼び出される
実践的ユースケース:カスタマーサポート自動化
複雑なマルチエージェントシステムの実例:
from crewai import Agent, Task, Crew, Process
class CustomerSupportMultiAgent:
def __init__(self):
# エージェント1:顧客意図の理解
self.intent_classifier = Agent(
role="意図分類エージェント",
goal="顧客の問題を適切なカテゴリに分類する",
backstory="カスタマーサービスの専門家",
)
# エージェント2:問題解決(一般的な質問)
self.problem_solver = Agent(
role="問題解決エージェント",
goal="FAQレベルの問題を解決する",
backstory="技術的な問題とFAQに詳しい",
)
# エージェント3:エスカレーション判定
self.escalation_judge = Agent(
role="エスカレーション判定エージェント",
goal="人間対応が必要な問題を判定する",
backstory="複雑な問題と人間対応タイミングを理解",
)
# エージェント4:サマリー作成(人間ハンドオフ用)
self.summarizer = Agent(
role="サマリーエージェント",
goal="問題を簡潔に要約し、人間対応用にまとめる",
backstory="コミュニケーションが明確で簡潔",
)
async def handle_customer_inquiry(self, customer_message: str):
"""顧客問い合わせの完全な処理フロー"""
# タスク定義
classify_task = Task(
description=f"この顧客メッセージを分類してください:\n{customer_message}",
agent=self.intent_classifier,
expected_output="問題カテゴリと優先度"
)
solve_task = Task(
description="分類された問題を解決してください",
agent=self.problem_solver,
expected_output="解決方案またはFAQ参照",
context=[classify_task]
)
escalation_task = Task(
description="この問題は人間対応が必要か判定してください",
agent=self.escalation_judge,
expected_output="エスカレーション: Yes/No + 理由",
context=[classify_task, solve_task]
)
# エスカレーション判定に基づき条件分岐
escalation_result = await escalation_task.execute()
if "Yes" in escalation_result:
# 人間ハンドオフが必要
summary_task = Task(
description="サポートスタッフ用にこの問題をまとめてください",
agent=self.summarizer,
expected_output="完全なカスタマーコンテキスト",
context=[classify_task, solve_task, escalation_task]
)
crew = Crew(
agents=[
self.intent_classifier,
self.problem_solver,
self.escalation_judge,
self.summarizer
],
tasks=[
classify_task,
solve_task,
escalation_task,
summary_task
],
process=Process.SEQUENTIAL
)
result = crew.kickoff()
return {
"escalated": True,
"summary": result,
"target_team": "human_support"
}
else:
# AI自動解決可能
return {
"escalated": False,
"solution": escalation_result,
"target_team": "closed"
}
# 使用例
support_system = CustomerSupportMultiAgent()
response = await support_system.handle_customer_inquiry(
"MacBook Proで動作しないアプリがあります。何をしたらいいですか?"
)
print(response)
エラーハンドリング:マルチエージェント特有の課題
単一エージェントと異なり、複数エージェント間でのエラー処理は複雑です:
from typing import Optional
import asyncio
class RobustMultiAgentOrchestrator:
"""エラーリカバリー機能付きオーケストレータ"""
async def execute_with_retry(
self,
agent_task,
max_retries=3,
timeout_seconds=30
):
"""タイムアウト&リトライ対応"""
for attempt in range(max_retries):
try:
# タイムアウト設定付き実行
result = await asyncio.wait_for(
agent_task.execute(),
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
print(f"⏱️ Agent timeout (attempt {attempt+1})")
if attempt < max_retries - 1:
# リトライ:プロンプト簡略化
agent_task.description = self.simplify_prompt(
agent_task.description
)
await asyncio.sleep(2 ** attempt) # 指数バックオフ
else:
raise TimeoutError(f"Max retries exceeded for {agent_task}")
except Exception as e:
print(f"× Agent error: {str(e)}")
if isinstance(e, ValueError):
# 入力値エラーは修正して再実行
agent_task.description = self.fix_prompt_format(
agent_task.description
)
await asyncio.sleep(1)
else:
# その他のエラーはスキップして次へ
return {"error": str(e), "status": "failed"}
def simplify_prompt(self, prompt: str) -> str:
"""プロンプトを簡略化(複雑さが原因の場合)"""
return prompt[:500] + "\n\n簡潔に回答してください。"
def fix_prompt_format(self, prompt: str) -> str:
"""プロンプトの形式を修正"""
# JSON形式の検証など
return prompt
async def execute_with_fallback(
self,
primary_agent,
fallback_agent,
task
):
"""フォールバック機能付き実行"""
try:
result = await primary_agent.execute(task)
return result
except Exception as e:
print(f"Primary agent failed: {e}, using fallback")
# より小さい/シンプルなエージェントにフォールバック
result = await fallback_agent.execute(task)
return {
**result,
"executed_by": "fallback_agent",
"primary_error": str(e)
}
async def execute_with_voting(
self,
task,
agents: list,
voting_threshold=0.66
):
"""複数エージェントの多数決(品質向上)"""
results = await asyncio.gather(
*[agent.execute(task) for agent in agents]
)
# 結果の一致度を計算
agreement_score = self.calculate_agreement(results)
if agreement_score >= voting_threshold:
# 合意が得られた場合、その結果を採用
return {
"result": results[0],
"agreement": agreement_score,
"method": "consensus"
}
else:
# 合意が得られない場合は最高品質エージェント結果を採用
best_result = max(
results,
key=lambda r: r.get("quality_score", 0)
)
return {
"result": best_result,
"agreement": agreement_score,
"method": "best_of_multiple",
"warning": "Low consensus on result"
}
パフォーマンス最適化:2026年版ベストプラクティス
1. モデル選択の最適化
# 最適なモデルマッピング
model_selection = {
"research": "claude-opus-4-6", # 高精度必須
"writing": "claude-3.5-sonnet", # バランス型
"seo_optimization": "claude-3-sonnet", # 軽量で十分
"review": "claude-opus-4-6", # 最終品質チェック
"summarization": "claude-3-haiku" # 最軽量
}
# コスト削減効果:
# Opus only: $0.10/タスク
# 混合選択: $0.035/タスク (65%削減)
2. キャッシング戦略
from functools import lru_cache
import hashlib
class CachingMultiAgent:
def __init__(self, cache_size=1000):
self.cache = {}
self.cache_size = cache_size
def cache_key(self, agent_name: str, input_data: str) -> str:
"""入力データのハッシュキー生成"""
combined = f"{agent_name}:{input_data}"
return hashlib.md5(combined.encode()).hexdigest()
async def execute_cached(self, agent, task_input):
"""キャッシュを利用した実行"""
key = self.cache_key(agent.role, task_input)
# キャッシュヒット
if key in self.cache:
print(f"✓ Cache hit for {agent.role}")
return self.cache[key]
# キャッシュミス:実行
result = await agent.execute(task_input)
# キャッシュに保存
if len(self.cache) >= self.cache_size:
# LRU削除
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = result
return result
まとめ
マルチエージェントAIの実装チェックリスト:
- アーキテクチャ決定(シーケンシャル/パラレル/階層的のいずれか)
- フレームワーク選択(CrewAI推奨)
- エージェント設計(各エージェントの「人格」を明確に定義)
- タスク依存関係(入力-出力の流れを図式化)
- エラーハンドリング(リトライ、フォールバック、投票メカニズム実装)
- MCP統合(外部ツール/API接続設計)
- モデル選択(タスク別に最適モデル割当)
- キャッシング(重複推論の削減戦略)
- 監視・ログ(エージェント間通信のトレーシング)
- 本番テスト(実データで全フロー検証)
マルチエージェントAIはもはや実験的技術ではなく、エンタープライズレベルのスタンダードになっています。複雑なタスク自動化の実装は、今や現実的な選択肢です。
参考リソース
- CrewAI Documentation: https://docs.crewai.com
- LangGraph: https://langchain-ai.github.io/langgraph
- Google Cloud A2A Protocol: https://cloud.google.com/docs/agents
Discussion