📝

DSPyでDBとVectorStoreのRAGパイプラインを構築する

2024/05/20に公開

DSPyとは

DSPyって知っていますか?
RAG(Retrieval-Augmented Generation)アプリケーションを構築・最適化するためのフレームワークで、プロンプトの自動最適化、自動推論機能、組み込みの評価などの機能があります。

できることが色々あり、強力なフレームワークなので大いに期待されています。
先週、WeaviateというベクトルストアベンダーがDSPyでプロンプトをSQLに変換(Text to SQL)するサンプルを書いてくれました。

https://twitter.com/ecardenas300/status/1790432812337709279

英語読めないので翻訳します。

翻訳
データが多数のデータベースに分散していても、
すべてのデータをRAGアプリケーションで利用することができます🍱
このノートブックでは、DSPyを使って、
GoogleのBig QueryとWeaviateを活用するエンドツーエンドのRAGパイプラインを構築します!
エージェントを使ったコンテキスト融合では、最初にクエリをルーターに送って関連するコンテキストを持つデータベースを見つけます。
次に、各データベースから関連するコンテキストを取得して、最終的な答えにまとめます。
エージェントがもっとコンテキストを必要とする場合は、十分な情報が得られるまでルーターに戻って繰り返します。
BigQueryにはWeaviateブログの著者に関する構造化情報があり、
Weaviateにはブログ記事のチャンクと埋め込みが保存されています。
これにより、「Weaviateブログの最頻出著者は誰か?」や
「Weaviateにおけるハイブリッド検索とは何か?」といった質問に答えることができます。

こちらのノートブックに従ってください:

https://github.com/weaviate/recipes/blob/main/integrations/bigquery/BigQuery-Weaviate-DSPy-RAG.ipynb

この訳で理解できたあなたはスゴイ!
宇宙語?と思ったあなたは正常です。
やってることはシンプルで

「Weaviateブログ投稿の最も更新頻度の高いユーザは誰ですか?」のようなクエリを投げる

ルーターがそれをベクトルストアから検索するかデータベースから検索するか決める

エージェントがコンテキストが取得されたと判断したら回答を生成する。
コンテキストが不充分だと判断されたらクエリを変換してルーティングからやり直す。

という流れです。

LangGraphと近いかと思いますが、DSPyではもう少しシステマチックにRAGパイプラインを作れるみたいです。
ツイート中のノートブックを開いて、実際にコードを見ていきましょう。

……うーん。英語ばっかりで読めない!というわけで翻訳&動くように諸修正しました。
以下のノートブックから私が修正したノートブックにアクセスできます。
https://colab.research.google.com/drive/1zZjrwLlx0jOY4CupVr__cj7IPcuFHjAF

使うドキュメントは以下の通り

  • 技術ブログの本文(ベクトルストアに入れる)
  • 技術ブログのメタデータ(データベースに入れる)
  • おそらくWeaviateに関する質問回答のペア(ベクトルストアに入れる)
  • おそらくWeaviateに関する質問回答のペア(データベースに入れる)

おそらくと言ったのはメタデータ以外のドキュメントが詳細不明だからです。
コードの流れからいってWeaviate関連のクエリを受けて、質問回答RAGから簡潔なコンテキストを、次にベクトルストアから詳細なコンテキストを得て回答を作るのではないかと思うのですが、憶測です。

( ´・д・)エ-

いえいえ、今回知りたいのはあくまでDSPyの使い方。
ドキュメントはそれっぽいのを適当に作って進めていきましょう。

コード解説

ライブラリのインストール

まず最初のコードはライブラリのインストールです。(notebook)
使ってないライブラリも入ってるかも。
エラーが出たら随時削除してください。

!pip install langchain_community db-sqlite3 openai sentence_transformers chromadb faiss-cpu dspy-ai weaviate-client  boto3 watchtower python-dotenv typing pydrive flask pydantic openai langchain langgraph gpt4all langchain-anthropic pandas google-api-core==2.17.1 google-api-python-client==2.122.0 google-auth==2.28.2 google-auth-httplib2==0.2.0 google-auth-oauthlib==1.2.0 googleapis-common-protos==1.63.0 google ThreadPoolExecutorPlus chromadb unstructured langsmith langchainhub langchain_community langchain_core langchain_openai langchain_community langchainhub langchain-anthropic umap-learn scikit-learn tiktoken chromadb matplotlib

DSPyをGemini APIに接続する

DSPyで使うLLMのセッティングです。(notebook)
元コードではGeminiを使っていましたが、Googleには課金してないのでOpenAIに変更しました。

llm = dspy.OpenAI(model='gpt-4-1106-preview', api_key=userdata.get('OPENAI_API_KEY'))
dspy.settings.configure(lm=dspy.OpenAI(model='gpt-4-1106-preview'))

非構造化テキストデータをメモリにロードする

次は技術ブログをチャンクに分解してベクトルストアに入れるのですが、技術ブログのデータがないので代わりに怖い話の投稿データを使います。
深夜に読んでる方は背後にお気をつけください。

実際に試す方はあまりいないと思いますが、怖い話投稿データ もアップしました。
これをgoogle corabのsample data以下にアップすれば動作確認できるので、よかったらお使いください。

Weaviateスキーマを作成し、データをインポートする

次は技術ブログ(怪談データ)をベクトルストアに入れるのですが、Weaviateを使おうとしたところエラーが出ました。

コード
client = weaviate.connect_to_local()
エラー
UnexpectedStatusCodeError: Meta endpoint! Unexpected status code: 404, with response body: None.

しかたなくWeaviateのAPIキーを取得して再挑戦したのですが、やっぱり初期化のところでエラーが出ます。

コード
weaviate_client = weaviate.connect_to_wcs(
    cluster_url="http://localhost",
    auth_credentials=weaviate.auth.AuthApiKey(userdata.get("WCS_API_KEY")),
    headers={
        "X-OpenAI-Api-Key": userdata.get('OPENAI_API_KEY') 
    }
)
エラー
ConnectError: [Errno 99] Cannot assign requested address

もう少し調べようかと思ったのですが、別のベクトルストアで代替することにしました。
Chromaを使います。(notebook)

chromaによる怪談チャンクの追加
COLLECTION_NAME="Horror_Posts"
chroma_client = chromadb.Client()
embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="multi-qa-MiniLM-L6-cos-v1"
)
horror_collection = chroma_client.get_or_create_collection(name=COLLECTION_NAME, embedding_function=embedding_func)
horror_collection.add(documents=blog_chunks,
   ids=[f"id{i}" for i in range(len(blog_chunks))],
)

クエリのテスト

次はベクトルストアにコンテキストが入ったかテストします。
元コードでは「ガーベッジコレクションはどう動作しますか?」というクエリに対して該当のコンテキストがしっかり取得できています。

こちらも負けずに「幼稚園の頃仲良かった友達に起きたことは?」と聞いたところバッチリコンテキストが取得できました。
回答が気になる方はぜひノートブックを。

次はデータベース(BigQuery)に一問一答ペアのデータを入れるのですが、これも元データがないので怪談投稿データから作ります。
一問一答の作り方が気になる方はこちら

出来上がったペア
質問1: 掲示板の投稿によると、天国と天へ昇ることはどのように異なるとされていますか?
回答1: 掲示板の投稿によれば、天国は輪廻転生の上がりが来た人しか行けない特別な場所で、
次元が異なるとされています。一方で天へ昇ることは、守護霊になった魂や輪廻転生を待つ魂、
修行中の魂などがそれぞれ別の場所にいる状態を指しているようです。

質問2: 故人が天国に行ったか、この世にとどまってしまったかを見分ける方法はあるとされていますか?
回答2: 投稿によると、霊感がないと故人が天国に行ったかこの世に留まっているかを見分けるのは難しいとされています。
しかし、嗅覚や感覚が鋭い人は故人の匂いや雰囲気を感じ取ることができるかもしれません。
また、目や第三の眼で視える人で霊格が高い場合は、故人が昇ったかどうかを知ることができるとされています。

質問3: 掲示板の投稿によると、自殺した魂についてどのような見解が示されていますか?
回答3: 掲示板の投稿では直接的に自殺した魂についての見解は示されていませんが、
参加者の一人が「自殺した魂は浮かばれない?」という記事を見たことに言及しており、
その内容についての意見を求めています。
この投稿からは、自殺した魂に関する具体的な見解や信念は読み取れませんが、
参加者間でこのトピックについての議論があることが示唆されています。

うーん。なかなか深いですね。

pip を使用して DBクライアントをダウンロードする

次はデータベースのセットアップですが、BigQueryも有料なのでCSVライクに使える無料のsqlite3を使います。
技術ブログのメタデータだけは元のコードにベタ貼りされてたので、それをsqlite3に挿入します。(notebook)

import sqlite3
# SQLiteデータベースファイルを作成または接続
conn = sqlite3.connect('example.db')
# カーソルを作成
sql_client = conn.cursor()
# テーブルを作成
sql_client.execute('''CREATE TABLE IF NOT EXISTS BlogInfo (id INTEGER PRIMARY KEY, Name TEXT, Team  TEXT,  Blogs_Written INTEGER,   Active_Weaviate_Team_Member BOOLEAN)''')
rows_to_insert = [
    ("Abdel Rodriguez", "Applied Research", 5, True),
    ("Adam Chan", "Developer Growth", 1, True),
...
for row in rows_to_insert:
    sql_client.execute("INSERT INTO BlogInfo (Name, Team, Blogs_Written, Active_Weaviate_Team_Member) VALUES (?, ?, ?, ?)", row)

# 変更を保存
conn.commit()

VectorStore と sqlite3 に監視ログを保存する

先ほど作った怪談一問一答もsqlite3とベクトルストアに入れましょう。(notebook)

# SQL
sql_client.execute('''CREATE TABLE IF NOT EXISTS RAGLogs (id INTEGER PRIMARY KEY, query TEXT, answer TEXT)''')

# 前回はanonimousで区切ってたので句読点で区切れるように上書きする
def split_into_sentences(text):
    sentences = re.split(r'(?<!\w。)(?<=。|\?|\、)\s*', text)
    # print(f"split_into_sentences:sentences:s//// {sentences} ////end")
    return [sentence.strip() for sentence in sentences if sentence.strip()]


rag_chunks = []
rag_log_collection = chroma_client.get_or_create_collection(name="RAGLogs", embedding_function=embedding_func)
with open("kowa/Rag.csv", 'r', encoding='utf-8') as csv_file:
    reader = csv.DictReader(csv_file)
    for row in reader:
        sql_client.execute('''
        INSERT INTO RAGLogs (query, answer) VALUES (?, ?)
        ''', (row['query'], row['answer']))

        sentences = split_into_sentences(row['query'] + row['answer'])
        sentence_chunks = chunk_list(sentences, 1)
        sentence_chunks = [' '.join(chunk) for chunk in sentence_chunks]
        rag_chunks.extend(sentence_chunks)

        sample_embedding = embedding_func(sentence_chunks)
        print(f"{sentence_chunks} Embedding dimension: {len(sample_embedding)}")


# Chroma
rag_log_collection.add(documents=rag_chunks,
  ids=[f"id{i}" for i in range(len(rag_chunks))],
)


# 変更を保存
conn.commit()

これでRAGの準備ができました。

  • 技術ブログ…のデータがないので怪談投稿の本文(ベクトルストアに入れる)
  • 技術ブログのメタデータ(データベースに入れる)
  • Weaviateに関する質問回答のペア…がないので怪談のペア(ベクトルストアに入れる)
  • Weaviateに関する質問回答のペア…がないので怪談のペア(データベースに入れる)

もうこの時点で何を作ろうとしてるのかわからなくなってきましたが、気にせず進みましょう。
いよいよ仕上げのDSPyです。

DSPy Signatures と ルーティングのEnum設定

dspy.Signatureというメソッドを使って入出力を管理します。(notebook)

例えばTextToSQLのクラスでいえば、SQLのテーブル構造と自然言語のクエリを入力して、SQLクエリを出力します。

class TextToSQL(dspy.Signature):
    """自然言語クエリを、指定されたスキーマの有効な SQL クエリに変換します"""
    sql_schema_with_description: str = dspy.InputField()
    natural_language_query: str = dspy.InputField()
    sql_query: str = dspy.OutputField(desc="改行文字を含まない SQL クエリ文字列のみを出力します。")

TextToSQL、ルーター、コンテキストが充分かチェッカー、回答生成の入出力を定義します

from pydantic import BaseModel
from enum import Enum

class Route(Enum):
    Author_Info_BigQuery = "Author_Info_BigQuery"
    RAG_Log_BigQuery = "RAG_Log_BigQuery"
    RAG_Log_Weaviate = "RAG_Log_Weaviate"
    Blogs_Weaviate = "Horrror_Post"

class TextToSQL(dspy.Signature):
    """自然言語クエリを、指定されたスキーマの有効な SQL クエリに変換します"""
    sql_schema_with_description: str = dspy.InputField()
    natural_language_query: str = dspy.InputField()
    sql_query: str = dspy.OutputField(desc="改行文字を含まない SQL クエリ文字列のみを出力します。")

class QueryRouter(dspy.Signature):
    """クエリとデータ ソースのリストが与えられた場合、クエリに答えるのに最適なデータ ソースを出力します。"""

    query: str = dspy.InputField(desc="データ ソースの 1 つからの情報を使用して応答するクエリ.")
    data_sources: str = dspy.InputField(desc="各データ ソースの説明。")
    route: Route = dspy.OutputField()

class AgentLoopCondition(dspy.Signature):
    """コンテキストと検索履歴を評価し、質問に答えるのに十分なコンテキストが収集されたかどうか、または情報ソースからさらにコンテキストを取得する必要があるかどうかを判断します。."""

    query: str = dspy.InputField(desc="コンテキストからの情報を使用して応答するクエリ。")
    data_sources: str = dspy.InputField(desc="各データ ソースの説明。")
    contexts: str = dspy.InputField(desc="これまでに取得したコンテキスト.")
    more_info_needed: bool = dspy.OutputField(desc="提供されたコンテキストに基づいて質問に回答できるかどうか.")

class GenerateAnswer(dspy.Signature):
    """文脈を評価して質問に答えます。
質問に答えるためにクエリルーターが必要と判断した情報ソースによっては、一部のコンテキストが欠落している可能性があります。."""

    question: str = dspy.InputField()
    contexts: str = dspy.InputField(desc="複数のデータソースを検索して取得した情報.")
    data_sources: str = dspy.InputField(desc="コンテキストが取得されたデータソースの説明.")
    answer: str = dspy.OutputField()

続いてデータベースを検索するエージェントとベクトルストアを検索するエージェントをセットアップします。

from typing import List

class BigQuerySearcher():
    def __init__(self, sql_schema_with_description: str,
                 sqlclient: sqlite3.Connection):
        self.text_to_sql = dspy.TypedPredictor(TextToSQL)
        self.sql_schema_with_description = sql_schema_with_description
        self.sqlclient = sqlclient

    def sql_results_to_text(self,rows: List) -> str:
        results = []
        print(f"sql_results_to_text: rows:{rows}")
        for row in rows:
            result_string = ''.join(item[0] for item in rows)
            results.append(result_string)

        return "\n".join(results)

    def forward(self, query: str):
        sql_query = self.text_to_sql(natural_language_query=query,
                            sql_schema_with_description=self.sql_schema_with_description).sql_query
        query_job = self.sqlclient.execute(sql_query)
        text_sql_results = self.sql_results_to_text(query_job.fetchall())
        return text_sql_results

class WeaviateSearcher():
    def __init__(self, collection,
                 collection_name: str):
        self.collection = collection
    def forward(self, query: str):
        # ベクターストアをクエリ検索
        response = self.collection.query(
            query=query,
            limit=3
        )
        return  "\n\n".join(response)

構造化スキーマ情報とデータソースのメタデータ

LLMに渡すエージェントのメタデータとルーティングを設定します。

author_schema_with_description = """
技術スキーマ情報:

テーブル: BlogInfo
属性:
`Name` STRING
`Team` STRING
`Blogs_Written` INTEGER
`Active_Weaviate_Team_Member` BOOLEAN

テーブルの説明:

この表には、Weaviate ブログ投稿の作成者に関する情報が含まれています。
`Name` 属性は作成者の名前です。
`Team`  属性は、著者が Weaviate で取り組んでいる特定のチームです。
`Blogs_Written` 属性は、著者が書いたブログの数です。
`Active_Weaviate_Team_Member` 属性は、作成者が現在 Weaviate チームのメンバーであるかどうかを示します。
"""

rag_log_schema_with_description = """
技術スキーマ情報:

テーブル: RAGLogs
属性:
`query` STRING
`answer` STRING

テーブルの説明:

このテーブルには、質問応答システムに送信された質問と、その結果として得られるシステムからの応答が含まれています。
「query」属性は、システムに送信されるクエリです。
「answer」属性は、クエリに対するシステムの応答です。
"""


route_config = {
    "data_sources": {
        "Author_Info_BigQuery": BigQuerySearcher(author_schema_with_description, sql_client),
        "RAG_Log_BigQuery": BigQuerySearcher(rag_log_schema_with_description, sql_client),
        "RAG_Log_Weaviate": WeaviateSearcher(rag_log_collection, "RAGLog"),
        "Horror_Posts": WeaviateSearcher(horror_collection, "Horror_Posts")
    },
    "description": """
        Author_Info_BigQuery: Weaviate ブログ投稿の作成者に関する情報 (「名前」、Weaviate で取り組んでいる「チーム」、作成者の「Blogs_Written」の数、「Active_Weaviate_Team_Member」かどうかなど) が含まれる BigQuery の構造化 SQL テーブル `。
        RAG_Log_BigQuery: 質問応答システムに送信された質問とシステムの応答を含む BigQuery の構造化 SQL テーブル。
        RAG_Log_Weaviate: 質問応答システムに送信された質問とシステムの応答を含むベクトル インデックス。
        Horror_Posts: 怪談投稿ベクター インデックス。
        """
}

RAGwithContextFusion

dspy.Moduleを継承してRAGパイプラインを構築します。

class RAGwithContextFusion(dspy.Module):
    def __init__(self, route_config):
        self.route_config = route_config
        self.query_router = dspy.TypedPredictor(QueryRouter)
        self.agent_loop_condition = dspy.TypedPredictor(AgentLoopCondition)
        self.generate_answer = dspy.TypedPredictor(GenerateAnswer)

    def forward(self, query):
        enough_context = False
        contexts, queries = [], []
        while not enough_context:
            query_route = self.query_router(query=query, data_sources=self.route_config["description"]).route.name
            context = self.route_config["data_sources"][query_route].forward(query=query)
            contexts.append(context)
            queries.append(query)
            query_history = "\n".join(f"query {i+1}: {query}" for i, query in enumerate(queries))
            contexts_str = "\n".join(f"context {i+1}: {item}" for i, item in enumerate(contexts))
            enough_context = self.agent_loop_condition(query=query,
                                                       data_sources=self.route_config["description"],
                                                       contexts=contexts_str).more_info_needed
        answer = self.generate_answer(question=query,
                                      contexts=contexts_str, data_sources=self.route_config["description"]).answer
        return dspy.Prediction(answer=answer)

実行

さあ、準備は整いました。
データベースを使う質問とベクトルストアを使う質問、両方やってみましょう。

rag_with_context_fusion = RAGwithContextFusion(route_config)
データベースを使う質問
rag_with_context_fusion(query="Weaviate のブログ投稿を最も頻繁に投稿するのは誰ですか?")
結果
Zain Hasan と Erika Cardenas は Weaviate ブログ投稿の最も頻繁な著者であり、それぞれ 20 件の投稿があります

おお、うまくいきましたね。BigQueryじゃなくsqlite3でも大丈夫そうです。
次はベクトルストア用の質問をしてみましょう。

ベクトルストアを使う質問
rag_with_context_fusion(query="幼稚園の頃仲良かった友達に起きた怪談話とは?")
結果
質問に対する具体的な答えを提供するための十分な情報がデータソースからは見つかりませんでした。

( ਊдਊ)なんで!?

取得されたコンテキストを見ると別の怪談ポストが取得されていました。
検索精度の問題なのでしょうか…

終わりに

結果は微妙でしたが、DSPyではこんな風にパイプラインを構築できるという話でした。
入出力の指定といった機能はLangGraphにはないですし、公式ドキュメントを見たところ結構抽象度が高くて応用がききそうです。
もし覇権フレームワークにならないとしても、この思想を勉強して損はないかもしれません!

Discussion