🐬

タスク計画型AIエージェントの実装方法

に公開

はじめに

本記事では、LangGraphを使用したタスク計画型エージェントの実装方法を解説します。このエージェントはユーザーの質問を複数の独立したサブタスクに分割し、各タスクをWeb検索で並列実行し、結果を統合して包括的な回答を生成します。

LangGraphとは

LangGraphは、LLMアプリケーションのワークフローをグラフ構造で表現・実行するためのフレームワークです。

LangGraphを利用した実装フローの概要について知りたい方は、以前に解説した自己反省型エージェントの記事を参考にしてください。本記事では、計画型エージェントの実装を注目して解説していきます。

今回作成するシステムの特徴

このエージェントには以下の特徴があります:

  • タスク分割: ユーザーの質問を実行可能な独立したサブタスクに自動分割
  • 並列実行: 複数のタスクを並列実行することで処理時間を短縮
  • 複数検索: 各タスク内でも複数の検索クエリを並列実行し、多角的に情報収集
  • 自己反省: 検索結果とタスク結果の品質を自動評価し、必要に応じて改善
  • 結果統合: 全タスクの結果を統合して、わかりやすい最終回答を生成

エージェント構成

このシステムは2つの主要なエージェントで構成されています:

  1. Plannerエージェント: ユーザーの質問をサブタスクに分割し、最終回答を生成
  2. WebSearchエージェント: 各サブタスクを並列実行し、Web検索で情報収集
ユーザーの質問
    ↓
[Plannerエージェント]
 ├─ タスク計画
 ├─ [WebSearchエージェント×N] ← 並列実行
 │   ├─ 検索クエリ生成
 │   ├─ 検索実行(複数並列)
 │   ├─ タスク結果生成
 │   └─ 品質評価(自己反省)
 └─ 最終回答生成
    ↓
 最終回答

エージェントの実装

ここからは、LangGraphの基本的な実装フローに従って、実際にエージェントシステムを構築していきます。

1. 環境変数の設定

事前に以下の環境変数を設定してください:

GOOGLE_API_KEY="your-google-api-key"
GOOGLE_CSE_ID="your-custom-search-engine-id"

GOOGLE_CSE_IDを取得するには、Googleカスタム検索エンジンを作成してAPIを発行する必要があります。詳しい手順については、以下のブログ記事を参考にしてください:

2. ステートの定義

このシステムは2階層のステート構造を持ちます。

2.1 共通ステート

全エージェントで共有される基本的な状態を定義します:

from typing import Annotated, Optional, TypedDict
from langgraph.graph.message import AnyMessage, add_messages

class Task(TypedDict):
    task_id: str
    task_description: str
    task_result: str

def update_task(existing: list[Task], new: list[Task]) -> list[Task]:
    """タスクリストを更新"""
    if not new:
        return existing
    
    new_task_ids = {task['task_id'] for task in new}
    
    # 既存タスクのうち、更新されないものを保持
    updated = [task for task in existing if task['task_id'] not in new_task_ids]
    
    # 新しいタスクを追加
    updated.extend(new)
    
    return updated

class BaseState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    tasks: Annotated[list[Task], update_task]
    final_answer: Optional[str]

2.2 WebSearchステート

WebSearchエージェント専用の状態を定義します:

import operator

class SearchResult(TypedDict):
    query: str
    title: str
    url: str
    snippet: str
    content: Optional[str]

def merge_search_results(left: list[SearchResult] | None, right: list[SearchResult] | None) -> list[SearchResult]:
    """並列更新を許可しつつ、空リストでクリアするリデューサー"""
    if right is None:
        return left or []
    if not right:
        return []
    if not left:
        return right
    return left + right  # 両方ある場合は連結

class PrivateState(TypedDict):
    task_id: str
    task_description: str
    task_result: Optional[str]
    search_queries: Annotated[list[str], operator.add]
    search_results: Annotated[list[SearchResult], merge_search_results]
    feedback: Optional[str]
    attempt: int
    completed: bool

class WebSearchState(BaseState, PrivateState):
    pass

3. Plannerエージェントの実装

Plannerエージェントは、ユーザーの質問をサブタスクに分割し、全タスクの結果を統合します。

3.1 タスク計画ノード

ユーザーの質問を実行可能な独立したサブタスクに分割します:

from langchain_core.messages import SystemMessage
from langgraph.types import Command, Send
from pydantic import BaseModel, Field

async def plan_tasks(state: BaseState) -> Command:
    """ユーザーのリクエストを分析し、実行可能な独立したサブタスクに分割するノード"""
    
    class Task(BaseModel):
        task_description: str = Field(description="タスクの内容を記述してください。")
    
    class TaskPlan(BaseModel):
        tasks: list[Task] = Field(description="実行するタスクのリスト(最低1つ以上)")
        reason: str = Field(description="タスク分割の戦略と根拠を説明してください。")
    
    system_message = SystemMessage(
        content="""
ユーザーのリクエストを実行可能な独立したサブタスクに分割してください。

## システムアーキテクチャの理解:
このシステムは以下の3段階で動作します:
1. **タスク計画(あなたの役割)**: ユーザーのリクエストを複数のタスクに分割する
2. **タスク実行**: 各タスクをwebsearchエージェントが並列実行し、Web検索で情報収集を行って結果を返す
3. **回答生成**: すべてのタスク結果を統合してユーザーに最終回答を提示

**重要**:
- 各タスクはwebsearchエージェントが独立して実行します
- 複数のタスクは並列実行されます
- あなたが作成したタスクの内容が、websearchエージェントへの指示になります

## サブタスクの作成ルール

### 基本方針
- **並列実行を活用**: websearchエージェントは複数のタスクを同時に実行できるため、独立したタスクは分割してください
- **検索の効率化**: 異なる対象(場所、期間、項目など)は別々のタスクにすることで、検索精度が向上します

### 必須要件

1. **必ず1つ以上のサブタスクを作成してください**
    - 単一の質問でも、最低1つのサブタスクを作成します

2. **各サブタスクは完全に独立している必要があります**
    - タスク間に依存関係を持たせないでください
    - あるタスクの結果が別のタスクの入力になるような分割は避けてください
    - 各タスクは単独で実行・完了できる内容にしてください
    - 各タスクは並列実行されるため、順序に依存しない設計にしてください

3. **タスクの内容は具体的で明確にしてください**
    - websearchエージェントへの指示として機能するよう、タスクの内容を明確に記述してください
    - Web検索で見つけられる情報に焦点を当ててください
""")
    
    try:
        model = get_model()
        plan = await model.with_structured_output(TaskPlan).ainvoke(
            [system_message] + state["messages"]
        )
        
        if not plan.tasks:
            logger.error("plan_tasksでタスクが空です")
            raise
        
        # 各タスクをWebSearchエージェントに並列送信
        sends = [
            Send(
                "websearch",
                {
                    "task_id": str(idx),
                    "task_description": task.task_description,
                    "attempt": 0,
                    "completed": False
                }
            )
            for idx, task in enumerate(plan.tasks)
        ]
        
        initial_tasks = [
            {
                "task_id": str(idx),
                "task_description": task.task_description,
            }
            for idx, task in enumerate(plan.tasks)
        ]
        
        return Command(
            update={"tasks": initial_tasks},
            goto=sends
        )
    except Exception as e:
        logger.error(f"plan_tasksでエラーが発生しました: {str(e)}", exc_info=True)
        raise

3.2 最終回答生成ノード

全タスクの結果を統合して、ユーザーへの最終回答を生成します:

from langgraph.graph import END

async def generate_final_answer(state: BaseState) -> Command:
    """完了したタスクの結果を統合して、ユーザーへの最終回答を生成"""
    
    tasks = state.get("tasks", [])
    
    if not tasks:
        logger.error(f"generate_final_answerでタスクが空です")
        raise
    
    task_results_text = "\n\n".join([
        f"【タスク{idx+1}{task['task_description']}\n結果: {task['task_result']}"
        for idx, task in enumerate(tasks)
    ])
    
    messages = state.get("messages", [])
    user_query = messages[0].content
    
    system_message = SystemMessage(content=f"""
複数のタスクの実行結果を統合し、ユーザーの質問に対する包括的で分かりやすい回答を生成してください。

## 回答のルール:

1. **統合と一貫性**:
    - 各タスクの結果を適切に統合し、全体として一貫性のある回答にする
    - タスクの結果を単純に羅列するのではなく、自然な文章として統合する

2. **わかりやすさ**:
    - 簡潔で分かりやすい日本語で記述
    - ユーザーの質問に直接答える形式にする
    - 必要に応じて箇条書きや見出しを使って構造化

3. **完全性**:
    - 全てのタスク結果から重要な情報を漏らさず含める
    - ユーザーの質問に対して包括的に答える

4. **情報源の記載形式(必須)**:
    - タスク結果で実際に利用した情報源のURLのみを記載する
    - 利用していないURLは記載しない
    - 同じドメインのURLが複数ある場合は、代表的なURL1つのみを記載する
    - フォーマット:
    ```
    (回答の本文)

    【参考情報】
    • https://example.com/article
    • https://another.com/page

## 重要な注意事項:
- タスク結果に含まれる情報のみを使用してください
- タスク結果にない情報は推測しないでください
- 情報が不足している場合は、その旨を明記してください
""")
    
    human_message = HumanMessage(content=f"""
## ユーザーの質問:
{user_query}

## タスクの実行結果:
{task_results_text}

上記のタスク結果を統合して、ユーザーの質問に対する包括的な回答を生成してください。
""")
    
    try:
        model = get_model()
        response = await model.ainvoke([system_message, human_message])
        
        return Command(
            update={
                "messages": [AIMessage(content=response.content)],
                "final_answer": response.content
            },
            goto=END
        )
    
    except Exception as e:
        logger.error(f"generate_final_answerでエラーが発生しました: {str(e)}", exc_info=True)
        raise

3.3 Plannerグラフの構築

定義したノードを接続してグラフを構築します:

from langgraph.graph import START, StateGraph

planner_graph = StateGraph(BaseState)
planner_graph.add_node(plan_tasks)
planner_graph.add_node(generate_final_answer)
planner_graph.add_edge(START, "plan_tasks")

4. WebSearchエージェントの実装

WebSearchエージェントは、各サブタスクを実行し、Web検索で情報を収集します。

4.1 検索クエリ生成ノード

タスク内容から最適な検索クエリを生成します(最大2個):

async def generate_search_queries(state: WebSearchState) -> WebSearchState:
    """ユーザーの質問から最適な検索クエリを生成するノード(最大2個)"""
    
    class SearchQueries(BaseModel):
        queries: list[str] = Field(description="生成された検索クエリのリスト(最大2個)", max_length=2)
        reason: str = Field(description="これらのクエリを選んだ理由")
    
    task_description = state.get("task_description", "")
    previous_queries = state.get("search_queries", [])
    feedback = state.get("feedback")
    
    system_message = SystemMessage(
        content=f"""
あなたは検索クエリ生成の専門家です。割り当てられたタスクに答えるために最適な検索クエリを生成してください。

## 現在の日付:
{datetime.now().strftime("%Y年%m月%d日")}

## クエリ生成のルール:

1. **複数の視点から検索**:
    - 異なる角度から情報を集めるため、1-2個のクエリを生成
    - 重複する内容のクエリは避ける

2. **具体的で明確なクエリ**:
    - 曖昧な表現を避け、固有名詞を使う

3. **時間的文脈の考慮**:
    - タスクが「本日」「今日」を含む場合 → 必ず日付を含める
    - 過去の情報が必要な場合 → 具体的な期間を指定
    - 最新情報が必要な場合 → "最新"や年月を含める

4. **タスク内容の活用**:
    - タスクの要求を正確に理解する
    - 代名詞(「それ」「この」など)がある場合は具体的な名詞に変換
    - 文脈から暗黙の情報を補完

5. **検索エンジン最適化**:
    - 自然な日本語で、検索エンジンが理解しやすい形式
    - キーワードの組み合わせを工夫

## 重要な注意事項:
- 必ず1-2個のクエリを生成してください(1個でも2個でも可)
- タスクの要求を正確に理解してクエリを生成してください
- 前回のクエリと異なる角度からの検索を心がけてください
""")
    
    human_content_parts = [f"## 割り当てられたタスク:\n{task_description}"]
    
    if previous_queries:
        queries_text = "\n".join([f"- {q}" for q in previous_queries])
        human_content_parts.append(f"\n## すでに利用した検索クエリ:\n{queries_text}")
        human_content_parts.append("\n**重要**: 前回の検索で十分な結果が得られなかったため、異なる角度からの新しいクエリを生成してください。")
    
    if feedback:
        human_content_parts.append(f"\n## 改善フィードバック:\n{feedback}")
        human_content_parts.append("\n上記のフィードバックを参考にしてください。")
    
    human_message = HumanMessage(content="".join(human_content_parts))
    
    try:
        model = get_model()
        search_queries_result = await model.with_structured_output(SearchQueries).ainvoke(
            [system_message, human_message]
        )
        
        if not search_queries_result.queries:
            logger.warning("generate_search_queriesでクエリが生成されませんでした")
            return Command(
                update={"task_result": "適切な検索クエリを生成できませんでした。"},
                goto="evaluate_task_result"
            )
        
        # 複数の検索クエリを並列実行
        sends = [
            Send("execute_search", {"query": query})
            for query in search_queries_result.queries
        ]
        
        return Command(
            update={
                "search_queries": search_queries_result.queries,
            },
            goto=sends
        )
    except Exception as e:
        logger.error(f"generate_search_queriesでエラーが発生しました: {str(e)}", exc_info=True)
        raise

4.2 検索実行ノード

個別の検索クエリを並列実行します:

from langchain_community.document_loaders import WebBaseLoader
from langchain_google_community import GoogleSearchAPIWrapper
import asyncio
import re

async def execute_search(arg: dict) -> dict:
    """単一の検索クエリを実行するノード"""
    
    def clean_text(text: str) -> str:
        text = re.sub(r'\n\s*\n+', '\n\n', text)
        lines = [line.strip() for line in text.split('\n')]
        lines = [line for line in lines if line]
        return '\n'.join(lines)
    
    query = arg.get("query", "")
    if not query:
        return Command(update={"search_results": []}, goto="generate_task_result")
    
    try:
        num_results = 2
        
        search = GoogleSearchAPIWrapper(
            google_api_key=GOOGLE_API_KEY,
            google_cse_id=GOOGLE_CX
        )
        
        results = search.results(query, num_results=num_results)
        
        if not results:
            return Command(update={"search_results": []}, goto="generate_task_result")
        
        search_results = []
        for result in results:
            url = result['link']
            title = result['title']
            snippet = result.get('snippet', '')
            
            try:
                # Webページの内容を取得
                loader = WebBaseLoader(url)
                load_task = asyncio.to_thread(loader.load)
                docs = await asyncio.wait_for(load_task, timeout=15.0)
                
                raw_content = docs[0].page_content
                cleaned_content = clean_text(raw_content)
                content = cleaned_content
                search_results.append({
                    "query": query,
                    "title": title,
                    "url": url,
                    "content": content[:2500],
                    "snippet": snippet
                })
            except Exception as e:
                logger.warning(f"execute_searchでWebページ取得エラー: {str(e)}", exc_info=True)
                # エラー時はsnippetのみ使用
                search_results.append({
                    "query": query,
                    "title": title,
                    "url": url,
                    "snippet": snippet
                })
        
        return Command(
            update={"search_results": search_results},
            goto="generate_task_result"
        )
    except Exception as e:
        logger.error(f"execute_searchでエラーが発生しました: {str(e)}", exc_info=True)
        return Command(update={"search_results": []}, goto="generate_task_result")

4.3 タスク結果生成ノード

検索結果を基にタスク結果を生成します:

async def generate_task_result(state: WebSearchState) -> WebSearchState:
    """検索結果を元にタスク結果を生成するノード"""
    
    task_description = state.get("task_description", "")
    search_results = state.get("search_results", [])
    feedback = state.get("feedback")
    
    system_message = SystemMessage(
        content=f"""
あなたはタスク実行エージェントです。以下の検索結果を元に、割り当てられたタスクの結果をまとめてください。

## 現在の日付:
{datetime.now().strftime("%Y年%m月%d日")}

## システムアーキテクチャの理解:
このシステムは以下の3段階で動作します:
1. **タスク計画**: ユーザーの質問を複数のタスクに分割
2. **タスク実行(あなたの役割)**: 各タスクについて検索を実行し、結果をまとめる
3. **回答生成**: すべてのタスク結果を統合してユーザーに最終回答を提示

**重要**: 回答生成エージェントは検索結果を直接見ることができません。あなたのタスク結果のみを参照します。
そのため、次のエージェントが適切に理解・利用できる内容にする必要があります。

## タスク結果作成のルール:

1. **検索結果のみを使用**:
    - 検索結果に含まれる情報のみを使ってタスク結果をまとめる
    - 検索結果にない情報は推測しない

2. **次のエージェントが理解できる内容にする**:
    - **数字、日付、固有名詞、統計データなど具体的な情報を含める**
    - **専門用語や略語がある場合、検索結果に説明があれば簡潔に補足する**
    - **文脈理解に必要な背景情報があれば含める**
    - タスクに関連する重要な情報を漏らさず含める

3. **タスク結果の構成**:
    - タスクの要求に対する直接的な答えを述べる
    - 必要に応じて補足情報を追加
    - 自然な文章で記述する

4. **情報源の記載形式(必須)**:
    - タスク結果で実際に利用した情報源のURLのみを記載する
    - 利用していないURLは記載しない
    - 同じドメインのURLが複数ある場合は、代表的なURL1つのみを記載する
    - フォーマット:
        ```
        (回答の本文)
    
        【参考情報】
        • https://example.com/article
        • https://another.com/page

5. **不足情報への対応**:
    - 検索結果が不完全な場合は、その旨を明記
    - 得られた情報の範囲で最大限タスク結果をまとめる

## 重要な注意事項:
- **回答生成エージェントが検索結果を見ずに理解できるよう、文脈を含めて記述してください**
- このタスク結果は最終的に他のタスク結果と統合されてユーザーに提示されます
- タスクの要求に直接関係する情報のみを含めてください
- 検索結果にない情報は「検索結果には含まれていません」と明記
""")
    
    human_content_parts = [f"## 割り当てられたタスク:\n{task_description}"]
    
    if search_results:
        human_content_parts.append("\n## 取得した検索結果:")
        for i, result in enumerate(search_results, 1):
            title = result.get("title", "")
            url = result.get("url", "")
            content = result.get("content", result.get("snippet", ""))
            query = result.get("query", "")
            
            human_content_parts.append(f"\n### 検索結果 {i}")
            human_content_parts.append(f"\n**検索クエリ**: {query}")
            human_content_parts.append(f"\n**タイトル**: {title}")
            human_content_parts.append(f"\n**URL**: {url}")
            human_content_parts.append(f"\n**内容**:\n{content}\n")
    
    if feedback:
        previous_result = state.get("task_result", "")
        human_content_parts.append(f"\n## 改善フィードバック:\n{feedback}")
        if previous_result:
            human_content_parts.append(f"\n## 以前のタスク結果:\n{previous_result}")
        human_content_parts.append("\n**重要**: 上記のフィードバックを参考にして、より良いタスク結果を作成してください。")
    
    human_message = HumanMessage(content="".join(human_content_parts))
    
    try:
        model = get_model()
        answer = await model.ainvoke([system_message, human_message])
        
        return Command(
            update={
                "task_result": answer.content
            },
            goto="evaluate_task_result"
        )
    except Exception as e:
        logger.error(f"generate_task_resultでエラーが発生しました: {str(e)}", exc_info=True)
        raise

4.4 タスク結果評価ノード

生成されたタスク結果の品質を評価し、必要に応じて改善を指示します:

async def evaluate_task_result(state: WebSearchState) -> dict:
    """生成されたタスク結果を評価し、再検索や結果改善が必要か判断するノード"""
    
    class TaskEvaluation(BaseModel):
        is_satisfactory: bool = Field(description="タスク結果がタスクの要求に十分答えているかどうか")
        need: Optional[Literal["search", "generate"]] = Field(
            description="改善が必要な場合、どの部分の改善が必要か。search: 検索クエリや検索結果の改善、generate: タスク結果の改善。改善不要ならNone。"
        )
        reason: str = Field(description="上記の判断理由。is_satisfactoryの判断理由、または改善が必要な場合はその理由を記述。")
        feedback: Optional[str] = Field(
            description="改善が必要な場合(needがNoneでない場合)の具体的なフィードバック。searchならクエリに関するアドバイス、generateならタスク結果に関するアドバイス。"
        )
    
    task_description = state.get("task_description", "")
    task_result = state.get("task_result", "")
    search_queries = state.get("search_queries", [])
    search_results = state.get("search_results", [])
    
    attempt = state.get("attempt", 0)
    attempt += 1
    
    system_message = SystemMessage(
        content=f"""
あなたはタスク結果品質を評価する専門家です。検索結果と生成されたタスク結果を比較し、2つの観点から評価してください。

## 現在の日付:
{datetime.now().strftime("%Y年%m月%d日")}

## 評価の流れ:
以下の順序で評価を行ってください:

### 1. 検索結果の確認 (need = "search" かどうか)
まず検索結果を詳細に確認し、タスクに答えるための情報が含まれているかを判断してください。

**need = "search" (検索改善が必要):**
- **検索結果にタスクに答えるための情報が全く含まれていない**
- 検索クエリが明らかに不適切(タスクと無関係なクエリ)
- 検索結果がタスクと全く関連性がない
- 重要な情報が検索できていない(異なる角度からの検索で改善できそう)

### 2. タスク結果の確認 (need = "generate" かどうか)
検索結果が十分な場合、次に検索結果とタスク結果を比較し、適切に活用されているかを判断してください。

**need = "generate" (タスク結果改善が必要):**
- **検索結果にはタスクに答える情報があるのに、タスク結果でその情報を活用できていない**
- **検索結果の重要な情報がタスク結果に含まれていない**
- タスク結果の構成や表現が分かりにくい
- 検索結果を羅列しているだけで、自然な文章になっていない
- タスクに直接関係ない情報が大量に含まれている

### 3. 全体的な満足度 (is_satisfactory)
検索とタスク結果の両方が適切な場合、最終的に満足できるかを判断してください。

**need = None (改善不要):**
- **検索結果に含まれる重要な情報がタスク結果に適切に反映されている**
- タスク結果が自然な文章で構成されている
- タスクの要求に焦点を絞り、簡潔にまとめている

**is_satisfactory:**
- need が None の場合のみ True
- need が "search" または "generate" の場合は False

## 重要な注意事項:
- **優先順位**: 検索結果に問題がある場合は need = "search"、検索結果は十分だがタスク結果に問題がある場合は need = "generate"
- **is_satisfactory は need が None の場合のみ True にしてください**
- **reasonとfeedbackは具体的で実行可能な内容にしてください**
""")
    
    # HumanMessageで動的な値を渡す
    human_content_parts = [f"## 割り当てられたタスク:\n{task_description}"]
    
    # 検索クエリを追加
    if search_queries:
        queries_text = "\n".join([f"- {q}" for q in search_queries])
        human_content_parts.append(f"\n## 実行した検索クエリ:\n{queries_text}")
    
    # 検索結果を追加
    if search_results:
        human_content_parts.append("\n## 取得した検索結果:")
        for i, result in enumerate(search_results, 1):
            title = result.get("title", "")
            url = result.get("url", "")
            content = result.get("content", result.get("snippet", ""))
            query = result.get("query", "")
            
            human_content_parts.append(f"\n### 検索結果 {i}")
            human_content_parts.append(f"\n**検索クエリ**: {query}")
            human_content_parts.append(f"\n**タイトル**: {title}")
            human_content_parts.append(f"\n**URL**: {url}")
            human_content_parts.append(f"\n**内容**:\n{content}\n")
    
    # タスク結果を追加
    human_content_parts.append(f"\n## 生成されたタスク結果:\n{task_result}")
    
    human_message = HumanMessage(content="".join(human_content_parts))
    
    try:
        model = get_model()
        evaluation = await model.with_structured_output(TaskEvaluation).ainvoke(
            [system_message, human_message]
        )
        
        # 最大2回まで改善を試みる
        if evaluation.is_satisfactory or attempt >= 2:
            task_id = state.get("task_id", "")
            return Command(
                update={
                    "attempt": attempt,
                    "completed": True,
                    "tasks": [{
                        "task_id": task_id,
                        "task_description": task_description,
                        "task_result": task_result
                    }]
                },
                goto=END
            )
        
        # 検索改善が必要な場合
        if evaluation.need == "search":
            return Command(
                update={
                    "attempt": attempt,
                    "search_results": [],  # 検索結果をクリア
                    "feedback": evaluation.feedback
                },
                goto="generate_search_queries"
            )
        # タスク結果のみ改善が必要な場合
        elif evaluation.need == "generate":
            return Command(
                update={
                    "attempt": attempt,
                    "feedback": evaluation.feedback
                },
                goto="generate_task_result"
            )
        else:
            # 念の為(is_satisfactoryがFalseなのにneedがNoneの場合)
            logger.warning(f"evaluate_task_result: is_satisfactory=False but need=None. Completing task anyway.")
            task_id = state.get("task_id", "")
            return Command(
                update={
                    "attempt": attempt,
                    "completed": True,
                    "tasks": [{
                        "task_id": task_id,
                        "task_description": task_description,
                        "task_result": task_result
                    }]
                },
                goto=END
            )
    except Exception as e:
        logger.error(f"evaluate_task_resultでエラーが発生しました: {str(e)}", exc_info=True)
        raise

4.5 WebSearchグラフの構築

定義したノードを接続してグラフを構築します:

websearch_graph = StateGraph(WebSearchState)

websearch_graph.add_node(generate_search_queries)
websearch_graph.add_node(execute_search)
websearch_graph.add_node(generate_task_result)
websearch_graph.add_node(evaluate_task_result)

websearch_graph.add_edge(START, "generate_search_queries")

5. グラフの統合とコンパイル

2つのエージェントを統合して、最終的なグラフを構築します:

def create_graph():
    # WebSearchエージェントをPlannerエージェントに組み込む
    planner_graph.add_node("websearch", websearch_graph.compile())
    planner_graph.add_edge("websearch", "generate_final_answer")
    
    # グラフをコンパイル
    graph = planner_graph.compile()
    return graph

6. エージェントの実行

コンパイル済みのグラフを使用して、実際に処理を実行します:

async def main():
    # グラフの作成
    app = create_graph()
    
    # 初期状態の準備
    initial_state = {
        "messages": [HumanMessage(content="明日から3日間の東京旅行のプランを考えて")],
    }
    
    # エージェントの実行
    try:
        result = await app.ainvoke(initial_state)
        print(result["final_answer"])
    except Exception as e:
        print(f"\nエージェント実行中にエラーが発生しました: {e}")


if __name__ == "__main__":
    asyncio.run(main())

実行結果の例

上記のコードを実行すると、エージェントは以下のような処理を行います:

1. Plannerエージェントによるタスク計画:

reason: ユーザーの「明日から3日間の東京旅行のプラン」というリクエストに対し、旅行プランの主要な要素(観光スポット、モデルコース、天気、グルメ、イベント)を網羅的に調査するため、これらを独立したサブタスクに分割しました。各タスクは並列で実行可能であり、効率的に情報を収集できます。

tasks:
- task_description: 東京の3日間旅行でおすすめの観光スポットやモデルコースを調査
- task_description: 東京の明日から3日間の天気予報を調査
- task_description: 東京旅行でおすすめのグルメや食事処を調査
- task_description: 東京で明日から3日間の期間中に開催されるイベントや季節のアクティビティを調査

2. 実行フロー:

  1. タスク分割 → 4つの独立したサブタスク(観光スポット・天気・グルメ・イベント)に分割
  2. 並列実行 → 4つのタスクを同時に実行
  3. 各タスク内での処理:
    • 検索クエリ生成 → 「東京 3日間 観光 モデルコース」「東京 天気予報 3日間」などのクエリを生成
    • 検索実行(並列) → 複数のクエリを同時に検索
    • タスク結果生成 → 検索結果を基に各タスクの結果をまとめる
    • 品質評価 → 結果が適切であることを確認(不十分なら改善)
  4. 結果統合 → 全タスクの結果を統合して、包括的な旅行プランを生成

生成された回答(例):

明日から3日間の東京旅行のプランをご提案します。伝統文化から最新トレンド、ショッピング、ナイトライフ、そして秋のイベントまで、東京の多様な魅力を体験できるコースです。

---

### 東京3日間旅行プラン(2025年11月16日~11月18日)

#### 【天気予報の概要】
旅行期間中は、最高気温が15℃~20℃、最低気温が7℃~11℃と、比較的過ごしやすい秋の気候が予想されます。降水確率は10%~20%と低めですが、念のため羽織るものや雨具の準備をおすすめします。

* 11月16日(日): 最高気温17℃、最低気温9℃、降水確率10%
* 11月17日(月): 最高気温20℃、最低気温11℃、降水確率20%
* 11月18日(火): 最高気温15℃、最低気温7℃、降水確率10%

---

#### 1日目:歴史とショッピング、ナイトライフを満喫(11月16日・日曜日)

* 午前:皇居参観または皇居東御苑散策
  - 江戸城の跡地に建つ皇居は、歴史的に深い意味を持つ場所です
  - 皇居東御苑:自由に散策でき、景観や日本ならではの植物を鑑賞できます

* 午後:銀座でショッピング
  - GINZA SIX、東急プラザ銀座、銀座三越などでショッピングやグルメを楽しめます

* 夜:六本木でナイトライフ
  - 居酒屋からナイトクラブまで、東京のナイトライフを満喫できます

【この日のイベント】
* デザインフェスタvol.62(東京ビッグサイト、10:00~18:00)
* 国営昭和記念公園 黄葉・紅葉まつり&秋の夜散歩2025

---

#### 2日目:若者文化と自然、都会のオアシス(11月17日・月曜日)

* 午前:渋谷・原宿で散策
  - 渋谷スクランブル交差点:東京のシンボルの一つ
  - 明治神宮:静かな参道を通り、参拝や本殿の見学ができます
  - 竹下通り:ユニークなファッション店、カフェが並び、若者文化を体験できます

* 午後:新宿の庭園を散策
  - 新宿御苑:都会のオアシスで、広大な庭園で静かな時間を過ごせます

---

#### 3日目:下町情緒と絶景、動物たちとの出会い(11月18日・火曜日)

* 午前:浅草・上野コース
  - 仲見世通り・雷門・浅草寺:東京の定番観光スポット
  - 上野動物園:ジャイアントパンダをはじめ約350種の動物に出会えます

* 午後:東京スカイツリー
  - 展望回廊:地上450mから東京の街並みを一望
  - すみだ水族館:幻想的なクラゲ水槽が楽しめます

---

### 東京ならではの伝統グルメ「江戸の食の四天王」

* うなぎ(蒲焼): 醤油ベースの甘辛いタレで炭火焼き
* 寿司: 江戸前寿司の本場
* そば: 江戸時代のファストフード
* 天ぷら: 薄い衣で素材の味を活かす江戸天ぷら

【参考情報】
- https://www.agoda.com/ja-jp/travel-guides/japan/tokyo/
- https://www.toshin.com/weather/detail?id=66124
- https://haraheri.net/article/1569/tokyo-gourmet2

最後に

今回はLangGraphを利用したタスク計画型AIエージェントの実装方法を紹介しました。ぜひ、このサンプルコードを基に、独自のエージェントを構築してみてください。

Discussion