OpenSearchコードリーディング 検索編 (part 1)
OpenSearchの検索処理は、クライアントのRESTリクエストから始まり、複雑な分散処理を経て検索結果を返すまでに、多層的なアーキテクチャを通過します。
本記事では、OpenSearchのコードベースを解析し、検索リクエストがどのように処理されるのか、その内部メカニズムを解明していきます。
Part1では、検索リクエストの受信からパース、クエリリライトまでの流れを追っていきます。
検索受信処理の全体像
OpenSearchが検索リクエストを受け取ると、リクエスト内容をパースし、SearchRequestオブジェクトに変換します。次に検索パイプラインの適用とクエリリライトを実行します。最後に実際の検索処理を各シャードに対して実行します。本記事では検索処理をシャードに対して実行するまでの流れを扱います。
主要なクラスとデータ構造は以下の通りです。
ステップ1: RestControllerでのルーティング
クライアントからHTTPリクエストが到着すると、RestControllerが処理を受け持ちます。
RestController.tryAllHandlersメソッドでは、リクエストのrawPathとHTTPメソッドから適切なハンドラーを検索します。内部ではPathTrieというトライ木構造を使用しており、URLパスから対応するハンドラーを検索します。PathTrieは、URLパスの各セグメントをノードとして格納し、プレースホルダー({index}など)も処理します。
getAllRestMethodHandlersメソッドでは、PathTrie.retrieveAllを使ってパスに一致するすべてのハンドラーを取得します。/_searchや/{index}/_searchといったパスはRestSearchActionにマップされており、このマッピングはノード起動時にActionModuleによって設定されます。
registerHandlerメソッドは、URLパス(例:"/_search")とRESTハンドラー(例:RestSearchAction)のマッピングをPathTrieに登録します。ノード起動時には、ActionModuleが各RESTハンドラーのregisterHandlerを呼び出してこれらのマッピングを設定します。
またActionModuleは、アクションタイプ(例:SearchAction.INSTANCE、識別子: "indices:data/read/search")とトランスポートアクション(TransportSearchAction)のマッピングも別途管理します。これらのマッピングにより、RESTリクエストが適切なトランスポートアクションに到達できます。
ハンドラーが見つかると、dispatchRequestメソッドでRestHandlerインターフェースのhandleRequestメソッドが呼び出されます。RestSearchActionはBaseRestHandlerを継承しており、BaseRestHandlerがRestHandlerインターフェースを実装しています。BaseRestHandler.handleRequestは抽象メソッドprepareRequestを呼び出し、これがRestSearchActionで実装されています。
ステップ2: RestSearchActionでのリクエストパース
続いてRestSearchActionによるリクエストのパースの流れを見ていきます。
RestSearchAction.prepareRequestメソッドでは、まずSearchRequestオブジェクトを作成し、withContentOrSourceParamParserOrNullを使ってリクエストボディまたはsourceパラメータからパーサーを取得します。その後parseSearchRequestメソッドを呼び出して、HTTPリクエストをSearchRequestオブジェクトに変換します。
parseSearchRequestメソッドはリクエストボディをsearchRequest.source().parseXContentでパースします。parseXContentはトークンベースでJSONなどを処理し、各フィールド(query、aggregations、from、sizeなど)を対応する内部表現に変換します。
parseSearchSourceメソッドでは、URLパラメータが処理されます。まずRestActions.urlParamsToQueryBuilderでURLパラメータからクエリを構築し、既存のクエリがあれば上書きします。その後、from、size、timeoutなどの個別パラメータを処理します。リクエストボディが先に処理され、その後URLパラメータが処理されるため、両者が同じフィールドを設定していた場合は、URLパラメータがボディの設定を上書きする形になります。
ステップ3: NodeClientの呼び出し
SearchRequestの構築が完了すると、RestSearchActionはRestCancellableNodeClientを通じてリクエストを実行します。
prepareRequestメソッドの最後で、RestCancellableNodeClientを作成してSearchAction.INSTANCEを実行します。RestCancellableNodeClientはHTTPチャンネルの状態を監視し、クライアントが接続を切断した場合に検索タスクをキャンセルする機能を提供します。
RestCancellableNodeClientは内部的にNodeClientに処理を委譲します。NodeClient内部では、DynamicActionRegistryがアクションタイプから対応するトランスポートアクションを解決します。SearchAction.INSTANCEは"indices:data/read/search"という識別子を持ち、ActionModuleによってTransportSearchActionにマップされています。DynamicActionRegistryからTransportSearchActionが返されると、そのdoExecuteメソッドが呼び出され、実際の検索処理へと移行します。
ステップ4: TransportSearchActionでの検索処理
NodeClientがDynamicActionRegistryからTransportSearchActionを解決すると、検索処理の本格的な実行に入ります。
TransportSearchAction.doExecuteメソッドでは、タスクがCancellableTaskの場合にTimeoutTaskCancellationUtility.wrapWithCancellationListenerを使ってリスナーをラップします。これにより、search.cancel_after_time_interval設定で指定された時間を超えた検索を自動的にキャンセルできます。その後executeRequestメソッドを呼び出して実際の検索処理を開始します。
executeRequestメソッドでは、まずSearchTimeProviderを作成します。これは2種類の時刻を管理します。absoluteStartMillisはインデックス名の日付数式解決に必要で、relativeStartNanosは高精度な処理時間測定に使用されます。
トレーシングのためのOpenTelemetryのSpanを作成した後、SearchRequestOperationsListenerを構築します。次にsearchPipelineService.resolvePipelineでパイプラインを解決します。パイプラインが指定されている場合、PipelinedRequestが返され、リクエストとレスポンスの両方に変換を適用できます。その後、transformRequestでリクエストの変換を実行し、変換されたリクエストで次の処理に進みます。
buildRewriteListenerメソッドは、クエリのリライト処理を行うリスナーを構築します。このリスナーはActionListener.wrapを使って構築され、成功時と失敗時のハンドラーを持ちます。リクエストのsourceがnullの場合はそのままonResponseを呼び出し、nullでない場合はRewriteable.rewriteAndFetchでクエリのリライトを実行します。
リライトが完了すると、リスナーの成功ハンドラーが呼び出されます。リライトによってクエリが変更された場合(新しいSearchSourceBuilderオブジェクトが作成された場合)、それをsearchRequestに設定し直します。その後、クラスター状態から検索対象のインデックス情報を抽出します。リモートクラスターへの検索が含まれる場合はccsRemoteReduce、ローカルのみの場合はexecuteLocalSearchが呼び出され、実際の検索処理に移行します。
リライト処理の例として、Termsクエリのルックアップがあります。termsクエリで別のインデックスを参照する場合、リライト時に実際の値を取得し、具体的な値のリストに置換されます。また、日付の"now"は実行時のタイムスタンプに解決されます。例えば"gte": "now-1h"は、実行時刻の1時間前の具体的なタイムスタンプに変換されます。
リライト完了後、クラスター状態から情報を抽出します。extractRequestedIndicesメソッドでは、Point-in-Time検索の場合はSearchContextIdからインデックス情報を取得し、通常検索の場合はremoteClusterService.groupIndicesでローカルとリモートのインデックスを分類します。
resolveLocalIndicesメソッドは、ローカルインデックスの解決を行います。localIndicesがnullの場合は、リモートクラスターのみを検索対象とすることを意味し、空の結果を返します。そうでない場合は、indexNameExpressionResolver.concreteResolvedIndicesを呼び出して具体的なインデックス名に解決します。
buildPerIndexAliasFilterメソッドは、各インデックスに対するエイリアスフィルターを構築します。まずindexNameExpressionResolver.resolveExpressionsで指定されたインデックスとエイリアスのセットを取得します。次に、各具体的なインデックスに対してループし、読み取りがブロックされていないかチェックした後、searchService.buildAliasFilterでエイリアスフィルターを構築します。最終的にインデックスのUUIDをキーとしたマップに格納します。
executeSearchメソッドでは、クラスター状態からシャードの情報を取得します。Point-in-Time検索の場合はSearchContextIdから、通常検索の場合はoperationRouting.searchShardsでシャードルーティングを決定します。preferenceパラメータにより、プライマリシャードまたはレプリカシャードの選択を制御できます。nodeSearchCountsで各ノードの現在の検索負荷を追跡し、負荷分散を実現します。
最後に、searchAsyncActionProvider.asyncSearchActionを呼び出して、実際の非同期検索処理を開始します。これにより、複数のシャードに対する並列検索が実行され、結果が集約されます。
まとめ
Part1では、検索リクエストがOpenSearchに到着してから、実際の検索実行の準備が整うまでの流れを追いました。
HTTPリクエストはRestControllerで受信され、PathTrieによってRestSearchActionにルーティングされます。RestSearchActionでHTTPリクエストをSearchRequestオブジェクトに変換した後、NodeClientを経由してTransportSearchActionに処理が移譲されます。TransportSearchActionでは、検索パイプラインの適用、クエリリライト、インデックス解決、シャードルーティングの決定を順次実行し、実際の検索処理をデータノードへ引き渡すまでの準備を行います。
次回のPart2では、この準備された検索リクエストが実際にどのように各シャードで実行され、結果が集約されるのかを見ていきます。
参考
- OpenSearch Deep Dive - Search (part 1)
- 本記事の内容はOpenSearch Communityの動画シリーズ「OpenSearch Deep Dive - Search (part 1)」にインスパイアされています
- https://www.youtube.com/watch?v=MJPqYT7QPXA
OpenSearch Project(OSS) の Publicationです。 OpenSearch Tokyo User Group : meetup.com/opensearch-project-tokyo/
Discussion