❄️

Streamlit in Snowflake (SiS) で利用ユーザー毎にパーソナライズする

2024/08/31に公開

はじめに

2024年7月末に Streamlit in Snowflake で Current_User と行アクセスポリシーが利用できるようになりました。

このアップデートの嬉しいところは、アプリケーションに接続した Snowflake ユーザーを識別し、ユーザー毎に処理を変えることが簡単かつセキュアに行うことができるようになったことです。

アプリケーションへのログインの仕組みやユーザー管理テーブルなどを独自に用意しなくても、例えば以下のように1つのアプリケーションをユーザー毎にパーソナライズすることが可能です。

  • ユーザー毎にアプリケーションの表示を変える
  • ユーザー毎にパーソナライズされた分析ダッシュボードを用意する
  • 行アクセスポリシーを用いてユーザー毎に異なるクエリ出力結果を取得する (Enterprise Edition 以上)

今回はユーザー個人の情報を表示する簡単な ToDo リストを作ってみたいと思います。

機能概要

実現したいこと

  • 1つのアプリをシェアして個人の ToDo リストを管理できる
  • 行アクセスポリシーを用いて他の人の ToDo が表示されないようにする

完成イメージ


ユーザー TKANNO の画面


ユーザー TARO の画面

前提条件

  • Snowflake アカウント
    • 行アクセスポリシーを利用する場合は Enterprise Edition のアカウントが必要です

注意事項

  • Streamlit in Snowflake は所有者権限で実行されるため、Current_Role は Streamlit in Snowflake のアプリケーションロールと同一になります。(そのため、パーソナライズには利用できません)

手順

ToDo リストを保存するテーブルを作成する

ワークシートから以下のコマンドを実行します。

-- ToDoリストテーブルを作成する
CREATE TABLE IF NOT EXISTS todo_list (
    id INT AUTOINCREMENT,
    task VARCHAR(255),
    status VARCHAR(20),
    due_date DATE,
    completed_date DATE,
    owner VARCHAR(50)
);

行アクセスポリシーを作成する

todo_list テーブルの owner と Streamlit in Snowflake のアプリケーションに接続した current_user が一致する行を返すポリシーです。

ワークシートから以下のコマンドを実行します。

-- 行アクセスポリシーを作成する
CREATE ROW ACCESS POLICY IF NOT EXISTS todo_row_access_policy
    AS (owner VARCHAR) RETURNS BOOLEAN ->
        owner = CURRENT_USER();

行アクセスポリシーを適用する

ワークシートから以下のコマンドを実行します。

-- 行アクセスポリシーを適用する
ALTER TABLE todo_list ADD ROW ACCESS POLICY todo_row_access_policy ON (owner);

これでワークシートでの作業は終了です。

Streamlit in Snowflake のアプリを実行

新規で Streamlit in Snowflake のアプリを作成し、以下コードをコピー&ペーストで貼り付けて完了です。

14行目が現在アプリに接続しているユーザーを文字列として取得している箇所です。

import streamlit as st
from snowflake.snowpark.context import get_active_session
import pandas as pd

# レイアウトの設定
st.set_page_config(
    layout="wide"
)

# Snowflakeセッションの取得
session = get_active_session()

# カレントユーザーの取得
current_user = session.sql("SELECT CURRENT_USER()").collect()[0][0]

# ToDoリストの取得
def get_todo_list():
    return session.table("todo_list").to_pandas()

# タスクの追加または更新
def upsert_task(task_id, task, status, due_date, completed_date):
    due_date_sql = f"'{due_date}'" if due_date else "NULL"
    completed_date_sql = f"'{completed_date}'" if completed_date else "NULL"
    
    if task_id:
        session.sql(f"""
        UPDATE todo_list
        SET task = '{task}', status = '{status}', due_date = {due_date_sql}, completed_date = {completed_date_sql}
        WHERE id = {task_id}
        """).collect()
    else:
        session.sql(f"""
        INSERT INTO todo_list (task, status, owner, due_date, completed_date)
        VALUES ('{task}', '{status}', '{current_user}', {due_date_sql}, {completed_date_sql})
        """).collect()

# タスクの削除
def delete_task(task_id):
    session.sql(f"DELETE FROM todo_list WHERE id = {task_id}").collect()

# メイン関数
def main():
    st.title(f"{current_user} の個人ダッシュボード")

    # タスク一覧
    st.subheader(f"{current_user} の ToDo リスト")
    todo_df = get_todo_list()

    # ヘッダーの表示
    col1, col2, col3, col4, col5 = st.columns([3, 2, 2, 2, 2])
    col1.write("タスク")
    col2.write("状態")
    col3.write("期限")
    col4.write("完了日")
    col5.write("削除")

    # タスク一覧の表示
    for _, row in todo_df.iterrows():
        col1, col2, col3, col4, col5 = st.columns([3, 2, 2, 2, 2])
        
        with col1:
            task = st.text_input("task", value=row['TASK'], key=f"task_{row['ID']}", label_visibility="collapsed")
        
        with col2:
            status = st.selectbox("status", ["Pending", "In Progress", "Completed"], index=["Pending", "In Progress", "Completed"].index(row['STATUS']), key=f"status_{row['ID']}", label_visibility="collapsed")
        
        with col3:
            due_date = st.date_input("due_date", value=pd.to_datetime(row['DUE_DATE']).date() if pd.notna(row['DUE_DATE']) else None, key=f"due_date_{row['ID']}", label_visibility="collapsed")
        
        with col4:
            completed_date = st.date_input("comp_date", value=pd.to_datetime(row['COMPLETED_DATE']).date() if pd.notna(row['COMPLETED_DATE']) else None, key=f"completed_date_{row['ID']}", label_visibility="collapsed")

        with col5:
            if st.button("削除", key=f"delete_{row['ID']}"):
                delete_task(row['ID'])
                st.experimental_rerun()

        # 値が変更されたらすぐにデータベースを更新
        if task != row['TASK'] or status != row['STATUS'] or due_date != row['DUE_DATE'] or completed_date != row['COMPLETED_DATE']:
            upsert_task(row['ID'], task, status, due_date, completed_date)
            st.experimental_rerun()

    # 新しいタスクの追加
    st.subheader("新しいタスクの追加")
    new_task = st.text_input("新しいタスク")
    new_status = st.selectbox("状態", ["Pending", "In Progress", "Completed"])
    new_due_date = st.date_input("期限")
    # new_completed_date = st.date_input("完了日")
    if st.button("追加"):
        upsert_task(None, new_task, new_status, new_due_date, None)
        st.success("新しいタスクを追加しました")
        st.experimental_rerun()

# メイン処理
if __name__ == "__main__":
    main()

最後に

いかがだったでしょうか? Current_User と行アクセスポリシーを組み合わせることで、シンプルな手順でユーザー毎にパーソナライズしたセキュアなアプリケーションが作れるため、アイディア次第で更に使い勝手の良いアプリケーションを用意できるようになりました。

発展的なアイディアとしては、Streamlit in Snowflake 経由でのテーブルへの書き込み時に署名として Current_User の情報を付与したり、パーソナライズした情報を Cortex LLM にコンテキストとして渡してパーソナルアシスタントとして活用するなどが考えられます。

是非皆様も Current_User を使って面白いことにチャレンジしてみてください!

宣伝

SNOWFLAKE WORLD TOUR TOKYO のオンデマンド配信中!

Snowflake の最新情報を知ることができる大規模イベント『SNOWFLAKE WORLD TOUR TOKYO』が2024/9/11-12@ANAインターコンチネンタル東京で開催されました。
現在オンデマンド配信中ですので数々の最新のデータ活用事例をご覧ください。
また私が登壇させていただいた『今から知ろう!Snowflakeの基本概要』では、Snowflakeのコアの部分を30分で押さえられますので、Snowflake をイチから知りたい方、最新の Snowflake の特徴を知りたい方は是非ご視聴いただければ嬉しいですmm

https://www.snowflake.com/events/snowflake-world-tour-tokyo/

X で Snowflake の What's new の配信してます

X で Snowflake の What's new の更新情報を配信しておりますので、是非お気軽にフォローしていただければ嬉しいです。

日本語版

Snowflake の What's New Bot (日本語版)
https://x.com/snow_new_jp

English Version

Snowflake What's New Bot (English Version)
https://x.com/snow_new_en

変更履歴

(20240831) 新規投稿
(20240926) 宣伝欄の修正

Discussion