💽

FastMCPサーバとQdrantを使ったRAGシステムについて

に公開

背景

ベクトル埋め込みと検索のプロトタイプを基にし、QdrantベクトルDBを使ったプロトタイプを実装してみました。


対象読者

  • ベクトルの基本概念を理解している方
  • RAG導入を検討している方

環境

項目 バージョン
OS Ubuntu 20.04.6 LTS
ランタイム Python 3.10.12
主要ライブラリ fastapi、fastmcp、langchain、numpy、openai、python-dotenv、qdrant_client、requests、uvicorn
モデル Azure OpenAI text-embedding-3-small

※ 2025/08/05現在
参照:Azure OpenAIテキスト埋め込みモデル

パソコンの仕様

項目 仕様
CPU 12th Gen Intel(R) Core(TM) i7-1255U 1.70 GHz
RAM 16.0 GB

事前準備

「ベクトル埋め込みと検索のプロトタイプ」の事前準備で用意したCopilotに生成してもらった10件のmarkdownファイルを今度はmongoDBに登録しました。


プロジェクトの全体像

現在、MCPサーバ内で、検索全件取得の役割に応じて、ツールリソースとして2つに分けられています。データ操作の為はFastAPI経由で行っています。


この図はデータ流れを表示します。

プロジェクト構造

今度のプロジェクト構造は、「ベクトル埋め込みと検索のプロトタイプ」の構造とほぼ同じですが、追加および変更した分のみを記述します。

  1. モデルの定義
  2. mongoDBからのデータ取得API ※
  3. QdrantベクトルDBの操作
  4. ページの操作
  5. QdrantベクトルDB操作用のFastAPI
  6. QdrantベクトルDB操作用のMCPサーバ
./
├── model モデルの定義
    ├── metadata.py #各ページメタデータのモデルクラス
    ├── section.py #分割されたセクションのモデルクラス
    └── text_embed.py #テキスト埋め込みのモデルクラス
├── .env #環境変数の設定ファイル
├── fastapi_qdrant.py #データ操作用のFastAPI
├── mongoDB_api.py #mongoDBからのデータ取得API
├── mcp_qdrant_server.py #QdrantベクトルDB検索と全件取得用のMCPサーバ
├── page_handler.py #ページの操作
├── qdrant.py #QdrantベクトルDBの操作
├── requirements.txt #必須のパッケージ
├── split_markdown.py #mdファイルの分割
├── split_text.py #チャンクの分割
└── vector_embed.py #ベクトル埋め込み

※ こちらの記事はQdrantベクトルDBの操作を強調するため、mongoDBからのデータ取得APIの内容を省略します。
※ mongoDBからのデータ取得APIには、以下の2つのメソッドが存在しています。

  1. get_page_list → mongoDBのあるパス以下の全ページを取得するメソッド
  2. get_page_data → 各ページデータを取得するメソッド

手順

1. pythonパッケージのインストール

  • スクリプトと同じ階層に、依存パッケージを記載した requirements.txt ファイルを用意しました。
  • requirements.txtファイルの中は以下のようです。
requirements.txt
fastapi
fastmcp
langchain
numpy
openai
python-dotenv
qdrant_client 
requests
uvicorn
  • 下記のコマンドを実行することで、requirements.txtに記載された依存パッケージをインストールできます。
pip install -r requirements.txt
  • ローカルでインストールされた全てのpythonパッケージをpip listコマンドで確認出来ます。
C:\Users\...\...> pip list
Package                 Version
...
fastapi
fastmcp
langchain
numpy
openai
python-dotenv
qdrant_client      …
requests       
uvicorn

2. .envの設定

AZURE_OPENAI_API_KEY = '' # テキストモデルのAPIキー
AZURE_OPENAI_ENDPOINT = '' # Azure OpenAIのエンドポイント
MONGODB_URL = '' # mongoDBのURL
MONGODB_API_TOKEN = '' # mongoDBのAPIトークン
MONGODB_PATH = "" # このパス以下で全ページを保存している
QDRANT_LOCAL_URL = 'http://localhost:6333' #Qdrant_DBのポートフォワードしたポート番号

3. QdrantDBの起動

以下のコマンドで行います。

docker run -p 6333:6333 -p 6334:6334 \
    -v "$(pwd)/qdrant_storage:/qdrant/storage:z" \
    qdrant/qdrant

4. FastMCPサーバの起動

  1. mcp_qdrant_server.pyを実行し、起動したMCPサーバの URLをコピーします。
    ![][mcp_qdrant_server]

  2. mcp inspectorを以下のコマンドで実行します。

npx @modelcontextprotocol/inspector

※ 参照:MCP Inspectorの実行

  1. 下のスクリーンショットでハイライトしたリンクをブラウザで開きます。
    ![][mcp_link]

  2. ブラウザで立ち上がった MCP inspector のページで、URL にMCPサーバの URL を記入し、Connectボタンを押下します。
    ![][mcp_server_url]

  3. そうすると、ブラウザ上でMCPサーバのツールなどが使えるようになります。
    ![][mcp_server_running]

サンプルコード

モデルの定義

各ページメタデータのモデルクラス

Metadataクラスに下記のフィールドがあります。

フィールド(データ型) 説明
pg_id (str) ページid
creator (str) 作成者
created_at (str) 作成日時
last_update_user (str) 更新した人
updated_at (str) 更新日時
metadata.py
from typing import TypedDict


class Metadata(TypedDict):
    pg_id: str
    creator: str
    created_at: str
    last_update_user: str
    updated_at: str

見出しごとに分割されるセクションのモデルクラス

Sectionクラスに下記のフィールドがあります。

フィールド(データ型) 説明
metadata (Metadata) ページのメタデータ
h1 (TextEmbedding) 見出し1のテキスト埋め込み
h2 (TextEmbedding) 見出し2のテキスト埋め込み
h3 (TextEmbedding) 見出し3のテキスト埋め込み
p (TextEmbedding) 本文のテキスト埋め込み
section.py
from model.metadata import Metadata
from model.text_embed import TextEmbedding


class Section:
    def __init__(
        self,
        metadata: Metadata,
        h1: TextEmbedding,
        h2: TextEmbedding,
        h3: TextEmbedding,
        p: TextEmbedding,
    ):
        self.metadata = metadata
        self.h1 = h1
        self.h2 = h2
        self.h3 = h3
        self.p = p

テキスト埋め込みのモデルクラス

TextEmbeddingクラスに下記のフィールドがあります。

フィールド(データ型) 説明
text (str) 見出し、本文のテキストデータ
embeddings (List[List[float]]) ベクトル埋め込み配列

QdrantベクトルDBには、一つのデータまたはペイロードに対して複数のベクトルを持てるマルチベクトルという機能があります。今度のプロジェクトで複数の見出し本文が属している一つのセクションに対して、その複数の見出しと本文のベクトルをマルチベクトルとしてまとめています。

そのため、text_embed.py ファイルには、見出しや本文などのすべてのベクトルをベクトルDBに保存するときに1つの配列に結合する関数concatenate_embeddingsも用意しています。

参照:Qdrantのマルチベクトル

text_embed.py
from typing import TypedDict, List, Optional

class TextEmbedding(TypedDict):
    text: str 
    embeddings: List[List[float]]


def concatenate_embeddings(*text_embeds: TextEmbedding) -> List[List[float]]:
    """全てのベクトル埋め込みを一つのベクトル埋め込み配列にする
    Args:
        *text_embeds: いくつかのTextEmbeddingインスタンス
    Returns:
        合わせたベクトル埋め込みの配列
    """
    return [vec for textEmbed in text_embeds if textEmbed["text"] for vec in textEmbed["embeddings"]]

QdrantDBの操作

以下のサンプルコードで下記の操作ができます。

  1. データ登録
  2. 全件取得
  3. 全件数取得
  4. 検索
  5. フィルタリング「ページidで絞り込む」
  6. ページ存在の確認
  7. ページidによる削除
  8. 全件削除
qdrant.py
# 標準ライブラリ
import os
import time
import uuid

# サードパーティライブラリ
from dotenv import load_dotenv
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    FieldCondition,
    Filter,
    FilterSelector,
    MatchValue,
    MultiVectorComparator,
    MultiVectorConfig,
    PointStruct,
    VectorParams,
)

# ローカルモジュール
import vector_embed
from model import text_embed
from model.section import Section

# .envファイルをロードする
qdrant_url = os.getenv("QDRANT_LOCAL_URL")  # qdrantのURL

load_dotenv()

client = QdrantClient(url=qdrant_url)

collection = "md_collection"

# Qdrantコレクションの作成「既存してない場合」
if not client.collection_exists(collection_name=collection):
    client.create_collection(
        collection_name=collection,
        vectors_config=VectorParams(
            size=1536,
            distance=Distance.COSINE,
            multivector_config=MultiVectorConfig(comparator=MultiVectorComparator.MAX_SIM),
        ),
    )


def upload_section(section: Section):
    """区切れたセクションをQdrantベクトルDBにポイントとして登録する"""
    # ランダムなUUID(バージョン4)文字列の生成
    unique_id_str = str(uuid.uuid4())

    # 複数の見出しと本文をまとめたマルチベクトル
    vectors = text_embed.concatenate_embeddings(section.h1, section.h2, section.h3, section.p)

    payload_data = {"metadata": section.metadata, "p": section.p["text"]}

    if section.h1["text"]:
        payload_data["h1"] = section.h1["text"]

    if section.h2["text"]:
        payload_data["h2"] = section.h2["text"]

    if section.h3["text"]:
        payload_data["h3"] = section.h3["text"]

    point = PointStruct(id=unique_id_str, vector=vectors, payload=payload_data)

    client.upsert(collection_name=collection, points=[point])


def get_all_points():
    """全てのポイントを取得する"""
    start_time = time.time()
    all_points = client.scroll(
        collection_name=collection,
        scroll_filter=None,  # No filter = get all
        limit=1000,  # Adjust limit as needed
    )
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"全ポイント取得の実行時間: {execution_time:.4f} 秒")
    return all_points[0]
    # all_points is a tuple: (list_of_points, next_offset)
    # for point in all_points[0]:
    #     print(point)


def get_number_of_points():
    """全てのポイント数を取得する"""
    collection_info = client.get_collection(collection_name=collection)
    return collection_info.points_count


def search_by_query(query: str, limit: int, with_vector=False):
    """クエリで検索する
    Args:
        query: ユーザークエリ
        limit: 結果件数の制限
    Returns:
        out: 類似度高い辞書型のリスト
    """
    query_embed_start_time = time.time()
    # ユーザークエリのベクトル化
    query_embedding = vector_embed.create_embeddings(query)

    query_embed_end_time = time.time()
    query_embed__time = query_embed_end_time - query_embed_start_time

    print(f"クエリ埋め込みの実行時間: {query_embed__time:.4f} 秒")

    start_time = time.time()
    scored_points = client.query_points(
        collection_name=collection, query=query_embedding, with_payload=True, limit=limit, with_vectors=with_vector
    ).points
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"検索実行時間: {execution_time:.4f} 秒")
    return scored_points


def create_filter_by_pg_id(pg_id: str) -> Filter:
    return Filter(must=[FieldCondition(key="metadata.pg_id", match=MatchValue(value=pg_id))])


def check_if_page_exists_by_filter(filter: Filter) -> bool:
    """ページが存在するかを確認する"""
    scroll_result, _ = client.scroll(
        collection_name=collection,
        scroll_filter=filter,
        limit=1,  # 1件のみのチェックでいける
    )
    return len(scroll_result) != 0


def delete_points_by_pg_id(pg_id: str):
    """ページIDでポイントを削除する
    与えられたページIDに基づいてフィルタを作成し、
    該当するデータが存在するかを確認した上で、Qdrantコレクションから
    該当するポイントを削除します。
    削除処理の実行時間も計測され、標準出力に表示されます。
    Args:
        pg_id (str): 削除対象のページID。
    Returns:
        標準出力に削除処理の実行時間(秒)を表示します。
    Raises:
        Exception: 指定されたページIDに関連するデータが存在しない場合に発生します。
    """
    start_time = time.time()
    filter = create_filter_by_pg_id(pg_id=pg_id)
    if not check_if_page_exists_by_filter(filter=filter):
        raise Exception(f"ID:{pg_id}のページに関連するデータは見つかりませんでした。")
    client.delete(collection_name=collection, points_selector=FilterSelector(filter=filter))
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"ページID「{pg_id}」の削除実行時間: {execution_time:.4f} 秒")


def delete_all_points():
    """全てのポイントを削除する"""
    start_time = time.time()
    if client.collection_exists(collection_name=collection):
        client.delete_collection(collection_name=collection)
        print(f"コレクション'{collection}'の削除が完了しました。")
        # Recreate the collection after deletion
        client.create_collection(
            collection_name=collection,
            vectors_config=VectorParams(
                size=1536,
                distance=Distance.COSINE,
                multivector_config=MultiVectorConfig(comparator=MultiVectorComparator.MAX_SIM),
            ),
        )
        print(f"コレクション'{collection}'の再作成が完了しました。")
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"全件の削除実行時間: {execution_time:.4f} 秒")

ページの操作

こちらのpage_handler.pyスクリプトは、モデルの定義SectionTextEmbeddingモデルを使用し、見出しごとに分割し、分割されたセクションをQdrantベクトルDBに登録や更新の役割を果たします。

page_handler.py
# 標準ライブラリ
import time

# サードパーティライブラリ
from langchain.schema import Document

# ローカルモジュール
import mongoDB_api
import qdrant
import split_markdown
import split_text
import vector_embed
from model.metadata import Metadata
from model.section import Section
from model.text_embed import TextEmbedding


class PageHandlerAPIError(Exception):
    """mongoDBAPIのエラーを表す例外クラス"""

    def __init__(self, description):
        super().__init__()
        self.description = description

    def __repr__(self):
        return f"PageHandlerAPIError({self.description})"

    def __str__(self):
        return f"エラー:{self.description}"


def generate_json_data(text: str) -> TextEmbedding:
    """JSONデータを生成する
    Args:
        text: テキスト
    Returns:
        テキスト、ベクトル埋め込み配列のJSONデータ
    """
    # テキストをチャンクに分ける
    text_chunks = split_text.split(text)

    # テキストとトークン数のJSONデータ
    result = {
        "text": text,
        "embeddings": [vector_embed.create_embeddings(chunk) for chunk in text_chunks],
    }

    return result


def split_sections(metatada: Metadata, docs: list[Document]) -> list[Section]:
    """各ドキュメントごとにセクション化する"""
    section_list = []

    for doc in docs:
        h1 = {"text": "", "embeddings": []}
        h2 = {"text": "", "embeddings": []}
        h3 = {"text": "", "embeddings": []}
        p = {"text": "", "embeddings": []}

        if doc.metadata.get("header1"):
            h1["text"] = doc.metadata["header1"]
        if doc.metadata.get("header2"):
            h2["text"] = doc.metadata["header2"]
        if doc.metadata.get("header3"):
            h3["text"] = doc.metadata["header3"]
        if doc.page_content:
            p["text"] = doc.page_content

        if (p["text"]) and (not p["text"].startswith("---")):
            if h1["text"]:
                h1 = generate_json_data(h1["text"])

            if h2["text"]:
                h2 = generate_json_data(h2["text"])

            if h3["text"]:
                h3 = generate_json_data(h3["text"])

            if p["text"]:
                p = generate_json_data(p["text"])

            section = Section(metadata=metatada, h1=h1, h2=h2, h3=h3, p=p)
            section_list.append(section)

    return section_list


def split_page_and_upload_sections_to_qdrant(page_data: dict):
    pg_id = page_data["_id"]
    body_length = page_data["bodyLength"]

    if body_length == 0:
        print(f"ID:{pg_id}のページは空です!!!")
        return

    print(f"ページの_id: {pg_id}")
    print(f"文書の長さ: {body_length}")

    # メーターデータの定義
    creator = page_data["creator"]["username"]
    created_at = page_data["createdAt"]
    last_update_user = page_data["lastUpdateUser"]["username"]
    updated_at = page_data["updatedAt"]

    # メーターデータの初期化
    metadata: Metadata = {
        "pg_id": pg_id,
        "creator": creator,
        "created_at": created_at,
        "last_update_user": last_update_user,
        "updated_at": updated_at,
    }

    page_body = page_data["body"]
    # マークダウンの分割
    splitted_docs = split_markdown.split(page_body)
    print(f"分割されたドキュメント数: {len(splitted_docs)}")

    # セクションの分割
    sections = split_sections(metadata, splitted_docs)

    if len(sections) == 0:
        print(f"id{pg_id}の中身は空です。")
        return

    print(f"分割されたセクション数: {len(sections)}")

    # if (has_diff_to_prev == True) and (body_length > 0): #should be False, but temporarily set True
    for section in sections:
        section_index = sections.index(section)
        print(f"section index: {section_index}")
        qdrant.upload_section(section)


def insert_all_pages_to_qdrant(url: str, access_token: str, all_pages: list):
    """全ページの分割されたセクションをqdrantに登録する"""
    start_time = time.time()
    for page in all_pages:
        page_data = mongoDB_api.get_page_data(url, access_token, page["_id"])
        if page_data is not None: 
            # 各ページを複数のセクションに分割し、QdrantDBにアップロードする
            split_page_and_upload_sections_to_qdrant(page_data)

    end_time = time.time()
    execution_time = end_time - start_time
    print(f"全ページの登録実行時間: {execution_time:.4f} 秒")


def insert_by_pg_id(url: str, access_token: str, page_id: str):
    """指定ページの分割されたセクションをqdrantに登録する"""
    start_time = time.time()
    page_data = mongoDB_api.get_page_data(url=url, access_token=access_token, page_id=page_id)
    if page_data is None:
        raise PageHandlerAPIError(f"ID {page_id} のページはありません。")
    split_page_and_upload_sections_to_qdrant(page_data=page_data)
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"ID:{page_id}のページ登録実行時間: {execution_time:.4f} 秒")


def update_by_pg_id(url: str, access_token: str, page_id: str):
    """ページIDで更新する
    この関数は以下の処理を行います
    1. 指定したページIDに紐づく既存のベクトルポイントをQdrantから削除します。
        ⇒ page_idに関連するデータがなかった場合、そのページに関連するデータがないという例外が発生します。
    2. mongoDB APIから最新のページデータを取得します。
    3. ページをセクションに分割し、更新されたセクションをQdrantにアップロードします。
    Args:
        page_id (str): 更新対象ページの一意なID
    """
    start_time = time.time()
    page_data_to_update = mongoDB_api.get_page_data(url=url, access_token=access_token, page_id=page_id)

    # 更新したいページがあるかの確認
    if page_data_to_update is None:
        raise PageHandlerAPIError(f"ID {page_id} のページはありません。")

    qdrant.delete_points_by_pg_id(pg_id=page_id)

    split_page_and_upload_sections_to_qdrant(page_data=page_data_to_update)
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"ページID「{page_id}」の更新時間: {execution_time:.4f} 秒")

FastAPI

FastAPI経由でQdrantDBの以下の操作ができます。

  1. 全ページ登録
  2. 全件数取得
  3. 全件取得
  4. 全件削除
  5. ページidによる登録
  6. ページidによる更新
  7. ページidによる削除
fastapi_qdrant.py
import os

import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

import mongoDB_api
import page_handler
import qdrant

# .envファイルをロードする
load_dotenv()

# mongoDBのURLとAPIトークン
wiki_url = os.getenv("MONGODB_URL")  # mongoDBのURL
wiki_api_token = os.getenv("MONGODB_API_TOKEN")  ##mongoDBのAPIトークン

# FastAPIアプリケーションの初期化
app = FastAPI()


# クエリリクエストボディのモデル
class QueryRequest(BaseModel):
    query: str
    top_k: int = 5


# Wikiリクエストボディのモデル
class WikiRequest(BaseModel):
    path: str


@app.post("/search", tags=["Find"])
async def search_similar_data(request: QueryRequest):
    """検索エンドポイント"""
    try:
        results = qdrant.search_by_query(query=request.query, limit=request.top_k)
        if not results:
            raise HTTPException(status_code=404, detail="No results found")

        print(f"結果の数: {len(results)}")
        print(f"クエリ: {request.query}, 件数制限: {request.top_k}")
        print(f"結果: {results}")
        # 結果のペイロードリストを返す
        return [hit.payload for hit in results]

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/insert_all_pages", tags=["DB_ALL"])
async def insert_all_pages_from_path(request: WikiRequest):
    """Qdrantに全ページのデータを登録するエンドポイント"""
    # Wikiのすべてのベージを取得する
    if not wiki_url or not wiki_api_token:
        raise HTTPException(status_code=500, detail="Wiki URL または API トークンが設定されていません。")

    loaded_pages = mongoDB_api.get_page_list(wiki_url, wiki_api_token, request.path)
    if not loaded_pages:
        raise HTTPException(status_code=404, detail="指定されたパスにページが見つかりませんでした。")
    print(f"取得したページ数: {len(loaded_pages)}")
    try:
        print(f"登録するページのパス: {request.path}")
        # qdrantにデータ登録
        page_handler.insert_all_pages_to_qdrant(url=wiki_url, access_token=wiki_api_token, all_pages=loaded_pages)

        # 成功メッセージを返す
        return {
            "message": "登録が成功しました。",
            "page_count": len(loaded_pages),
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/number_of_points", tags=["DB_ALL"])
async def get_number_of_points():
    """全ポイントの件数を取得するエンドポイント"""
    try:
        number_of_points = qdrant.get_number_of_points()
        return {
            "total_number_of_points": number_of_points,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/points", tags=["DB_ALL"])
async def get_all_points():
    """全てのポイントを取得するエンドポイント"""
    try:
        all_points = qdrant.get_all_points()
        if not all_points:
            raise HTTPException(status_code=404, detail="No points found")
        return all_points

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    

@app.delete("/delete_all", tags=["DB_ALL"])
async def delete_all_points():
    """全てのポイントを削除するエンドポイント"""
    try:
        qdrant.delete_all_points()
        return {"message": "全てのポイントが削除されました。"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/insert_by_page_id", tags=["DB_ID"])
async def insert_by_pg_id(page_id: str):
    """
    指定したページIDのベクトル情報をQdrantに登録するエンドポイント
    Args:
        page_id (str): 登録対象ページの一意なID

    Raises:
        QdrantやmongoDB API操作中に発生した例外をそのまま送出します。
    """
    try:
        if not wiki_url or not wiki_api_token:
            raise HTTPException(status_code=500, detail="Wiki URL or API token is not set in environment variables.")

        page_handler.insert_by_pg_id(url=wiki_url, access_token=wiki_api_token, page_id=page_id)
        return {"message": f"ページID {page_id} に基づくベクトル情報の登録が完了しました。"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.put("/update_by_pg_id", tags=["DB_ID"])
async def update_vector_by_pg_id(page_id: str):
    """
    指定したページIDのベクトル情報をQdrantで更新するエンドポイント
    Args:
        page_id (str): 更新対象ページの一意なID

    Raises:
        QdrantやmongoDB API操作中に発生した例外をそのまま送出します。
    """
    try:
        if not wiki_url or not wiki_api_token:
            raise HTTPException(status_code=500, detail="Wiki URL or API token is not set in environment variables.")

        page_handler.update_by_pg_id(url=wiki_url, access_token=wiki_api_token, page_id=page_id)
        return {"message": f"ページID{page_id}に基づくベクトル情報の更新が完了しました。"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.delete("/delete_by_page_id", tags=["DB_ID"])
async def delete_by_page_id(page_id: str):
    """ページIDでポイントを削除するエンドポイント"""
    try:
        qdrant.delete_points_by_pg_id(pg_id=page_id)
        return {"message": f"ページID:{page_id}のすべてのデータが削除されました。"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


# このスクリプトが直接実行されたときのみアプリを起動する
if __name__ == "__main__":
    # Uvicorn を使って FastAPI アプリを起動する
    uvicorn.run("fastapi_qdrant:app", host="127.0.0.1", port=8001, reload=True)

FastMCP

こちらは、FastMCP経由でQdrantベクトルDBの操作ができるソースコードです。
FastMCPの二つの機能であるツール(Tool)とリソース(Resource)を使用しました。

  • ツール(Tool)に検索機能があります。
  • リソース(Resource)に全件取得機能があります。
mcp_qdrant_server.py
import os

from dotenv import load_dotenv
from fastapi import HTTPException
from fastmcp import FastMCP
from pydantic import BaseModel

import qdrant

# .envファイルをロードする
load_dotenv()

# mcp
mcp = FastMCP(name="McpQdrantServer")


# クエリリクエストボディのモデル
class QueryRequest(BaseModel):
    query: str
    top_k: int = 5


@mcp.tool(tags={"find", "public"})
async def search_similar_data(request: QueryRequest):
    """
    セリオのナレッジ検索ツール
    Args:
        request (QueryRequest): 検索クエリと返す件数を含むリクエストオブジェクト
        {
            "query": ユーザからの検索クエリ
            "limit": 検索件数(何も指定がなければ5件)
        }
    Returns:
        List[dict]: 検索結果のペイロードのリスト
    Raises:
        HTTPException:
            - 結果が見つからない場合は404を返します。
            - 検索処理中に予期しないエラーが発生した場合は500を返します。
    """

    try:
        results = qdrant.search_by_query(query=request.query, limit=request.top_k)
        if not results:
            raise HTTPException(status_code=404, detail="No results found")

        print(f"結果の数: {len(results)}")
        print(f"クエリ: {request.query}, 件数制限: {request.top_k}")
        print(f"結果: {results}")
        # 結果のペイロードリストを返す
        return [hit.payload for hit in results]

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@mcp.resource(uri="resource://all-points", mime_type="application/json", tags={"store", "private", "get_all"})
async def get_all_points():
    """全てのポイントを取得するツール"""
    try:
        all_points = qdrant.get_all_points()
        if not all_points:
            raise HTTPException(status_code=404, detail="No points found")
        return all_points

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


# このスクリプトが直接実行されたときのみアプリを起動する
if __name__ == "__main__":
    mcp.run(transport="streamable-http")

結果

  • 10件のmarkdownファイルの登録実行時間179.9831 秒かかりました。
  • よく使うプログラミング言語というクエリに対する結果は、以下のようです。
    • クエリの埋め込み時間1.2550 秒
    • 検索実行時間0.0215 秒
  • よく使うプログラミング言語にたいする上位5件の取得したペイロードまたはデータのみのJSON結果は以下のようです。
[
  {
    "pg_id": "683d3ea1a9cb231ee7e80a1d",
    "p": "プログラミング言語は以下のような目的で使用されます:  \n- **ソフトウェア開発**:アプリケーションやゲーム、Web サービスなどを作成。\n- **データ処理**:大量のデータを分析・処理。\n- **自動化**:繰り返し作業の効率化。\n- **教育・研究**:アルゴリズムや論理的思考の学習。  \n---",
    "h1": "プログラミング言語とは",
    "h2": "💡 プログラミング言語の目的"
  },
  {
    "pg_id": "683d3ea1a9cb231ee7e80a1d",
    "p": "プログラミング言語は、いくつかの観点で分類されます:  \n- **コンパイル型 vs インタプリタ型**  \n- コンパイル型:C, C++(事前に機械語に変換)\n- インタプリタ型:Python, JavaScript(逐次実行)  \n- **手続き型 vs オブジェクト指向型**  \n- 手続き型:C(命令の順序で処理)\n- オブジェクト指向型:Java, Python(データと処理を一体化)  \n- **静的型付け vs 動的型付け**\n- 静的型付け:Java, C++(変数の型を明示)\n- 動的型付け:Python, Ruby(実行時に型が決定)  \n---",
    "h1": "プログラミング言語とは",
    "h2": "🛠️ プログラミング言語の分類"
  },
  {
    "pg_id": "683d3ea1a9cb231ee7e80a1d",
    "p": "AI や IoT、量子コンピューティングの発展により、新しいプログラミング言語やパラダイムが登場しています。今後も、より効率的で安全な開発を支える技術として進化し続けるでしょう。  \n---",
    "h1": "プログラミング言語とは",
    "h2": "📈 今後の展望"
  },
  {
    "pg_id": "683d3ea1a9cb231ee7e80a1d",
    "p": "AI や IoT、量子コンピューティングの発展により、新しいプログラミング言語やパラダイムが登場しています。今後も、より効率的で安全な開発を支える技術として進化し続けるでしょう。  \n---",
    "h1": "プログラミング言語とは",
    "h2": "📈 今後の展望"
  },
  {
    "pg_id": "683d3ea1a9cb231ee7e80a1d",
    "p": "目的や用途に応じて、適切な言語を選ぶことが重要です:  \n- **初心者向け**:Python, Scratch\n- **Web 開発**:HTML/CSS + JavaScript + フレームワーク(React, Vue など)\n- **モバイルアプリ**:Swift(iOS)、Kotlin(Android)\n- **ゲーム開発**:C++, C#, Unity  \n---",
    "h1": "プログラミング言語とは",
    "h2": "🚀 プログラミング言語の選び方"
  }
]

その上位5件スコアポイントは以下のようです。

[
    ScoredPoint(id='14e8633f-b5a8-467b-984e-892a05ec4026', version=46, score=0.71252066, payload={'pg_id': '683d3ea1a9cb231ee7e80a1d', 'p': 'プログラミング言語は以下のような目的で使用されます:  \n- **ソフトウェア開発**:アプリケーションやゲーム、Web サービスなどを作成。\n- **データ処理**:大量のデータを分析・処理。\n- **自動化**:繰り返し作業の効率化。\n- **教育・研究**:アルゴリズムや論理的思考の学習。  \n---', 'h1': 'プログラミング言語とは', 'h2': '💡 プログラミング言語の目的'}, vector=None, shard_key=None, order_value=None), 
    ScoredPoint(id='d06aefe6-c3fd-48e8-9de7-40bfd85962cf', version=48, score=0.71252066, payload={'pg_id': '683d3ea1a9cb231ee7e80a1d', 'p': 'プログラミング言語は、いくつかの観点で分類されます:  \n- **コンパイル型 vs インタプリタ型**  \n- コンパイル型:C, C++(事前に機械語に変換)\n- インタプリタ型:Python, JavaScript(逐次実行)  \n- **手続き型 vs オブジェクト指向型**  \n- 手続き型:C(命令の順序で処理)\n- オブジェクト指向型:Java, Python(データと処理を一体化)  \n- **静的型付け vs 動的型付け**\n- 静的型付け:Java, C++(変数の型を明示)\n- 動的型付け:Python, Ruby(実行時に型が決定)  \n---', 'h1': 'プログラミング言語とは', 'h2': '🛠️ プログラミング言語の分類'}, vector=None, shard_key=None, order_value=None), 
    ScoredPoint(id='422288cd-ccf5-4226-b39e-f6986652c123', version=50, score=0.71252066, payload={'pg_id': '683d3ea1a9cb231ee7e80a1d', 'p': 'AI や IoT、量子コンピューティングの発展により、新しいプログラミング言語やパラダイムが登場しています。今後も、より効率的で安全な開発を支える技術として進化し続けるでしょう。  \n---', 'h1': 'プログラミング言語とは', 'h2': '📈 今後の展望'}, vector=None, shard_key=None, order_value=None), 
    ScoredPoint(id='750a4891-4522-472b-8c2a-77e1c800af02', version=51, score=0.71252066, payload={'pg_id': '683d3ea1a9cb231ee7e80a1d', 'p': 'AI や IoT、量子コンピューティングの発展により、新しいプログラミング言語やパラダイムが登場しています。今後も、より効率的で安全な開発を支える技術として進化し続けるでしょう。  \n---', 'h1': 'プログラミング言語とは', 'h2': '📈 今後の展望'}, vector=None, shard_key=None, order_value=None), 
    ScoredPoint(id='91de99fc-f0e9-4c51-8c9b-fee06e245bf8', version=49, score=0.71252066, payload={'pg_id': '683d3ea1a9cb231ee7e80a1d', 'p': '目的や用途に応じて、適切な言語を選ぶことが重要です:  \n- **初心者向け**:Python, Scratch\n- **Web 開発**:HTML/CSS + JavaScript + フレームワーク(React, Vue など)\n- **モバイルアプリ**:Swift(iOS)、Kotlin(Android)\n- **ゲーム開発**:C++, C#, Unity  \n---', 'h1': 'プログラミング言語とは', 'h2': '🚀 プログラミング言語の選び方'}, vector=None, shard_key=None, order_value=None)
]

考察

導入のしやすさという観点から見ると、QdrantのベクトルDBにはマルチベクトル機能があるため、今回のプロジェクトでは、複数の見出し本文が属する一つのセクションに対して、それぞれのベクトルを1つの配列にまとめることができました。その結果、ベクトル検索の際には、ユーザークエリに対してすべてのベクトルを考慮し、類似度を照らし合わせることができるため、非常に有用です。

パフォーマンス面では、同じ10件のMarkdownファイルをクラウド上のMongoDBからローカルホストのQdrantベクトルDBに登録する処理時間は、ローカルフォルダーからJSONファイルとして保存する場合より約30秒遅くなりました。ただし、これはネットワークの影響も考えられるため、特に問題はないと考えています。一方、同じ質問に対する検索処理時間は、QdrantベクトルDBからの検索(0.0215秒)の方が、ローカルJSONファイルからの検索(0.1075秒)より約0.08秒速く、パフォーマンス面でも優れていることが確認できました。


まとめ & 次のステップ

  • 今度のプロジェクトでは、QdrantベクトルDBを使って、登録と検索を行いました。登録と検索実行時間はそれぞれ179.9831 秒0.0215 秒でした。
  • 今回は10件のみのMarkdownファイルで検証した結果ですが、実際には今後データ量が何倍にも増えていくことが想定されるため、その際に結果がどう変化するかは、実際に試してみないと分かりません。
  • 今後は、QdrantベクトルDBの更新機能も追加し、ファイル保存型プロトタイプとQdrantDB用のプロトタイプの登録および検索の結果を比較検証していく予定です。

参考リンク

セリオ株式会社 テックブログ

Discussion