🛠️

Amazon OpenSearch Serverless で Delete by query がサポートされてないので実装してみる

2024/12/17に公開

https://qiita.com/advent-calendar/2024/lambda

🗓 AWS Lambda と Serverless Advent Calendar 2024 の 17 日目 🗓

🔰 はじめに

対象

以下の基礎知識がある方

前提条件

  • 2024 年 12 月時点の情報です
  • 例外処理は入れていない参考程度の実装例としてご了承ください

🔖 要約

  • クエリの条件に合致するすべてのドキュメントを削除できる Delete by query の API が Amazon OpenSearch Serverless でサポートされていないため、SearchBulk の API を組み合わせて OpenSearch Python Client で実装した
  • Search の API で 1 回に取得できる上限があることや、インデックス内のドキュメントを更新した場合に反映されるまでに時間がかかる仕様を考慮した実装にする必要があった
  • 削除対象の件数が多い場合は sleep を入れる必要があるため、削除処理が完了するまでそれなりに時間がかかる

🧑‍🏫 OpenSearch Serverless と Delete by query について

OpenSearch Serverless を選択する場合の注意点

OpenSearch Serverless は有効活用できれば運用面の手間を減らしてくれる一方で、OpenSearch で使用できる API の全てをサポートしていない点や Serverless 特有の Limitation がある点は注意が必要です。

公式ドキュメントに OpenSearch Serverless でサポートされている機能 が記載されていますが、例えば Delete by query (POST <index>/_delete_by_query) の API は記載がないため現時点では使用できません。

OpenSearch の Delete by query の使い方

Delete by query は削除したいドキュメントに合致する条件を、Search の API でも使用する Query DSL で表現できます。そのため大量にあるドキュメントの中から「YYYY 年より昔に登録されたドキュメントは削除する」など、柔軟にドキュメント削除ができて便利です。

OpenSearch Python Client で Delete by query を行いたい場合、OpenSearch クライアントに用意されている delete_by_query メソッドを使って body 引数に検索条件を渡すだけです。例えば評価の低い映画の情報をインデックスから削除する場合、以下のようなコードで実現できます。

from opensearchpy import OpenSearch

# 接続先などは必要に応じて設定
opensearch_client = OpenSearch(hosts=['localhost'])

# 削除対象の検索条件
index_name = "movies"
query = {
    "query": {
        "match": {
            "rate": 3
        }
    }
}
# すでにドキュメントが存在する場合に、該当するドキュメントをまとめて削除できる
response = opensearch_client.delete_by_query(index=index_name, body=query)

🧪 OpenSearch Serverless で Delete by query のようなことをしたい場合

考慮した点

Delete by query を使用しない場合、OpenSearch Serverless でドキュメントを削除する場合は以下のいずれかです。

  • Delete document の API でドキュメントを 1 つ削除
  • Bulk の API で複数のドキュメントをまとめて削除

そのため Search のレスポンスに含まれるドキュメント ID (_id) の値を使用して Bulk でまとめて削除すれば Delete by query を実現できるかと思っていたのですが、いくつかの考慮点が必要でした。

OpenSearch の Search API の制約 (Serverless は関係ない部分)

  • 最大取得件数が設定されているため、削除対象が多い場合に「Search でドキュメント ID を取得 → Bulk で削除」を何度か繰り返さないといけない
    • 例えば削除対象が 35,000 件の場合、「Search で 10,000 件のドキュメント ID を取得 → Bulk で削除」を 4 回繰り返す必要がある
  • クエリに合致するドキュメントの正確な数を取得したい場合は track_total_hitsTrue で指定する必要がある

OpenSearch Serverless の制約

  • ドキュメント削除などの変更をしても、OpenSearch Serverless ではインデックスのリフレッシュインターバルがベクトル検索では約 60 秒、通常の検索と時系列では約 10 秒である
    • そのため「Search で合致するドキュメント ID 取得 → Bulk で削除」を繰り返す場合、ドキュメント削除後にすぐ Search をすると既に削除リクエストをしているドキュメントが取得されてしまため削除後に次の Search をするまで待機する必要がある

実装例

考慮点を踏まえて def bulk_delete_using_search(...) というメソッドを定義して Delete by query のようなことをできるようにしてみました。引数には Delete by querybody 引数に渡すのと同じく Query DSL を渡せるようにしています。

その他工夫した点を挙げます。

  • Delete by query と同様に OpenSearch クライアントと index_name は必須で、それ以外は指定しなくても良いがオプションでパラメーターを変更できる
  • 削除対象のドキュメントが Search の最大取得数を超える場合は、任意の秒数だけ sleep してインデックス更新を待った後に残りのドキュメントの検索と削除をすることで確実に全てのドキュメントに対応できる
  • 処理を実行するマシンや OpenSearch のスペックを考慮して Search の最大取得数や Bulk でまとめるドキュメント数を任意の値に設定できる
import time

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth


def main():
    # TODO 使用する host と region を設定する
    opensearch_client = get_opensearch_client(host="my-test-domain.ap-northeast-1.aoss.amazonaws.com", region="ap-northeast-1")

    # TODO 削除対象の検索条件を必要に応じて設定する
    index_name = "movie"
    query = {
        "query": {
            "match": {
                "rate": 3
            }
        }
    }

    # Delete by query の代わりの処理 (例外処理は省略しています)
    bulk_delete_using_search(
        opensearch_client=opensearch_client,
        index_name=index_name,
        search_body=query,
        # ここから下は指定しなければデフォルト値を使う
        search_size=5000,  # 1 回の Search リクエストで取得するドキュメント数 (default では最大 10,000)
        search_timeout=90,  # Search リクエストのタイムアウト
        bulk_delete_batch_size=1000,  # 1 回の Bulk リクエストに含めるドキュメント数
        # sleep_seconds はベクトル検索では 60 秒より大きく、通常の検索と時系列では 10 秒より大きくしておく
        sleep_seconds=80,  # 複数回 Bulk リクエストが必要な場合、次の Search をするまでのインデックス更新を待機する秒数
    )


def bulk_delete_using_search(
    opensearch_client,
    index_name: str,
    search_body: dict,
    search_size: int = 10000,
    search_timeout: int = 180,
    bulk_delete_batch_size: int = 10000,
    sleep_seconds: int = 80
):
    # 最大で search_size の件数のドキュメントを削除
    hits_total = _bulk_delete_using_search(opensearch_client, index_name, search_body, search_size, search_timeout, bulk_delete_batch_size)
    # 検索条件に該当するドキュメント数が 1 回の Search 取得件数より多い場合、残りを取得して削除する
    while hits_total > search_size:
        # Serverless は結果の反映に時間がかかるので、削除した内容が検索でヒットしないようになるまでしばらく待ってから残りの検索と削除
        time.sleep(sleep_seconds)
        hits_total = _bulk_delete_using_search(opensearch_client, index_name, search_body, search_size, search_timeout, bulk_delete_batch_size)


def _bulk_delete_using_search(
    opensearch_client,
    index_name: str,
    search_body: dict,
    search_size: int,
    search_timeout: int,
    bulk_delete_batch_size: int
):
    search_response = opensearch_client.search(
        index=index_name,
        body=search_body,
        size=search_size,
        track_total_hits=True,  # パフォーマンスに多少影響あるが正確な件数を取得する
        params={"timeout": search_timeout},
    )
    hits_total = search_response["hits"]["total"].get("value", 0)
    print(f"{hits_total=}")
    if hits_total == 0:
        return 0

    # 削除対象のドキュメント ID を入れる配列
    body = []
    count = 0
    # 検索結果からドキュメント ID を取り出し、bulk_delete_batch_size の数を 1 回の Bulk リクエストで処理する
    for doc in search_response["hits"]["hits"]:
        count += 1
        body.append({"delete": {"_id": doc["_id"]}})
        if count >= bulk_delete_batch_size:
            count = 0
            opensearch_client.bulk(body, index=index_name)
            # 削除したドキュメント ID はクリア
            body = []
    if len(body) > 0:
        opensearch_client.bulk(body, index=index_name)

    return hits_total


def get_opensearch_client(
    host: str,  # ex: "my-test-domain.ap-northeast-1.aoss.amazonaws.com"
    region: str,  # ex: "ap-northeast-1"
    service: str = "aoss"
):
    credentials = boto3.Session().get_credentials()
    auth = AWSV4SignerAuth(credentials, region, service)

    return OpenSearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = auth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection,
        pool_maxsize = 20
    )


if __name__ == "__main__":
    main()

✅ まとめ

  • サポートされていない API を実装することで、OpenSearch の仕様や OpenSearch Serverless の制限事項についても学べた
  • 削除対象の件数が多い場合、Delete by query のように 1 回のリクエストで対象を全て削除することは Search の制約などがあり実現できない
  • OpenSearch Serverless を選択する場合は制限事項を確認して自身の要件を満たせるかを考慮しておかないと、マネージドなメリットのトレードオフとして自前で追加実装が必要になることもあるので注意

⚙️ おまけ

使用する Python パッケージの準備

  • Python が入っているコードの実行環境で必要なパッケージをインストール
python -m pip install boto3 opensearch-py

OpenSearch の準備

  • movie インデックスを作成し、ドキュメントを大量に作成する
import time

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth


def main():
    # TODO 使用する host と region を設定する
    opensearch_client = get_opensearch_client(host="my-test-domain.ap-northeast-1.aoss.amazonaws.com", region="ap-northeast-1")

    # TODO インデックス名を設定
    index_name = "movie"
    index_exists = opensearch_client.indices.exists(index_name)
    print(f"{index_exists=}")
    if not index_exists:
        create_index(opensearch_client, index_name)
        time.sleep(60)

    # ドキュメントを作成
    create_document(opensearch_client, index_name)

    # インデックスに作成されているドキュメント件数を確認
    query = {
        "query": {
            "match_all": {}
        }
    }
    time.sleep(80)
    count_document(opensearch_client, index_name, query)


# https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-vector-search.html#serverless-vector-tutorial
def create_index(opensearch_client, index_name):
    index_body = {
        "settings": {
            "index.knn": True
        },
        "mappings": {
            "properties": {
                "housing-vector": {
                    "type": "knn_vector",
                    "dimension": 2
                },
                "title": {
                    "type": "text"
                },
                "rate": {
                    "type": "long"
                }
            }
        }
    }
    create_index_response = opensearch_client.indices.create(index_name, body=index_body)
    print(f"{create_index_response=}")


def create_document(opensearch_client, index_name):
    # 適当に 99 x 1000 で 99000 件のドキュメント追加
    for i in range (99):
        bulk_body = []
        for j in range(1000):
            bulk_body.append({"index": {"_index": index_name}})
            bulk_body.append({
                "housing-vector": [
                    i + j,
                    i * j,
                ],
                "title": "dark night" + str(i * 1000 + j),
                "rate": j % 3 + 3,
            })
        bulk_response = opensearch_client.bulk(index=index_name, body=bulk_body)
        print(f"{bulk_response}")


def count_document(opensearch_client, index_name, query):
    count_response = opensearch_client.count(index=index_name, body=query)
    print(f"{count_response=}")


def get_opensearch_client(
    host: str,  # ex: "my-test-domain.ap-northeast-1.aoss.amazonaws.com"
    region: str,  # ex: "ap-northeast-1"
    service: str = "aoss"
):
    credentials = boto3.Session().get_credentials()
    auth = AWSV4SignerAuth(credentials, region, service)

    return OpenSearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = auth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection,
        pool_maxsize = 20
    )


if __name__ == "__main__":
    main()

OpenSearch の後片付け

  • movie インデックスを削除
import time

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth


def main():
    # TODO 使用する host と region を設定する
    opensearch_client = get_opensearch_client(host="my-test-domain.ap-northeast-1.aoss.amazonaws.com", region="ap-northeast-1")

    # TODO インデックス名を設定
    index_name = "movie"
    index_delete_response = opensearch_client.indices.delete(index_name)
    print(f"{index_delete_response=}")


def get_opensearch_client(
    host: str,  # ex: "my-test-domain.ap-northeast-1.aoss.amazonaws.com"
    region: str,  # ex: "ap-northeast-1"
    service: str = "aoss"
):
    credentials = boto3.Session().get_credentials()
    auth = AWSV4SignerAuth(credentials, region, service)

    return OpenSearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = auth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection,
        pool_maxsize = 20
    )


if __name__ == "__main__":
    main()
GitHubで編集を提案

Discussion