❄️

Snowflake Cortex Search Boosts & Decays で RAG チャットアプリを強化する

に公開

はじめに

2025/4/28に、Snowflake で RAG フレームワークなどを構成できる Cortex SearchBoosts / Decays 機能が追加されました!この機能により検索スコアを 「数値メタデータ (例: いいね数や閲覧数など)」「経過時間」 に応じて柔軟に調整できるようになり、RAG アプリなどにおけるドキュメント検索精度をさらに高めることが可能になりました。

今回はこの機能を使って、より高度な検索機能を備えた RAG チャットボットを作ってみましたので是非使ってみてください。クイックに試してカスタムしていただけるよう シンプルなソースコードかつ Streamlit in Snowflake で機能を完結させています。

Cortex Search の基本的な概要については以前の記事「Snowflake Cortex Search で RAG チャットアプリを試す」に記載がありますのでご確認ください。

https://zenn.dev/tsubasa_tech/articles/200e72d6039acf

Boosts / Decays 機能の概要

機能 目的 主なパラメータ
Numeric Boosts 数値列を重み付けして検索スコアを強める column, weight
Time Decays タイムスタンプや日付列を参照し時間経過でより最新データの検索スコアを強める column, weight, limit_hours

公式ドキュメント: https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-search/boosts-decays

設定は scoring_config パラメータで JSON 形式で渡します。

scoring_config = {
    "functions": {
        "numeric_boosts": [
            {"column": "likes", "weight": 1}  # いいね数をそのまま加算
        ],
        "time_decays": [
            {"column": "created_at", "weight": 1, "limit_hours": 240}  # 10 日以内であれば検索結果に出やすくなる
        ]
    }
}

複数のブースト / ディケイを同時に設定でき、weight を調整することで「より人気のあるドキュメントを検索結果に出やすくする」「鮮度の高いドキュメントを優先して検索結果に出す」など様々な検索体験を作り出すことが可能となるわけです。

Boosts / Decays がもたらすビジネス的インパクト

テーマ 従来の RAG Boosts / Decays 適用後
検索結果の品質 「あまり使われないドキュメント」などが検索結果に利用されてしまう いいね数や閲覧数を重み付けすることで、ユーザーが本当に価値を感じている情報が検索結果に使用される
情報の鮮度 「陳腐化したドキュメント」や「古いドキュメント」などが検索結果に利用されてしまう 古いドキュメントの検索スコアを自動減衰し、最新情報が検索結果に使用される
ユーザー体験 ユーザーのフィードバックが検索結果に反映されない いいねボタンなどの集計結果を Cortex Search に反映させることで継続的に検索精度が向上

期待できる効果例

  • FAQ ボットの解決率向上 ― 「解決したらいいねボタンを押せる」機能を入れるだけで回答精度を継続向上
  • 社内ナレッジの鮮度維持 ― 1年以上前の手順書は自動でランクダウンし誤った手順の利用を抑止
  • マーケティング用途 ― 「閲覧数」「クリック率」などをブースト対象にすれば、関心の高い検索結果を常に提示可能

ROI 観点では 検索 > クリック > フィードバック のループを Snowflake 内で完結できるため、複雑なシステム構築や外部サービス連携を省略し追加のコストを抑えつつ ユーザーへの提供速度を速められる点が魅力になるのではないかと思います。

実装するアプリケーションの機能概要

では早速 Boosts / Decays を用いたアプリを作ってみましょう!今回のアプリは Streamlit in Snowflake (SiS) 上で動作する RAG チャットボットアプリとなります。ファイルのステージへのアップロード、チャンク化、テーブルへの保存、Cortex Search を用いた Embedding とハイブリッド検索、そして COMPLETE 関数を用いた回答生成までを一気通貫で実現するアプリとなります。

  1. ファイルのステージへのアップロード: st.file_uploader を用いてローカル PC からのファイルを Snowflake のステージへアップロードします。テキストファイルだけではなく PDF ファイルにも対応させています。
  2. チャンク化: SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER 関数を使用し、大きいサイズのテキストをチャンク化します。チャンクサイズやオーバーラップサイズは画面上でユーザーが設定することが可能です。
  3. バルクインサート: 生成したチャンクをテーブルにバルクインサートします。
  4. Cortex Search Service の作成: テーブル情報に基づいて Cortex Search Service を作成します。
  5. Boosts / Decays を適用した検索: likescreated_at をスコアリングに利用して Cortex Search Service での検索を行います。
  6. COMPLETE 関数を用いた回答生成: COMPLETE 関数を用いてCortex Search Service の検索結果からチャットボットの回答を生成して表示します。
  7. 参考ドキュメント UI: 利用したチャンクについて参考ドキュメントとして表示します。
  8. いいねボタン: 参考ドキュメントに対していいねボタンを押すことで、likes をカウントアップさせることが可能です。このカウント数は Cortex Search Service が更新されたタイミングで反映されます。
  9. ダウンロード機能: 参考ドキュメントでダウンロードボタンを押下すると、元ファイルを Snowflake ステージから取得しダウンロードすることが可能です。

完成イメージ

アプリの全体像
アプリの全体像

対応ファイル
テキストだけではなく、HTMLやPDFにも対応させました

参考ドキュメント
検索結果画面 いいねボタンとダウンロードボタンを備えています

前提条件

  • Snowflake アカウント
    • Cortex LLM が利用できる Snowflake アカウント (クロスリージョン推論のリリースによりクラウドやリージョンの制約がほぼ無くなりました)
  • Streamlit in Snowflake のインストールパッケージ
    • python 3.11 以降
    • snowflake-ml-python 1.8.1 以降
    • snowflake.core 1.2.1 以降
    • pdfplumber 0.11.4 以降

Cortex Search のリージョン対応表 (Snowflake 公式ドキュメント)

手順

新規で Streamlit in Snowflake のアプリを作成

Snowsight の左ペインから『Streamlit』をクリックし、『+ Streamlit』ボタンをクリックし SiS アプリを作成します。

Streamlit in Snowflake のアプリを実行

Streamlit in Snowflake アプリの編集画面で snowflake-ml-pythonsnowflake.corepdfplumber をインストールし、以下コードをコピー&ペーストで貼り付けてください。(コードが長くなってしまったためアコーディオンを開いて表示してください)

その後12行目 - 16行目のパラメータを環境に合わせて設定し実行してください。

ソースコード
import streamlit as st
import uuid
from datetime import datetime
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import Complete as CompleteText
from snowflake.core import Root
import io

# ------------------------------------------------------------
# 各種設定 (環境に合わせて修正してください)
# ------------------------------------------------------------
STAGE_NAME = "<ドキュメントを保存するステージ名>"
TABLE_NAME = "<ドキュメントのデータを保存するテーブル名>"
SEARCH_SERVICE = "<Cortex Search Serviceの名前>"
WAREHOUSE = "<Warehouse名>"        # Cortex Searchで使用するWarehouse
MAX_RESULTS = 5                   # Cortex Searchで検索時に取得するドキュメント数

# ------------------------------------------------------------
# Snowflakeセッションの初期化とデータベース/スキーマ/テーブルの設定
# ------------------------------------------------------------
st.set_page_config(layout="wide")
session = get_active_session()
root = Root(session)
current_db = session.sql("SELECT CURRENT_DATABASE()").collect()[0][0]
current_schema = session.sql("SELECT CURRENT_SCHEMA()").collect()[0][0]


def init_objects() -> None:
    """ステージ・テーブル・サーチサービスが存在しない場合に作成する"""
    # ステージの作成
    session.sql(f"CREATE STAGE IF NOT EXISTS {STAGE_NAME}").collect()

    # テーブルの作成
    session.sql(
        f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
            doc_id      STRING PRIMARY KEY,
            file_name   STRING,
            content     STRING,
            likes       INT,
            created_at  TIMESTAMP
        )
        """
    ).collect()

    # Cortex Search Serviceの作成
    svc_exists = session.sql(
        f"SHOW CORTEX SEARCH SERVICES LIKE '{SEARCH_SERVICE}'"
    ).collect()

    if not svc_exists:
        create_search_service()


def create_search_service() -> None:
    """ドキュメントテーブルを対象として Cortex Search Service を再作成"""
    session.sql(
        f"""
        CREATE OR REPLACE CORTEX SEARCH SERVICE {SEARCH_SERVICE}
        ON content
        ATTRIBUTES file_name, likes, created_at
        WAREHOUSE = {WAREHOUSE}
        TARGET_LAG = '1 day'
        AS (
            SELECT file_name, content, likes, created_at
            FROM {TABLE_NAME}
        )
        """
    ).collect()


# ------------------------------------------------------------
# ファイルアップロード関数
# ------------------------------------------------------------

def upload_document(uploaded_file, chunk_size: int, overlap: int) -> None:
    """ローカルファイルをステージへ格納後、テキストをチャンク化してテーブルへ登録 (likes は 0 で初期化)"""
    # ファイルをステージに格納
    stage_path = f"@{STAGE_NAME}/{uploaded_file.name}"
    session.file.put_stream(uploaded_file, stage_path, auto_compress=False)

    file_ext = uploaded_file.name.split(".")[-1].lower()
    # ファイルポインタを先頭に戻す
    uploaded_file.seek(0)

    if file_ext == "pdf":
        # -------- PDF テキスト抽出 --------
        try:
            import pdfplumber

            pdf_bytes = uploaded_file.read()
            with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
                pages = [p.extract_text() or "" for p in pdf.pages]
            content = "\n".join(pages)
        except Exception as e:
            content = f"[PDFからのテキスト抽出失敗]: {e}"
    else:
        # -------- 通常テキスト抽出 --------
        raw_bytes = uploaded_file.read()
        try:
            content = raw_bytes.decode("utf-8")
        except UnicodeDecodeError:
            try:
                content = raw_bytes.decode("shift_jis")
            except Exception:
                content = str(raw_bytes)

    # ---------- Snowflake 関数 SPLIT_TEXT_RECURSIVE_CHARACTER でチャンク化 ----------
    # 'none' フォーマットを指定し、Python から引数をバインドして呼び出す
    chunks_rows = session.sql(
        """
        SELECT value
        FROM LATERAL FLATTEN(
            input => SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER(?, 'none', ?, ?)  
        )
        """,
        params=[content, chunk_size, overlap]
    ).collect()

    # 登録日時を datetime 型で保持
    ts = datetime.utcnow()

    # ---------- バルクインサート ----------
    # すべてのチャンクをリストに格納し、DataFrame として一括で書き込む
    rows_to_insert = [
        (str(uuid.uuid4()), uploaded_file.name, row["VALUE"], 0, ts)
        for row in chunks_rows
    ]

    if rows_to_insert:
        df = session.create_dataframe(
            rows_to_insert,
            schema=["doc_id", "file_name", "content", "likes", "created_at"],
        )
        # append モードで既存テーブルへ追加
        df.write.mode("append").save_as_table(TABLE_NAME)


# ------------------------------------------------------------
# Cortex Searchを用いた検索関数
# ------------------------------------------------------------

def search_documents(question: str):
    """Cortex Search でブースト/ディケイを適用し関連ドキュメントを取得"""
    rag_svc = (
        root.databases[current_db]
            .schemas[current_schema]
            .cortex_search_services[SEARCH_SERVICE]
    )

    scoring = {
        "functions": {
            "numeric_boosts": [
                {"column": "likes", "weight": 1}
            ],
            "time_decays": [
                {"column": "created_at", "weight": 1, "limit_hours": 240}
            ]
        }
    }

    resp = rag_svc.search(
        query=question,
        columns=["file_name", "content", "likes", "created_at"],
        limit=MAX_RESULTS,
        scoring_config=scoring,
    )

    return resp.results


# ------------------------------------------------------------
# COMPLETE関数を用いた回答生成関数
# ------------------------------------------------------------

def generate_answer(question: str, context_blocks, model: str) -> str:
    """コンテキストと質問を結合し COMPLETE 関数で回答を生成"""
    context = "\n---\n".join(
        [f"[likes: {d['likes']}, date: {d['created_at']}]\n{d['content']}" for d in context_blocks]
    )

    prompt = f"""
    あなたはアップロードされたドキュメントを検索して回答するアシスタントです。
    以下のコンテキストを参考に、ユーザーの質問に対して日本語で簡潔に回答してください。
    コンテキストが不十分な場合は、その旨を伝えてください。

    ### コンテキスト
    {context}

    ### 質問
    {question}

    ### 回答
    """

    return CompleteText(model, prompt)


# ------------------------------------------------------------
# Streamlit UI
# ------------------------------------------------------------

def main():
    st.title("📖 Simple RAG アプリ – Cortex Search Boosts & Decays")

    # --- Sidebar: settings & upload -------------------------------------
    st.sidebar.header("🗂️ ドキュメントアップロード")
    # ---------- チャンク設定 ----------
    st.sidebar.markdown("### 🔧 チャンク設定")
    chunk_size = st.sidebar.number_input("チャンクサイズ (文字数)", min_value=100, max_value=2000, value=300, step=50)
    overlap_size = st.sidebar.number_input("オーバーラップ (文字数)", min_value=0, max_value=500, value=30, step=10)

    # ---------- ファイルアップロード ----------
    st.sidebar.markdown("### 📂 ファイルアップロード")
    uploaded_file = st.sidebar.file_uploader(
        "対応フォーマット: txt, md, csv, json, log, html, xml, yaml, yml, pdf",
        type=[
            "txt", "md", "csv", "json", "log", "html", "xml", "yaml", "yml", "pdf",
        ],
    )

    if st.sidebar.button("ステージとテーブルへ保存") and uploaded_file is not None:
        upload_document(uploaded_file, chunk_size, overlap_size)
        st.sidebar.success("アップロード完了! ステージとテーブルに保存されました。")

    # ---------- Cortex Search 更新 ----------
    if st.sidebar.button("Cortex Search Service 更新"):
        create_search_service()
        st.sidebar.success("Cortex Search Service を再構築しました。")

    # --- Sidebar: model selection --------------------------------------
    st.sidebar.header("⚙️ モデル設定")
    model_name = st.sidebar.selectbox(
        "LLMモデルを選択",
        (
            "claude-3-7-sonnet",
            "mistral-large2",
            "llama3.3-70b",
            "reka-flash"
        ),
    )

    # --- Chat history state --------------------------------------------
    if "messages" not in st.session_state:
        st.session_state["messages"] = []

    # ---------- 参考ドキュメント描画用 fragment ----------
    @st.fragment
    def render_docs(docs):
        """参考ドキュメント一覧を表示"""
        with st.expander("🔍 参考ドキュメント", expanded=True):
            for idx, d in enumerate(docs):
                title = d.get("file_name", "(タイトルなし)")
                meta = f"👍 {d['likes']} | {d['created_at']}"
                preview = d["content"][:150].replace("\n", " ") + "…"

                col1, col2, col3 = st.columns([7, 1, 2])
                with col1:
                    st.markdown(f"**{title}** — {meta}\n\n> {preview}")

                # いいねボタン
                def _like(file_name= d['file_name']):
                    session.sql(
                        f"UPDATE {TABLE_NAME} SET likes = likes + 1 WHERE file_name = ?",
                        params=[file_name]
                    ).collect()
                col2.button("👍", key=f"like_{idx}_{title}", on_click=_like)

                # ダウンロードボタン (ステージ上の元ファイルを取得)
                stage_path = f"@{STAGE_NAME}/{d['file_name']}"
                try:
                    stream = session.file.get_stream(stage_path)
                    file_bytes = stream.read()
                except Exception:
                    # フォールバック: チャンクを連結してテキストとして提供
                    file_rows = session.sql(
                        f"SELECT content FROM {TABLE_NAME} WHERE file_name = ? ORDER BY created_at",
                        params=[d['file_name']]
                    ).collect()
                    file_bytes = "\n".join([r['CONTENT'] for r in file_rows]).encode("utf-8")

                # MIME タイプ判定
                ext = d['file_name'].split('.')[-1].lower()
                mime_map = {
                    "txt": "text/plain",
                    "md": "text/markdown",
                    "csv": "text/csv",
                    "json": "application/json",
                    "log": "text/plain",
                    "html": "text/html",
                    "xml": "application/xml",
                    "yaml": "text/yaml",
                    "yml": "text/yaml",
                    "pdf": "application/pdf",
                }
                mime_type = mime_map.get(ext, "application/octet-stream")

                col3.download_button(
                    label="📄 DL",
                    data=file_bytes,
                    file_name=d['file_name'],
                    mime=mime_type,
                    key=f"dl_{idx}_{title}"
                )

    # ---------- 過去メッセージ表示 ----------
    for msg in st.session_state["messages"]:
        with st.chat_message(msg["role"]):
            st.markdown(msg["content"])
            if "docs" in msg:
                render_docs(msg["docs"])

    # ---------- チャット履歴クリア ----------
    if st.sidebar.button("チャット履歴をクリア"):
        st.session_state["messages"] = []
        st.rerun()

    # --- User input -----------------------------------------------------
    if question := st.chat_input("質問を入力してください"):
        # Store user message
        st.session_state["messages"].append({"role": "user", "content": question})
        with st.chat_message("user"):
            st.markdown(question)

        # Retrieve relevant docs (boosts & decays applied)
        docs = search_documents(question)

        # Generate answer
        answer = generate_answer(question, docs, model_name)

        # Assistant message
        with st.chat_message("assistant"):
            st.markdown(answer)
            render_docs(docs)

        # Save assistant message
        st.session_state["messages"].append({"role": "assistant", "content": answer, "docs": docs})


# ------------------------------------------------------------
# Initialize DB objects then launch Streamlit
# ------------------------------------------------------------
if __name__ == "__main__":
    init_objects()
    main()

最後に

いかがでしたでしょうか?Cortex Search Boosts / Decays を用いることでメタデータを活かしたより高度な検索システムや RAG チャットボットを作ることができるようになりました。今回はなるべくシンプルなアプリにしましたが、例えば「filter を使って検索対象データを部署で絞り込む」「ドキュメントの管理画面を追加して、管理者がブーストの値を制御する」など工夫次第で更に便利なアプリケーションにすることも可能です。是非皆さんも Snowflake でデータ活用をより身近にしていただければ幸いです!

宣伝

SNOWFLAKE DISCOVER で登壇しました!

2025/4/24-25に開催されました Snowflake のエンジニア向け大規模ウェビナー『SNOWFLAKE DISCOVER』において『ゼロから始めるSnowflake:モダンなデータ&AIプラットフォームの構築』という一番最初のセッションで登壇しました。Snowflake の概要から最新状況まで可能な限り分かりやすく説明しておりますので是非キャッチアップにご活用いただければ嬉しいです!

以下リンクでご登録いただけるとオンデマンドですぐにご視聴いただくことが可能です。

生成AI Conf 様の Webinar で登壇しました!

『生成AI時代を支えるプラットフォーム』というテーマの Webinar で NVIDIA 様、古巣の AWS 様と共に Snowflake 社員としてデータ*AI をテーマに LTをしました!以下が動画アーカイブとなりますので是非ご視聴いただければ幸いです!

https://www.youtube.com/live/no9WYeLFNaI?si=2r0TVWLkv1F5d4Gs

X で Snowflake の What's new の配信してます

X で Snowflake の What's new の更新情報を配信しておりますので、是非お気軽にフォローしていただければ嬉しいです。

日本語版

Snowflake の What's New Bot (日本語版)
https://x.com/snow_new_jp

English Version

Snowflake What's New Bot (English Version)
https://x.com/snow_new_en

変更履歴

(20250501) 新規投稿
(20250508) 宣伝文修正
(20250510) Cortex LLM のモデルの claude-3-5-sonnetclaude-3-7-sonnet に変更

Discussion