🦜

LangChain の Vectorstore として Azure Cache for Redis を使ってベクトルの格納と検索を行う

2023/05/29に公開

はじめに

LangChainVectorestore として Azure Cache for Redis を使おうとしたときに LangChain のドキュメントを読むだけでは一筋縄ではいかなかったため、一連の手順と参考情報へのリンクをまとめました。

全体として以下の絵のようなことを行います。

方法

1. Python

以下のパッケージを Pyhton 実行環境にインストールします。バージョンは執筆時点での最新版です。tensorflow-datasets は今回使用するサンプルデータの準備に使います。

2. Azrue Cache for Redis Enterprise

ドキュメントの手順に従って Azure Cache for Redis のリソースを作成します。今回使用する langchain.vectorstores.Redisredis-py のラッパーとして内部的には RediSearch モジュールに含まれる Vector similarity を使っています。そのため、リソース作成時には Enterprise レベルを選択したうえで RediSearch モジュールを追加しておく必要があります。

参考

3. Azure OpenAI Service

Embedding のために Azure OpenAI Service を準備します。

3.1. リソース作成

Azure OpenAI Service を使うには 利用申請 が必要です。利用申請が承認されたら、ドキュメントの手順に従ってリソースを作成します。

参考

3.2. モデルデプロイ

Vectorstore への格納時にテキストをベクトルに変換する Embedding モデルを指定する必要があるため、Azure OpenAI Service の Embedding シリーズのモデルをデプロイします。
OpenAI の Embedding モデルには複数のシリーズが存在していますが、執筆時点で最新の text-embedding-ada-002 シリーズを除く V1 モデルは既に全て非推奨になっています。よって、今回は text-embedding-ada-002 シリーズの中で最も新しい text-embedding-ada-002 (Version 2) をデプロイします。

モデル名 最大トークン トレーニングデータ 米国東部リージョン 米国中南部リージョン 西ヨーロッパリージョン
text-embedding-ada-002 (Version 2) 8,191 2021 年 9 月 まで
text-embedding-ada-002 (Version 1) 4,095 2021 年 9 月 まで

参考

4. 実装

4.1. データ準備

サンプルとして Wiki-40B日本語データセットを使います。今回はタイトルに Microsoftマイクロソフト が含まれる記事を取得します。以下のスクリプトを実行すると、146 のテキストファイルが作成されます。
なお、記事の内容はデータセットが作成された 2020 年時点のものですので、現在の記事とは異なる可能性があります。

import os
import re
import tensorflow_datasets as tfds

output_dir = "./ms_articles"
os.makedirs(output_dir, exist_ok=True)
for split in ["train", "validation", "test"]:
    ds = tfds.load("wiki40b/ja", split=split)
    for idx, record in enumerate(ds):
        text = record["text"].numpy().decode("utf-8")
        wiki_id = record["wikidata_id"].numpy().decode("utf-8")
        m = re.search("_START_ARTICLE_\n(.+)\n", text)
        if m:
            article_title = m.group(1).strip()
            if "マイクロソフト" in article_title or "Microsoft" in article_title:
                filename = f"{output_dir}/{wiki_id}_{article_title}.txt"
                with open(filename, "w") as f:
                    f.write(text)

[補足] Wiki-40B データのフォーマット

データセットには特殊なマーカー文字列が含まれています。

  • _START_ARTICLE_ (記事のタイトル)
  • _START_SECTION_ (セクションの開始)
  • _START_PARAGRAPH_ (段落の開始)
  • _NEWLINE_ (段落内の改行)

生データは以下のようなフォーマットです。

_START_ARTICLE_
<タイトル>
_START_SECTION_
<セクション名>
_START_PARAGRAPH_
<文>_NEWLINE_<文>_NEWLINE_...
_START_SECTION_
<セクション名>
_START_PARAGRAPH_
<文>_NEWLINE_<文>_NEWLINE_...

4.2. 環境変数の設定

Azure OpenAI Service のキーとエンドポイントAzure Cache for Redis のアクセスキーおよびリソース名を事前に環境変数に設定しておきます。

import os

os.environ["OPENAI_API_TYPE"] = "azure" # 固定値
os.environ["OPENAI_API_BASE"] = "https://<your-aoai-resource-name>.openai.azure.com/"
os.environ["OPENAI_API_KEY"] = "<your-aoai-api-key>"
os.environ["OPENAI_API_VERSION"] = "2023-05-15" # 執筆時点で最新

os.environ["REDIS_NAME"] = "<your-redis-resource-name>"
os.environ["REDIS_KEY"] = "<your-redis-access-key>"

4.3. テキスト読み込みとチャンク化

以下のスクリプトを実行すると、事前に準備しておいたマイクロソフト関連の記事を読み込んで長い記事のチャンク化 (分割) を行います。前述のとおり Wiki-40B データセットにはマーカー文字列が含まれていますので、今回はその中から _START_SECTION_ をセパレーターとして使い、とりあえず 2,000 トークンを目安にチャンク化を行います。

あわせて、Vectorestore にベクトルと一緒に格納するメタデータも作成します。

  • Wikidata ID (Wiki-40B データセットで付与されている記事 ID)
  • 記事タイトル
  • 記事テキストファイルのパス
  • 記事の合計チャンク数 (1 つの記事がいくつのチャンクに分割されたか)
  • チャンク通し番号 (このチャンクはその中で何番目か)
import os
import re

from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores.redis import Redis as redis_vectorstore

dir_path = "./ms_articles"
files = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]

texts = []
metadatas = []

for file in files:
    # 記事IDと記事タイトルを取得 (ファイル名のフォーマット: <id>_<title>.txt)
    article_id_title = os.path.splitext(file)[0].split("_")
    article_id = article_id_title[0] 
    article_title = article_id_title[1]
    file_path = os.path.join(dir_path, file)
    
    # テキスト読み込み
    with open(file_path, 'r') as f:
        raw_text = f.read()
    
    # 読み込んだ記事のタイトル部分を削除
    pattern = "_START_ARTICLE_\n(.+)\n"
    raw_text_wo_title = re.sub(pattern, "", raw_text)
    
    # チャンクに分割
    splitter = CharacterTextSplitter.from_tiktoken_encoder(
        encoding_name="cl100k_base", # text-embedding-ada-002のエンコーディング
        separator="_START_SECTION_", # セクション単位で分割
        chunk_size=2000,             # なるべく2,000トークン
        chunk_overlap=0              # チャンク分割の際のオーバーラップなし
    )
    chunks = splitter.split_text(raw_text_wo_title)
    chunk_count = len(chunks)
    
    # メタデータを作成
    metadata = [
        {
            "article_id": article_id,       # 記事ID
            "article_title": article_title, # 記事タイトル
            "file_path": file_path,         # ファイルパス
            "chunk_count": chunk_count,     # 合計チャンク数
            "chunk_number": i+1             # チャンク通し番号
        } for i in range(0, chunk_count)
    ]
    
    # リストに追加
    texts.extend(chunks)
    metadatas.extend(metadata

# wiki40bデータセットのマーカー文字列を削除 
texts = [
    t.replace("_START_ARTICLE_\n","")          # 行ごと削除
    .replace("_START_PARAGRAPH_\n","")         # 行ごと削除
    .replace("_START_SECTION_\n","")           # 行ごと削除
    .replace("_NEWLINE_","\n") for t in texts] # 改行コードに置き換え

上記の設定でチャンク化を行うと 222 のチャンクに分割されますが、一部チャンクは 2,000 トークンを超えてしまいます。(つまり、単一のセクションで 2,000 トークンを超えているものがあります。)
しかし、今回使用する text-embedding-ada-002 (Version 2) の最大トークンは 8,191 で問題なく処理できるためこれ以上の分割は行いません。

Created a chunk of size 2428, which is longer than the specified 2000
Created a chunk of size 2096, which is longer than the specified 2000
Created a chunk of size 2188, which is longer than the specified 2000
Created a chunk of size 2056, which is longer than the specified 2000
Created a chunk of size 2662, which is longer than the specified 2000

参考

4.4. Embedding と Vectorstore (Redis) への格納

続けて以下のスクリプトを実行すると、チャンクの Embedding が行われ、メタデータとともに Vectorstore (Redis) へ格納されます。

# Embeddingに使うAOAIのモデルを指定(接続情報は事前に設定した環境変数から読み込まれる)
embeddings = OpenAIEmbeddings(
    deployment="text-embedding-ada-002", # デプロイ名(≠モデル名)
    chunk_size=1                         # Embeddingのバッチサイズは1にする
)
index_name = "ms_articles"
redis_url = "rediss://:" + os.getenv("REDIS_KEY") + "@" + os.getenv("REDIS_NAME") + ".eastus.redisenterprise.cache.azure.net:10000"

# EmbeddingとVectorestoreへの格納
vec_store = redis_vectorstore.from_texts(
    texts=texts,           # チャンク(文字列)のリスト
    metadatas=metadatas,   # メタデータ(辞書)のリスト
    embedding=embeddings,  # Embeddingモデル
    index_name=index_name, # インデックス名
    redis_url=redis_url    # Redis接続情報
)

参考

[補足] Redis URL の構文について

Azure Cache for Redis Enterprise への接続を行う際の URL は以下の構文で記述します。また、Azure Cache for Redis への通信は SSL が必須となるため、URL スキームに rediss:// を選択します。

"rediss://:<your-redis-key>@<your-redis-resource-name>.eastus.redisenterprise.cache.azure.net:10000"

※ 米国東部リージョンにリソースを作成した場合の例

参考

[補足] langchain.embeddings.OpenAIEmbeddings の chunk_size

OpenAIEmbeddings (OpenAI embedding モデルのラッパー) の chunk_size (ここでの chunk_size はバッチサイズのこと) はデフォルトで 1,000 に設定されていて、そのままリクエストを行うと Azure OpenAI Service の制限にかかり Embedding が行えない場合があります。そのため、chunk_size を 1 など小さな値に設定して制限にかからないようにします。

制限にかかった場合のエラーメッセージ:

InvalidRequestError: Too many inputs for model None. The max number of inputs is 1. We hope to increase the number of inputs per request soon. Please contact us through an Azure support request at: xxx for further questions.

参考

5. 検索

ここまでの一連のコードを実行してきている場合 vec_store インスタンスをそのまま使うことで検索が行えますが、現実的には Vectorstore へのベクトル格納と検索は別の文脈で行う場合が多いと思われるため、改めて既存のインデックスに接続する書き方をします。
検索を実行すると、クエリの文字列がベクトル化されて Vectorestore 中で類似性が高いチャンクが指定した件数だけ返されます。

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores.redis import Redis as redis_vectorstore

# Embeddingに使うAOAIのモデルを指定(接続情報は事前に設定した環境変数から読み込まれる)
embeddings = OpenAIEmbeddings(
    deployment="text-embedding-ada-002", # OAIのデプロイ名(≠モデル名)を指定する
    chunk_size=1                         # Embeddingの同時実行数は1にする
)
index_name = "ms_articles"
redis_url = "rediss://:" + os.getenv("REDIS_KEY") + "@" + os.getenv("REDIS_NAME") + ".eastus.redisenterprise.cache.azure.net:10000"

# 既存のインデックスに接続
vec_store_fei =  redis_vectorstore.from_existing_index(
    embedding=embeddings,  # Embeddingモデル
    index_name=index_name, # インデックス名
    redis_url=redis_url    # Redis接続情報
)

query = "マイクロソフトのクラウド"
results = vec_store_fei.similarity_search(query, k=3)

for r in results:
    print(r.metadata)
    print(r.page_content)
    print("\n")

この例では "マイクロソフトのクラウド" というクエリに対して、類似性が高い 3 つ の記事 (Microsoft Online ServicesMicrosoft Office 365Microsoft Azure) のチャンクが返されます。

{'article_id': 'Q3124991', 'article_title': 'Microsoft Online Services', 'file_path': './ms_articles/Q3124991_Microsoft Online Services.txt', 'chunk_count': 1, 'chunk_number': 1}
Microsoft Online Servicesのサービス一覧
Microsoft Online Servicesには、以下のサービスがある。

{'article_id': 'Q775811', 'article_title': 'Microsoft Office 365', 'file_path': './ms_articles/Q775811_Microsoft Office 365.txt', 'chunk_count': 2, 'chunk_number': 1}
Microsoft Exchange Online
最新版の Exchange Server をマイクロソフトがホスティングして提供するクラウドサービス。Microsoft Exchange Online は50GBの電子メールボックス、および予定表共有・連絡先・仕事リストを管理するPIM機能を提供する。また、メールボックスに配信されるボイスメール、標準搭載されたスパム対策・ウイルス対策機能、アーカイブ機能、スレッドビュー、メールヒントなどの機能を提供する。デスクトップからはMicrosoft Outlookを通じてアクセスが可能であり(一部廉価プランを除く)、WebブラウザならOutlook Web Appを通じて、また、Windows PhoneからはスマートフォンからはOutlook Mobileを通じてアクセスすることができる。Exchange ActiveSyncにより、様々なモバイルデバイスからExchangeサービスへのアクセスが可能。iPhone/iPad では、OWA for iPhone/iPad というアプリを利用して、Exchange ActiveSyncだけでは利用できない機能も利用可能。
Microsoft SharePoint Online
マイクロソフトがホスティングして提供するクラウドサービスの一つであり、ビジネスコラボレーション プラットフォームである。つまり業務で使用する共同作業の場を提供し、生産性を向上させるために支援していく。Microsoft SharePoint Onlineによって、組織内外に様々なポータルサイト(個人用サイト、チームサイト、イントラネットサイト、エクストラネットサイトなど)を作成できるようになっており、ドキュメントの公開や同時編集なども可能になっている。また、リストと呼ばれる簡易データベース的に利用できる機能などを持っていたり、ニュースフィードと呼ばれるマイクロブログ機能を備えているなど、業務チームメンバー間での情報共有とコミュニケーションを包括的に支援できるようになっている(ただし、Office 365 ではコミュニケーションツールとして Yammer に置き換えることが可能になってるほか、Microsoft Teams といった新しいコミュニケーションツールが登場しており、コミュニケーションをすべて SharePoint で賄うというながれではなくなってきている)。ちなみに、以前は、インターネット公開用のWebサイトを作成することも可能であったが現在は廃止されており、クローズドな企業内または企業間での情報共有に利用されている。特に SharePoint Onlineでは設定によっては社外の利用者とのファイル共有などもできるようになっており(もちろん、組織内の管理者による許可が必要)、社外のパートナー企業間での情報のやり取りをセキュアに行えるようになっている。個人向けのファイルストレージとしてOffice 365 にはOneDrive for Business(旧SkyDrive Pro)と呼ばれるサービスが提供されているが、これの基盤は SharePoint となっている。既定では1TBまで利用できるようになっており、Office 365 のサブスクリプションによっては無制限に利用できるようになっている。SharePoint や OneDrive for Business はPCとの同期ツールが提供されており、オンラインやオフライン問わずファイルを閲覧したり編集したりすることも可能になっている。また SharePoint および OneDrive for Business はWindowsストアアプリやiPhone/iPadからアクセスできるアプリも提供されている。なお、オンプレミス(社内設置型) では最新版は SharePoint Server 2016が提供されており、SharePoint Online で提供されている機能を一定の時点で切り出して社内設置型用に提供するといった形態となっており、最新機能は SharePoint Onlineに先に投入される形になっている(クラウド ファースト)。場合によっては SharePoint Onlineでしか利用できない機能もあり、オンプレミス版の SharePoint と SharePoint Onlineでは利用できる機能差が年々大きくなっている。
Skype for Business Online
Skype for Business Server 2015 をマイクロソフトがホスティングして提供するクラウドサービス。Skype for Business Onlineはプレゼンス情報、インスタントメッセージ、PC同士の音声・ビデオ通話と、アプリケーション共有・デスクトップ共有・ホワイトボードなどを駆使したオンライン会議の機能を提供する。Skype for Business Onlineの機能には、Skype for Businessクライアントからアクセスする。Skype for Business Onlineを使うと、Officeアプリケーション内にプレゼンス情報を表示したり、プレゼンスアイコンからワンクリックでコミュニケーションを開始したりすることができる。WindowsストアアプリやiPhone/iPad、Androidからアクセスできるアプリも提供されている。2015年4月にLync Onlineから名称変更。

{'article_id': 'Q725967', 'article_title': 'Microsoft Azure', 'file_path': './ms_articles/Q725967_Microsoft Azure.txt', 'chunk_count': 1, 'chunk_number': 1}
概要
Microsoft Azureはマイクロソフトのデータセンターにあるクラウド プラットフォームで、アプリケーションとデータをホストしている。アプリケーションの動作環境(Microsoft Azure)と、Windows Azure AppFabric(旧名称は.NET Services。クラウドのミドルウェア サービス)、SQL Database(クラウドのRDB)を提供する。
Microsoft Azureは、Compute、Storage、Fabricという3つを核に構成する。Computeは計算資源を提供するサービス、Storageはスケーラブルなストレージ サービス(BLOB、テーブル、キュー)を示唆する。Microsoft Azureのホスティング環境は、Fabric Controllerと呼び、個々のシステムのリソース、ロード バランシング、複製、アプリケーションのライフ サイクルをホストしているアプリケーションが明示的に対処することなく自動的にネットワーク中に共同で使用するようにする。加えて、ストレージ サービスのような大部分のアプリケーションが必要とする他のサービスを提供する様になっている。Fabric上は、Windows Azure Platformの一部のサービスをアプリケーションは利用できる様になっている。商用サービス開始当初は公式機能以外のインストールが必要なコンポーネント(たとえば帳票作成やPDFファイル生成用コンポーネント)を利用することはできなかった。しかし、VMロール(Windows Azure Virtual Machine Role)の提供(2012年現在ベータリリース)や、起動前処理を行えるように機能拡張された為、これまでWindows Server OS上で動作していたアプリケーションの多くをMicrosoft Azure上でも稼働できるようになっている。
開発者はREST、HTTP、XMLを組み合わせたAPIを使用して Microsoft Azure が提供するサービスと対話を行う。サービスと対話を行うライブラリはクライアントサイドにもマネージド クラス ライブラリで提供される。Microsoft Azureでホストされるアプリケーションの開発と発行はVisual StudioといったIDEを使用して行う。
Microsoft Azure 実装
Windows Azure Platformは特化されたオペレーティングシステム Windows Azureを用い"fabric layer"を実行する。"fabric layer"とは、マイクロソフトにてホスティングされたクラスタであり、Microsoft Azure上にて動作するアプリケーションへの計算資源およびストレージの割り当て、および管理を行う。Microsoft Azureは開発中は"Red Dog"というコードネームで知られ、Windows Server 2008とカスタム化されたHyper-VであるWindows Azure Hypervisor上の"cloud layer"として説明されており、サービスの仮想化を行うものである。
SQL Database
SQL Database(旧称SQL Azure)は、SQL Server 2008をベースに開発されたクラウド上の関係データベースエンジンである。SQL Server 2008をベースに開発が行われているため、SQL Serverに対応したアプリケーションやツールとの親和性が高く、SQL Serverで使用していたツールがそのまま使用でき、SQL Databaseへの移行が比較的簡単におこなうことができる。
Microsoft Azure Marketplace
Microsoft Azure Marketplaceは、多様な商用・非商用データ(人口統計や金融情報など)といったコンテンツを保持しているプロバイダ(パブリッシャ)が容易に販売することができるオンライン上のマーケットプレースである。利用者はMicrosoft Azure Marketplaceを通じ、検索・共有・購入等を行うことができる。また通常データはODataを通じて提供され、アプリケーションからの利用も可能となっている。
Microsoft Azure Marketplaceにはデータを提供するMicrosoft Azure Marketplace Dataセクションと、Windows Azure Platformに関連する多様なカテゴリのサービスやアプリケーションを提供するWindows Azure Marketplace Applicationsセクションの2つが存在する。Windows Azure Marketplace Dataは2010年に米国で開催されたMicrosoft PDC2010にて正式リリースされた。またWindows Azure Marketplace Applicationsは、2011年7月10日に米国で開催されたMicrosoft Worldwide Partner Conference 2011にて正式リリースがアナウンスされた。
セキュリティについて
サービス開始当初、Microsoft Azureのデータはアメリカ合衆国など海外のデータセンターに置かれる事になり、情報漏えいや、海外にデータを置くことによる問題などが指摘されていた(当然のことながら、Amazon EC2、Google App Engineなどの競合サービスも同様である)。海外にデータを置く問題とは、例えば何らかの事件があってFBI(=米国の連邦捜査局)にデータをすべて押収される可能性などである。
2010年7月には、富士通がMicrosoft Azureの技術を用いた自社ブランドのクラウドサービスを提供している。
マイクロソフトによると、Microsoft Azure は アプリケーション開発者およびサービス管理者向けのセキュリティを強化する仕組みがあるとしており、公式サイトにて関連情報を公開している。
2014年2月26日からはマイクロソフト自身が埼玉県に日本(東)リージョン、大阪府に日本(西)リージョンを開設し運用している。

参考

[補足] インデックス作成時と検索時に行われていること

redis-py のラッパーである langchain.vectorstores.Redis を使った処理は、内部的には以前書いた記事の後半で行ったこととほぼ同じことをしています。

インデックス作成時:

    def _create_index(
        self, dim: int = 1536, distance_metric: REDIS_DISTANCE_METRICS = "COSINE"
    ) -> None:
        try:
            from redis.commands.search.field import TextField, VectorField
            from redis.commands.search.indexDefinition import IndexDefinition, IndexType
        except ImportError:
            raise ValueError(
                "Could not import redis python package. "
                "Please install it with `pip install redis`."
            )

        # Check if index exists
        if not _check_index_exists(self.client, self.index_name):
            # Define schema
            schema = (
                TextField(name=self.content_key),
                TextField(name=self.metadata_key),
                VectorField(
                    self.vector_key,
                    "FLAT",
                    {
                        "TYPE": "FLOAT32",
                        "DIM": dim,
                        "DISTANCE_METRIC": distance_metric,
                    },
                ),
            )
            prefix = _redis_prefix(self.index_name)

            # Create Redis Index
            self.client.ft(self.index_name).create_index(
                fields=schema,
                definition=IndexDefinition(prefix=[prefix], index_type=IndexType.HASH),
            )
  • VectorField を持つインデックスを作成。
  • indexing method としては FLAT (Brute-force index) が使われる。(固定値)
  • 検索時の distance metrics としてはデフォルトで cosine distance が使われる。

検索時:

    def _prepare_query(self, k: int) -> Query:
        try:
            from redis.commands.search.query import Query
        except ImportError:
            raise ValueError(
                "Could not import redis python package. "
                "Please install it with `pip install redis`."
            )
        # Prepare the Query
        hybrid_fields = "*"
        base_query = (
            f"{hybrid_fields}=>[KNN {k} @{self.vector_key} $vector AS vector_score]"
        )
        return_fields = [self.metadata_key, self.content_key, "vector_score"]
        return (
            Query(base_query)
            .return_fields(*return_fields)
            .sort_by("vector_score")
            .paging(0, k)
            .dialect(2)
        )
  • Vector similarity の KNN query に変換して検索される。

参考

おわりに

LangChain + Azure Cache for Redis + Azure OpenAI Service の組み合わせでベクトル検索を行うことができました。LangChain のようなフレームワークは大規模言語モデル (LLM) をアプリケーションに組み込む際に汎用的に使える一方で、内側で行われている処理が見えづらかったり、細かい制御が効かないといった側面もありますので、適材適所で使うことが大切だと感じました。

以上です。🍵

Microsoft (有志)

Discussion