🚀

LangChain の OpenSearchVectorSearch に Routing サポートを追加した話

に公開

TL;DR
LangChain の OpenSearchVectorSearch に包括的な routing 機能 を実装しました。
これにより、OpenSearch クラスタでの検索・削除を効率化し、データ局所性を保てるようになりました。
PR: feat: Add comprehensive routing support to OpenSearchVectorSearch #349


💡 背景:なぜ Routing が必要なのか?

LangChain の OpenSearchVectorSearch は、
OpenSearch クラスタ上にベクトルデータを格納・検索するためのバックエンド実装です。

しかしこれまで、routing パラメータ がサポートされていませんでした。
OpenSearch における routing は、どのシャードにドキュメントを保存するかを決めるキーです。

routing が無いと、検索や削除時にクラスタ全体へクエリがブロードキャストされてしまいます。


🧩 Routing なしの問題点

  • 全シャードに問い合わせが発生
  • ノード数が増えるほど検索コストが上昇
  • データ局所性が失われる

✅ Routing ありのデータ配置

  • routing=user123 のみを対象にクエリ送信
  • データ局所性を保ちつつ、検索・削除が高速化

🏗️ 実装内容(PR #349)

今回のPRでは、
LangChain の OpenSearchVectorSearch包括的な routing サポート を追加しました。

📦 PR: feat: Add comprehensive routing support to OpenSearchVectorSearch #349


🧱 対応範囲

機能カテゴリ 対象メソッド 対応内容
バルク挿入 _bulk_ingest_embeddings, _abulk_ingest_embeddings _routing サポート
インデックス作成 _default_text_mapping, _default_scripting_text_mapping _routing.required 設定可能
検索 _raw_similarity_search_with_score_by_vector routing パラメータを付与
削除 delete, adelete _routing による対象絞り込み
ファクトリメソッド from_embeddings, afrom_embeddings routing 付き index を作成可能
型安全性 各 mapping 関数 cast(Dict[str, Any], ...) による mypy 安定化

🔍 コードハイライト

1️⃣ routing パラメータの導入

class OpenSearchVectorSearch:
    def __init__(self, opensearch_url: str, index_name: str, embedding: Embeddings, **kwargs):
        self.routing = kwargs.get("routing")  # ✅ routing を保持

2️⃣ バルク挿入時の _routing 付与

def _bulk_ingest_embeddings(..., routing: Optional[str] = None) -> List[str]:
    for doc, vector in zip(texts, embeddings):
        request = {"_op_type": "index", "_index": index_name, "vector_field": vector}
        if routing:
            request["_routing"] = routing  # ✅ routing 設定
        requests.append(request)

3️⃣ マッピングに routing 要件を追加

def _default_text_mapping(..., routing_required: bool = False):
    mapping = {
        "settings": {"index": {"knn": True}},
        "mappings": {"properties": {...}},
    }

    mappings_section = cast(Dict[str, Any], mapping["mappings"])
    if routing_required:
        mappings_section["_routing"] = {"required": True}  # ✅
    return mapping

4️⃣ 検索時の routing パラメータ伝搬

def _raw_similarity_search_with_score_by_vector(...):
    search_params = {"index": index_name, "body": search_query}
    if self.routing:
        search_params["routing"] = self.routing  # ✅
    response = self.client.search(**search_params)

⚙️ 使用例:routing 指定付きのベクトルストア作成

from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_openai import OpenAIEmbeddings

embedding = OpenAIEmbeddings()

vectorstore = OpenSearchVectorSearch.from_embeddings(
    embedding=embedding,
    opensearch_url="http://localhost:9200",
    index_name="docs_with_routing",
    routing="user123",  # ✅ ルーティングキーを指定
)

検索時も routing が自動的に付与されます👇

docs = vectorstore.similarity_search("LangChain routing example")
# 内部では search(..., routing="user123") が実行される

📈 効果の比較(定性的)

指標 Routingなし Routingあり
検索レイテンシ 全シャードにアクセス 対象シャードのみアクセス
削除効率 全体ブロードキャスト 局所的削除が可能
スケーラビリティ ノード数増加に比例して低下 シャード単位で安定

🧠 実装時の工夫ポイント

  • 型安全性の担保
    cast(Dict[str, Any], mapping["mappings"]) により mypy を通過し、CI で型エラーゼロを実現。
  • 非同期APIとの整合性
    async/sync 両方の削除メソッドを _op_type: "delete" 形式に統一。
  • 柔軟なマッピング生成
    routing が必要な場合のみ _routing.required=True を動的追加。

🔮 今後の展望

  • 現在は async search メソッドが未実装
    → 将来的には araw_similarity_search_with_score_by_vector() などへ routing 伝搬を拡張予定。
  • routing キーの自動生成(例:ユーザーIDハッシュ)も検討中。

🧭 Routingのリクエストフロー


🏁 まとめ

LangChain の OpenSearchVectorSearch に包括的な routing サポートを導入することで、
クラスタ環境でも高効率なデータ分散・検索・削除が可能になりました。


📘 PRリンク:
feat: Add comprehensive routing support to OpenSearchVectorSearch #349

✍️ Author: @yukiharada1228


💬 最後に

LangChain の OpenSearchVectorSearch は、RAG 構築や大規模検索アプリの精度・スピードを支える強力なコンポーネントです。
今回紹介した「包括的ルーティング対応」は、今後の検索戦略の幅を広げる基盤となるはずです。

興味のある方は、ぜひ PR #349 の実装詳細も覗いてみてください。
実際に試してみた結果や改善案なども、コメントで共有してもらえると嬉しいです!

Discussion