Streamlit in Snowflakeの設計で意識すると良さそうなポイント

2024/10/22に公開

Streamlit in Snowflake (SiS) でアプリケーションを開発する際に、設計で気をつけて良かった点、やらなくても良さそうな点についてまとめました。テンプレートが必要な方は、記事の最後をご覧ください。

やった方がよいこと

ローカル環境のセットアップ

SiSでは、アプリの接続がアクティブな間はウェアハウスが起動した状態に保たれます。開発作業中にセッションを繋いだままにすると、その分だけ課金が発生し続けることになります。開発中はローカル上でStreamlitアプリを動作させ、不要なウェアハウスの課金を避けられるようにしましょう。

たとえば以下のように、クエリ実行用の共通関数を定義し、環境によってセッションの取得方法を切り替えることで、ローカルからであってもSiSと同じロジックでデータ取得することができます。
こちらの記事を参考にさせていただきました。全体をクロージャ化し、ローカル環境でもSnowparkを利用するようにして、環境差分を減らしています。)

utils.py
def create_execute_query():
    """create_execute_query
    クエリ実行用関数 execute_query を作成するクロージャ。

    ### execute_query
    execute_queryは一つのクエリを受け取り、実行します。

    Examples:
        >>> execute_query = create_execute_query()
        >>> execute_query("select * from table where id = ?", params=(1,))
    """
    try:
        # streamlit in snowflakeの内部ならば、snowparkセッションを使ってクエリ実行
        session = get_active_session()
    except SnowparkSessionException as e:
        if not os.path.isfile('.streamlit/secrets.toml'):
            raise e
        secrets = st.secrets["snowflake"]
        connection_parameters = {
            "user": secrets["user"],
            "private_key": base64.b64decode(secrets["private_key"]),
            "account": secrets["account"],
            "warehouse": secrets["warehouse"],
            "database": secrets["database"],
            "schema": secrets["schema"]
        }
        session = Session.builder.configs(connection_parameters).create()
        # streamlit in snowflakeの外部(開発中のローカル環境など)ならば、python-connectorを使ってクエリ実行
    finally:
        def execute_query(query, params=()):
            return session.sql(query, params=params).to_pandas()
        return execute_query

environment.ymlとsnowflake.ymlの作成

SiSのアプリ設定は、environment.ymlとsnowflake.ymlファイルにより定義できます。アプリケーションの環境定義をバージョン管理できます。

environment.ymlの例
dependencies:
- streamlit=1.35.0
- pandas
snowflake.ymlの例
definition_version: 1
streamlit:
  name: my_streamlit_app
  stage: my_streamlit_app_stage
  query_warehouse: my_wh
  main_file: main.py
  env_file: environment.yml
  pages_dir: pages/
  additional_source_files:
  - utils.py

クエリ結果のキャッシュ

SiSでも、Streamlitのキャッシュ機能を活用してクエリ結果を再利用することができます。クエリ結果をキャッシュすることで、無駄なリソース消費を防ぎ、アプリのパフォーマンスを向上させられます。

以下、クエリの実行を行うための関数をアプリケーション中で用意して、その関数の結果をキャッシュする例です。

services.py
import streamlit as st

from utils import create_execute_query

@st.cache_data # この関数の結果をキャッシュする
def get_data():
    """
    Snowflakeからのデータ取得
    """
    execute_query = create_execute_query()
    query = "select * from sample_table"
    return execute_query(query)

main.py
import streamlit as st

from services import get_data

# serviceの関数を使用してデータ取得
df = get_data()
st.dataframe(df)

SnowCLIからデプロイ

ローカル環境で開発したコードは、SnowCLIからデプロイができます。

https://docs.snowflake.com/en/developer-guide/snowflake-cli/command-reference/streamlit-commands/deploy

Githubを使用してコード管理をしている場合、公式ドキュメントのGithub Actionを使ったデプロイフローも参考になります。

https://docs.snowflake.com/en/developer-guide/streamlit/create-streamlit-snowflake-cli

場合によってはやった方がよいこと

小規模なアプリの場合はここまでやる必要はないと思ったので、あえて「やった方がよいこと」とは分けています。

placeholderによるレイアウトの作成

Streamlitのレイアウトは、withを使った表記法が推奨されています。

import streamlit as st

col1, col2, col3 = st.columns(3)

with col1:
    st.header("A cat")
    st.image("https://static.streamlit.io/examples/cat.jpg")

with col2:
    st.header("A dog")
    st.image("https://static.streamlit.io/examples/dog.jpg")

with col3:
    st.header("An owl")
    st.image("https://static.streamlit.io/examples/owl.jpg")

ですが、withでレイアウトを構成する場合、表示するデータの取得・作成ロジックと、レイアウトを決定するためのロジックが混ざってしまいます。

st.emptyを用いて、プレースホルダー要素を使ったレイアウトだけを事前定義することで、データのロジックとレイアウトをある程度分離できます。

import streamlit as st

# レイアウトを定義
col1, col2, col3 = st.columns(3)

col1.header("A cat")
col1_image_placeholder = col1.empty()
col2.header("A dog")
col2_image_placeholder = col2.empty()
col2.header("An owl")
col2_image_placeholder = col3.empty()

# データの取得方法を定義
col1_image_placeholder.image("https://static.streamlit.io/examples/cat.jpg")
col2_image_placeholder.image("https://static.streamlit.io/examples/dog.jpg")
col3_image_placeholder.image("https://static.streamlit.io/examples/owl.jpg")

with表記による実装がStreamlit公式の推奨です。基本的にはwithで書いた方が書き味は良いと思います。ただ、状況によってはwith下のネストがかなり深くなってしまう場合があります(たとえば、stateによる条件分岐が重なる場合です)。こういった時はemptyの活用は有効かもしれません。

container = st.container()
with container:
    col1, col2 = st.columns(2)
    with col1:
        with st.expander():
            if "key" in st.session_state:
                # このように、どこまでもネストが深くなりやすい

また、データ取得が長時間かかる場合では、先にページ要素のレンダリングだけを済ませることで、UXの改善も図ることができると思います。

モデルクラスの作成

プレースホルダーを使ってレイアウトとロジックを分離したとしても、まだロジックが大きすぎる場合があります。「データ取得のロジック」と「表示する形にデータを編集するロジック」のそれぞれが大きいケースです。

よくあるケースがDataframeの加工です。表示したいデータの形に合わせて何度もクエリを投げるのは非効率なため、一度だけ投げたクエリの結果をpandasで整形して、要素の表示やグラフ描画を行いたいシーンは多いと思います。

raw_df = execute_query(query) # データの取得

# 表示したいデータの絞り込み
view_df = raw_df[raw_df["表示対象"] == "yes"]

# 条件によって表示するデータを出しわけ
if len(view_df) > 0:
    st.metric("データ件数", "0")
    st.metric("最大", "0")
    st.metric("最小", "0")
    st.metric("平均", "0")
elif len(view_df) > 1:
    st.metric("データ件数", len(view_df))
    st.metric("最大", view_df["VALUE"].max())
    st.metric("最小", view_df["VALUE"].min())
    st.metric("平均", view_df["VALUE"].mean())

# グラフにするためにデータフレームを加工
pivot_df = view_df.pivot(index="DATE", columns="KIND")["VALUE"]
st.bar_chart(pivot_df)

このような場合、クエリの結果をモデルクラスに格納し、データ操作をモデルに集約することで、画面の実装をシンプルにできます。

@dataclass
class Model:
    """Model
    データ加工ロジックを集約するクラス
    """"
    raw_df: pd.DataFrame

    def filter_data(self) -> pd.DataFrame:
        return self.raw_df[self.raw_df["表示対象"] == "yes"]

    def get_metrics(self, view_df: pd.DataFrame) -> dict:
        if len(view_df) == 0:
            return {"件数": 0, "最大": 0, "最小": 0, "平均": 0}
        else:
            return {
                "件数": len(view_df),
                "最大": view_df["VALUE"].max(),
                "最小": view_df["VALUE"].min(),
                "平均": view_df["VALUE"].mean()
            }

    def get_pivot_data(self, view_df: pd.DataFrame) -> pd.DataFrame:
        return view_df.pivot(index="DATE", columns="KIND")["VALUE"]
raw_df = execute_query(query)

# モデルに生データを渡して加工
model = Model(raw_df)
view_df = model.filter_data()
metrics = model.get_metrics(view_df)
pivot_df = model.get_pivot_data(view_df)

# メトリクス表示
st.metric("データ件数", metrics["件数"])
st.metric("最大", metrics["最大"])
st.metric("最小", metrics["最小"])
st.metric("平均", metrics["平均"])

# グラフ表示
st.bar_chart(pivot_df)

なお、StreamlitでdataclassやEnumを使用する際、公式ドキュメントでも解説されているように、セッション中で複数回クラスが定義されることで、予期しない動作を起こすことがあります。多くの場合、モデルをメインのpyファイルとは別のファイルにすることで回避できると思いますので、モデルクラスを定義するためのpyファイルは別で用意しましょう。

https://docs.streamlit.io/develop/concepts/design/custom-classes

やらなくてもよさそうなこと

ここからは、Streamlitで実装するときにはやらなくてよいと個人的に感じたプラクティスを紹介します。

過剰なMVC/MVVMパターンの適用

前述の「やった方が良い場合がありそうなこと」で紹介したパターンはどれも、MVCやMVVMで語られてきた「ビューとモデルの分離」を目指した設計パターンです。しかし、Streamlitでは、MVCやMVVMを意識しすぎた実装には意味が薄いと感じています。

前提として、デザインパターンの目的は「責務の分解」や「再利用性の向上」の実現です。これら設計パターンを採用するモチベーションは「複数の画面で行われている似たようなデータ操作を一箇所にまとめたい」とか「データ操作のロジックには手を入れずに画面デザインだけを変えられるように、ロジックと画面の定義を別の場所にしたい」といった背景があるはずです。

しかし、Streamlitはそもそもがシンプルなデータ分析や可視化・簡単な機械学習モデルのデプロイなどを簡単に行うことを目的としたフレームワークであり、アドホックな利用がメインのユースケースです。ワークフローのような複雑な業務ロジックを表現したり、ロジックをかえずにUIをコロコロと調整するシーンは考えにくいです。

具体、過剰な設計例を挙げます。

セッションの集約

SPAでWebページの構築をしたことがある方なら、ReduxやVuex、Piniaなどのような状態管理の機構を使用したことがあると思います。そして、Streamlitのsession_stateを見て、これを一つのモジュールにしたいと考えるはずです。

import streamlit as st

class SessionStateManager:
    def __init__(self):
        # 初期値がない場合に初期化
        if 'state' not in st.session_state:
            st.session_state.state = {
                'count': 0,
                'user_name': '',
                'is_logged_in': False
            }

    def get_state(self):
        """状態を取得"""
        return st.session_state.state

    def set_state(self, key, value):
        """状態の特定のキーを更新"""
        st.session_state.state[key] = value

    def increment_count(self):
        """カウントを増加"""
        st.session_state.state['count'] += 1

    def reset_state(self):
        """全体の状態をリセット"""
        st.session_state.state = {
            'count': 0,
            'user_name': '',
            'is_logged_in': False
        }
    
    def set_user(self, user_name):
        """ユーザー名をセット"""
        st.session_state.state['user_name'] = user_name
        st.session_state.state['is_logged_in'] = True

    def logout(self):
        """ログアウト処理"""
        st.session_state.state['user_name'] = ''
        st.session_state.state['is_logged_in'] = False

stateの管理や、ミューテーションによるstateへのアクセスを統一でき、いい設計に見えます。しかし、個人的には過剰だと思います。

状態管理ライブラリが目指しているのは「アプリケーション全体での状態管理」です。どこかの画面で更新されたステートが、別の画面の振る舞いに影響するようなシチュエーションです。どこで誰がどうやってステートを更新するのかがカオスにならないように、状態管理ライブラリが導入されます。

Streamlitの場合、画面を跨いでステートを利用したいケースは稀だと思います。もしそのような実装を試みたい場合は、画面の分割単位を工夫する等、リファクタリングのタイミングかもしれません。

Atomic design

要素を細かくコンポーネントに分割するAtomic designのようなパターンも、Streamlitにはマッチしないと思います。複数の画面で似たようなコンポーネントを使うケースはあり得ますが、上で挙げたモデルクラスのようなものを用意して、データ処理のロジックを共通化する程度で用は足りそうです。

プログラムは分割することによって複雑さが増します。Atoms、Molecules、Organismsといった単位でフォルダを切ったりして、過剰にアプリを複雑にする必要はないはずです。

自動テスト

Streamlitは、pytestで画面の振る舞いをテストするネイティブのテストフレームワークを持ちます。
https://docs.streamlit.io/develop/concepts/app-testing

(これは私が使いこなせていないだけかもしれませんが、)正直自動テストを組んでまで、Streamlitのアプリを検証する意味は薄いと思っています。

データアプリとしてStreamlitを利用する場合、その振る舞いは参照するデータの仕様によって大きく左右されます。一生懸命自動テストのためにテストパターンやダミーデータを作り込んでテストを書いても、本番データの仕様が変われば、テストまで書き直さなければならなくなります。アドホックな利用を前提にしたStreamlitでは、自動テストの開発にかかった工数の分ほどのリターンを得られるとは考えにくいです。

とはいえ、本番データを使った検証自体がすごく大変で、ローカルにダミーデータを作って検証をした方がメリットがあるような現場であれば、pytestを組むメリットは出てくると思います。

テンプレ構成

.
├── environment.yml
├── pages
├── snowflake.yml
└── src
    ├── main.py
    └── utils.py
utils.py
import os
import base64
from time import sleep

import pandas as pd
import streamlit as st
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.exceptions import SnowparkSessionException


def create_execute_query():
    """create_execute_query
    クエリ実行用関数 execute_query を作成するクロージャ。

    ### execute_query
    execute_queryは一つのクエリを受け取り、実行します。

    Examples:
        >>> execute_query = create_execute_query()
        >>> execute_query("select * from table where id = ?", params=(1,))
    """
    try:
        # streamlit in snowflakeの内部ならば、snowparkセッションを使ってクエリ実行
        session = get_active_session()
    except SnowparkSessionException as e:
        if not os.path.isfile('.streamlit/secrets.toml'):
            raise e
        secrets = st.secrets["snowflake"]
        connection_parameters = {
            "user": secrets["user"],
            "private_key": base64.b64decode(secrets["private_key"]),
            "account": secrets["account"],
            "warehouse": secrets["warehouse"],
            "database": secrets["database"],
            "schema": secrets["schema"]
        }
        session = Session.builder.configs(connection_parameters).create()
        # streamlit in snowflakeの外部(開発中のローカル環境など)ならば、python-connectorを使ってクエリ実行
    finally:
        def execute_query(query, params=()):
            return session.sql(query, params=params).to_pandas()
        return execute_query

def create_execute_queries():
    """create_execute_queries
    クエリ実行用関数 execute_queries を作成するクロージャ。

    ### execute_queries
    execute_queriesは複数のクエリのリストを受け取り、非同期に実行します。

    Args:
        queries: クエリのリスト。リストの要素は、それぞれクエリ本文とパラメータ辞書を保持したタプルです。

    Returns:
        list: 各クエリの処理結果が格納されたpd.DataFrame。並び順は、投入されたクエリリストの順序に一致。

    Raises:
        リストに含まれるクエリのうち一つでもエラーが起きたら、そのエラーをRaiseします。
    
    Examples:
        >>> execute_queries = create_execute_queries()
        >>> results = execute_queries(
            [
                ("select page_id, name from pages where page_id = ?", (1,)),
                ("select current_datetime()", ())
            ])
        >>> results[0]
            | PAGE_ID | NAME |
            | :------ | :--- |
            | 1       | hoge |
        >>> results[1]
            | CURRENT_DATETIME() |
            | :----------------- |
            | 2024-08-27 00:00:00|
    """
    try:
        # streamlit in snowflakeの内部ならば、snowparkセッションを使ってクエリ実行
        session = get_active_session()    
    except SnowparkSessionException as e:
        # streamlit in snowflakeの外部(開発中のローカル環境など)ならば、python-connectorを使ってクエリ実行
        if not os.path.isfile('.streamlit/secrets.toml'):
            raise e
        secrets = st.secrets["snowflake"]
        connection_parameters = {
            "user": secrets["user"],
            "private_key": base64.b64decode(secrets["private_key"]),
            "account": secrets["account"],
            "warehouse": secrets["warehouse"],
            "database": secrets["database"],
            "schema": secrets["schema"]
        }
        session = Session.builder.configs(connection_parameters).create()
    finally:
        def execute_queries(queries):
            query_ids = []
            for query in queries:
                # すべて非同期に起動する。
                query_id = session.sql(query[0], params=query[1]).collect_nowait().query_id
                query_ids.append(query_id)
            
            # すべてのクエリの完了を待機する
            while not all([session.create_async_job(query_id).is_done() for query_id in query_ids]):
                sleep(0.05)

            # クエリ結果を取り出す。
            results = [session.create_async_job(query_id).result(result_type="pandas") for query_id in query_ids]
            return results
        return execute_queries
environment.yml
dependencies:
- streamlit=1.35.0
- pandas
snowflake.yml
definition_version: 1
streamlit:
  name: my_streamlit_app
  stage: my_streamlit_app_stage
  query_warehouse: my_wh
  main_file: src/main.py
  env_file: environment.yml
  pages_dir: pages/
  additional_source_files:
  - src/utils.py
main.py
import streamlit as st

st.write("Hello!")

Discussion