🏃

サーバーレスマルチモーダルRAG構築:学習ログ

に公開

やったこと

  • AWSサーバーレス(Lambda、S3、Step Functions、Bedrockなど)だけでRAGを構築
  • KendraやOpenSearchは使わず、OSSベースで動作を確認
  • 目的は、RAGを動作させるためにどのサービスをどのように組み合わせればよいかを自分で理解すること
  • 月1万ファイルに対して約15ドルで稼働

完成品

https://github.com/okdryk/serverless-multimodal-rag

画面キャプチャ

  • フォルダアップロード画面
    フォルダアップロード

  • フォルダアップロード後、インデックス作成
    インデックス作成

  • チャットで質問
    チャットで質問

  • 回答と根拠の提示
    回答と根拠の提示

全体アーキテクチャ

全体アーキテクチャ

主な特徴

  • フォルダアップロードで一括処理可能
  • 対応ファイル形式:txt, docx, pptx, pdf, excel, md
  • 画像データも解析対象に含むマルチモーダル対応
  • チャンキング・埋め込み・検索処理を並列化し高速化
  • FAISSをベクトルデータベースとして使用し、そのインデックスはS3に保存
  • AWS Amplify, Lambda, SQS, DynamoDB, Step Functions, Bedrockで構成
  • イベント駆動型で低コスト運用

技術詳細

  • ファイル解析:Lambda関数で各種ファイルをパース
  • チャンキングと埋め込み:Step Functionsで並列実行
  • 画像対応:マルチモーダルモデルに生成させた画像の説明文を埋め込みして対応
  • ベクトル検索:FAISSをS3に保存し高速検索
  • RAG:Bedrockを使い生成モデルで回答生成
  • イベント駆動:S3イベント → SQS → Lambda連携

参考リポジトリ

https://github.com/aws-samples/serverless-pdf-chat

  • 特徴:
    • KendraもOpenSearchも未使用
    • LambdaでPDF分割 → Bedrockでベクトル化 → FAISSでインデックス → S3保存
    • 検索もLambda経由 → コストがオンデマンド

参考にしたリポジトリは現状PDF1つのみに対応。
ここから以下の機能を追加実装する。また、実装に伴う課題への対策も記述する。

  • 複数ファイル対応
    • Lambda制限と並列処理
    • 検索処理の並列化
  • ドキュメント内の画像を検索対象に含める

機能の設計と実装

複数ファイル対応

課題: 複数ファイルをアップロードしたことを検知してインデックス作成を実行したい。1ファイルだけならS3トリガーで可能だが、複数ファイルのアップロードを検知する仕組みがない。
対策: 個別アップロードをカウントして完了判定を行う

設計の特徴

  • DynamoDBでアップロード数管理 + Streamsで完了検知
  • 状態管理が可能で可視化できる

処理の流れ

  1. クライアントがローカルのフォルダを指定して、Lambdaに署名付きURL発行をリクエスト
  2. LambdaがDynamoDBにアップロード想定数を記録する
  3. 署名付きURLを発行してクライアントにレスポンス
  4. クライアントが署名付きURLを使用してファイルをアップロードしていく
  5. ファイルアップロードの度にS3アップロードトリガーでカウンタLambdaを実行
  6. カウンタLambdaはDynamoDBのアップロード数を更新する
  7. レコードの更新がある度にDynamoDB Streamsでアップロード数確認Lambdaを実行する
  8. 想定アップロード数に達していたらインデックス作成マシンを実行する

Lambda制限と並列処理

課題: Lambdaには最大メモリ10GB、最大実行時間15分の制限あり
対策: ファイル単位・チャンク単位でLambdaを並列処理実行する

インデックス作成の並列化

イメージ

実装

StepFunctionsにファイル一覧をjsonとして入力し、Distributed Mapで1ファイルずつ処理する。ファイルの処理を並列で行い、さらにチャンクの処理を並列で行う。

  1. S3からファイルを取得してチャンキングを行う
  2. チャンクをSQSキューに送信
  3. ベクトル埋め込みの完了チェック用にチャンク数の合計数を送信しておく。
  4. チャンクをLambdaへ送信
  5. LambdaでBedrockのAPIを利用してベクトル化を行いS3に保存する
  6. ベクトル埋め込みが完了したことをDynamoDBへ通知する
  7. すべてのベクトル埋め込み処理が完了しているかどうか1秒おきにチェックする
  8. すべてのベクトル埋め込みが完了したらインデックス作成を行う
  9. S3からベクトルを取得してインデックス作成を行う。

サーバーレスでFan-out/Fan-inを実現する方法は以下の記事を参考
https://theburningmonk.com/2024/08/whats-the-best-way-to-do-fan-out-fan-in-serverlessly-in-2024/

チャンキングと埋め込み

  • ドキュメントに対して行うチャンキングと埋め込み処理をLambdaで実装
  • PDF / Word / Excel / PowerPoint / Markdownに対応
  • 文末で分割し、チャンク単位でベクトル化
  • FAISSインデックスを作成しS3に保存
# 文末で区切る
sentences = re.findall(r'[^。!?.!?]*[。!?.!?]?', text)
sentences = [s.strip() for s in sentences if s.strip()]

# 1チャンクあたり500文字以内にする
chunks = []
current_chunk = []
max_chunk_size = 500

for sent in sentences:
    current_chunk.append(sent)
    if sum(len(s) for s in current_chunk) > max_chunk_size:
        chunks.append("".join(current_chunk[:-1]))
        current_chunk = [current_chunk[-1]]

if current_chunk:
    chunks.append("".join(current_chunk))
# bedrockでベクトル埋め込み
response = bedrock_runtime.invoke_model(
    modelId=MODEL_ID,
    body=json.dumps({"inputText": text.strip()}),
    contentType='application/json',
    accept='application/json'
)

検索処理の並列化

  • ファイル単位で検索 → 上位結果を集約 → リランキング

ドキュメント内の画像を検索対象に含める

  • ドキュメントのチャンキング処理と同時に画像を抽出する
  • AIに画像の説明文を生成させる
  • 説明文はチャンクテキストと同じベクトル空間に埋め込む

ドキュメントのチャンキング処理と同時に画像を抽出する

def extract_images_from_xlsx(excel_path: str, output_dir: str) -> bool:
    seen_hashes = set()
    wb = load_workbook(excel_path)
    image_saved = False

    for sheet_name in wb.sheetnames:
        sheet = wb[sheet_name]
        for idx, image in enumerate(sheet._images, start=1):
            if isinstance(image, XLImage):
                try:
                    if hasattr(image, "_data") and image._data:
                        img_data = image._data()
                        img = Image.open(io.BytesIO(img_data))
                        filename = f"{os.path.splitext(os.path.basename(excel_path))[0]}_{sheet_name}_img{idx}.png"

マルチモーダルモデルに画像の説明文を生成させる

def lambda_handler(event: Dict[str, Any], context):
    for record in event.get("Records", []):
        try:
            body = record.get("body")
            task = json.loads(body) if isinstance(body, str) else body
            uri = f"s3://{BUCKET}/{task.get('s3ImageObjectKey')}" # 画像を保存しているS3バケットのURIを指定

            request_body = build_request_body(task, uri)

            response = invoke_bedrock_with_retries(request_body, task['s3ImageObjectKey'])
            caption = extract_caption_from_response(response)

            send_caption_to_queue(task, caption) # ベクトル埋め込み処理用SQSに送信

    return {'status': 'success', 'message': 'Caption generated and sent to TEXT_QUEUE'}


def build_request_body(task: Dict[str, Any], uri: str) -> Dict[str, Any]:
    user_prompt = (
        f"\n\nDocument file name: {task['subfolderPath']}{task['fileName']}\n"
        "This image is extracted from a document and may contain text, figures, tables, or screenshots of the interface. "
        "Please create a clear and concise Japanese caption describing the content, no more than 500 characters."
    )

    return {
        "schemaVersion": "messages-v1",
        "system": [
            {
                "text": (
                    'You are a professional image caption creator. Please create descriptive and natural Japanese captions based on the images and related information provided by users. '
                    'Captions should include the following: '
                    '- String displayed in the image, '
                    '- A description of what you see in the image, '
                    '- Relevant background information (if available). '
                    'Please do not guess and omit any unknown parts.'
                )
            }
        ],
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "image": {
                            "format": "png",
                            "source": {
                                "s3Location": {
                                    "uri": uri,
                                    "bucketOwner": ACCOUNT
                                }
                            }
                        }
                    },
                    {"text": user_prompt}
                ]
            }
        ],
        "inferenceConfig": {
            "max_new_tokens": 300,
            "top_p": 0.1,
            "top_k": 20,
            "temperature": 0.3
        }
    }

生成した説明文はドキュメントのチャンクテキストと同じ処理を辿っていき、最終的に同じベクトル空間へ埋め込まれる。

最終アーキテクチャ図


コスト

大まかな計算で以下の通り
月1万ファイルのPDFをアップロード:約9ドル
月1万回質問:約6ドル
合計:15ドル

1万ファイル規模でもかなり低コストで実現できることがわかった

コスト試算詳細

条件

  • PDF(2MB、50チャンク)をアップロード
  • 無料利用枠は考慮しない
  • 東京リージョン

単価

サービス 単価
Lambda GB-秒あたり $0.0000166667、100万リクエストあたり $0.20
SQS 100万リクエストあたり $0.40
DynamoDB(書き込み) 100 万あたり $0.715
DynamoDB(読み出し) 100 万あたり $0.1425
Bedrock (Titan Text V2) 1,000トークンあたり $0.000029

ファイルアップロードのコスト

項目 詳細 金額 (USD)
Lambda(署名付きURL) 512MB × 10000ms $0.0000833335
DynamoDB(書き込み) 500(ファイル) $0.0003575
DynamoDB(読み出し) 500(ファイル) $0.00007125
合計 $0.0005120835

埋め込み処理のコスト

項目 詳細 金額 (USD)
Lambda(埋め込み) 512MB × 150ms × 25,000回 $0.03625
Lambda(インデックス) 1024MB × 3000ms × 500回 $0.0251
SQS 25,000メッセージ(500ファイル × 50チャンク) $0.01
DynamoDB(書き込み) 25,000(500ファイル × 50チャンク) $0.017875
Bedrock (Titan Text V2) 12,500,000トークン(25,000チャンク × 500トークン) $0.3625
合計 $0.451725

検索処理1回あたりのコスト

処理項目 詳細 金額 (USD)
Lambda(検索) 1024MB × 500ms × 1回 $0.00000833
SQS 500メッセージ(1メッセージ/ファイル) $0.0002
DynamoDB(書き込み) 500 $0.0003575
DynamoDB(読み出し) 500 $0.00007125
合計(検索1回) $0.00063708

学び・感想

  • イベント駆動(S3トリガー・DynamoDB Streams・SQS)はベストプラクティスを取り入れ学びになった
  • Lambda制限を考慮した分散処理設計は必須
  • サーバーレスは楽しいが構成が複雑になる

Discussion