🧠

AOSS と Bedrock で AI 検索

に公開

はじめに

AOSS とは、Amazon OpenSearch Serverless である。弊社では、製品内の全文検索エンジンとして、AOSS を使用している。やはり、運用的にも保守的にも、リクエスト量の増減やAZ障害を気にしなくて良い「真にフルマネージド」の世界を経験してしまうと、もう元には戻れない。

世の中は、やはり AI である。そろそろなんとかしようと Bedrock Knowledgebase などを調査してみたのであるが、別に Chat なんかしなくても、AI 検索ができればいいんだよなぁ…と思っていたところ、

https://aws.amazon.com/jp/about-aws/whats-new/2025/08/amazon-opensearch-serverless-ai-connectors-hybrid-search/

である。2025年8月7日。これかもしれない…。

やってみた

とはいえ、何の知識もない状態で始められるほど、甘くはないことは承知してるので、スタート地点を探すことにした。

スタート地点

あった。

https://aws.amazon.com/jp/blogs/news/introduction-to-amazon-opensearch-service-workshop-jp

2025年3月21日。いいかもしれない…。

真にフルマネージド

OpenSearch Service を OpenSearch Serverless に置き換え、SageMager を Bedrock に置き換えよう。これが今回の縛りである。

結論

置き換えた。

https://github.com/take0a/aoss-bedrock-workshop

以上。であるが、ここでも説明してみる。ここでは、 4-ai-search.ipynb だけにするが、他の内容も大変勉強になった。ありがとう AWS。

準備

環境は、こんな感じで、VSCode から EC2 に Remote Development で Jupyter Notebook を開いて、EC2 の IAM ロールで AOSS と Bedrock にアクセスする。(これだけ。大量のリソースを CloudFormation で作成するのに抵抗のある方も安心w)

EC2

最近は Python といえば、uv なので、ここみて、uv をインストールしてください。

上の置き換えた Workshop のリポジトリを使うなら、

git clone https://github.com/take0a/aoss-bedrock-workshop.git
cd aoss-bedrock-workshop
uv sync

VSCode には、以下の Extension をインストールしてください。

IAM

以下の IAM Role に AOSS と Bedrock のアクセス権限を付与してください。

  • 上記の踏み台にする EC2 を作るときに紐づけた IAM Role
  • 管理コンソールを操作するユーザー

これとは別に、AOSS が Bedrock にアクセスするための IAM Role を作成する。ここで「信頼するサービス」は、ml.opensearchservice.amazonaws.com なので注意が必要である。というか結構無理ゲーだと思う。ここに書いてあるが…。ポリシーは、このブログの範囲であれば、InvokeModel ができれば OK。

AOSS

ベクトル検索コレクションを作るのであるが、コレクションを作る前に、AOSS のポリシーを作る。卵と鶏のようであるが、ポリシーを作る前に、コレクションの名前を決めておくのが吉。ここでは、workshop-vector で予定する。これは、以下のポリシーを割り当てるリソースを特定するためである。

既に、AOSS の検索コレクションを使っていて、開発環境だから OCU は最小の 2 に設定している場合、ベクトル検索コレクションを追加すると 4 に上げないとコレクションが作れない。また、AOSS を使っていない場合、だいぶ安くはなったが、それなりに費用がかかるので、事前に確認することをお勧めする。

Encryption policy

リソースを workshop* のように、コレクション名をカバーするように設定する。暗号化は、他のキーでも良いですが、AWS所有キーを使用するで良い。

Network policy

workshop* のエンドポイントとアクセスを有効にする。テスト用のデータでテストするだけなら、パブリックでも良いかも。

Data access policy

プリンシパルは、上の IAM で AOSS へのアクセス権限を付与した EC2 とユーザーの Role にする。リソースは、collection/workshop*index/workshop*/*model/workshop*/* の3つであるが、小さい画面では、最後の model が見えない&スクロールバーも出ない状態になる。また、3つ許可したいのに、1ルールのアイテムは2つまでと怒られる。ぶちぎれるところではあるが、ここは、グッと堪えて、ルールを2つに分けて登録する。

Bedrock

Amazon Titan Text Embeddings V2 を使いたいので、管理コンソールの モデルアクセスリクエスト可能 の場合には、モデルアクセスをリクエスト して、アクセスが付与されました にする。

ベクトル検索

ベクトル検索は、Bedrock Knowledgebase でも利用可能でしたので、2025年8月7日の発表分ではありませんが、今回のブログの範囲全体の基本なので、ここからやりましょう。

Bedrock 呼び出しのチェック

ここのコピペであるが、以下のコードを使って、Bedrock の設定や IAM の設定を確認しておく。

# Generate and print an embedding with Amazon Titan Text Embeddings V2.

import boto3
import json

# Create a Bedrock Runtime client in the AWS Region of your choice.
client = boto3.client("bedrock-runtime", region_name="us-east-1")

# Set the model ID, e.g., Titan Text Embeddings V2.
model_id = "amazon.titan-embed-text-v2:0"

# The text to convert to an embedding.
input_text = "Please recommend books with a theme similar to the movie 'Inception'."

# Create the request for the model.
native_request = {"inputText": input_text}

# Convert the native request to JSON.
request = json.dumps(native_request)

# Invoke the model with the request.
response = client.invoke_model(modelId=model_id, body=request)

# Decode the model's native response body.
model_response = json.loads(response["body"].read())

# Extract and print the generated embedding and the input text token count.
embedding = model_response["embedding"]
input_token_count = model_response["inputTextTokenCount"]

print("\nYour input:")
print(input_text)
print(f"Number of input tokens: {input_token_count}")
print(f"Size of the generated embedding: {len(embedding)}")
print("Embedding:")
print(embedding)

AOSS インデックスの作成

以下のように、knn_vector タイプのフィールドを用意したインデックスを作成する。

payload = {
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "question": {"type": "text", "analyzer": "custom_kuromoji_analyzer"},
      "context":  {"type": "text", "analyzer": "custom_kuromoji_analyzer"},
      "answers":  {"type": "text", "analyzer": "custom_kuromoji_analyzer"},
      "question_embedding": {
        "type": "knn_vector",
        "dimension": 1024,
        "space_type": "l2",
        "method": {
          "name": "hnsw",
          "engine": "faiss",
        }
      },
      "context_embedding": {
        "type": "knn_vector",
        "dimension": 1024,
        "space_type": "l2",
        "method": {
          "name": "hnsw",
          "engine": "faiss",
        },
      }
    }
  },
  "settings": {
    "index.knn": True,
    "analysis": {
      "analyzer": {
        "custom_kuromoji_analyzer": {
          "char_filter": ["icu_normalizer"],
          "filter": [
              "kuromoji_baseform",
              "custom_kuromoji_part_of_speech"
          ],
          "tokenizer": "kuromoji_tokenizer",
          "type": "custom"
        }
      },
      "filter": {
        "custom_kuromoji_part_of_speech": {
          "type": "kuromoji_part_of_speech",
          "stoptags": ["感動詞,フィラー","接頭辞","代名詞","副詞","助詞","助動詞","動詞,一般,*,*,*,終止形-一般","名詞,普通名詞,副詞可能"]
        }
      }
    }
  }
}
# インデックス名を指定
index_name = "jsquad-knn"

try:
    # 既に同名のインデックスが存在する場合、いったん削除を行う
    print("# delete index")
    response = opensearch_client.indices.delete(index=index_name)
    print(json.dumps(response, indent=2))
except Exception as e:
    print(e)

# インデックスを作成
response = opensearch_client.indices.create(index_name, body=payload)
response

ベクトルデータの生成

ベクトル検索の場合、ベクトルデータの準備は、AOSS 内部からではなく、以下のように、AOSS へアクセスする前に行うことになる。

def get_df_with_embeddings(input_df, field_mappings, model_id, bedrock_region, batch_size):
    output_df = pd.DataFrame([]) #create empty dataframe
    df_list = np.array_split(input_df, input_df.shape[0]/batch_size)
    for df in tqdm(df_list):
        index = df.index #backup index number
        df_with_embeddings = df
        for field_mapping in field_mappings:
            input_field_name = field_mapping["InputFieldName"]
            embedding_field_name = field_mapping["EmbeddingFieldName"]
            payload = {
                "inputText": df_with_embeddings[input_field_name].values.tolist()[0]
            }
            body = json.dumps(payload)
            bedrock_runtime_client = boto3.client("bedrock-runtime", region_name=bedrock_region)
            response = bedrock_runtime_client.invoke_model(modelId=model_id, body=body)
            model_response = json.loads(response["body"].read())
            embeddings = [model_response["embedding"]]

            df_with_embeddings = pd.concat([df_with_embeddings.reset_index(drop=True), pd.Series(embeddings,name=embedding_field_name).reset_index(drop=True)],axis=1) #join embedding results to source dataframe
            df_with_embeddings = df_with_embeddings.set_index(index) #restore index number

        output_df = pd.concat([output_df, df_with_embeddings])
    return output_df

valid_df_with_embeddings = get_df_with_embeddings(
    input_df=valid_df,
    field_mappings=[
        {"InputFieldName": "question", "EmbeddingFieldName": "question_embedding"},
        {"InputFieldName": "context", "EmbeddingFieldName": "context_embedding"},
    ],
    model_id=model_id,
    bedrock_region=default_region,
    batch_size=1
)

ドキュメントのロード

前項で、Pandas の DataFrame 上に準備したデータを AWS SDK for Pandas を使用してロードする。

index_name = "jsquad-knn"
response = wr.opensearch.index_df(
    client=opensearch_client,
    df=valid_df_with_embeddings,
    use_threads=True,
    index=index_name,
    bulk_size=200, # 200 件ずつ書き込み
    refresh=False,
)

ベクトル検索

ドキュメントのベクトルも外部で作成してロードしたが、検索の入力パラメータの query も外部でベクトル化して検索する。

index_name = "jsquad-knn"
model_id = "amazon.titan-embed-text-v2:0"
query = "日本で梅雨がない場所は?"

def text_to_embedding(text, region_name, model_id):
    payload = {
        "inputText": text
    }
    body = json.dumps(payload)
    bedrock_runtime_client = boto3.client("bedrock-runtime", region_name)
    response = bedrock_runtime_client.invoke_model(modelId = model_id, body=body)
    model_response = json.loads(response["body"].read())
    return model_response["embedding"]

vector = text_to_embedding(text=query, region_name=default_region, model_id=model_id)
k = 10

payload = {
  "query": {
    "knn": {
      "question_embedding": {
        "vector": vector,
        "k": k
      }
    }
  },
  "_source": False,
  "fields": ["question", "answers", "context"],
  "size": k
}
response = opensearch_client.search(
    index=index_name,
    body=payload
)
pd.json_normalize(response["hits"]["hits"])

ニューラル検索

ここからは、やっと 2025年8月7日の発表分。ベクトル検索と何が違うのか?
ドキュメント登録時のベクトルの作成も、検索時のベクトルの作成も AOSS 側でやってくれるのが、ニューラル検索ということらしい。

インデックスの作成

ベクトル検索と同じである。Workshop では、インデックスの名前だけ jsquad-neural-search に変更した。

コネクタの作成

これも 2025年8月7日の発表分の新機能。
AOSS が Bedrock へアクセスするためのコネクタを作成する。

embedding_model_name = "amazon.titan-embed-text-v2:0"

payload = {
  "name": embedding_model_name, 
  "description": "Remote connector for " + embedding_model_name,
  "version": 1, 
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": opensearch_connector_role_arn
  },
  "parameters": {
    "region": default_region,
    "service_name": "bedrock",
    "model": embedding_model_name,
    "dimensions": 1024,
    "normalize": True,
    "embeddingTypes": ["float"],    
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "headers": {
          "content-type": "application/json",
          "x-amz-content-sha256": "required",
      },
      "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/invoke",
      "pre_process_function": "connector.pre_process.bedrock.embedding",
      "request_body": '{ "inputText": "${parameters.inputText}", "dimensions": ${parameters.dimensions}, "normalize": ${parameters.normalize}, "embeddingTypes": ${parameters.embeddingTypes} }',
      "post_process_function": "connector.post_process.bedrock.embedding",
    }
  ]
}

# API の実行
response = opensearch_client.http.post("/_plugins/_ml/connectors/_create", body=payload)
# 結果からコネクタ ID を取得
opensearch_embedding_connector_id = response["connector_id"]

モデルの登録

前項のコネクタを指定して、モデルを登録する。
ここから先は、コネクタのIDではなく、モデルのIDで参照されることになる。

payload = {
    "name": embedding_model_name,
    "description": embedding_model_name,
    "function_name": "remote",
    "connector_id": opensearch_embedding_connector_id
}
response = opensearch_client.http.post("/_plugins/_ml/models/_register?deploy=true", body=payload)
opensearch_embedding_model_id = response['model_id']

(ぐるぐるしてるのは、カットしました。)

Ingestion pipeline の作成

Ingestion pipeline は、データ登録時にベクトル埋め込みを行う。
その際、text_embedding processor として、上で登録した model_id を保持している。

payload = {
  "processors": [
    {
      "text_embedding": {
        "model_id": opensearch_embedding_model_id,
        "field_map": {
            "question": "question_embedding",
            "context": "context_embedding"
        }
      }
    }
  ]
}

ingestion_pipeline_id = f"{embedding_model_name}_neural_search_ingestion"

response = opensearch_client.http.put("/_ingest/pipeline/" + ingestion_pipeline_id, body=payload)
print(response)

Search pipeline の作成

Search pipeline は、クライアントから入力されたテキストのクエリをベクトルのクエリに変換する。その際、neural_query_enricher request_processor として、前項の Ingestion pipeline と同じく model_id を保持している。

payload={
  "request_processors": [
    {
      "neural_query_enricher" : {
        "default_model_id": opensearch_embedding_model_id
      }
    }
  ]
}
# パイプライン ID の指定
search_pipeline_id = f"{embedding_model_name}_neural_search_query"
# パイプライン作成 API の呼び出し
response = opensearch_client.http.put("/_search/pipeline/" + search_pipeline_id, body=payload)
print(response)

ニューラル検索のデータロード

上の ingestion_pipeline_id を指定してデータのロードをすることで、ベクトルデータを生成しながら、登録が行われる。
オリジナルは、use_threads=Truebulk_size=10 であったが、ここまで減らさないと途中で死んでしまうので、シングルスレッドで1件ずつの逐次処理になった。結果、15分前後かかる。(ので、本当はなんとかしたい)

index_name = "jsquad-neural-search"
response = wr.opensearch.index_df(
    client=opensearch_client,
    df=valid_df,
    use_threads=False,
    index=index_name,
    bulk_size=1, # 1 件ずつ書き込み
    refresh=False,
    pipeline=ingestion_pipeline_id
)

ニューラル検索

上の search_pipeline_id を指定して、検索を行うことで、クエリをベクトルデータに変換して、検索が行われる。

index_name = "jsquad-neural-search"
query = "日本で梅雨がない場所は?"
payload = {
  "size": 10,
  "query": {
    "neural": {
      "question_embedding": {
        "query_text": query, 
        # model_id の指定は行わない
        "k": 10
      }
    }
  },
  "_source" : False,
  "fields": ["question", "answers",  "context"]
}
# 検索 API を実行
response = opensearch_client.search(
    body = payload,
    index = index_name,
    filter_path = "hits.hits",
    search_pipeline = search_pipeline_id # 新たに追加
)

# 結果を表示
pd.json_normalize(response["hits"]["hits"])

ハイブリッド検索

これも 2025年8月7日の発表分。
ハイブリッド検索は、テキスト検索とベクトル検索を組み合わせた検索である。

index_name = "jsquad-neural-search"
query = "日本で梅雨がない地域は?"

payload = {
  "size": 10,
  "query": {
    "hybrid": {
      "queries": [
        {
          "match": {
            "question": {
              "query": query,
              "operator": "and"
            }
          }
        },
        {
          "neural": {
            "question_embedding": {
              "query_text": query, # テキストをベクトルに変換し
              "k": 10 # クエリベクトルに近いベクトルのうち上位 10 件を返却
            }
          }
        }
      ]
    }
  },
  "_source" : False,
  "fields": ["question", "answers",  "context"]
}
# 検索 API を実行
response = opensearch_client.search(
    body = payload,
    index = index_name,
    filter_path = "hits.hits",
    search_pipeline = hybrid_search_pipeline_id 
)

# 結果を表示
pd.json_normalize(response["hits"]["hits"])

おわりに

これまでのテキストの全文検索にベクトル検索を組み合わせたハイブリッド検索までくると、AI 検索感があるよね?

Workshop には、スパース検索もあったのだが、東京リージョンの Bedrock では、日本語スパース検索が可能なモデルは使用できないようなので、Bedrock の update 待ちということにしたいと思う。

また、Workshop には、セマンティックリランキングもあったのだが、こちらは、Bedrock 上の amazon.rerank-v1:0 へのコネクションとモデル登録と実行まではできたが、rerank response_processor として pipeline に登録する際に、Invalid processor type rerank になってしまったので、たぶん、AOSS の update 待ち。

GitHubで編集を提案
株式会社ROBONの技術ブログ

Discussion