🌷

Cortex Analyst と Streamlit を用いた Text2SQL アプリ作成

に公開

記事の概要

こんにちは。シンプルフォーム株式会社でインターンをしています、長井です。
この記事では、Cortex Analyst [1] を用いたチャットアプリケーションを Streamlit [2] で構築した事例を紹介します。具体的には本アプリケーションで実装した以下の機能を説明します。

  • チャット機能
    • チャット
    • リクエスト数のカウント
    • Semantic View の選択
    • 直前までの質問を参考にした回答の生成
  • 補助機能
    • フィードバックフォーム
    • チャット履歴ページ
    • Semantic View の定義確認ページ

背景・課題

当社では社内向けのデータ分析基盤として Snowflake 基盤を提供しています。Redash を主な BI ツールとして非エンジニアを含む社内メンバーにより SQL を用いた分析が行われてきました。

データ分析基盤の利用は以前より普及していたものの、SQL の文法や当社固有のデータ構造に関する知識が分析時のハードルになっていました。また、今後、扱うデータの種類や新規参画メンバーが増えていくことも見込まれており、このような問題がより顕著になっていくことが想定されました。

一部のメンバーは ChatGPT のようなツールを SQL クエリ開発の補助として利用していましたが、当社のデータ構造に関する知識を持たないため、活用には限界がありました。

アーキテクチャ

本アプリケーションのチャット部分は、Snowflake が提供する Cortex Analyst の API、アプリケーション部分は Streamlit を用いて構築しました。

architecture

インフラアーキテクチャに関してはこちらの記事をご覧ください。
https://zenn.dev/dataheroes/articles/20250608-streamlit-in-aws

実装(チャット機能)

まず、チャット機能の実装について説明していきます。
一般的なチャットアプリケーション同様、画面メイン下部の「質問を入力してください」と書かれた入力欄から質問を入力します。また、Cortex Analyst が生成した SQL やその実行結果を同じページに表示します。

chat

主要な実装に関して、以下の記事などを参考にさせていただきました。

https://zenn.dev/tree_and_tree/articles/e1c9dc57cdde28

https://zenn.dev/jay_explolar/articles/df9bf248ab40a9

Cortex Analyst API の呼び出し

ユーザーによるテキスト入力として受け取った質問を元に Cortex Analyst API に対するリクエストボディを構成し、POST リクエストとして送信します。[1:1]

def send_prompt(prompt: str) -> dict[str, any]:
    """
    APIの呼び出しとレスポンスの取得
    """

    request_body = {
        "messages": [{
            "role": "user",
            "content": [{"type": "text", "text": prompt}]
        }],
        "semantic_view": f"<データベース名>.<スキーマ名>.{st.session_state.semantic_view}"
    }
    st.session_state.current_request = request_body

    # ※ build_prompt_text の詳細は後述
    request_body["messages"][0]["content"][0]["text"] = build_prompt_text(prompt)
    
    # Cortex Analyst API にリクエスト送信
    resp = requests.post(
        url=f"https://{st.session_state.conn.host}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{st.session_state.conn.rest.token}"',
            "Content-Type": "application/json"
        },
        stream=True,
        verify=False
    )

    request_id = resp.json()["request_id"]
    logger.info(f"request_id: {request_id}")

    if resp.status_code < 400:
        return resp.json()
    else:
        raise Exception(
            f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
        )

チャットページの構築

上記の send_prompt() を呼び出して、チャットページを以下のように構築しています。

def process_message(prompt: str) -> None:
    """
    メッセージの加工
    """
    st.session_state.prompt = prompt

    message_user = {
        "role": "user",
        "content": [{"type": "text", "text": prompt}],
        "created_at": get_current_timestamp()
    }
    st.session_state.messages.append(message_user)

    with st.chat_message("user"):
        st.write(prompt)

    with st.chat_message("assistant"):
        with st.spinner("考え中......"):
            response = send_prompt(prompt=prompt)
            request_id = response["request_id"]
            content = response["message"]["content"]
            display_content(request_id=request_id, content=content)

    message_assistant = {
        "role": "assistant",
        "content": content,
        "request_id": request_id,
        "created_at": get_current_timestamp()
    }
    st.session_state.messages.append(message_assistant)
    return

def build_chat_page():
    """
    チャットページの構築
    """

    # セッション変数の初期化
    if "messages" not in st.session_state:
        st.session_state.messages = []


    # 既存メッセージの表示
    for idx, message in enumerate(st.session_state.messages):
        with st.chat_message(message["role"]):
            display_content(
                content=message["content"],
                request_id=message.get("request_id"),
                message_index=idx
            )

    # ユーザー入力の受付
    if user_input := st.chat_input(
        "質問を入力してください",
        disabled=(selected_semantic_view is None)
    ):
        # st.session_stateの初期化
        if selected_semantic_view:
            st.session_state.semantic_view = selected_semantic_view.lower()
            st.session_state.current_request = None
            st.session_state.current_response = None

        process_message(prompt=user_input)

    return

サイドバーの設定項目

サイドバーで設定可能な ①~③ の項目の実装を説明します。
chat2

① Request Count - リクエスト数のカウント

ユーザーが一日に送信した質問回数を集計します。Cortex Analyst のコストを適切に管理するために Google OAuth 認証を実装し、ユーザーごとに一日あたりの質問上限を設けています。

def get_today_requests_count(user_name: str) -> int:
    """
    ログイン中のユーザーの今日のリクエスト数のカウント
    """
    current_date = datetime.now(ZoneInfo("Asia/Tokyo")).strftime("%Y-%m-%d")
    query = f"""
        SELECT COUNT(*) AS count
        FROM <データベース名>.<スキーマ名>.<テーブル名>
        WHERE
            user_name = '{user_name}'
            AND to_date(created_at) = '{current_date}'
        ;
    """
    result = execute_query(query)
    return result[0][0]

def display_sidebar_requests_count(chat_requests_count_limit: int = CHAT_REQUEST_COUNT_LIMIT_DEFAULT):
    """
    質問回数の表示
    """
    st.session_state.requests_count = get_today_requests_count(st.user.email)
    st.sidebar.markdown(f"Request Count : {st.session_state.requests_count} / {chat_requests_count_limit}")
    return

② Semantic Views - セマンティックビューの選択

Cortex Analyst が参照する Semantic View をユーザーが指定するためのタブです。

def get_available_semantic_views() -> list[str]:
    """
    参照可能なセマンティックビューの取得
    """
    query = """SHOW SEMANTIC VIEWS IN SCHEMA <データベース名>.<スキーマ名>"""
    result = execute_query(query=query)
    return [row[1].lower() for row in result]

with st.sidebar:
    """
    セマンティックビューの選択
    """
    selected_semantic_view = st.selectbox(
        "Semantic View",
        get_available_semantic_views(),
        index=None,
        placeholder="選択してください"
    )

③ Context Count - 直前までの質問を参考にした回答の生成

ユーザーの質問に指示語(例:「それ」「前の条件」など)が含まれている場合、直前までの質問内容を参照して SQL を生成する設定です。参照する過去の質問数は、ユーザーが任意に指定できます。
以下の実装により、直前までの質問を含めて Cortex Analyst API にリクエストを送れるようにしました。

def get_chat_history():
    """
    これまでの会話の取得
    """
    # ユーザーがこれまでに送信したメッセージのインデックスを抽出
    user_question_list_num = [i for i, d in enumerate(st.session_state.messages) if d["role"] == "user"]
    # ユーザーが投げた質問の総数をカウント
    question_count = len(user_question_list_num)

    # 参照する履歴の数が質問数より少ない場合、直近の質問から context_count 件分だけを切り出す
    if st.session_state.context_count < question_count:
        return st.session_state.messages[user_question_list_num[question_count - st.session_state.context_count - 1] : user_question_list_num[question_count - 1]]
    # 参照する履歴の数が質問数以上の場合は、最初の質問から直前の質問までをすべて返す
    else:
        return st.session_state.messages[0 : user_question_list_num[question_count - 1]]

def build_prompt_text(prompt: str) -> str:
    """
    会話履歴 + 接続指示文 + 今回の質問 をまとめる
    """  
    instruction_statement =  "下記の質問に回答してください。回答を作成するにあたって前述した情報を参考にしてください。"
    return "\n".join([
        get_chat_history(),
        instruction_statement,
        prompt,
    ])

def send_prompt(prompt: str) -> dict[str, any]:
    """
    APIの呼び出しとレスポンスの取得
    """
    # ...略...

    # 会話履歴 + 接続文 + 今回の質問 をまとめる
    request_body["messages"][0]["content"][0]["text"] = build_prompt_text(prompt)

    # ...略...

実装(補助機能)

Feedback - フィードバック

生成された SQL・結果や Semantic View に対してのフィードバックをユーザーから共有してもらうためのフォームです。Snowsight 上の Cortex Analyst UI にも Feedback はありますが、Good/Bad しか送信できずコメントなどを得られないため、より詳細なニーズを把握することを目的に設置しています。

チャット生成後、チャットページ上の Feedback ボタンからフィードバックページへ遷移すると、写真のように、ユーザー名・Request ID・Semantic View・質問内容が自動で入力されます。また、チャット開発チームにフィードバックが届いた際には、Slack に通知が飛ぶよう実装しました。

実装

def send_feedback_to_slack(id: str, request_id: str, user_name: str, prompt: str, content: str, semantic_view: str, created_at: str):
    """
    Slackにフィードバックを共有
    """
    slack_client = get_slack_client()
    SLACK_CHANNEL_ID = "slackのチャンネルID"
    env = os.environ["TARGET_ENV"]

    header_text = "フィードバックが届きました!"
    header_text += f"【{env}】" if env != "prod" else ""

    try:
        response = slack_client.chat_postMessage(
            channel=SLACK_CHANNEL_ID,
            blocks=[
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": header_text
                    }
                }
            ],
            attachments=[
                {
                    "color": "#26a69a",
                    "blocks": [
                        {"type": "section", "text": {"type": "mrkdwn", "text": f"*ID*\n{id}"}},
                        {"type": "divider"},
                        {"type": "section", "fields": [
                            {"type": "mrkdwn", "text": f"*ユーザー名*\n{user_name}"},
                            {"type": "mrkdwn", "text": f"*作成日時*\n{created_at}"},
                            {"type": "mrkdwn", "text": f"*Request ID*\n{request_id}"},
                            {"type": "mrkdwn", "text": f"*Semantic View*\n`{semantic_view}`"},
                        ]},
                        {"type": "divider"},
                        {"type": "section", "text": {"type": "mrkdwn", "text": f"*フィードバック内容*\n{content}"}},
                        {"type": "divider"},
                        {"type": "section", "text": {"type": "mrkdwn", "text": f"*質問内容*\n{prompt}"}},
                    ]
                }
            ]
        )
        logger.info(f"Message sent: {response['ts']}")
    except SlackApiError as e:
        logger.error(f"Error posting message: {e.response['error']}")
        raise e

def build_feedback_page():
    """
    フィードバック用のフォーム
    """
    st.header("フィードバック送信フォーム")

    st.session_state.conn, session = get_session()

    with st.form("feedback_form", clear_on_submit=True):
        id = str(uuid.uuid4())

        # user_name
        user_name_input = st.text_input("ユーザー名", value=st.user.email, disabled=True)

        request_id, semantic_view, prompt = None, None, None

        # request_id
        if "request_id" in st.session_state:
            request_id = st.session_state.request_id
            if request_id:
                semantic_view = st.session_state.requests[request_id]["semantic_view"]
                prompt = st.session_state.requests[request_id]["prompt"]

        # Request ID
        request_id_input = st.text_input("Request ID", value=request_id, disabled=True)
        # Semantic View
        semantic_view_input = st.text_input("Semantic View", value=semantic_view, disabled=True)
        # 質問内容
        prompt_input = st.text_area("質問内容", value=prompt, disabled=True)
        # Feedback の内容
        content = st.text_area("フィードバック【必須】", placeholder="フィードバックを入力してください")
        # 送信日時
        created_at = datetime.now(ZoneInfo("Asia/Tokyo")).strftime("%Y-%m-%d %H:%M:%S")

        submit_button = st.form_submit_button("送信")

        if submit_button:
            feedback_df_pd = pd.DataFrame(
                [
                    [id, request_id_input, user_name_input, content, semantic_view_input, created_at]
                ],
                columns=["id", "request_id", "user_name", "content", "semantic_view", "created_at"],
            )
            if not content:
                st.error("フィードバックを入力してください")
                st.stop()

            try:
                session.write_pandas(feedback_df_pd, "<データベース名>.<スキーマ名>.<テーブル名>", quote_identifiers=False)
                send_feedback_to_slack(id, request_id_input, user_name_input, prompt_input, content, semantic_view, created_at)
                logger.info(f"id: {id}, request_id: {request_id_input}, user_name: {user_name_input}, prompt_input: {prompt_input}, content: {content}, semantic_view: {semantic_view}, created_at: {created_at}")
                st.success("送信されました!フィードバックありがとうございます。")

            except Exception as e:
                logger.error(f"処理に失敗しました: {e}")

feedback
slack

History - 履歴ページ

各ユーザーがこれまでに投げかけた質問と、生成された SQL を確認できます。
従来はチャットページを閉じると内容がリフレッシュされ、過去の会話履歴を参照できないという課題がありました。そのため再度同じ質問を投げる必要があり、無駄なコストが発生してしまうといった問題が生じていました。本ページにより、コスト面での懸念を解消できると考えています。

history1
history2

Definition - Semantic Viewの確認

Semantic View がどのように定義されているのか (=DDL) を確認できます。

本ページは生成された SQL やクエリ結果の解釈に役立てることができます。例えば、ユーザーが質問を投げて意図した回答を得られなかった際、Semantic View の定義を確認することでユーザー自身で原因を突き止めやすくなります。また、これにより、 Semantic View を開発・管理するチームに対して、フィードバックがより行いやすくなることを期待しています。

実装

def build_definition_page():
    st.header("Semantic View 定義")

    st.session_state.ddl = None
    st.session_state.selected_semantic_view = None

    with st.form("definition_form", clear_on_submit=True):
        selected_semantic_view = st.selectbox(
            "Semantic View",
            get_available_semantic_views(),
            index=None,
            placeholder="選択してください",
        )
        submit_button = st.form_submit_button("確認")

        if submit_button:
            if selected_semantic_view:
                st.session_state.selected_semantic_view = selected_semantic_view
                query = f"""
                SELECT GET_DDL(
                    'SEMANTIC_VIEW',
                    '<データベース名>.<スキーマ名>.{selected_semantic_view}'
                )
                """
                result = execute_query(query=query)
                ddl = result[0][0]
                st.session_state.ddl = ddl
            else:
                st.warning("Semantic View を選択してください")

    if st.session_state.ddl:
        st.markdown(f"Selected Semantic View : `{st.session_state.selected_semantic_view}`")
        st.markdown(f"```sql\n{st.session_state.ddl}\n```", unsafe_allow_html=True)

feedback

さいごに

Cortex Analyst を用いたチャットアプリケーションを Streamlit で構築した方法について書いてみました。

社内のユーザーにアプリを信頼して利用してもらうためには、Cortex Analyst の生成する回答精度を高めることが重要です。そのため、Semantic View の改善や質問プロンプトの研究などを今後実施していく予定です。
また、今回は Cortex Analyst をメインに扱いましたが、Cortex Search への拡張や Snowflake Intelligence の検証も取り組んでいきたいと考えています。

最後まで読んで頂き、ありがとうございました。

参考

脚注
  1. Cortex Analystとは自然言語での問い合わせに対して、SQLクエリを生成してデータを抽出する Text to SQL であり、Snowflake の LLM 機能の一つです。 ↩︎ ↩︎

  2. Streamlit は Python 製のインタラクティブな Web アプリケーションを簡単に開発できるフレームワークです。特に、データサイエンスや機械学習、BI・ダッシュボードといった領域で広く利用されています
    https://docs.snowflake.com/ja/user-guide/snowflake-cortex/cortex-analyst
    https://streamlit.io/ ↩︎

Discussion