😳

Snowflake Cortex SearchでRAGを構築してみた

に公開

はじめに

企業のデータウェアハウスとして広く利用されているSnowflakeですが、近年のAI活用需要に応えて、RAGの構築基盤としても注目を集めているのをご存知でしょうか?

Snowflake Cortex Searchを活用することで、既存のデータウェアハウスの利用を拡張し、RAG(Retrieval-Augmented Generation)システムを効率的に構築することが可能です。

本記事では、実際のシステム構築とその際の注意点を記載いたします。

この記事のGoal

  • RAGとはなにかわかること
  • Snowflake Cortex Searchを活用したRAGの実装ができること
  • 大規模な文章の場合のチャンク化の効果と実装方法がわかること

既存のSnowflake環境を活用した実用的なRAGシステムの構築が可能にできれば幸いです。

対象読者

  • Snowflakeを用いたRAGシステム構築を検討しているかた
  • RAGを聞いたことはあるが、実際に構築がしたことがない方
  • 企業における生成AI活用に課題を抱いている方

Snowflakeアーキテクチャの概要

Snowflake概要図

Snowflakeは、クラウドネイティブなデータプラットフォームとして設計されたデータウェアハウスソリューションです。

特徴

  • 完全管理型: サーバー管理不要
  • 自動スケーリング: その際の処理量に応じて自動拡張
  • マルチクラウド対応: AWS、Azure、GCPで利用可能
  • SQL標準対応: 慣れ親しんだSQLでデータ操作が可能

RAGを構築する際のSnowflakeの良さ

  1. Cortex Search機能: 外部サービスを使用せずにRAG構築が可能
  2. スケーラブルな処理能力: 大規模なデータに対して並列処理とオートスケーリングが可能
  3. 既存アセットの活用: 既存の蓄積されたデータを追加的な移行コストなしで活用可能

RAG(Retrieval-Augmented Generation)とは?

RAGは、検索(Retrieval)生成(Generation) を組み合わせたAI手法です。

従来のLLMにおける制約

  • 知識の不足の問題: 訓練データの時点以降の情報への対応が不可(ユーザ入力によって学習するケースもあり)
  • ハルシネーション: 事実に基づかない回答生成(ハルシネーション)のリスクがある
  • 企業固有情報の欠如: 組織内データへのアクセス制限がある

RAGによって解決できること

  • 動的情報取得: 最新のドキュメントベースからの情報検索
  • 根拠の透明性: 検索されたソース文書に基づく回答生成
  • プライベートデータ活用: 企業内部データの安全な統合

最近では企業における生成AI活用において、RAGを用いることは一般的な解決策となってきました。

システム実装

今回用いたデータ

今回は生成AIにダミーのデータとして以下のようにDummy PCという架空の製品のマニュアルを作成してもらいました。

dummy_pc.md
# DummyPC

# DummyPC ユーザーマニュアル

## はじめに

本マニュアルは、DummyPC(ダミー・ピーシー)をご利用いただく上で必要となる情報をまとめたものです。本機は、初心者から上級者まで幅広いユーザーに対応した高性能かつカスタマイズ性に優れたパーソナルコンピュータです。ハードウェアの取り扱いからソフトウェアの基本操作、トラブルシューティング、定期メンテナンス、さらにはセキュリティ対策まで、できる限り詳細なガイドを提供します。

---

## 安全上のご注意

- **感電防止**:電源プラグを抜く際は、コードではなくプラグ本体を持って抜いてください。
- **火災予防**:機器の上に物を置かないでください。また通気口を塞がないよう注意してください。
- **物理的損傷防止**:衝撃を与えたり、水濡れや湿度の高い場所での使用は避けてください。
- **自己修理禁止**:分解や改造は行わないでください。保証対象外となり、火災や感電の原因にもなります。

これらの注意事項に従わない場合、故障や事故につながる可能性があります。安全な使用のため、必ずご確認ください。

---

## DummyPCの特長

### ハードウェア概要

- **CPU**:DummyChip製 8コア16スレッドプロセッサ
- **GPU**:DummyGraphics G3000シリーズ(VR対応)
- **メモリ**:標準16GB(最大64GBまで拡張可能)
- **ストレージ**:512GB NVMe SSD(追加HDD/SSDスロットあり)
- **電源ユニット**:500W 80Plus認証取得
- **拡張性**:PCIeスロット x2、M.2スロット x2、USBポート多数搭載
- **冷却機構**:高効率ヒートシンク+静音ファンシステム

### 標準搭載OS・ソフトウェア

- **OS**:DummyOS 1.0 (64-bit)
- **標準アプリ**:DummyBrowser(Webブラウザ)、DummyOffice Lite(文書・表計算・スライド)、DummyMail(メールクライアント)、DummyMediaPlayer(メディアプレーヤー)、DummyDefender(セキュリティツール)

### 対応アクセサリ・周辺機器

- **キーボード/マウス**:有線・無線両対応
- **モニタ**:HDMI、DisplayPort、USB-C映像出力対応
- **外部ストレージ**:USB3.2 Gen2, Thunderbolt 3 対応
- **VRヘッドセット**:DummyVR対応機器

---

## パッケージ内容物

- DummyPC 本体
- 電源コード
- ユーザーマニュアル(本書)およびクイックスタートガイド(印刷版)
- 保証書・サポート情報カード
- インストールUSBメディア(OS再インストール用)
- キーボード、マウス(オプション品の場合は別途記載)

開封時には内容物に欠品がないかご確認ください。万一不足があればサポートにお問い合わせください。

---

## 設置方法と初期設定

### 設置環境の確認

- **温度・湿度**:常温(20~25℃)、湿度40~60%程度が理想
- **振動・埃**:振動のない平坦な場所、埃の少ない環境を推奨
- **電源環境**:安定したAC100~240V、50/60Hz電源
- **通気性**:本体背面・側面通気口を塞がないようスペースを確保

### 本体および周辺機器の接続

1. 本体背面の電源コネクタに電源コードを接続し、コンセントに差し込みます。
2. モニタをHDMIまたはDisplayPortで接続します。
3. キーボードとマウスをUSBポートまたはBluetoothでペアリングします。
4. 有線LANの場合はLANケーブルを接続、Wi-Fi使用の場合は後で設定します。

### 初回電源オンと初期セットアップウィザード

1. 電源ボタンを押し、本体を起動します。
2. OS初回起動時に言語や地域、タイムゾーンなどの基本設定を行うセットアップウィザードが表示されます。画面の指示に従い設定を進めます。
3. ユーザーアカウント(管理者アカウント)を作成し、パスワードを設定します。
4. ネットワーク設定(Wi-Fiの場合はSSIDとパスワードの入力)を行います。
5. 必要なアップデートやセキュリティパッチがあれば適用してください。

---

## ユーザーインターフェイスの基本操作

### デスクトップ画面の構成

- **タスクバー**:画面下部に配置され、スタートメニュー、開いているアプリケーション、通知エリアなどが表示
- **アイコン**:デスクトップには「コンピュータ」「ドキュメント」「ゴミ箱」などの基本アイコンが配置
- **右クリックメニュー**:背景を右クリックすると表示されるメニューからディスプレイ設定や新規フォルダ作成が可能

### スタートメニューとアプリケーションランチャーの使い方

- **スタートメニュー**:左下のDummyOSアイコンをクリックで展開
- インストール済みアプリケーション一覧や、頻繁に使うアプリのピン留めが可能
- 検索バーからアプリ、ファイル、設定項目を検索可能

### ウィンドウ操作・マルチデスクトップ機能

- **最小化/最大化/閉じる**:ウィンドウ右上ボタンで操作
- **ウィンドウスナップ機能**:ウィンドウを画面端にドラッグすると自動的に半分または四分割で配置
- **マルチデスクトップ**:タスクビューから新しいデスクトップを追加し、作業環境を分離可能

### ショートカットキー一覧

- **`Ctrl + C`**:コピー
- **`Ctrl + V`**:貼り付け
- **`Ctrl + X`**:切り取り
- **`Alt + Tab`**:ウィンドウ切替
- **`Ctrl + Shift + Esc`**:タスクマネージャ起動
- **`Win + D`**:デスクトップ表示
- **`Win + L`**:画面ロック

---

## ネットワークとインターネット接続

### 有線LAN接続方法

1. LANケーブルを本体背面のLANポートに接続
2. OSが自動でIPアドレス取得(DHCP)を実行
3. ブラウザを開き、インターネットに接続できるか確認

### Wi-Fi設定方法

1. タスクバーのネットワークアイコンをクリック
2. 利用可能なWi-FiのSSID一覧から自宅またはオフィスのネットワークを選択
3. セキュリティキー(パスワード)を入力して接続

### VPN/プロキシ設定

- **VPN**:ネットワーク設定から「VPN」を選択し、サーバーアドレス、ユーザー名、パスワードを入力
- **プロキシ**:ブラウザまたはシステム設定でプロキシサーバーアドレスを指定

### ネットワークトラブルシューティング

- LANケーブルの断線チェック
- ルーター/モデムの再起動
- OSのネットワーク診断ツールを使用
- ドライバ更新、ファイアウォール設定確認

---

## アカウント管理とセキュリティ

### ユーザーアカウントの種類と作成方法

- **管理者アカウント**:システム全般の変更権限
- **標準アカウント**:日常使用に最適。重要な設定変更には管理者権限が必要
- **ゲストアカウント**:一時利用者向け、変更権限が制限

ユーザーアカウントは、「設定 > アカウント」から新規追加可能。

### パスワードポリシーとリカバリ手順

- **パスワードポリシー**:8文字以上、英数字記号混在推奨
- **パスワードリセット**:リカバリツールまたは事前に設定したセキュリティ質問を用いてリセット可能

### セキュリティソフトウェアの構成と使用方法

- **DummyDefender**:標準搭載のウイルス対策ソフト
- 定期的なスキャンスケジュール、リアルタイム保護有効化、隔離ファイル管理が可能

### OSアップデートとセキュリティパッチ適用

- 定期的に「設定 > 更新とセキュリティ」から最新アップデートをチェック
- 自動更新設定も可能

---

## アプリケーションとソフトウェア

### 標準搭載アプリケーション一覧

- **DummyBrowser**:Webブラウジング
- **DummyOffice Lite**:簡易的な文書作成、表計算、スライド資料作成
- **DummyMail**:メール送受信
- **DummyMediaPlayer**:音楽・動画再生
- **DummyDefender**:セキュリティ対策

### アプリケーションのインストール/アンインストール

- **インストール**:DummyStore(アプリストア)または公式サイトからダウンロードしたインストーラを実行
- **アンインストール**:「設定 > アプリ」から対象アプリを選択し、「アンインストール」をクリック

### ソフトウェアライセンス管理

- 有料アプリケーションはライセンスキー登録が必要
- DummyOSアカウントに紐づけてクラウド上でライセンスを管理可能

### アプリケーション設定バックアップ・復元方法

- 「設定 > バックアップ」からアプリ設定をクラウド同期
- 復元時はDummyOSアカウントでログインし同期を実行

---

## データ管理

### ファイルエクスプローラの使い方

- **エクスプローラ起動**:タスクバーのフォルダアイコンをクリック、または`Win + E`
- 左ペインでドライブ、フォルダを選択し、右ペインにファイル一覧が表示
- ファイルコピー・移動はドラッグ&ドロップや右クリックメニューから実行

### フォルダ構成と推奨管理方法

- **ドキュメント**、**ピクチャ**、**ビデオ**、**ダウンロード**など標準フォルダを活用
- 分類やプロジェクトごとにフォルダ分けし、定期的な整理を推奨

### バックアップ戦略とツール

- **ローカルバックアップ**:外付けHDD/SSDへ定期的にファイルコピー
- **クラウドバックアップ**:DummyCloudやサードパーティーサービスを利用
- 自動バックアップスケジュールを設定可能

### クラウドストレージの連携

- DummyCloudアプリでDummyOSアカウントにログイン
- ファイルをクラウドと同期することが可能
- 他の端末からもアクセス可能

---

## ハードウェアの拡張・交換

### RAM/メモリ増設方法

1. 電源を切り、電源コードを抜く
2. ケースを開く(背面ネジまたはクイックリリースボタン)
3. 空きメモリスロットにRAMモジュールを挿入
4. ケースを閉じ、電源を入れてBIOS/OSで認識を確認

### SSD/HDD交換および追加ストレージの取り付け

1. 本体カバーを開き、ストレージベイまたはM.2スロットを確認
2. 既存ドライブを取り外し、新しいSSD/HDDを取り付け
3. 必要ならOSでフォーマットし、ドライブレターを割り当て

### GPU/グラフィックカード交換ガイド

1. 電源を切り、電源コードを抜く
2. 拡張スロットの固定ネジを外し、旧GPUを外す
3. 新しいGPUをスロットに挿入し、固定ネジと補助電源コネクタを接続
4. ケースを閉じ、電源投入後ドライバを更新

### 冷却ファンの交換とエアフロー改善

- 交換用ファンを用意(同径・同電圧)
- 吹き出し方向を確認し、ホコリを除去
- 正しい回転方向で取り付け、負圧・正圧環境をバランス良く調整

---

## トラブルシューティング

### 起動しない場合

- 電源ケーブルとコンセントを確認
- 電源ボタン長押しで再試行
- BIOSビープコード(ピープ音)の意味をマニュアルで確認
- メモリや拡張カードの接触不良をチェック

### 画面が表示されない場合

- モニタ電源・ケーブル接続確認
- 別のポート(HDMI⇔DisplayPort)で試す
- 内蔵GPUに切り替えて表示テスト
- GPUドライバ再インストール

### 音が出ない場合

- ミュート設定解除、音量ミキサー確認
- スピーカー・ヘッドフォンが正しく接続されているか確認
- オーディオドライバ更新

### ハングアップやフリーズが頻発する場合

- 不要なバックグラウンドプロセスを終了
- メモリ診断ツールでRAMエラーをチェック
- ディスク使用率や発熱状況を確認
- ソフトウェアアップデート、セーフモード起動

### ネットワークが不安定な場合

- ルーター再起動・ファームウェア更新
- ネットワークアダプタドライバ更新
- 他のデバイスで問題がないか確認

---

## 定期メンテナンス

### OSクリーニング・キャッシュ削除

- 内蔵のクリーニングツールやサードパーティー製ソフトでテンポラリファイル削除
- ブラウザキャッシュや履歴を定期的にクリア

### 不要ファイル・ログファイルの整理

- 定期的にダウンロードフォルダや一時ファイルフォルダを確認
- 古いログファイルを削除してディスクスペースを確保

### ウイルススキャン・マルウェアチェック

- DummyDefenderで定期的にフルスキャン
- 定期的な定義ファイル更新

### ファン掃除・埃除去

- ケースを開け、エアダスターで内部の埃を除去
- ヒートシンク、ファンブレードの清掃
- 熱対策で定期的なメンテナンスを実施

---

## 高度な設定

### BIOS/UEFI設定

- 起動時`F2`または`Delete`キーでBIOS/UEFIにアクセス
- 起動順序、クロック調整、メモリ設定、セキュリティオプションの変更が可能
- 設定変更は慎重に行い、変更前の設定を記録

### 電源管理・省電力モード

- OS側で電源プランを「バランス」「省電力」「高パフォーマンス」から選択
- スリープ、休止状態、ディスプレイオフまでの時間を調整

### 高度なネットワーク構成(VLAN, Static Route)

- ネットワーク設定からVLAN IDの設定
- 静的ルーティングをコマンドラインまたはGUIツールで追加
- 管理者権限が必要

### 仮想マシン環境の構築

- DummyHypervisorまたはサードパーティー製ツールで仮想環境を作成
- ISOイメージからゲストOSをインストール
- ネットワーク、ストレージ割り当てを調整

---

## カスタマーサポートと保証

### サポートへの問い合わせ方法

- **電話サポート**:XX-XXXX-XXXX(平日9:00-18:00)
- **メールサポート**:support@dummypc.example
- **公式サイトチャットサポート**:アカウントログイン後、チャットウィンドウから問い合わせ可能

### 保証期間と補償内容

- 購入日から1年間の製品保証(ハードウェア不良に対する無償修理)
- 消耗品(バッテリ、ファンなど)は保証対象外

### 修理依頼の手順

1. サポートに連絡し、故障症状や購入情報を伝える
2. RMA番号(修理受付番号)が発行される
3. 本体を梱包し、指定住所へ発送
4. 修理完了後、返送

---

## 法的事項

### ライセンス条項

- DummyOSおよび標準ソフトウェアはライセンス契約に基づき提供
- 無断複製、改変、再配布禁止

### プライバシーポリシー

- 個人情報はDummyPC社のプライバシーポリシーに従い適切に保護
- 利用状況データは統計的目的で匿名化処理後使用

### 免責事項

- 本マニュアルの情報は予告なく変更される場合がある
- 間接的損害、データ消失に関する補償は行わない

---

## FAQ

**Q1. 起動が遅いのですが?**

A1. 自動起動アプリを減らす、ストレージの空き容量確保、メモリ増設を検討してください。

**Q2. OS再インストール方法は?**

A2. 同梱のインストールUSBメディアからブートして再インストールウィザードを実行します。

**Q3. ノイズが気になるのですが?**

A3. 内部清掃、ファン速度設定の見直し、静音ファンへの交換をお試しください。

---

## 用語集

- **BIOS/UEFI**:基本入出力システム。ハードウェア初期化やブート順序設定を行う
- **LAN/Wi-Fi**:有線/無線ネットワーク接続
- **VPN**:Virtual Private Network、リモートネットワークへの安全な接続
- **アカウント**:ユーザー個人設定やアクセス権限を管理するためのID
- **バックアップ**:データを保全するための複製をとる行為

---

## アップデート履歴

- **v1.0 (2024/12/11)**:初版リリース
- **v1.1 (2024/12/12)**:ネットワークトラブルシューティング項目追記
- **v1.2 (2024/12/13)**:FAQにOS再インストール手順を追加

---

開発環境の構築

必要な依存関係をインストールします。

pip install snowflake-snowpark-python openai python-dotenv

データスキーマの作成

ファイル管理とベクトル検索用のテーブルを定義します。
※SnowflakeのUIにて操作を行ってください

-- メイン文書テーブル
CREATE OR REPLACE TABLE docs (
  doc_id STRING,
  content TEXT,
  url STRING,
  embedding VECTOR(FLOAT, 768)
);

-- チャンク文書テーブル(後述の改善版)
CREATE OR REPLACE TABLEdocs_chunks (
  chunk_id STRING,
  original_doc_id STRING,
  content TEXT,
  url STRING,
  embedding VECTOR(FLOAT, 768)
);

基本的なRAGの実装

import os
from snowflake.snowpark import Session
from snowflake.cortex import embed_text_768
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import VectorType
import openai

def get_session():
    """Snowflakeセッションを作成"""
    return Session.builder.configs({
        "account": os.environ["SNOWFLAKE_ACCOUNT"],
        "user": os.environ["SNOWFLAKE_USER"],
        "password": os.environ["SNOWFLAKE_PASSWORD"],
        "database": "rag_demo",
        "schema": "rag",
        "warehouse": "rag_wh"
    }).create()

def ask_question(session, question: str, k: int = 5):
    """質問に対してRAG回答を生成"""
    # 1. 質問の埋め込みベクトルを生成
    q_vec = embed_text_768("snowflake-arctic-embed-m", question)
    
    # 2. ベクトル検索で関連文書を取得
    df = session.table("docs").filter(F.col("embedding").is_not_null())
    df_with_similarity = df.with_column(
        "similarity",
        F.call_function(
            "VECTOR_COSINE_SIMILARITY",
            F.col("embedding").cast(VectorType(float, 768)),
            F.lit(q_vec).cast(VectorType(float, 768)),
        ),
    )
    
    results = (
        df_with_similarity.select("content", "similarity")
        .order_by(F.col("similarity").desc())
        .limit(k)
        .collect()
    )
    
    # 3. 関連文書をコンテキストとして結合
    context = "\n---\n".join([row["CONTENT"] for row in results])
    
    # 4. OpenAI APIで回答生成
    client = openai.OpenAI()
    response = client.chat.completions.create(
        model="gpt-4.1",
        messages=[
            {"role": "system", "content": "参考文書をもとに日本語で回答してください。"},
            {"role": "user", "content": f"質問: {question}\n\n参考文書:\n{context}"}
        ],
        temperature=0.0
    )
    
    return response.choices[0].message.content

処理の流れ

今回の実装では、以下の3段階の処理を実行してRAG回答を生成しています

  1. 質問の埋め込みベクトル化:Snowflake Cortex Searchのembed_text_768関数を使用して、ユーザーの質問をベクトル表現に変換します。この768次元のベクトルが検索クエリとして機能します。

  2. ベクトル類似度検索:事前に格納された文書の埋め込みベクトルと質問ベクトルとの間でコサイン類似度を計算し、最も関連性の高い上位k件の文書を抽出します。VECTOR_COSINE_SIMILARITY関数により、Snowflake内でベクトル演算が実行されます。

  3. LLMによる回答生成:取得した参考情報と元の質問をOpenAI GPT-4.1に送信し、参考文書に基づいた回答を生成します。

大規模な文書処理における課題

RAGシステムにおいて、文書の分割は検索精度とシステムパフォーマンスに大きな影響を与えます。
特に大規模な文書の処理においては、文章の分割(チャンク化)が必須となってきます。

チャンク化による検索精度への影響

以下のような例を用いて、文書サイズが検索結果に与える影響を検証しました

# テスト実行
question = "DummyPCの電源がつきません"
answer = ask_question(session, question)
print(f"answer : {answer}")
answer : DummyPCの電源がつかない場合、以下を順番にご確認ください...

一見適切な回答が生成されているように見えますが、詳細に調査すると重大な問題がありました、、、

処理の詳細

取得された文書:
文書1: 13,548文字
内容: # DummyPC ユーザーマニュアル
## はじめに
本マニュアルは、DummyPC(ダミー・ピーシー)を...
類似度: 0.8153

結果からわかること

先ほどの検証から以下のことがわかります。

  1. 過大な情報粒度: 13,548文字の文書全体が単一のベクトル表現として処理される
  2. 情報の過多: 関連性の高い「トラブルシューティング」セクションが、ほかのセクションと一緒に帰ってきてしまっているために、回答の精度が低下する可能性がある。

このような大規模の文書の処理においては、そのままベクトル化してしまうとシステム全体の精度に大きな影響を与えることがわかりました。

文書を分割して格納する実装

チャンク化を行う

大規模な文書を分割し、検索精度を向上させる手法(チャンク化)を実装していきます。

import re

def split_markdown_document(content: str, doc_id: str):
    """マークダウン文書をセクション単位で分割"""
    chunks = []
    
    # セクションヘッダーで分割(## レベル)
    sections = re.split(r'\n(?=##\s)', content)
    
    for i, section in enumerate(sections):
        section = section.strip()
        if not section or len(section) < 50:  
            continue
            
        chunk_id = f"{doc_id}_chunk_{i+1}"
        chunks.append({
            "chunk_id": chunk_id,
            "original_doc_id": doc_id,
            "content": section
        })
    
    return chunks

def rechunk_documents():
    """既存文書をチャンク分割して新テーブルに保存"""
    session = get_session()
    
    # 元の文書を取得
    documents = session.table("docs").collect()
    
    for doc in documents:
        # チャンク分割
        chunks = split_markdown_document(doc["CONTENT"], doc["DOC_ID"])
        
        # 各チャンクを新テーブルに挿入
        for chunk in chunks:
            # エスケープ処理
            escaped_content = chunk["content"].replace("'", "''")
            
            # 埋め込みベクトル付きで挿入
            insert_sql = f"""
                INSERT INTO docs_chunks (chunk_id, original_doc_id, content, url, embedding)
                SELECT 
                    '{chunk["chunk_id"]}',
                    '{chunk["original_doc_id"]}',
                    '{escaped_content}',
                    '{doc["URL"]}',
                    SNOWFLAKE.CORTEX.EMBED_TEXT_768('snowflake-arctic-embed-m', '{escaped_content}')
            """
            session.sql(insert_sql).collect()
    
    session.close()

チャンク化の効果

チャンク化前:

  • 処理単位: 13,548文字の一体文書
  • 検索粒度: 文書全体レベル

チャンク後:

  • 処理単位: 20チャンク(平均484文字)
  • 検索粒度: セクションレベル

最適化されたRAGシステムの実装

チャンク化を入れた際の実装

def ask_question_chunked(session, question: str, k: int = 5, show_details: bool = False):
    """チャンク化されたデータでRAG回答を生成"""
    # 質問の埋め込みベクトル生成
    q_vec = embed_text_768("snowflake-arctic-embed-m", question)
    
    # チャンクテーブルで検索
    df = session.table("docs_chunks").filter(F.col("embedding").is_not_null())
    df_with_similarity = df.with_column(
        "similarity",
        F.call_function(
            "VECTOR_COSINE_SIMILARITY",
            F.col("embedding").cast(VectorType(float, 768)),
            F.lit(q_vec).cast(VectorType(float, 768)),
        ),
    )
    
    results = (
        df_with_similarity.select("chunk_id", "content", "similarity")
        .order_by(F.col("similarity").desc())
        .limit(k)
        .collect()
    )
    
    # 詳細表示用
    if show_details:
        print("取得されたチャンク:")
        for i, row in enumerate(results, 1):
            print(f"チャンク{i}: {row['CHUNK_ID']}")
            print(f"類似度: {row['SIMILARITY']:.4f}")
            print(f"内容: {row['CONTENT'][:150]}...")
            print("-" * 40)
    
    # 回答生成
    context = "\n---\n".join([row["CONTENT"] for row in results])
    
    client = openai.OpenAI()
    response = client.chat.completions.create(
        model="gpt-4.1",
        messages=[
            {"role": "system", "content": "参考文書をもとに日本語で簡潔に回答してください。"},
            {"role": "user", "content": f"質問: {question}\n\n参考文書:\n{context}"}
        ],
        temperature=0.0
    )
    
    return response.choices[0].message.content

チャンク化の効果

従来の文書単位での検索:

文書1: 13,548文字 - "# DummyPC ユーザーマニュアル ## はじめに..."
類似度: 0.8153

チャンク化後の検索:

取得されたチャンク:
チャンク1: DummyPC_chunk_14
類似度: 0.9162
内容: ## トラブルシューティング
### 起動しない場合
- 電源ケーブルとコンセントを確認
- 電源ボタン長押しで再試行...

この実装により、クエリに対して関連性の高い情報のみを抽出することができます。

SnowflakeでRAGを構築する際のメリット

エンタープライズRAGシステムの構築基盤としてSnowflakeを選択することで得られる技術的優位性について分析します。

1. 自動スケーリングによる柔軟な運用

-- ウェアハウスサイズを動的に変更
ALTER WAREHOUSE rag_wh SET WAREHOUSE_SIZE = 'LARGE';

技術的優位性:

  • リソース最適化: 処理負荷に応じてその場に応じたリソースの調整を行ってくれるのでコストを削減
  • インフラ管理の簡易化: インフラの負荷や新しく構築をする必要がないので簡易に

2. 大量データの並列処理

# 大量チャンクの並列埋め込み生成
def batch_embed_chunks(session, chunks, batch_size=100):
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]
        
        # バッチ処理でSnowflakeに送信
        batch_df = session.create_dataframe(batch)
        batch_df.write.save_as_table("temp_chunks", mode="append")
        
        # Cortexで一括埋め込み生成
        session.sql("""
            UPDATE temp_chunks 
            SET embedding = SNOWFLAKE.CORTEX.EMBED_TEXT_768('snowflake-arctic-embed-m', content)
            WHERE embedding IS NULL
        """).collect()

並列処理を行うメリット:

  • 高速化: 数万から数十万のチャンクに対する埋め込みベクトル生成の高速化
  • 実装の簡素化: SQLベースの操作なので、複雑な並列処理ロジックの記述が不要

おわりに

Snowflake Cortex Searchを活用したRAGシステムの構築により、既存のデータウェアハウス環境を活用した企業ユースに最適なのAIシステムを効率的に実装することが可能です。

特に大規模なファイルにおいてチャンク化は、回答の精度に大きな影響を与える重要な要素であることが確認できました。

本記事で紹介した実装手法が、RAGシステム構築において参考になりますと幸いです。


Solvio株式会社

Discussion