🌍

LangGraphによる自己修正RAGを実行してみよう!

2024/08/01に公開

皆さん、LangGraphは使用していますか?
LangGraphはLangChainと比べて双方向なやり取りができるのでより複雑なアプリケーションが可能になります!
例えばRAGもLangGraphで実装することにより、より正確性のあるアプリケーションが可能になります!

LangGraph

LangGraphの基本的な概念は他の方のリンクを参考にしてみて下さい!ここでは特に説明はしないです。
https://langchain-ai.github.io/langgraph/
https://zenn.dev/pharmax/articles/8796b892eed183

RAG

RAG(Retrieval-Augmented Generation)は、一般的な情報に加えて、特定のドキュメントやデータベースを参照しながら回答を生成する技術です。このアプローチにより、例えば組織内部の情報を反映した回答が得られたりします。
しかし、RAGには以下のデメリットがあります。まず、LLMの回答精度が不十分な場合、誤った回答が生成されるリスクがあります。次に、ベクトル検索の精度が低いと、関連性の低いドキュメントが取得され、回答の品質に影響を与える可能性があります。

Adaptive RAG

この記事は基本的に以下リンクを日本語にしたものです。
https://langchain-ai.github.io/langgraph/tutorials/rag/langgraph_adaptive_rag/
Adaptive RAGは(1)クエリ分析と(2)アクティブ/自己修正RAGの機能を有しています。
これは前述したRAGのデメリットを補うことができます。

これから紹介するAdaptive RAGは以下の機能を有しています。

  1. クエリ分析
    クエリを分析してそのクエリに応じて場合分けできます。例えばRAGの機能を有してるチャットbotがあって、「こんにちは」などの挨拶に対してベクトル検索する必要ないですよね?
  2. 取得したドキュメントの分析
    ドキュメントはベクトル検索して取得することになりますが、必ずしもそのすべてのデータすべてがクエリに対して有用であるとは限りません。例えば4個ドキュメントを取得して1個のみ有用な回答が含まれているという場合、他の3個はLLMの回答精度に影響を与える可能性があります。取得ドキュメントを分析して不要と判断できれば、それはLLMへのコンテキストに混ぜないに越したことはありません。
  3. 回答のハルシネーションチェック
    回答がドキュメントに即しているかどうかを検証します。即していないと判断した場合には再度解答をやり直させるみたいな分岐が可能になります。
  4. 回答の有用性チェック
    生成された回答がユーザーの求める情報として有用であるかどうかを評価します。これも即していないと判断した場合には再度解答をやり直させるみたいな分岐が可能になります。
  5. クエリの調整
    初期のクエリが不十分な場合に、クエリを最適化してより適切なベクトル検索をするための調整機能を備えています。チャットボットを公開した場合にはAIリテラシーがそこまで高くないユーザーが使用することも想定されます。回答とクエリを総合的に判断して必要であればこの機能を使用します。

基本的には、
質問→質問分析→RAG→回答チェック→回答提示
みたいな流れになっていて、必要に応じ前述のワークフローのように条件分岐していくことになります。

コードの実践!

Google colobで項目毎に実行するのを前提に記述していきます。
今回例ではLLMに関連する質問ではRAGを使って、それ以外はネット検索ツールを使うというチャットボットです。

必要なライブラリのインストール

pip install -U langchain_community tiktoken langchain-openai langchainhub chromadb langchain langgraph  tavily-python

環境変数の設定

まずは環境変数の設定です。
LLMのOpenAIとネット検索用にTavilyのAPI key。
LangChain、LangGraph関連のコードを実行する際はLangSmithでどのように動いているか確認することをお勧めします。
API keyの取得方法はそれぞれ別の方の記事を参考にして下さい。

import os
os.environ["OPENAI_API_KEY"] = "<your-api-key>"
os.environ["TAVILY_API_KEY"] = "<your-api-key>"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = "<your-api-key>"

ドキュメントをベクトル化

まずドキュメントをベクトル化します。今回の例ではLLMに関する3つのURLをベクトル化しています。

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embd = OpenAIEmbeddings()

# LLMに関連するドキュメント
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

# ロード
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

# テキスト分割
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# ベクトルストアへ収納
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=embd,
)
retriever = vectorstore.as_retriever()

LLM

ここからはLLMに関連する設定になります。Adaptive RAGの項で挙げた機能はそれぞれLLMにプロンプトを設定して必要に応じて使用していく形になります。

クエリ分析

クエリを分析して、次に何を使うか分岐させます。ここではベクトルストアかウェブ検索のいずれかを選ぶようにしています。

from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

# 出力モデルを定義
class RouteQuery(BaseModel):
    """ユーザーのクエリを最も関連性の高いデータソースにルーティングします。"""

    datasource: Literal["vectorstore", "web_search"] = Field(
        ...,
        description="ユーザーの質問に応じて、ウェブ検索またはベクターストアにルーティングします。",
    )

# LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_router = llm.with_structured_output(RouteQuery)

# プロンプト
system = """あなたはユーザーの質問をベクターストアまたはウェブ検索にルーティングする専門家です。
ベクターストアにはエージェント、プロンプトエンジニアリング、アドバーサリアルアタックに関連する文書が含まれています。
これらのトピックに関する質問にはベクターストアを使用し、それ以外の場合はウェブ検索を使用してください。"""
route_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

question_router = route_prompt | structured_llm_router

お試し

question_routerはinvokeで実行することが出来ます。questionは実際に想定されるクエリですね。

print(
    question_router.invoke(
        {"question": "NFLドラフトでベアーズが最初に指名するのは誰ですか?"}
    )
)
print(question_router.invoke({"question": "エージェントメモリの種類は何ですか?"}))

出力

datasource='web_search'
datasource='vectorstore'

実行結果は、うんちゃんと分類できていますね!

取得したドキュメントの分析

ベクトル検索で得られたドキュメントがクエリと関連しているかチェックします。

# 出力モデルを定義
class GradeDocuments(BaseModel):
    """取得された文書の関連性をチェックするためのバイナリスコア。"""

    binary_score: str = Field(
        description="文書が質問に関連しているかどうかを判定するバイナリスコア。'yes' または 'no' で回答してください。"
    )

# LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeDocuments)

# プロンプト
system = """あなたは取得された文書がユーザーの質問に関連しているかどうかを評価するグレーダーです。\n
    文書にユーザーの質問に関連するキーワードや意味が含まれている場合、それを関連性があると評価してください。\n
    厳密なテストである必要はありません。目的は誤った取得を排除することです。\n
    文書が質問に関連しているかどうかを示すために、バイナリスコア 'yes' または 'no' を与えてください。"""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "取得された文書: \n\n {document} \n\n ユーザーの質問: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader

お試し

こちらもinvokeで実行することが出来ます。試しにベクトル検索してそのうちの1個のドキュメントがクエリに関連しているか確認してみましょう

question = "エージェントメモリ"
docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

出力

binary_score='yes'

関連あるみたいですね!

回答の生成

上記でOKが出たドキュメントを参考にクエリに対する解答を生成します。

from langchain_core.output_parsers import StrOutputParser

# プロンプト
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "ユーザーから与えられたコンテキストを参考に質問に対し答えて下さい。"),
        ("human", "コンテキスト: \n\n {context} \n\n ユーザーの質問: {question}"),
    ]
)

# LLM
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

# ドキュメントをひとまとめに
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# チェーン
rag_chain = prompt | llm | StrOutputParser()

お試し

ちゃんと生成できるか試してみましょう!

generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

出力

エージェントメモリは、LLM(大規模言語モデル)を活用した自律エージェントシステムにおいて重要な役割を果たします。エージェントメモリは、以下のように分類されます:

1. **感覚記憶**: 生の入力(テキスト、画像、その他のモダリティ)に対する学習埋め込み表現を保持します。

2. **短期記憶**: コンテキスト内学習を行うためのもので、トランスフォーマーの有限なコンテキストウィンドウの長さに制約されているため、短く有限です。

3. **長期記憶**: エージェントがクエリ時にアクセスできる外部ベクトルストアとして機能し、迅速な検索を可能にします。

長期記憶は、情報の埋め込み表現をベクトルストアデータベースに保存し、最大内積検索(MIPS)をサポートすることで、有限な注意範囲の制約を緩和します。これにより、エージェントは過去の経験を基に行動を決定し、他のエージェントと相互作用することができます。

それっぽい回答していますね

回答のハルシネーションチェック

生成された回答がちゃんとドキュメントに即しているか、ハルシネーションチェックします。

# 出力モデルを定義
class GradeHallucinations(BaseModel):
    """生成された回答にハルシネーションが含まれているかを判定するためのバイナリスコア。"""

    binary_score: str = Field(
        description="回答が事実に基づいているかどうかを示すバイナリスコア。'yes' または 'no' で回答してください。"
    )

# LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeHallucinations)

# プロンプト
system = """あなたは、LLMが生成した回答が取得された事実のセットに基づいているかどうかを評価するグレーダーです。\n
     バイナリスコア 'yes' または 'no' を与えてください。'Yes' は回答が事実に基づいていることを意味します。"""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "事実のセット: \n\n {documents} \n\n LLM生成物: {generation}"),
    ]
)

hallucination_grader = hallucination_prompt | structured_llm_grader

お試し

hallucination_grader.invoke({"documents": docs, "generation": generation})

出力

GradeHallucinations(binary_score='yes')

ハルシネーションなしの判定です。

回答の有用性チェック

生成された回答がちゃんと質問に答えられているかチェックします。

# 出力モデルを定義
class GradeAnswer(BaseModel):
    """回答が質問に対処しているかどうかを評価するためのバイナリスコア。"""

    binary_score: str = Field(
        description="回答が質問に対処しているかどうかを示すバイナリスコア。'yes' または 'no' で回答してください。"
    )

# LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeAnswer)

# プロンプト
system = """あなたは回答が質問に対処しているかどうかを評価するグレーダーです。\n
     バイナリスコア 'yes' または 'no' を与えてください。'yes' は回答が質問を解決していることを意味します。"""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "ユーザーの質問: \n\n {question} \n\n LLM生成物: {generation}"),
    ]
)
answer_grader = answer_prompt | structured_llm_grader

お試し

answer_grader.invoke({"question": question, "generation": generation})

出力

GradeAnswer(binary_score='yes')

回答の有用性OKの判定です。

クエリの調整

質問の最適化を行います。
上述の回答チェックでNGなった場合にクエリの見直しをします。
(*結構雑な質問している人多いですよね)

# LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# プロンプト
system = """あなたは、入力された質問をベクターストアの検索に最適化されたより良いバージョンに変換する質問の再作成者です。\n
     入力を見て、基礎となる意味的な意図や意味について考察してください。"""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "ここに初期の質問があります: \n\n {question} \n 改善された質問を作成してください。",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()

お試し

question_rewriter.invoke({"question": question})

出力

エージェントメモリに関する具体的な情報や機能について教えてください。

最初の質問は「エージェントメモリ」でしたのでだいぶいい質問になりましたね!

ネット検索

web検索に関するツールを定義します。

from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)

ステート

グラフに渡されるオブジェクトです。
これでノード間で変数を扱うことが出来ます。

from typing import List
from typing_extensions import TypedDict

class GraphState(TypedDict):
    """
    グラフの状態を表します。

    属性:
        question: 質問
        generation: LLM生成物
        documents: 文書のリスト
    """

    question: str
    generation: str
    documents: List[str]

ノード、エッジ

ノード、エッジを定義します。

from langchain.schema import Document

def retrieve(state):
    """
    文書を取得する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): 取得された文書を含む新しいキー、documentsをstateに追加
    """
    print("---RETRIEVE---")
    question = state["question"]

    # 取得
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

def generate(state):
    """
    回答を生成する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): 生成されたLLMの回答を含む新しいキー、generationをstateに追加
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG生成
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

def grade_documents(state):
    """
    取得された文書が質問に関連しているかを判断する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): フィルタリングされた関連文書のみを含むようにdocumentsキーを更新
    """
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # 各文書を評価
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}

def transform_query(state):
    """
    クエリを変換してより良い質問を作成する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): 再構成された質問でquestionキーを更新
    """
    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # 質問を再構成
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}

def web_search(state):
    """
    質問に基づいてウェブ検索を行う

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        state (dict): 追加されたウェブ検索結果を含むようにdocumentsキーを更新
    """
    print("---WEB SEARCH---")
    question = state["question"]

    # ウェブ検索
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)

    return {"documents": web_results, "question": question}

### Edges ###

def route_question(state):
    """
    質問をウェブ検索またはRAGにルーティングする

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        str: 次に呼び出すノード
    """
    print("---ROUTE QUESTION---")
    question = state["question"]
    source = question_router.invoke({"question": question})
    if source.datasource == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "web_search"
    elif source.datasource == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"

def decide_to_generate(state):
    """
    回答を生成するか、質問を再生成するかを判断する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        str: 次に呼び出すノードのバイナリ決定
    """
    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # check_relevanceですべての文書がフィルタリングされた
        # 新しいクエリを再生成する
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        # 関連する文書があるので、回答を生成する
        print("---DECISION: GENERATE---")
        return "generate"

def grade_generation_v_documents_and_question(state):
    """
    生成物が文書に基づいているかどうか、および質問に対処しているかを判断する

    Args:
        state (dict): 現在のグラフの状態

    Returns:
        str: 次に呼び出すノードの決定
    """
    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score

    # ハルシネーションをチェック
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # 質問回答をチェック
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

グラフの作成

以下のようになるよう繋げ合わせます。

from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# ノードを定義する
workflow.add_node("web_search", web_search)  # ウェブ検索
workflow.add_node("retrieve", retrieve)  # 取得
workflow.add_node("grade_documents", grade_documents)  # 文書評価
workflow.add_node("generate", generate)  # 生成
workflow.add_node("transform_query", transform_query)  # クエリ変換

# グラフを構築する
workflow.add_conditional_edges(
    START,
    route_question,
    {
        "web_search": "web_search",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("web_search", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)

# コンパイル
app = workflow.compile()

グラフの可視化

from IPython.display import Image, display
display(Image(app.get_graph(xray=1).draw_mermaid_png()))

実行!

# Run
inputs = {"question": "エージェントメモリのはどんな種類がある?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

実行はそこそこ長いので中間ステップを出力してくれるとわかりやすいですね。

出力
---ROUTE QUESTION---
---ROUTE QUESTION TO RAG---
---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
"Node 'grade_documents':"
'\n---\n'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION DOES NOT ADDRESS QUESTION---
"Node 'generate':"
'\n---\n'
---TRANSFORM QUERY---
"Node 'transform_query':"
'\n---\n'
---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
"Node 'grade_documents':"
'\n---\n'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
"Node 'generate':"
'\n---\n'
('エージェントメモリには、以下のような種類が存在します。\n'
 '\n'
 '1. **感覚記憶 (Sensory Memory)**:\n'
 '   - '
 'これは、視覚や聴覚などの感覚情報の印象を保持する最初の段階の記憶です。例えば、目の前で何かが一瞬見えた後、そのイメージを数秒間保持することができます。\n'
 '\n'
 '2. **短期記憶 (Short-Term Memory) または 作業記憶 (Working Memory)**:\n'
 '   - '
 '現在意識している情報を保持し、学習や推論などの複雑な認知タスクを実行するために必要な記憶です。例えば、電話番号を一時的に覚えておくことが短期記憶の一例です。\n'
 '\n'
 '3. **長期記憶 (Long-Term Memory)**:\n'
 '   - 情報を長期間保存することができ、数日から数十年にわたって保持される記憶です。長期記憶には以下の2つのサブタイプがあります:\n'
 '     - **明示的記憶 (Explicit / Declarative Memory)**: '
 '事実や出来事を意識的に思い出すことができる記憶。例えば、特定の歴史的出来事や自分の誕生日を思い出すことが含まれます。\n'
 '     - **暗黙的記憶 (Implicit / Procedural Memory)**: '
 '無意識的に行われるスキルやルーチンに関する記憶。例えば、自転車に乗ることやタイピングのスキルがこれに該当します。\n'
 '\n'
 'これらのメモリの種類は、エージェントが過去の経験を基に行動を決定し、他のエージェントと相互作用するために重要な役割を果たします。')

最後に

いかがでしたでしょうか?LangGraphを用いたAdaptive RAGの実装方法について説明しました。
RAGは実務の面で非常に重要なテクニックですが、まだまだ発展途上の技術でさまざまな手法が生み出されている最中です。Adaptive RAGはハルシネーションの観点から非常に重要なテクニックです。
是非、皆さんのプロジェクトでも試してみてください!!また面白いLangChainの実装例があれば紹介しますね!!

Discussion