🏎️

【Azure Open AI】トークン超の大容量PDFをLLMに高速で読み込ませたい

2024/10/03に公開

要約

エンベディングモデルを複数展開し、PDFをチャンクに分割して並列処理を行うことで、ベクトルDBインデックス作成の時間を大幅に短縮します。5回の実験結果から、モデル数を増やすと処理時間が短縮されるものの、並行度が高いほど一貫性が低下する傾向が見られました。

課題の概要

コンテキストウィンドウを超過したドキュメントをLLMに読み込ませるベストプラクティスとしてベクトルDBを使用する方法がありますが、

Azure AI Searchを使用して有価証券報告書のような大容量PDFをインデックス作成する際、処理に数分間の時間がかかってしまいます。

これは、大量のデータを一度に処理するため、時間がかかることが原因と考えられます。

仮説と検証方法

仮説

エンベディングモデルを複数展開し、並列処理を行うことでインデックス作成の時間を短縮できるのではないか。

検証方法

  1. 別リージョンにエンベディングモデルを複数作成: Azure OpenAIのエンベディングモデルを複数のリージョンに展開します。
  2. ドキュメントの分割と並列処理: 大容量のPDFをチャンクに分割し、複数のエンベディングモデルで並行してベクトル生成処理を行います。
  3. インデックスへのアップロード: 生成したベクトルをAzure AI Searchにアップロードします。

環境設定と必要なライブラリ

以下のライブラリを使用します。事前にインストールしておいてください。

pip install langchain langchain_community langchain_text_splitters azure-search-documents azure-core

必要なライブラリの一覧:

  • os: 環境変数の管理
  • time: 処理時間の計測
  • concurrent.futures.ThreadPoolExecutor: 並列処理の実装
  • uuid: 一意なIDの生成
  • langchain_community.document_loaders.PyPDFLoader: PDFの読み込み
  • langchain_openai.AzureOpenAIEmbeddings: Azure OpenAIのエンベディング生成
  • azure.search.documents: Azure Searchのクライアント
  • azure.search.documents.indexes: Azure Searchのインデックス管理
  • azure.core.credentials.AzureKeyCredential: 認証情報の管理
  • langchain_text_splitters.RecursiveCharacterTextSplitter: テキストの分割

1. 設定ファイル

まず、必要な設定を定義します。Azure Searchサービス名、APIキー、インデックス名、処理するPDFのパス、およびエンベディングモデルの設定を行います。

# config.py

import os

# Azure Searchの設定
AZURE_SEARCH_SERVICE_NAME = "AZURE_SEARCH_SERVICE_NAME"
AZURE_SEARCH_API_KEY = "AZURE_SEARCH_API_KEY"
AZURE_SEARCH_INDEX_NAME = "AZURE_SEARCH_INDEX_NAME"

# 処理するPDFのパス
PDF_PATH = "PDF_PATH"

# エンベディングモデルの設定
class ConfigAzureOpenAI:
    def __init__(self, api_key, endpoint, deployment) -> None:
        self.api_key = api_key
        self.endpoint = endpoint
        self.deployment = deployment

# 複数のエンベディングモデルを定義
EMBEDDING_MODELS = [
    ConfigAzureOpenAI("api_key_1", "endpoint_1", "deployment_name_1"),
    ConfigAzureOpenAI("api_key_2", "endpoint_2", "deployment_name_2"),
    # 必要に応じて追加
    # ...
    ConfigAzureOpenAI("api_key_10", "endpoint_10", "deployment_name_10"),
]

解説:

  • ConfigAzureOpenAI クラスは、Azure OpenAIのエンベディングモデルの設定を保持します。
  • EMBEDDING_MODELS リストには、使用する複数のエンベディングモデルを定義します。今回は最大10モデルを想定しています。

2. PDFの読み込みと分割

PDFファイルを読み込み、テキストをチャンクに分割します。これにより、各チャンクを個別に処理できるようになります。

# pdf_loader.py

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

def load_and_split_pdf(pdf_path, chunk_size=2000, chunk_overlap=200):
    loader = PyPDFLoader(pdf_path)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    documents = loader.load_and_split(text_splitter=text_splitter)
    return documents

解説:

  • PyPDFLoader を使用してPDFを読み込みます。
  • RecursiveCharacterTextSplitter でテキストをチャンクに分割します。chunk_sizeは1チャンクあたりの文字数、chunk_overlapはチャンク間の重複部分です。

3. エンベディング生成

テキストチャンクからベクトル(エンベディング)を生成します。

# embeddings.py

from langchain_openai import AzureOpenAIEmbeddings

def generate_embeddings(documents, config_azure_openai, chunk_size=2000):
    embeddings = AzureOpenAIEmbeddings(
        azure_deployment=config_azure_openai.deployment,
        api_key=config_azure_openai.api_key,
        azure_endpoint=config_azure_openai.endpoint,
        chunk_size=chunk_size
    )
    return [embeddings.embed_query(doc.page_content) for doc in documents]

解説:

  • AzureOpenAIEmbeddings を使用してエンベディングを生成します。
  • 各ドキュメントの内容 (doc.page_content) をエンベディングに変換します。

4. Azure Searchインデックスのセットアップ

Azure Searchのインデックスをセットアップします。既存のインデックスがない場合は新規作成します。

# search_setup.py

from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import SearchIndex, SimpleField, SearchFieldDataType
from azure.core.credentials import AzureKeyCredential

def setup_search_index(service_name, api_key, index_name):
    index_client = SearchIndexClient(
        endpoint=f"https://{service_name}.search.windows.net",
        credential=AzureKeyCredential(api_key)
    )

    # インデックスが存在しない場合に作成
    try:
        index_client.get_index(index_name)
        print(f"Index '{index_name}' already exists.")
    except:
        fields = [
            SimpleField(name="id", type=SearchFieldDataType.String, key=True),
            SimpleField(name="content", type=SearchFieldDataType.String),
            SimpleField(name="embedding", type=SearchFieldDataType.Collection(SearchFieldDataType.Double))
        ]
        index = SearchIndex(name=index_name, fields=fields)
        index_client.create_index(index)
        print(f"Index '{index_name}' created.")

解説:

  • SearchIndexClient を使用してAzure Searchのインデックスクライアントを作成します。
  • インデックスが存在しない場合、新規に作成します。インデックスには idcontentembedding フィールドを含めます。

5. インデックスへのアップロード

生成したエンベディングをAzure Searchのインデックスにアップロードします。

# upload_search.py

import uuid
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential

def upload_to_search(service_name, api_key, index_name, documents, embeddings):
    search_client = SearchClient(
        endpoint=f"https://{service_name}.search.windows.net",
        index_name=index_name,
        credential=AzureKeyCredential(api_key)
    )

    upload_documents = []
    for doc, emb in zip(documents, embeddings):
        upload_documents.append({
            "id": str(uuid.uuid1()),
            "content": doc.page_content,
            "embedding": emb
        })

    search_client.upload_documents(documents=upload_documents)
    print(f"Uploaded {len(upload_documents)} documents to search index.")

解説:

  • SearchClient を使用してAzure Searchの検索クライアントを作成します。
  • 各ドキュメントに一意なIDを付与し、コンテンツとエンベディングをインデックスにアップロードします。

6. 並列処理の実装

エンベディング生成を並行して実行するための処理を実装します。モデルの数に応じて並行度を変えます。

# parallel_processing.py

from concurrent.futures import ThreadPoolExecutor

def single_model_process(documents, models):
    # 単一モデルでエンベディングを生成
    embeddings = generate_embeddings(documents, models[0])
    return embeddings

def dual_model_process(documents, models):
    split_index = len(documents) // 2
    documents1 = documents[:split_index]
    documents2 = documents[split_index:]

    with ThreadPoolExecutor(max_workers=2) as executor:
        future1 = executor.submit(generate_embeddings, documents1, models[0])
        future2 = executor.submit(generate_embeddings, documents2, models[1])
        embeddings1 = future1.result()
        embeddings2 = future2.result()

    combined_embeddings = embeddings1 + embeddings2
    return combined_embeddings

def five_model_process(documents, models):
    length = len(documents)
    split_size = length // 5
    remainder = length % 5

    indices = []
    start = 0
    for i in range(5):
        end = start + split_size + (1 if i < remainder else 0)
        indices.append((start, end))
        start = end

    parts = [documents[start:end] for start, end in indices]

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(generate_embeddings, part, models[i]) for i, part in enumerate(parts)]
        embeddings = [future.result() for future in futures]

    combined_embeddings = sum(embeddings, [])
    return combined_embeddings

def ten_model_process(documents, models):
    length = len(documents)
    split_size = length // 10
    remainder = length % 10

    indices = []
    start = 0
    for i in range(10):
        end = start + split_size + (1 if i < remainder else 0)
        indices.append((start, end))
        start = end

    parts = [documents[start:end] for start, end in indices]

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(generate_embeddings, part, models[i]) for i, part in enumerate(parts)]
        embeddings = [future.result() for future in futures]

    combined_embeddings = sum(embeddings, [])
    return combined_embeddings

解説:

  • ThreadPoolExecutor を使用して、複数のエンベディングモデルで並列処理を行います。
  • single_model_process は単一モデルで処理を行います。
  • dual_model_processfive_model_processten_model_process はそれぞれ2モデル、5モデル、10モデルで並列処理を行います。
  • 各プロセスでは、ドキュメントを均等に分割し、各モデルでエンベディングを生成します。最後にすべてのエンベディングを結合します。

7. メイン処理

全体の流れを管理するメインスクリプトです。インデックスのセットアップ、PDFの読み込み、エンベディング生成、インデックスへのアップロードを行います。

# main.py

import time
from config import AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, PDF_PATH, EMBEDDING_MODELS
from search_setup import setup_search_index
from pdf_loader import load_and_split_pdf
from upload_search import upload_to_search
from parallel_processing import single_model_process, dual_model_process, five_model_process, ten_model_process

def main():
    # インデックスのセットアップ
    setup_search_index(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME)

    # PDFの読み込みとチャンク分割
    documents = load_and_split_pdf(PDF_PATH)
    print(f"チャンク数: {len(documents)}")

    # 単一モデルでの処理
    start_time = time.time()
    embeddings_single = single_model_process(documents, EMBEDDING_MODELS)
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_single)
    single_duration = time.time() - start_time
    print(f"単一モデルでの処理: {single_duration:.2f} 秒")
    time.sleep(60)

    # 2モデル並行での処理
    start_time = time.time()
    embeddings_dual = dual_model_process(documents, EMBEDDING_MODELS[:2])
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_dual)
    dual_duration = time.time() - start_time
    print(f"2モデル並行での処理: {dual_duration:.2f} 秒")
    time.sleep(60)

    # 5モデル並行での処理
    start_time = time.time()
    embeddings_five = five_model_process(documents, EMBEDDING_MODELS[:5])
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_five)
    five_duration = time.time() - start_time
    print(f"5モデル並行での処理: {five_duration:.2f} 秒")
    time.sleep(60)

    # 10モデル並行での処理
    start_time = time.time()
    embeddings_ten = ten_model_process(documents, EMBEDDING_MODELS[:10])
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_ten)
    ten_duration = time.time() - start_time
    print(f"10モデル並行での処理: {ten_duration:.2f} 秒")

if __name__ == "__main__":
    main()

解説:

  • setup_search_index を呼び出して、Azure Searchのインデックスをセットアップします。
  • load_and_split_pdf でPDFを読み込み、チャンクに分割します。
  • 各モデル数(1, 2, 5, 10)に応じて並列処理を行い、エンベディングを生成します。
  • 生成したエンベディングを upload_to_search でAzure Searchにアップロードします。
  • 各処理の実行時間を計測し、結果を表示します。
  • 処理間に time.sleep(60) を挟んでいますが、これはAzureのAPI制限を避けるためです。必要に応じて調整してください。

実験結果と考察

以下は、5回の実験結果です。

1回目

チャンク数: 236
単一モデルでの処理: 109.08 秒
2モデル並行での処理: 55.75 秒
5モデル並行での処理: 32.99 秒
10モデル並行での処理: 21.95 秒

2回目

チャンク数: 236
単一モデルでの処理: 108.43 秒
2モデル並行での処理: 57.67 秒
5モデル並行での処理: 33.68 秒
10モデル並行での処理: 21.30 秒

3回目

チャンク数: 236
単一モデルでの処理: 105.97 秒
2モデル並行での処理: 54.64 秒
5モデル並行での処理: 45.70 秒
10モデル並行での処理: 46.47 秒

4回目

チャンク数: 236
単一モデルでの処理: 105.83 秒
2モデル並行での処理: 65.25 秒
5モデル並行での処理: 77.24 秒
10モデル並行での処理: 48.06 秒

5回目

チャンク数: 236
単一モデルでの処理: 113.19 秒
2モデル並行での処理: 60.91 秒
5モデル並行での処理: 62.33 秒
10モデル並行での処理: 27.95 秒

考察:

  • 単一モデルでの処理時間は約105~113秒と一定の時間がかかります。
  • 2モデル並行では約55~65秒に短縮されました。
  • 5モデル並行では、初回は約33秒、2回目以降は約45~77秒とばらつきがあります。
  • 10モデル並行では、約21~48秒と最も短い処理時間を記録しましたが、3回目以降は一部の回で時間が延びるケースも見られました。

全体的に、モデル数を増やすことで処理時間は短縮されましたが、並行度が高くなるにつれて一貫性が低下する傾向が見られました。これは、AzureのAPI制限やネットワークの遅延が影響している可能性があります。


結論

エンベディングモデルを複数展開し、並列処理を行うことでAzure AI Searchのインデックス作成時間を大幅に短縮できることが確認できました。具体的には、10モデルを並行して使用することで、処理時間を約21秒まで短縮することができました。

また、Azure AI Searchは処理トークン数で課金されるため、エンベディングモデルを増やしても総コストは変わりません。このため、処理時間を短縮するためにモデルを増やすことは、コストパフォーマンスの面でも有効な手段であると言えます。

ただし、並行度が高くなるにつれて一貫性が低下する可能性があるため、最適なモデル数を見極める必要があります。実際の運用環境においては、処理速度と安定性のバランスを考慮してモデル数を調整してください。


参考コード

以下に、今回の検証で使用した全コードをまとめます。
これらはあくまで検証用のスクリプトです。

config.py

import os

# Azure Searchの設定
AZURE_SEARCH_SERVICE_NAME = "AZURE_SEARCH_SERVICE_NAME"
AZURE_SEARCH_API_KEY = "AZURE_SEARCH_API_KEY"
AZURE_SEARCH_INDEX_NAME = "AZURE_SEARCH_INDEX_NAME"

# 処理するPDFのパス
PDF_PATH = "PDF_PATH"

# エンベディングモデルの設定
class ConfigAzureOpenAI:
    def __init__(self, api_key, endpoint, deployment) -> None:
        self.api_key = api_key
        self.endpoint = endpoint
        self.deployment = deployment

# 複数のエンベディングモデルを定義
EMBEDDING_MODELS = [
    ConfigAzureOpenAI("api_key_1", "endpoint_1", "deployment_name_1"),
    ConfigAzureOpenAI("api_key_2", "endpoint_2", "deployment_name_2"),
    ConfigAzureOpenAI("api_key_3", "endpoint_3", "deployment_name_3"),
    ConfigAzureOpenAI("api_key_4", "endpoint_4", "deployment_name_4"),
    ConfigAzureOpenAI("api_key_5", "endpoint_5", "deployment_name_5"),
    ConfigAzureOpenAI("api_key_6", "endpoint_6", "deployment_name_6"),
    ConfigAzureOpenAI("api_key_7", "endpoint_7", "deployment_name_7"),
    ConfigAzureOpenAI("api_key_8", "endpoint_8", "deployment_name_8"),
    ConfigAzureOpenAI("api_key_9", "endpoint_9", "deployment_name_9"),
    ConfigAzureOpenAI("api_key_10", "endpoint_10", "deployment_name_10"),
]

pdf_loader.py

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

def load_and_split_pdf(pdf_path, chunk_size=2000, chunk_overlap=200):
    loader = PyPDFLoader(pdf_path)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    documents = loader.load_and_split(text_splitter=text_splitter)
    return documents

embeddings.py

from langchain_openai import AzureOpenAIEmbeddings

def generate_embeddings(documents, config_azure_openai, chunk_size=2000):
    embeddings = AzureOpenAIEmbeddings(
        azure_deployment=config_azure_openai.deployment,
        api_key=config_azure_openai.api_key,
        azure_endpoint=config_azure_openai.endpoint,
        chunk_size=chunk_size
    )
    return [embeddings.embed_query(doc.page_content) for doc in documents]

search_setup.py

from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import SearchIndex, SimpleField, SearchFieldDataType
from azure.core.credentials import AzureKeyCredential

def setup_search_index(service_name, api_key, index_name):
    index_client = SearchIndexClient(
        endpoint=f"https://{service_name}.search.windows.net",
        credential=AzureKeyCredential(api_key)
    )

    # インデックスが存在しない場合に作成
    try:
        index_client.get_index(index_name)
        print(f"Index '{index_name}' already exists.")
    except:
        fields = [
            SimpleField(name="id", type=SearchFieldDataType.String, key=True),
            SimpleField(name="content", type=SearchFieldDataType.String),
            SimpleField(name="embedding", type=SearchFieldDataType.Collection(SearchFieldDataType.Double))
        ]
        index = SearchIndex(name=index_name, fields=fields)
        index_client.create_index(index)
        print(f"Index '{index_name}' created.")

upload_search.py

import uuid
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential

def upload_to_search(service_name, api_key, index_name, documents, embeddings):
    search_client = SearchClient(
        endpoint=f"https://{service_name}.search.windows.net",
        index_name=index_name,
        credential=AzureKeyCredential(api_key)
    )

    upload_documents = []
    for doc, emb in zip(documents, embeddings):
        upload_documents.append({
            "id": str(uuid.uuid1()),
            "content": doc.page_content,
            "embedding": emb
        })

    search_client.upload_documents(documents=upload_documents)
    print(f"Uploaded {len(upload_documents)} documents to search index.")

parallel_processing.py

from concurrent.futures import ThreadPoolExecutor
from embeddings import generate_embeddings

def single_model_process(documents, models):
    # 単一モデルでエンベディングを生成
    embeddings = generate_embeddings(documents, models[0])
    return embeddings

def dual_model_process(documents, models):
    split_index = len(documents) // 2
    documents1 = documents[:split_index]
    documents2 = documents[split_index:]

    with ThreadPoolExecutor(max_workers=2) as executor:
        future1 = executor.submit(generate_embeddings, documents1, models[0])
        future2 = executor.submit(generate_embeddings, documents2, models[1])
        embeddings1 = future1.result()
        embeddings2 = future2.result()

    combined_embeddings = embeddings1 + embeddings2
    return combined_embeddings

def five_model_process(documents, models):
    length = len(documents)
    split_size = length // 5
    remainder = length % 5

    indices = []
    start = 0
    for i in range(5):
        end = start + split_size + (1 if i < remainder else 0)
        indices.append((start, end))
        start = end

    parts = [documents[start:end] for start, end in indices]

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(generate_embeddings, part, models[i]) for i, part in enumerate(parts)]
        embeddings = [future.result() for future in futures]

    combined_embeddings = sum(embeddings, [])
    return combined_embeddings

def ten_model_process(documents, models):
    length = len(documents)
    split_size = length // 10
    remainder = length % 10

    indices = []
    start = 0
    for i in range(10):
        end = start + split_size + (1 if i < remainder else 0)
        indices.append((start, end))
        start = end

    parts = [documents[start:end] for start, end in indices]

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(generate_embeddings, part, models[i]) for i, part in enumerate(parts)]
        embeddings = [future.result() for future in futures]

    combined_embeddings = sum(embeddings, [])
    return combined_embeddings

main.py

import time
from config import AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, PDF_PATH, EMBEDDING_MODELS
from search_setup import setup_search_index
from pdf_loader import load_and_split_pdf
from upload_search import upload_to_search
from parallel_processing import single_model_process, dual_model_process, five_model_process, ten_model_process

def main():
    # インデックスのセットアップ
    setup_search_index(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME)

    # PDFの読み込みとチャンク分割
    documents = load_and_split_pdf(PDF_PATH)
    print(f"チャンク数: {len(documents)}")

    # 単一モデルでの処理
    start_time = time.time()
    embeddings_single = single_model_process(documents, EMBEDDING_MODELS)
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_single)
    single_duration = time.time() - start_time
    print(f"単一モデルでの処理: {single_duration:.2f} 秒")
    time.sleep(60)

    # 2モデル並行での処理
    start_time = time.time()
    embeddings_dual = dual_model_process(documents, EMBEDDING_MODELS[:2])
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_dual)
    dual_duration = time.time() - start_time
    print(f"2モデル並行での処理: {dual_duration:.2f} 秒")
    time.sleep(60)

    # 5モデル並行での処理
    start_time = time.time()
    embeddings_five = five_model_process(documents, EMBEDDING_MODELS[:5])
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_five)
    five_duration = time.time() - start_time
    print(f"5モデル並行での処理: {five_duration:.2f} 秒")
    time.sleep(60)

    # 10モデル並行での処理
    start_time = time.time()
    embeddings_ten = ten_model_process(documents, EMBEDDING_MODELS[:10])
    upload_to_search(AZURE_SEARCH_SERVICE_NAME, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME, documents, embeddings_ten)
    ten_duration = time.time() - start_time
    print(f"10モデル並行での処理: {ten_duration:.2f} 秒")

if __name__ == "__main__":
    main()

Discussion