Open9

Amazon OpenSearch Serviceのワークショップをやりつつ「ベクトル検索に関連するワード群」と「ベクトル検索用DBとしてのOpenSearchの価値」について整理する

Shogo TasaiShogo Tasai

本スクラッチの目的

ベクトルデータベースとしてのOpenSearchの価値ってどういうところにあるんだろう・そもそも使い勝手どんなもんだろうというところを手を動かしながら掴もうと思い、以下のワークショップに取り組んでいたところ
https://catalog.us-east-1.prod.workshops.aws/workshops/df655552-1e61-4a6b-9dc4-c03eb94c6f75/en-US

概念的な説明もある程度のボリュームがあるワークショップで今までのセマンティック検索に関するふんわりした理解を整理するのにいい機会だと思ったので、解説を読みつつ手を動かしつつ(+理解を深めたい点があれば調べつつ)自分なりに解釈した内容をメモしていくことにした

Shogo TasaiShogo Tasai

OpenSearchで利用しうる検索関連の技術の変遷(1)

従来型のテキスト検索

テキストの検索処理において、OpenSearchでは兼ねてからOkapi BM25という検索結果の順位付け用アルゴリズムをデフォルトのアルゴリズムとして用いてきた

BM25の前身のTF-IDFについて

BM25はTF-IDFというアルゴリズムの改良アルゴリズムであるため
その基本的な挙動の理解についてはTF-IDFについて知っておいた方がよい
TF-IDFはTerm Frequency-Inverse Document Frequencyの略だ
昔のVersionのLucene・Elasticsearchで用いられていたアルゴリズムである

語が表す通り、検索条件のワードをinputとして
TFとIDFの両方の仕組みでもって、どのドキュメントを重要とみなすかを
判定するアルゴリズムである

  • TF:検索条件のワードが「あるドキュメントに出現する頻度(Term Frequency)」すなわち「ワードが文書内でたくさん使われていればいるほど」そのドキュメントの優先順位を高めに算定するロジック
  • IDF:検索条件のワードが「全文書中で出現する珍しさの度合い(Inverse Document Frequency)」すなわち「ワードが全ドキュメント中であまり普遍的に使われていないものである場合に、そのワードを収録する数少ないドキュメント」について、より高い優先順位を置くロジック

検索における具体挙動

TF-IDFが採用されている頃のLuceneか何かで
「亜麻色の髪」というワードで検索をかけたと仮定しよう
(なぜ例が「亜麻色の髪」なのかはツッコミ無用。パッと思いついたのがこれだったからだけ)

そのシステム内では以下のような挙動をたどることが予想される

  • クエリ解析:検索ワードがトークナイズされて、「亜麻色」「の」「髪」というような形で分割・トークン化される
    • 「の」は実際のワークロードではストップワード(検索対象外ワード)とされる可能性が高そうだが、今回は説明の便宜上そのようには仮定しない
  • 文書検索:トークン化されたワードを含むドキュメントがインデックスから検索の上抽出される
  • スコアリング:TF-IDFアルゴリズムが抽出された各ドキュメントにスコアを割り当てる
    • この際「の」はTFアルゴリズムだけであれば、おそらくダントツで出現頻度が高くなり過大評価されて、逆に「亜麻色」や「髪」の比重は薄れてしまうであろうと予測される
    • 一方でIDFアルゴリズムがあることで「の」の重要性の著しい低さ・「亜麻色」のような語句の希少さがきちんと加味された上でスコアリングが期待できるというわけだ
  • ランキング & 返却:計算されたスコアに基づいて、ドキュメントは関連性の高い順にランク付けされて、スコアが高いドキュメントのいくつかを検索結果としてユーザーに返却する

改めてBM25について

基本的な挙動・コンセプトについてはTF-IDFを踏襲しつつ
以下のような点に関する改良を加えたことで
より現実的な検索ワークロードにより即した形となったアルゴリズムである
BMはBest Matchの略

  • 文書の長さの正規化: 文書の長さが異なれば、当然長い文書の方がより多くの単語が出現する傾向がある。BM25では文書の長さを考慮に入れて、より公平な比較を試みるようにしている
  • 飽和度 (Saturation): BM25では、ある単語の出現回数が多くなるにつれてその単語の重要性が増す度合いが徐々に低くなる。つまり、単語がある回数以上ドキュメント内で出現しても、単純に比例して重要性が高く見積もられるということがなくなる

備考

ハンズオンの解説では以下のように記述があるが、記事執筆時点でOpenSearchの最新バージョンは2.11である

Relevance scoring is the backbone of any search engine, It is really important to understand how it works while creating a search engine using Amazon OpenSearch Service. Amazon OpenSearch Service uses 2 kinds of similarity scoring function. Until version 5.0 it was using Term-Frequency and Inverse Document Frequency (TF-IDF) and after version 5.0 it uses Okapi BM25. In this section we will learn how these are different and how they play a vital role in determining relevance of a document.

超瑣末な点だが、↑の話はfolk元のElasticsearchの話で、OpenSearchについてはv1.0リリース当初からBM25をアルゴリズムとして採用していたことと思う

Shogo TasaiShogo Tasai

OpenSearchで利用しうる検索関連の技術の変遷(2)

セマンティック検索が必要とされる背景

BM25・TF-IDFのような従来型のキーワード検索では、デフォルトではあるワード(ex.憤怒)に対する検索で類義語(ex.忿懣)を引っ掛けることはできない。シノニムのような機構でカバーは可能であるものの、多種のワードに関するシノニムの管理はなかなかしんどいものがある。また、シノニムでもってしても文脈を考慮した検索は難しい。

有用なヘルプデスク用のChatやナレッジデータベースを構築するにあたっては、キーワード検索だけではユーザに有益な知識を提供することは難しくなってきており、自然言語を入力として受け付けてそのニュアンスをうまくとらまえて趣旨に沿った検索結果を提供するセマンティック検索が求められるようになってきている

ベクトル化

セマンティック検索を行うためには「検索対象となるドキュメント」の特徴量を比較できる形で数値化して表現・保存しておく必要がある。その主要な方式の1つがベクトル化である。ベクトル = 数ⅡBで学んだあのベクトルであり、ベクトル化の結果は [10 20 30 40 50]といった座標形式である。ただし、現実にセマンティック検索で用いられるベクトル化は数次元程度のレベルではなく数十から数百、ときには数千といったレベルの非常に高次で複雑な形で表現される

そして、ベクトル化された諸々のデータを検索するにあたり、ユーザとしては自然言語で検索しているつもりでいても、システムの裏側ではその検索条件の自然言語もベクトル化される。その上で、検索条件のベクトル化結果と近しい座標(特徴量)のデータが候補として提示されるというわけだ

備考

厳密に言えばTF-IDFのようなアルゴリズムもデータのベクトル化を実施していたようである。ただし、そこで生成されるベクトルの要素の多くはスパース(0)な単純構造であり(ただしベクトル化の次元数自体は多いらしい)、ベクトルデータベースのようなベクトル化データの保存・管理に特化しないものでも格納・検索に対処することは可能であった・・・ということのようだ

この点は自身の理解に対する自信は6割なので識者の方で「それは違うぞ」という方がいればぜひ教えてやってください

Word Embedding

セマンティック検索用のデータの保存・検索時のワークロードはもちろんのこと、語句・文章の類似性を機械学習モデルに学習させるワークロードにおいても、非構造化データである文字列のままでは色々と処理上苦しいところがある。

ということで、よりエンリッチされて互いに比較可能な構造化されたベクトル空間の世界に、文字列を組み込む・埋め込む(Embedding)手法が必要とされるわけだが、その手法のことをWord Embeddingと表現したりするようだ。今日ではベクトル化 ≒ Word Embeddingと捉えて差し支えないと考えている

Word Embeddingを実装するにあたっては複数のアルゴリズムの候補がある

Word2Vec

セマンティック検索が今日のレベルにまで進化を遂げる一歩前の段階のアルゴリズムとしてはWord2Vecがある。Word2Vecとはその名のごとく、単語レベルでのベクトル化を実現する。これにより、先ほど例に挙げた1語も被らない憤怒と忿懣が非常に似たニュアンスを持つ語句であることがベクトルで表現しうるようになった。

しかしWord2Vecでは単語より広い「文脈」を考慮することはできなかった

例えば以下のような同じbankでありながら、それぞれの文章内で大きく意味が異なる用法で使われていたとしてもWord2Vecにおいて両者のbankは全く同じベクトル表現がなされてしまうことだろう

  • The man was accused of robbing a bank.
    • その男は「銀行」強盗の嫌疑で告訴された
  • The man went fishing by the bank of the river.
    • その男は川の「土手」に釣りに行った

このような弱点を克服し、セマンティック検索の精度・有用性を向上させる大きな画期となったアルゴリズムがBERTである

BERT

BERTとはBidirectional Encoder Representations from Transformersの略。Bidirectionalすなわち双方向の学習を取り入れていることがこのアルゴリズムの味噌となる。

どういうことかというと、文の一方向のみ(左から右、または右から左)の文脈のみならず、BERTでは両方向からの文脈を考慮できるようになった。もう少しいうとTransformerというアーキテクチャの存在により、文章中に存在する各単語の他のすべての単語との関係性の強弱を同時に計算できるようになったのである。のみならず、先ほどの例で挙げたような「bank」のような語についても文章中で組み合わられている単語の種別によっては異なる意味を持つことも学習しうるようになった
また、BERTではCLSトークンという機構があり、各文章の先頭にこのトークンが挿入されるが学習の過程を通じて文章全体の文脈を表すように調整されてもいる(このCLSトークンについてもベクトルでもって表現される)。単語レベルの文脈理解の仕組みだけではなく、文章全体の特徴量も図る機構を有しているわけである

上述のような計算・学習は、1つの文章を読み込ませただけで有効に機能するはずがない。多種多様なデータセットの比較ありきで機能する仕組みである。そこで膨大な文章と高スペックなGPUマシン等を用いて事前学習(大量の文章の読み込み・比較)を行いそうした強弱を計算・判定できるモデルを作り推論処理・検索処理に利用するわけだ。ハイスペックな物理マシンが開発され、それらのマシンを所有せずに必要な時だけ利活用できるクラウドの技術が土台にあればこそのある種 力技の技術と言えるだろう

Shogo TasaiShogo Tasai

OpenSearchで利用しうる検索関連の技術の変遷(3)

BERTのようなアルゴリズム・モデルの登場により、単語レベルと文章レベルの両方で文脈に基づくベクトル表現を生成することができるようになった。ただ、ベクトル表現で保存されたデータ群を「どのような検索するか/検索するにあたってどのような手段の候補があるか」についてはまた別個のテーマである

k-NNについて

ベクトル化されたデータを検索するにあたって、有力な手法の一つにk-NNがある。
k-NNはk-Nearest Neighborsの略で、kは可変の変数を表している。k-NNはkで指定された数だけ「『あるデータ』に最も近いとみなせるデータ」をピックアップするためのアルゴリズムである

1つ前のスクラッチで触れた通り、ベクトルデータに対する検索においては保存データだけではなく検索条件もベクトル化される。

k-NNでは以下のようなアルゴリズムでもって、検索条件と各保存済みベクトルデータ間の関連性を計算する

  • ユークリッド距離:2点のベクトル間の直線距離を計算する
  • コサイン類似度:2点のベクトルに関して方向の類似性を計算する(ベクトルの大きさ・長さについては考慮しない)

上記の関連度計算に基づいて、検索条件ベクトルに最も近しいk個のベクトルデータを識別するというわけである

k-NNにはさらに大別してExact kNNとApproximate kNNの2種類が存在する

Exact kNNについて

Exact kNNは、愚直に全ベクトルデータを対象にして、検索条件ベクトルに最も近いk個のポイントを正確に見つけ出す方法だ。正確性は極めて高い一方で、高次元データや大規模データセットに対してこの方法を採用する場合に非常に高コストになるであろうことは想像に難くないだろう

Approximate kNNについて

そこで、近似値で構わないからより効率的な形でk個のデータを探し出せないかというアプローチが当然出てくる。それがApproximate kNNである。

HNSWについて

Approximate kNNのアプローチのうち、もっとも有名な手法の一つがHNSW(Hierarchical Navigable Small Worlds)である

Hierarchicalという形容詞の通り、DB上のベクトルデータが異なるレイヤーに階層的に分けられることになる。上層には少数の代表的なベクトルが、下層にはより多くの詳細なベクトルが配置される。上層の一部のベクトルデータAから一番近いデータBをピックアップして、さらにその後より密度高くデータが分布する下層レイヤーに降りデータB付近に存在するAに近いデータCをさらにピックアップして・・・というように芋蔓式に近しい点を辿っていくのである

文章だけで説明するのがちょっと苦しいレベルの話のように感じるが、実際問題イメージがつかない方はLIFULLのエンジニアの宮崎さんの図解等を参照する理解の助けになるかもしれない
https://www.docswell.com/s/LIFULL/Z1JQ88-2023-09-05-100726#p6

この方式では、一部のデータセットのみを走査すればよいため検索コストはExact kNNよりも低くなり高次元データや大規模データセットに対しても応用が効くようになる。その代わり厳密に最も近しいベクトルデータを探すことは難しくなるほか、多層のレイヤー構造の構築においてExact kNNと比べて大きなメモリ消費を必要として、さらに新規データの投入時も当該データをこのレイヤーに組み込む処理が発生するため投入にも時間がかかるようになることがデメリットと言える

ベクトルデータと従来式のindexの相性の悪さ

ここまで整理してきて、私が意外に思ったのは「ベクトルの座標についてあらかじめindexを構成しておいて、検索条件のベクトルに近しいベクトルについて即座にあたりをつけて引っ張ってくるのでなしに、全部のデータを走査したりHNSWみたいなまどろっこしいことをやらざるを得ない」ということである

ただ、よく考えればこれはもっともなことかもしれない

まず、RDBで商品分類のようなカラムにindexを貼る場合と異なり、多分ベクトルデータのような高次元なデータに関しては各データ間は(比較的近しいデータであっても)おそらくかなりの距離の隔たりが生まれるであろうから効率の良い類型化はできずおそらくは索引は膨大な数にわたってしまうだろう

その上、高次元データでは距離計算が複雑であるから索引のうちどれが検索条件のベクトルと一番近いかは即座に計算・判断できないことと思う。数が多ければ尚更だ

だから、単純な座標に基づくindexを作成したところで、結局はExact kNNをやっているのとあまり変わらない・・・という状態になってしまうような気がする

多分「せめてできること」として落ち着いたのが、データの分布をもとに比較的近しいデータ同士のクラスタリング・分類を検索の都度行うのではなく事前にざっくりと実施しておき、HNSWのようなアプローチで多層のレイヤーを構築した上で、最上層には各クラスタ内の代表的なデータポイントを配しておきそこから徐々に近しい座標を辿っていく・・・というHNSW的なアプローチなのだろうと思う

Shogo TasaiShogo Tasai

Module3(Semantic Search)・Module4(Fullstack semantic search)について

これからは実際に手を動かしていくパート

メモ・雑感

Mean pooling

Mean poolingというワードが事前説明なく出てくるが結構重要な処理の気配

  • テキストデータ(通常は文や文章)は、まずトークン(単語やサブワード)に分割され、それぞれに対してベクトル埋め込み(embedding)が生成される
  • これら個々のトークンの埋め込みを集約して、文全体の代表的なベクトルを作成するMean poolingという手法が存在する

以下例

import torch
from transformers import AutoTokenizer, AutoModel
from transformers import DistilBertTokenizer, DistilBertModel

#model_name = "distilbert-base-uncased"
#model_name = "sentence-transformers/msmarco-distilbert-base-dot-prod-v3"
model_name = "sentence-transformers/distilbert-base-nli-stsb-mean-tokens"


#Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask


def sentence_to_vector(raw_inputs):
    tokenizer = DistilBertTokenizer.from_pretrained(model_name)
    model = DistilBertModel.from_pretrained(model_name)
    inputs_tokens = tokenizer(raw_inputs, padding=True, return_tensors="pt")
    
    with torch.no_grad():
        outputs = model(**inputs_tokens)

    sentence_embeddings = mean_pooling(outputs, inputs_tokens['attention_mask'])
    return sentence_embeddings

ベクトル化データをOpenSearch Serviceに投入する具体要領

以下のようにk-NN検索に対応するindexであることを明示した上で、素の文章も保存しつつ、ベクトル化の結果をその文章と同じdocumentに保存する形式をとる

ぼんやりベクトルデータ用indexとそうでないindexは分けないといけないかと思っていたがそうではないらしい。キーワード検索もセマンティック検索も1つのデータストアで手間なく実現できそうなのは便利かも

knn_index = {
    "settings": {
        "index.knn": True,
        "index.knn.space_type": "cosinesimil",
        "analysis": {
          "analyzer": {
            "default": {
              "type": "standard",
              "stopwords": "_english_"
            }
          }
        }
    },
    "mappings": {
        "properties": {
            "question_vector": {
                "type": "knn_vector",
                "dimension": 768,
                "store": True
            },
            "question": {
                "type": "text",
                "store": True
            },
            "answer": {
                "type": "text",
                "store": True
            }
        }
    }
}

ベクトル化データを検索する具体要領

以下のような具合で文章のベクトル化を実施した上で

query_raw_sentences = ['does this work with xbox?']
search_vector = sentence_to_vector(query_raw_sentences)[0].tolist()

希望する返却QA数(=k)を指定した上で、ベクトル化済みソースをinputとして検索を実施する

query={
    "size": 30,
    "query": {
        "knn": {
            "question_vector":{
                "vector":search_vector,
                "k":30
            }
        }
    }
}

res = aos_client.search(index="nlp_pqa", 
                       body=query,
                       stored_fields=["question","answer"])
#print("Got %d Hits:" % res['hits']['total']['value'])
query_result=[]
for hit in res['hits']['hits']:
    row=[hit['_id'],hit['_score'],hit['fields']['question'][0],hit['fields']['answer'][0]]
    query_result.append(row)

query_result_df = pd.DataFrame(data=query_result,columns=["_id","_score","question","answer"])
display(query_result_df)

Webアプリ用の構成

Lambda等でSagaMakerのエンドポイントの推論等を使って、検索ワードをベクトル化した上で
そのベクトル化データをinputにして直前のOpenSearchのリクエストのようなものを実施する

Module 5: Fine tuning semantic search (Optional) についてはFine tuningの原理・プログラミングの具体要領を掴んでからでないと、やってもよくわからなそうなのでskip

Shogo TasaiShogo Tasai

Module 6(Neural Search)について

メモ・雑感

OpenSearchのNeural Searchプラグイン

今回のハンズオンの一番の「ほげぇぇぇ」ポイントになるかもしれない。
OpenSearchではLambdaとかSageMakerを要さずに、投入されたdocumentや検索文を
ベクトル化する仕組みを有している

ベクトル化についてはカスタムモデルを組み込むことも可能なようであるが
事前に用意されたモデルを利用することも可能である
https://opensearch.org/docs/latest/ml-commons-plugin/pretrained-models/

OpenSearch上でpre-trainedモデルを活用できるようにするまでのステップ

通常の要領でAOSクライアントを生成した上で

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
import json

kms = boto3.client('secretsmanager')
aos_credentials = json.loads(kms.get_secret_value(SecretId=outputs['OpenSearchSecret'])['SecretString'])

region = 'us-east-1' 

#credentials = boto3.Session().get_credentials()
#auth = AWSV4SignerAuth(credentials, region)
auth = (aos_credentials['username'], aos_credentials['password'])

index_name = 'nlp_pqa'

aos_client = OpenSearch(
    hosts = [{'host': aos_host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

Neural Search専用のクライアントを一般のAOSクライアントを引数に生成。適切なモデル名・モデルバージョンを選択した上で

from opensearch_py_ml.ml_models import SentenceTransformerModel
from opensearch_py_ml.ml_commons import MLCommonClient

ml_client = MLCommonClient(aos_client)
model_id = ml_client.register_pretrained_model(model_name = "huggingface/sentence-transformers/all-MiniLM-L12-v2", model_version = "1.0.1", model_format = "TORCH_SCRIPT", deploy_model=False, wait_until_deployed=False)
print(model_id)

実機にそのモデルを食わせる

load_model_output = ml_client.deploy_model(model_id)

print(load_model_output)

pipelineの構成

モデルをOpenSearchにデプロイしただけではモデルは動くようにならず、pipelineという機構の構成が必要な模様。pipelineは説明なしに出てきたため後でちょっと勉強して説明を補いたい

こんな感じで構成するらしい

pipeline={
  "description": "An example neural search pipeline",
  "processors" : [
    {
      "text_embedding": {
        "model_id": model_id,
        "field_map": {
           "question": "question_vector"
        }
      }
    }
  ]
}
pipeline_id = 'nlp_pipeline'
aos_client.ingest.put_pipeline(id=pipeline_id,body=pipeline)

サンプルのindexは以下の通りなので、おそらくは新規indexが投入された際にquestion fieldの値をもとにquestion_vectorというベクトル値を自動で埋め込んでくれるのが↑のpipelineの役割なのだろう

knn_index = {
    "settings": {
        "index.knn": True,
        "index.knn.space_type": "cosinesimil",
        "default_pipeline": pipeline_id,
        "analysis": {
          "analyzer": {
            "default": {
              "type": "standard",
              "stopwords": "_english_"
            }
          }
        }
    },
    "mappings": {
        "properties": {
            "question_vector": {
                "type": "knn_vector",
                "dimension": 384,
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "faiss"
                },
                "store": True
            },
            "question": {
                "type": "text",
                "store": True
            },
            "answer": {
                "type": "text",
                "store": True
            }
        }
    }
}

検索の要領

やはりquestion fieldに値が入っているdocumentを投入するだけでpipelineが駆動して
ベクトル化が実施されてquestion_vectorに値が入っていく模様

その後以下のようなbodyを用いて検索を実施すると、Neural Searchが問題なく実施可能

query={
  "_source": {
        "exclude": [ "question_vector" ]
    },
  "size": 30,
  "query": {
    "neural": {
      "question_vector": {
        "query_text": "does this work with xbox?",
        "model_id": model_id,
        "k": 30
      }
    }
  }
}

なお従来型の全文検索のbodyを対比のために付すと以下の通り

query={
    "size": 30,
    "query": {
        "match": {
            "question":"does this work with xbox?"
        }
    }
}
Shogo TasaiShogo Tasai

Module8(Conversational Search)について

Module7は印象に残る箇所がなかったので特に何も書き残さない

LangChainについて

LangChainについて生成AI周りのコーディングをする際の便利ツールという印象は元々もっていたが想像以上に数多のリソースと連携でき便利である印象を受けた

Webページの読み込み

こんな感じでラフにページを読み込める

from langchain.document_loaders import UnstructuredURLLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size = 1000, chunk_overlap = 100)

urls = ["https://docs.aws.amazon.com/opensearch-service/latest/developerguide/bp.html",
        "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/sizing-domains.html", 
        "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/petabyte-scale.html",
        "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-dedicatedmasternodes.html",
        "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/cloudwatch-alarms.html"]
url_loader = UnstructuredURLLoader(urls=urls)
url_texts = url_loader.load_and_split(text_splitter=text_splitter)
all_splits = url_texts
print(all_splits[0])

最後のprintの出力例

page_content='AWS\n\nDocumentation\n\nAmazon OpenSearch Service\n\nDeveloper Guide\n\nMonitoring and alerting\n\nShard strategy\n\nStability\n\nPerformance\n\nSecurity\n\nCost optimization\n\nOperational best practices for Amazon OpenSearch Service\n\nThis chapter provides best practices for operating Amazon OpenSearch Service domains and includes general\n        guidelines that apply to many use cases. Each workload is unique, with unique\n        characteristics, so no generic recommendation is exactly right for every use case. The most\n        important best practice is to deploy, test, and tune your domains in a continuous cycle to\n        find the optimal configuration, stability, and cost for your workload.\n\nTopics\n\nMonitoring and alerting\n\nShard strategy\n\nStability\n\nPerformance\n\nSecurity\n\nCost optimization\n\nSizing Amazon OpenSearch Service domains\n\nPetabyte scale in Amazon OpenSearch Service\n\nDedicated master nodes in\n                Amazon OpenSearch Service' metadata={'source': 'https://docs.aws.amazon.com/opensearch-service/latest/developerguide/bp.html'}

SagaMaker連携

例えばSageMaker上にWord Embedding用の推論エンドポイントを用意しておき、さくっと推論結果を得ることができる

from typing import Any, Dict, Iterable, List, Optional, Tuple, Callable
import json
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.embeddings.sagemaker_endpoint import EmbeddingsContentHandler
from langchain.schema import Document

class BulkSagemakerEndpointEmbeddings(SagemakerEndpointEmbeddings):
        def embed_documents(
            self, texts: List[str], chunk_size: int = 5
        ) -> List[List[float]]:
            results = []
            _chunk_size = len(texts) if chunk_size > len(texts) else chunk_size

            for i in range(0, len(texts), _chunk_size):
                response = self._embedding_func(texts[i:i + _chunk_size])
                results.extend(response)
            return results
        
class EmbeddingContentHandler(EmbeddingsContentHandler):
        content_type = "application/json"
        accepts = "application/json"

        def transform_input(self, prompt: str, model_kwargs={}) -> bytes:

            input_str = json.dumps({"text_inputs": prompt, **model_kwargs})
            return input_str.encode('utf-8') 

        def transform_output(self, output: bytes) -> str:

            response_json = json.loads(output.read().decode("utf-8"))
            embeddings = response_json["embedding"]
            if len(embeddings) == 1:
                return [embeddings[0]]
            return embeddings

print(embedding_endpoint_name)
embeddings = BulkSagemakerEndpointEmbeddings( 
            endpoint_name=embedding_endpoint_name,
            region_name=aws_region, 
            content_handler=EmbeddingContentHandler())

OpenSearch連携

OpenSearchをベクトルデータベースとして扱って便利にデータを投入できる機能もある
以下のコードは「SagaMaker連携」の紹介コードの続きと思って見てほしい(「SagaMaker連携」の紹介コードのembeddingsという変数を処理上で使っている)

from langchain.vectorstores import OpenSearchVectorSearch

os_domain_ep = 'https://'+aos_host

embedding_index_name = 'opensearch_kb_vector'

if len(all_splits) > 500:
    for i in range(0, len(all_splits), 500):
        start = i
        end = i+500
        if end > len(all_splits):
            end = len(all_splits)-1
        docs = all_splits[start:end]
        OpenSearchVectorSearch.from_documents(
            index_name = embedding_index_name,
            documents=docs,
            embedding=embeddings,
            opensearch_url=os_domain_ep,
            http_auth=auth
        )
        print(f"ingest documents from {start} to {end}", start, end)
else:
    OpenSearchVectorSearch.from_documents(
            index_name = embedding_index_name,
            documents=all_splits,
            embedding=embeddings,
            opensearch_url=os_domain_ep,
            http_auth=auth
        )
    print(f"ingest documents")

RAGのretriverとしての役割を持たせることもできる

retriever = open_search_vector_store.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={
        'k': 5,
        'score_threshold': 0.62
    }
)

RAGのソースの提示

Kendraみたいな「よしなにやる」系のサービスでないと実装が難しいとぼんやり思い込んでいたがLangChainにかかればそんなことはなさそうな雰囲気

qa_with_source = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    return_source_documents=True,
    chain_type_kwargs={"prompt": prompt_template2}
)

print("Question is:" + question)
result = qa_with_source({"query": question})

print("result:" + result["result"])
print("\n\n===========================")
print("\nsource documents:")
for doc in result["source_documents"]:
    print(doc)
    print("---------------------------\n")

以下最後のresult["source_documents"]をfor文に回した際のprint例
return_source_documents=Trueが機能する条件はよくわからないので後で調べたい

source documents:
page_content="If your minimum storage requirement exceeds 1 PB, see Petabyte scale in Amazon OpenSearch Service.\n\nIf you have rolling indexes and want to use a hot-warm architecture, see\n                            UltraWarm storage for Amazon OpenSearch Service.\n\nChoosing the number of shards\n\nAfter you understand your storage requirements, you can investigate your indexing\n                strategy. By default in OpenSearch Service, each index is divided into five primary shards and one\n                replica (total of 10 shards). This behavior differs from open source OpenSearch,\n                which defaults to one primary and one replica shard. Because you can't easily change\n                the number of primary shards for an existing index, you should decide about shard\n                count before indexing your first document." metadata={'source': 'https://docs.aws.amazon.com/opensearch-service/latest/developerguide/sizing-domains.html'}
---------------------------

DynamoDB連携

DynamoDBをチャットの履歴管理に用いることができるDynamoDBChatMessageHistoryというクラスが用意されている。ドキュメント読む暇がなかったので後で具体的な使い方を調べたい。BedrockやSagaMakerのような機械学習系サービス以外のサービスと連携できる機能・クラスに目を向けると面白い掘り出し物がありそう

from uuid import uuid4
from typing import Dict
from langchain.memory import ConversationBufferMemory
from langchain.memory import DynamoDBChatMessageHistory
from langchain.memory import ConversationBufferWindowMemory
from langchain import PromptTemplate, SagemakerEndpoint
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains import RetrievalQA

(中略)

ddb_table_name = "conversation-history-memory"
session_id = str(uuid4())
chat_memory = DynamoDBChatMessageHistory(
        table_name=ddb_table_name,
        session_id=session_id
    )

messages = chat_memory.messages

# Maintains immutable sessions
# If previous session was present, create
# a new session and copy messages, and 
# generate a new session_id 
if messages:
    session_id = str(uuid4())
    chat_memory = DynamoDBChatMessageHistory(
        table_name=ddb_table_name,
        session_id=session_id
    )
    # This is a workaround at the moment. Ideally, this should
    # be added to the DynamoDBChatMessageHistory class
    try:
        messages = messages_to_dict(messages)
        chat_memory.table.put_item(
            Item={"SessionId": session_id, "History": messages}
        )
    except Exception as e:
        print(e)

memory = ConversationBufferMemory(chat_memory=chat_memory, memory_key="chat_history", return_messages=True)
Shogo TasaiShogo Tasai

現在のOpenSearch Serviceで利用できるベクトルデータの検索技術

handsonに突入する前の座学編の最後に書いておけばよかった

保存できるベクトルデータについて

おそらくどのモデル・アルゴリズムに基づいて行われたベクトル化の結果も格納可能

扱えるベクトルデータの検索技術について

基本的にはk-NNがメインとなる
Approximate kNNについては3つのエンジンを指定可能である

  • Non-Metric Space Library (NMSLIB) – NMSLIB implements the HNSW ANN algorithm
  • Facebook AI Similarity Search (FAISS) – FAISS implements both HNSW and IVF ANN algorithms
  • Lucene – Lucene implements the HNSW algorithm

また、FAISSについては先述のHNSW以外にもIVFというアプローチを使いうる(FAISS HNSW、FAISS IVFおよびLucene HNSWについては後ほど調べて追記しておく)
https://aws.amazon.com/jp/blogs/big-data/amazon-opensearch-services-vector-database-capabilities-explained/

Exact K-NNも扱える
https://opensearch.org/docs/latest/search-plugins/knn/knn-score-script/

Shogo TasaiShogo Tasai

v2.11の注目機能

handson用のAOSクラスターを覗いていた際にリコメンドされて気づいたのだが、11/17にv2.11がAmazon OpenSearch Serviceでデプロイできるようになっていた
https://docs.aws.amazon.com/opensearch-service/latest/developerguide/release-notes.html

具体的に何ができるようになったかをつらつら見ると今回のhandson絡みで気になるアップデートを発見。RAG pipeline ???
https://opensearch.org/blog/get-started-opensearch-2-11/

Customize pipelines for retrieval augmented generation
Enhancing the conversational search functionality introduced in OpenSearch 2.10 as an experimental toolkit, this release introduces several new parameters that can be used to customize retrieval augmented generation (RAG) pipelines. These optional parameters provide core logic that allows you to adapt the way OpenSearch interacts with large language models (LLMs) as part of generative artificial intelligence (GenAI) applications. See the documentation for this feature to explore the available parameters.

RAG pipeline

このページに記されているその他の機能も気になるのだけれども一旦脇に置いて、RAG pipelineの欄だけを見ると、OpenSearchに対して質問(Query)を投げることで、OpenSearch内のベクトルデータから適当と思われるdocumentsをピックアップ & Bedrock等のAI基盤とシームレスに接続・引き渡して、OpenSearchから回答を返却できる機能っぽい。つまりLambdaとかなしでRAGが成立する。なかなかなクール機能では・・・?(ちょっとこれは実機を触りたいなぁ)

https://opensearch.org/docs/latest/ml-commons-plugin/conversational-search/#rag-pipeline