LangChain の OpenSearchVectorSearch に Routing サポートを追加した話
TL;DR
LangChain のOpenSearchVectorSearchに包括的な routing 機能 を実装しました。
これにより、OpenSearch クラスタでの検索・削除を効率化し、データ局所性を保てるようになりました。
PR: feat: Add comprehensive routing support to OpenSearchVectorSearch #349
💡 背景:なぜ Routing が必要なのか?
LangChain の OpenSearchVectorSearch は、
OpenSearch クラスタ上にベクトルデータを格納・検索するためのバックエンド実装です。
しかしこれまで、routing パラメータ がサポートされていませんでした。
OpenSearch における routing は、どのシャードにドキュメントを保存するかを決めるキーです。
routing が無いと、検索や削除時にクラスタ全体へクエリがブロードキャストされてしまいます。
🧩 Routing なしの問題点
- 全シャードに問い合わせが発生
- ノード数が増えるほど検索コストが上昇
- データ局所性が失われる
✅ Routing ありのデータ配置
-
routing=user123のみを対象にクエリ送信 - データ局所性を保ちつつ、検索・削除が高速化
🏗️ 実装内容(PR #349)
今回のPRでは、
LangChain の OpenSearchVectorSearch に 包括的な routing サポート を追加しました。
📦 PR: feat: Add comprehensive routing support to OpenSearchVectorSearch #349
🧱 対応範囲
| 機能カテゴリ | 対象メソッド | 対応内容 |
|---|---|---|
| バルク挿入 |
_bulk_ingest_embeddings, _abulk_ingest_embeddings
|
_routing サポート |
| インデックス作成 |
_default_text_mapping, _default_scripting_text_mapping
|
_routing.required 設定可能 |
| 検索 | _raw_similarity_search_with_score_by_vector |
routing パラメータを付与 |
| 削除 |
delete, adelete
|
_routing による対象絞り込み |
| ファクトリメソッド |
from_embeddings, afrom_embeddings
|
routing 付き index を作成可能 |
| 型安全性 | 各 mapping 関数 |
cast(Dict[str, Any], ...) による mypy 安定化 |
🔍 コードハイライト
1️⃣ routing パラメータの導入
class OpenSearchVectorSearch:
def __init__(self, opensearch_url: str, index_name: str, embedding: Embeddings, **kwargs):
self.routing = kwargs.get("routing") # ✅ routing を保持
2️⃣ バルク挿入時の _routing 付与
def _bulk_ingest_embeddings(..., routing: Optional[str] = None) -> List[str]:
for doc, vector in zip(texts, embeddings):
request = {"_op_type": "index", "_index": index_name, "vector_field": vector}
if routing:
request["_routing"] = routing # ✅ routing 設定
requests.append(request)
3️⃣ マッピングに routing 要件を追加
def _default_text_mapping(..., routing_required: bool = False):
mapping = {
"settings": {"index": {"knn": True}},
"mappings": {"properties": {...}},
}
mappings_section = cast(Dict[str, Any], mapping["mappings"])
if routing_required:
mappings_section["_routing"] = {"required": True} # ✅
return mapping
4️⃣ 検索時の routing パラメータ伝搬
def _raw_similarity_search_with_score_by_vector(...):
search_params = {"index": index_name, "body": search_query}
if self.routing:
search_params["routing"] = self.routing # ✅
response = self.client.search(**search_params)
⚙️ 使用例:routing 指定付きのベクトルストア作成
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings()
vectorstore = OpenSearchVectorSearch.from_embeddings(
embedding=embedding,
opensearch_url="http://localhost:9200",
index_name="docs_with_routing",
routing="user123", # ✅ ルーティングキーを指定
)
検索時も routing が自動的に付与されます👇
docs = vectorstore.similarity_search("LangChain routing example")
# 内部では search(..., routing="user123") が実行される
📈 効果の比較(定性的)
| 指標 | Routingなし | Routingあり |
|---|---|---|
| 検索レイテンシ | 全シャードにアクセス | 対象シャードのみアクセス |
| 削除効率 | 全体ブロードキャスト | 局所的削除が可能 |
| スケーラビリティ | ノード数増加に比例して低下 | シャード単位で安定 |
🧠 実装時の工夫ポイント
-
型安全性の担保
cast(Dict[str, Any], mapping["mappings"])により mypy を通過し、CI で型エラーゼロを実現。 -
非同期APIとの整合性
async/sync 両方の削除メソッドを_op_type: "delete"形式に統一。 -
柔軟なマッピング生成
routing が必要な場合のみ_routing.required=Trueを動的追加。
🔮 今後の展望
- 現在は
async searchメソッドが未実装
→ 将来的にはaraw_similarity_search_with_score_by_vector()などへ routing 伝搬を拡張予定。 - routing キーの自動生成(例:ユーザーIDハッシュ)も検討中。
🧭 Routingのリクエストフロー
🏁 まとめ
LangChain の
OpenSearchVectorSearchに包括的な routing サポートを導入することで、
クラスタ環境でも高効率なデータ分散・検索・削除が可能になりました。
📘 PRリンク:
feat: Add comprehensive routing support to OpenSearchVectorSearch #349
✍️ Author: @yukiharada1228
💬 最後に
LangChain の OpenSearchVectorSearch は、RAG 構築や大規模検索アプリの精度・スピードを支える強力なコンポーネントです。
今回紹介した「包括的ルーティング対応」は、今後の検索戦略の幅を広げる基盤となるはずです。
興味のある方は、ぜひ PR #349 の実装詳細も覗いてみてください。
実際に試してみた結果や改善案なども、コメントで共有してもらえると嬉しいです!
Discussion