🍣

ベクトル検索のプログラムの書き方比較 ~Databricks Vector Search、FAISS、ChromaDB~

2023/12/06に公開

Databricksのサーバーレスなベクトル検索機能として、Databricks Vector Searchが2023年11月30日よりPublic Previewとなりました。

https://docs.databricks.com/ja/generative-ai/vector-search.html

これを記念して、これから数回に渡って、本機能のご紹介を関連機能や類似製品(主にOSSなど)を含めてご紹介していこうと思います。
まず今回の第一回目は、これまでDatabricks上でベクトル検索の常套ツールであったFAISSおよびChromaDBとのプログラムの書き方を比較します。既存のコードをVector Searchベースに移行する際のご参考にしてください。

環境

ランタイムとノードタイプ

Databricks Runtime: 14.1 ML
ノードタイプ: i3.2xlarge(シングルノード)

ライブラリー

ランタイムにプリインストールされているものに加えて以下のものをインストールします。

%pip install -U databricks-vectorsearch mlflow databricks-sdk faiss-cpu==1.7.4 chromadb==0.3.21
dbutils.library.restartPython()

Embeddingモデル

今回は、intfloat/multilingual-e5-smallを使用して、テキストを埋め込みベクトルに変換します。

from sentence_transformers import SentenceTransformer

model = SentenceTransformer(
    "intfloat/multilingual-e5-small", 
    cache_folder='/tmp'
)

サンプルデータ

弊社といえばこれ!ということで、弊社社員が作成したこちらの日本語QAデータを使用します。
https://huggingface.co/datasets/yulanfmy/databricks-qa-ja

なお、データは事前にDelta Table化しておき、上記モデルを使用して得られた埋め込みベクトルデータを列として追加しておきます。

結果以下のようなテーブル作成されるので、これを起点として処理を進めていきます。

dbqaTableEmbedDF = spark.table(f'{YOUR_CATALOG_NAME}.{YOUR_SCHEMA_NAME}.{YOUR_TABLE_NAME}')
display(dbqaTableEmbedDF)

各処理のプログラムの書き方を比較

では、ここから以下の5つの処理を、それぞれFAISS、ChromaDB、Databrick Vector Searchで実装する場合にどのようなコードになるか具体的に見ていきます。

  1. インデックスの作成
  2. 埋め込みベクトルをインデックスへロード
  3. インデックスの永続化
  4. クエリー
  5. インデックスの削除

1. インデックスの作成

まず、インデックスとはベクトルデータベースにおけるテーブルのような概念です。ベクトルデータベースによって呼び名が変わることがあるので気をつけましょう。なお、FAISSとDatabricks Vector Searchではインデックスと呼ばれますが、ChromaDBではコレクションと呼ばれます。

FAISS

import faiss

vector_length = 384
faiss_index = faiss.IndexIDMap(faiss.IndexFlatIP(vector_length))

ChromaDB

import chromadb
from chromadb.config import Settings

# まずChromaDBのクライアントを作成
chroma_client = chromadb.Client(
    Settings(
        chroma_db_impl="duckdb+parquet",
        persist_directory='/tmp',  # オプション:データ保存先のディレクトリパス
    )
)

# 続いてコレクション(インデックス相当)を作成
collection_name = "my_index"
collection = chroma_client.create_collection(name=collection_name)
from databricks.vector_search.client import VectorSearchClient

vs_endpoint_name="my_vs_endpoint"
vs_index_fullname = "hiroshi.sampledata.dbqa_table_vs_index"
vector_length = 384

# まずVector Searchのクライアントを作成
vsc = VectorSearchClient()

# 続いてVector Searchのエンドポイントを作成
vsc.create_endpoint(
  name=vs_endpoint_name, 
  endpoint_type="STANDARD"
)

# Vector Searchのインデックスを作成
vsc.create_direct_access_index(
  index_name=vs_index_fullname,
  endpoint_name=vs_endpoint_name,
  primary_key="id",
  embedding_dimension=vector_length,
  embedding_vector_column="response_vector",
  schema={
    "id": "string", 
    "category": "string", 
    "context":"string", 
    "instruction":"string", 
    "response":"string", 
    "source":"string", 
    "response_vector": "array<float>"}
)

2. 埋め込みベクトルをインデックスへロード

続いて、埋め込みベクトルデータや関連データをインデックスへロードしていきます。序盤で説明した通り、すでに対象テキストデータから埋め込みベクトルデータを取得しているので、それをそのままロードします。

FAISS

import numpy as np
import faiss

faiss_response_embedding = np.array(dbqaTableEmbedDF.toPandas()['response_vector'].values.tolist())
id_index = np.array(dbqaTableEmbedDF.toPandas()['id'].values).flatten().astype("int")

content_encoded_normalized = faiss_response_embedding.copy()
faiss.normalize_L2(content_encoded_normalized)

faiss_index.add_with_ids(content_encoded_normalized, id_index)

ChromaDB

dbqaTableEmbedPandasDF = dbqaTableEmbedDF.orderBy("id").toPandas()

collection.add(
    embeddings=dbqaTableEmbedPandasDF["response_vector"].tolist(),
    documents=dbqaTableEmbedPandasDF["response"].tolist(),
    metadatas=[{"source": source} for source in dbqaTableEmbedPandasDF["source"].tolist()],
    ids=dbqaTableEmbedPandasDF["id"].tolist(),
)

なお、ChromaDBは内部でデフォルトのEmbeddingモデルとしてall-MiniLM-L6-v2を使用しているため、プレーンテキストのみを指定すると、自動で埋め込みベクトルに変換して、コレクションに格納してくれます。

dbqaTableEmbedPandasDF = dbqaTableEmbedDF.toPandas()

collection.add(
    documents=dbqaTableEmbedPandasDF["response"].tolist(), 
    metadatas=[{"source": source} for source in dbqaTableEmbedPandasDF["source"].tolist()],
    ids=dbqaTableEmbedPandasDF["id"].tolist(),
)

Databricks Vector Search


# インデックスへの参照を取得
vs_index = vsc.get_index(vs_endpoint_name, vs_index_fullname)

# オリジナルデータをロード可能な形式に加工
newDbqaTablePandas = dbqaTableEmbedDF.toPandas()
cols = newDbqaTablePandas['response_vector']
for index in range(len(cols)):
  cols[index] = list(cols[index].astype(float))
records = newDbqaTablePandas.to_dict('records')

# インデックスへロード(Upsert)
vs_index.upsert(records)

3. インデックスの永続化

続いて、ややおまけ的な内容ですが、インデックスの永続化です。

FAISS

faiss.write_index(faiss_index, "/tmp/faiss_index.index")

faiss_index = faiss.read_index("/tmp/faiss_index.index")

ChromaDB

Chroma Clientの作成時にpersistent_directoryを指定するとその場所にデータが保存されます。

Databricks Vector Search

インデックス作成時に指定したvs_index_fullname(Unity Catalog内)にDelta Tableとしてデータが保存されます。

4. クエリー

続いて、インデックスにクエリーを発行し、類似度の高いベクトル(およびのその元ネタのテキストなど)を検索します。今回は以下の質問文とTop_Kを共通で使用します。

  • 質問文: 「k-meansについて教えてください」
  • TOP_K: 2

FAISS

query = 'k-meansについて教えてください'
k = 2

# クエリーテキストを埋め込みベクトルへ変換
query_vector = model.encode([query])
faiss.normalize_L2(query_vector)

# 検索実行
top_k = faiss_index.search(query_vector, k)

# 以降は結果表示用の後処理(なのでそんなに重要ではない)
ids = top_k[1][0].tolist()
similarities = top_k[0][0].tolist()

results = []
count = 0
for id in ids:
    record = dbqaTableEmbedDF.select("id", "source", "response").filter(dbqaTableEmbedDF.id == id).first()
    results.append({'id': id, 'source': record.source, 'response': record.response, 'similarity': similarities[count]})
    count = count + 1

print(results)

ChromaDB

import json

query = 'k-meansについて教えてください'
k = 2

# クエリーテキストを埋め込みベクトルへ変換
query_vector = model.encode(query)

# 検索実行
results = collection.query(
    query_embeddings=[query_vector], 
    n_results=k
)

# 以降は結果表示用の後処理(なのでそんなに重要ではない)
for index in range(len(results["ids"][0])):
  print(f'{results["ids"][0][index]} : {results["distances"][0][index]} : {results["documents"][0][index]} : {results["metadatas"][0][index]["source"]}')

Databricks Vector Search

query = "k-meansについて教えてください"
k = 2

# インデックスへの参照を取得
vs_index = vsc.get_index(vs_endpoint_name, vs_index_fullname)

# クエリーテキストを埋め込みベクトルへ変換
query_vector = model.encode(question)

# 検索実行
results = vs_index.similarity_search(
  query_vector=list(query_vector.astype(float)),
  columns=["source", "response"],
  num_results=k)

# 以降は結果表示用の後処理(なのでそんなに重要ではない)
docs = results.get('result', {}).get('data_array', [])
print(docs)

5. インデックスの削除

最後はインデックスの削除です。

FAISS

インデックス削除用のコマンドは特に存在しません。データはシステムメモリーの内容をクリアすると削除可能です。また、永続化している場合は、ディスク上の当該ファイルを削除します。

ChromaDB

# インデックスの削除
chroma_client.delete_collection(name=collection_name)

Databricks Vector Search

# インデックスの削除
vsc.delete_index(vs_endpoint_name, vs_index_fullname)

# Vector Searchエンドポイントの削除
vsc.delete_endpoint(vs_endpoint_name)

まとめ

Databricks Vector SearchのPublic Previewを記念して、FAISS、ChromaDBとのプログラムの書き方を比較してみました。本記事の趣旨が書き方の比較なので、それ以外の比較は行なっておりませんが、Databricks Vector Searchはサーバーレスであるという特性を活かして、LLMのRAG用の本番環境などで非常に有効に働くことが期待されます。
今回は、比較のためにVector Searchの非常に単純な実装にとどめましたが、本機能が面白いのは、ソース Delta テーブルが変更されたときに自動的に更新される 差分同期インデックス を作成できる点です。Databricksの強みを致した機能でもあると思いますので、次回はこちらの機能にフォーカスしようと思います。

BFN!

Databricks無料トライアル

https://databricks.com/jp/try-databricks

Discussion