Cortex Analyst と Streamlit を用いた Text2SQL アプリ作成
記事の概要
こんにちは。シンプルフォーム株式会社でインターンをしています、長井です。
この記事では、Cortex Analyst [1] を用いたチャットアプリケーションを Streamlit [2] で構築した事例を紹介します。具体的には本アプリケーションで実装した以下の機能を説明します。
- チャット機能
- チャット
- リクエスト数のカウント
- Semantic View の選択
- 直前までの質問を参考にした回答の生成
- 補助機能
- フィードバックフォーム
- チャット履歴ページ
- Semantic View の定義確認ページ
背景・課題
当社では社内向けのデータ分析基盤として Snowflake 基盤を提供しています。Redash を主な BI ツールとして非エンジニアを含む社内メンバーにより SQL を用いた分析が行われてきました。
データ分析基盤の利用は以前より普及していたものの、SQL の文法や当社固有のデータ構造に関する知識が分析時のハードルになっていました。また、今後、扱うデータの種類や新規参画メンバーが増えていくことも見込まれており、このような問題がより顕著になっていくことが想定されました。
一部のメンバーは ChatGPT のようなツールを SQL クエリ開発の補助として利用していましたが、当社のデータ構造に関する知識を持たないため、活用には限界がありました。
アーキテクチャ
本アプリケーションのチャット部分は、Snowflake が提供する Cortex Analyst の API、アプリケーション部分は Streamlit を用いて構築しました。
インフラアーキテクチャに関してはこちらの記事をご覧ください。
実装(チャット機能)
まず、チャット機能の実装について説明していきます。
一般的なチャットアプリケーション同様、画面メイン下部の「質問を入力してください」と書かれた入力欄から質問を入力します。また、Cortex Analyst が生成した SQL やその実行結果を同じページに表示します。
主要な実装に関して、以下の記事などを参考にさせていただきました。
Cortex Analyst API の呼び出し
ユーザーによるテキスト入力として受け取った質問を元に Cortex Analyst API に対するリクエストボディを構成し、POST リクエストとして送信します。[1:1]
def send_prompt(prompt: str) -> dict[str, any]:
"""
APIの呼び出しとレスポンスの取得
"""
request_body = {
"messages": [{
"role": "user",
"content": [{"type": "text", "text": prompt}]
}],
"semantic_view": f"<データベース名>.<スキーマ名>.{st.session_state.semantic_view}"
}
st.session_state.current_request = request_body
# ※ build_prompt_text の詳細は後述
request_body["messages"][0]["content"][0]["text"] = build_prompt_text(prompt)
# Cortex Analyst API にリクエスト送信
resp = requests.post(
url=f"https://{st.session_state.conn.host}/api/v2/cortex/analyst/message",
json=request_body,
headers={
"Authorization": f'Snowflake Token="{st.session_state.conn.rest.token}"',
"Content-Type": "application/json"
},
stream=True,
verify=False
)
request_id = resp.json()["request_id"]
logger.info(f"request_id: {request_id}")
if resp.status_code < 400:
return resp.json()
else:
raise Exception(
f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
)
チャットページの構築
上記の send_prompt()
を呼び出して、チャットページを以下のように構築しています。
def process_message(prompt: str) -> None:
"""
メッセージの加工
"""
st.session_state.prompt = prompt
message_user = {
"role": "user",
"content": [{"type": "text", "text": prompt}],
"created_at": get_current_timestamp()
}
st.session_state.messages.append(message_user)
with st.chat_message("user"):
st.write(prompt)
with st.chat_message("assistant"):
with st.spinner("考え中......"):
response = send_prompt(prompt=prompt)
request_id = response["request_id"]
content = response["message"]["content"]
display_content(request_id=request_id, content=content)
message_assistant = {
"role": "assistant",
"content": content,
"request_id": request_id,
"created_at": get_current_timestamp()
}
st.session_state.messages.append(message_assistant)
return
def build_chat_page():
"""
チャットページの構築
"""
# セッション変数の初期化
if "messages" not in st.session_state:
st.session_state.messages = []
# 既存メッセージの表示
for idx, message in enumerate(st.session_state.messages):
with st.chat_message(message["role"]):
display_content(
content=message["content"],
request_id=message.get("request_id"),
message_index=idx
)
# ユーザー入力の受付
if user_input := st.chat_input(
"質問を入力してください",
disabled=(selected_semantic_view is None)
):
# st.session_stateの初期化
if selected_semantic_view:
st.session_state.semantic_view = selected_semantic_view.lower()
st.session_state.current_request = None
st.session_state.current_response = None
process_message(prompt=user_input)
return
サイドバーの設定項目
サイドバーで設定可能な ①~③ の項目の実装を説明します。
① Request Count - リクエスト数のカウント
ユーザーが一日に送信した質問回数を集計します。Cortex Analyst のコストを適切に管理するために Google OAuth 認証を実装し、ユーザーごとに一日あたりの質問上限を設けています。
def get_today_requests_count(user_name: str) -> int:
"""
ログイン中のユーザーの今日のリクエスト数のカウント
"""
current_date = datetime.now(ZoneInfo("Asia/Tokyo")).strftime("%Y-%m-%d")
query = f"""
SELECT COUNT(*) AS count
FROM <データベース名>.<スキーマ名>.<テーブル名>
WHERE
user_name = '{user_name}'
AND to_date(created_at) = '{current_date}'
;
"""
result = execute_query(query)
return result[0][0]
def display_sidebar_requests_count(chat_requests_count_limit: int = CHAT_REQUEST_COUNT_LIMIT_DEFAULT):
"""
質問回数の表示
"""
st.session_state.requests_count = get_today_requests_count(st.user.email)
st.sidebar.markdown(f"Request Count : {st.session_state.requests_count} / {chat_requests_count_limit}")
return
② Semantic Views - セマンティックビューの選択
Cortex Analyst が参照する Semantic View をユーザーが指定するためのタブです。
def get_available_semantic_views() -> list[str]:
"""
参照可能なセマンティックビューの取得
"""
query = """SHOW SEMANTIC VIEWS IN SCHEMA <データベース名>.<スキーマ名>"""
result = execute_query(query=query)
return [row[1].lower() for row in result]
with st.sidebar:
"""
セマンティックビューの選択
"""
selected_semantic_view = st.selectbox(
"Semantic View",
get_available_semantic_views(),
index=None,
placeholder="選択してください"
)
③ Context Count - 直前までの質問を参考にした回答の生成
ユーザーの質問に指示語(例:「それ」「前の条件」など)が含まれている場合、直前までの質問内容を参照して SQL を生成する設定です。参照する過去の質問数は、ユーザーが任意に指定できます。
以下の実装により、直前までの質問を含めて Cortex Analyst API にリクエストを送れるようにしました。
def get_chat_history():
"""
これまでの会話の取得
"""
# ユーザーがこれまでに送信したメッセージのインデックスを抽出
user_question_list_num = [i for i, d in enumerate(st.session_state.messages) if d["role"] == "user"]
# ユーザーが投げた質問の総数をカウント
question_count = len(user_question_list_num)
# 参照する履歴の数が質問数より少ない場合、直近の質問から context_count 件分だけを切り出す
if st.session_state.context_count < question_count:
return st.session_state.messages[user_question_list_num[question_count - st.session_state.context_count - 1] : user_question_list_num[question_count - 1]]
# 参照する履歴の数が質問数以上の場合は、最初の質問から直前の質問までをすべて返す
else:
return st.session_state.messages[0 : user_question_list_num[question_count - 1]]
def build_prompt_text(prompt: str) -> str:
"""
会話履歴 + 接続指示文 + 今回の質問 をまとめる
"""
instruction_statement = "下記の質問に回答してください。回答を作成するにあたって前述した情報を参考にしてください。"
return "\n".join([
get_chat_history(),
instruction_statement,
prompt,
])
def send_prompt(prompt: str) -> dict[str, any]:
"""
APIの呼び出しとレスポンスの取得
"""
# ...略...
# 会話履歴 + 接続文 + 今回の質問 をまとめる
request_body["messages"][0]["content"][0]["text"] = build_prompt_text(prompt)
# ...略...
実装(補助機能)
Feedback - フィードバック
生成された SQL・結果や Semantic View に対してのフィードバックをユーザーから共有してもらうためのフォームです。Snowsight 上の Cortex Analyst UI にも Feedback はありますが、Good/Bad しか送信できずコメントなどを得られないため、より詳細なニーズを把握することを目的に設置しています。
チャット生成後、チャットページ上の Feedback ボタンからフィードバックページへ遷移すると、写真のように、ユーザー名・Request ID・Semantic View・質問内容が自動で入力されます。また、チャット開発チームにフィードバックが届いた際には、Slack に通知が飛ぶよう実装しました。
実装
def send_feedback_to_slack(id: str, request_id: str, user_name: str, prompt: str, content: str, semantic_view: str, created_at: str):
"""
Slackにフィードバックを共有
"""
slack_client = get_slack_client()
SLACK_CHANNEL_ID = "slackのチャンネルID"
env = os.environ["TARGET_ENV"]
header_text = "フィードバックが届きました!"
header_text += f"【{env}】" if env != "prod" else ""
try:
response = slack_client.chat_postMessage(
channel=SLACK_CHANNEL_ID,
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": header_text
}
}
],
attachments=[
{
"color": "#26a69a",
"blocks": [
{"type": "section", "text": {"type": "mrkdwn", "text": f"*ID*\n{id}"}},
{"type": "divider"},
{"type": "section", "fields": [
{"type": "mrkdwn", "text": f"*ユーザー名*\n{user_name}"},
{"type": "mrkdwn", "text": f"*作成日時*\n{created_at}"},
{"type": "mrkdwn", "text": f"*Request ID*\n{request_id}"},
{"type": "mrkdwn", "text": f"*Semantic View*\n`{semantic_view}`"},
]},
{"type": "divider"},
{"type": "section", "text": {"type": "mrkdwn", "text": f"*フィードバック内容*\n{content}"}},
{"type": "divider"},
{"type": "section", "text": {"type": "mrkdwn", "text": f"*質問内容*\n{prompt}"}},
]
}
]
)
logger.info(f"Message sent: {response['ts']}")
except SlackApiError as e:
logger.error(f"Error posting message: {e.response['error']}")
raise e
def build_feedback_page():
"""
フィードバック用のフォーム
"""
st.header("フィードバック送信フォーム")
st.session_state.conn, session = get_session()
with st.form("feedback_form", clear_on_submit=True):
id = str(uuid.uuid4())
# user_name
user_name_input = st.text_input("ユーザー名", value=st.user.email, disabled=True)
request_id, semantic_view, prompt = None, None, None
# request_id
if "request_id" in st.session_state:
request_id = st.session_state.request_id
if request_id:
semantic_view = st.session_state.requests[request_id]["semantic_view"]
prompt = st.session_state.requests[request_id]["prompt"]
# Request ID
request_id_input = st.text_input("Request ID", value=request_id, disabled=True)
# Semantic View
semantic_view_input = st.text_input("Semantic View", value=semantic_view, disabled=True)
# 質問内容
prompt_input = st.text_area("質問内容", value=prompt, disabled=True)
# Feedback の内容
content = st.text_area("フィードバック【必須】", placeholder="フィードバックを入力してください")
# 送信日時
created_at = datetime.now(ZoneInfo("Asia/Tokyo")).strftime("%Y-%m-%d %H:%M:%S")
submit_button = st.form_submit_button("送信")
if submit_button:
feedback_df_pd = pd.DataFrame(
[
[id, request_id_input, user_name_input, content, semantic_view_input, created_at]
],
columns=["id", "request_id", "user_name", "content", "semantic_view", "created_at"],
)
if not content:
st.error("フィードバックを入力してください")
st.stop()
try:
session.write_pandas(feedback_df_pd, "<データベース名>.<スキーマ名>.<テーブル名>", quote_identifiers=False)
send_feedback_to_slack(id, request_id_input, user_name_input, prompt_input, content, semantic_view, created_at)
logger.info(f"id: {id}, request_id: {request_id_input}, user_name: {user_name_input}, prompt_input: {prompt_input}, content: {content}, semantic_view: {semantic_view}, created_at: {created_at}")
st.success("送信されました!フィードバックありがとうございます。")
except Exception as e:
logger.error(f"処理に失敗しました: {e}")
History - 履歴ページ
各ユーザーがこれまでに投げかけた質問と、生成された SQL を確認できます。
従来はチャットページを閉じると内容がリフレッシュされ、過去の会話履歴を参照できないという課題がありました。そのため再度同じ質問を投げる必要があり、無駄なコストが発生してしまうといった問題が生じていました。本ページにより、コスト面での懸念を解消できると考えています。
Definition - Semantic Viewの確認
Semantic View がどのように定義されているのか (=DDL) を確認できます。
本ページは生成された SQL やクエリ結果の解釈に役立てることができます。例えば、ユーザーが質問を投げて意図した回答を得られなかった際、Semantic View の定義を確認することでユーザー自身で原因を突き止めやすくなります。また、これにより、 Semantic View を開発・管理するチームに対して、フィードバックがより行いやすくなることを期待しています。
実装
def build_definition_page():
st.header("Semantic View 定義")
st.session_state.ddl = None
st.session_state.selected_semantic_view = None
with st.form("definition_form", clear_on_submit=True):
selected_semantic_view = st.selectbox(
"Semantic View",
get_available_semantic_views(),
index=None,
placeholder="選択してください",
)
submit_button = st.form_submit_button("確認")
if submit_button:
if selected_semantic_view:
st.session_state.selected_semantic_view = selected_semantic_view
query = f"""
SELECT GET_DDL(
'SEMANTIC_VIEW',
'<データベース名>.<スキーマ名>.{selected_semantic_view}'
)
"""
result = execute_query(query=query)
ddl = result[0][0]
st.session_state.ddl = ddl
else:
st.warning("Semantic View を選択してください")
if st.session_state.ddl:
st.markdown(f"Selected Semantic View : `{st.session_state.selected_semantic_view}`")
st.markdown(f"```sql\n{st.session_state.ddl}\n```", unsafe_allow_html=True)
さいごに
Cortex Analyst を用いたチャットアプリケーションを Streamlit で構築した方法について書いてみました。
社内のユーザーにアプリを信頼して利用してもらうためには、Cortex Analyst の生成する回答精度を高めることが重要です。そのため、Semantic View の改善や質問プロンプトの研究などを今後実施していく予定です。
また、今回は Cortex Analyst をメインに扱いましたが、Cortex Search への拡張や Snowflake Intelligence の検証も取り組んでいきたいと考えています。
最後まで読んで頂き、ありがとうございました。
参考
- https://zenn.dev/tree_and_tree/articles/e1c9dc57cdde28 - Zenn
- https://zenn.dev/jay_explolar/articles/df9bf248ab40a9 - Zenn
- https://zenn.dev/tree_and_tree/articles/e1c9dc57cdde28 - Zenn
- https://nextpublishing.jp/book/18323.html - Streamlit入門 本
- https://zenn.dev/jay_explolar/articles/df9bf248ab40a9 - Zenn
-
Cortex Analystとは自然言語での問い合わせに対して、SQLクエリを生成してデータを抽出する Text to SQL であり、Snowflake の LLM 機能の一つです。 ↩︎ ↩︎
-
Streamlit は Python 製のインタラクティブな Web アプリケーションを簡単に開発できるフレームワークです。特に、データサイエンスや機械学習、BI・ダッシュボードといった領域で広く利用されています
https://docs.snowflake.com/ja/user-guide/snowflake-cortex/cortex-analyst
https://streamlit.io/ ↩︎
Discussion