📶

[便利]dbtでStreamlit in Snowflakeのコードを管理しよう!

に公開

皆さん。Streamlit in Snowflakeのコード管理大変じゃないですか?

私の作成したStreamlit Materializationを使うとdbtでStreamlitアプリをコード管理する事ができます。めっちゃ便利なので使ってみてください。

これを使うメリットとしては

  • Streamlit アプリケーションがリネージュに表示されるので影響範囲が楽
  • dbt buildでStreamlit in Snowflakeアプリもデプロイ+コード管理出来る
  • {{ref('')}}という参照構文がStreamlitの中で使えるので、開発環境の切り替えなども楽。
  • dbt run -s +streamlit_model で上流データモデルに加えて、Streamlitも再デプロイ出来る。

という事があげられます。

以下のように,dbtのモデルとしてStreamlitアプリケーションを作成できます。

{{
  config(
    materialized = 'streamlit',
    query_warehouse = 'COMPUTE_WH',
    title = 'Form Application',
    packages = ['pandas', 'plotly']
  )
}}
//Pythonコード
...


Streamlitがリネージュに表示されている様子。

実際の構築手順

Step 1: カスタムマテリアライゼーションの配置

macros/streamlit.sqlとして以下のコードを保存してください。

{% materialization streamlit, adapter='snowflake' %}

  {%- set main_file_name = config.get('main_file', default='app.py') -%}
  {%- set query_warehouse = config.require('query_warehouse') -%}
  {%- set packages = config.get('packages', default=[]) -%}
  {%- set simple_stage_name = config.get('stage', default=this.identifier ~ '_stage') -%}
  {%- set title = config.get('title', default=this.identifier) -%}
  {%- set comment = config.get('comment') -%}
  {%- set model_content = sql -%}

  {%- set streamlit_name = this.identifier -%}
  {%- set stage_name = this.database ~ '.' ~ this.schema ~ '.' ~ simple_stage_name -%}

  {%- set required_packages = ['snowflake-snowpark-python', 'streamlit'] -%}
  {%- set all_packages = (required_packages + packages) | unique | list -%}
  {%- set env_yml_content -%}
dependencies:
{%- for pkg in all_packages %}
  - {{ pkg }}
{%- endfor %}
  {%- endset -%}

  {% set create_stage_sql %}
    CREATE STAGE IF NOT EXISTS {{ stage_name }}
  {% endset %}

  {% set sp_name = this.database ~ '.' ~ this.schema ~ '.' ~ this.identifier ~ '_writer_sp' %}
  {% set create_sp_sql %}
CREATE OR REPLACE TEMPORARY PROCEDURE {{ sp_name }}(FILE_PATH STRING, CONTENT STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'write_content_to_stage'
EXECUTE AS CALLER
AS
$$
import io

def write_content_to_stage(session, file_path: str, content: str) -> str:
    stage_name = "{{ stage_name }}"
    
    content_bytes = content.encode('utf-8')
    input_stream = io.BytesIO(content_bytes)
    
    try:
        session.file.put_stream(
            input_stream,
            f"@{stage_name}/{file_path}",
            auto_compress=False,
            overwrite=True
        )
        return f"Successfully wrote content to @{stage_name}/{file_path}"
    except Exception as e:
        return f"Error during put_stream for {file_path}: {str(e)}"
$$
  {% endset %}

  {% set create_streamlit_sql %}
CREATE OR REPLACE STREAMLIT {{ streamlit_name }}
  FROM @{{ stage_name }}
  MAIN_FILE = '{{ main_file_name }}'
  QUERY_WAREHOUSE = {{ query_warehouse }}
  {% if title %}
  TITLE = '{{ title }}'
  {% endif %}
  {% if comment %}
  COMMENT = '{{ comment }}'
  {% endif %}
  {% endset %}

  {% do run_query(create_stage_sql) %}
  {% do run_query(create_sp_sql) %}
  
  {% call statement('write_env_file', fetch_result=False, auto_begin=False) %}
    CALL {{ sp_name }}('environment.yml', $${{ env_yml_content }}$$)
  {% endcall %}

  {% call statement('write_main_file', fetch_result=False, auto_begin=False) %}
    CALL {{ sp_name }}('{{ main_file_name }}', $${{ model_content }}$$)
  {% endcall %}

  {% call statement('main') %}
    {{ create_streamlit_sql }}
  {% endcall %}

  {{ return({'relations': []}) }}

{% endmaterialization %}

Step2: Streamlit Materializationを使ったデータモデルの作成

models/streamlit.sqlを作成する。
df = session.table("{{ref('sample_data')}}")
でsample_dataというdbtのデータモデルからデータを取得できます(!)

{{
  config(
    materialized = 'streamlit',
    query_warehouse = 'COMPUTE_WH',
    title = 'Form Application',
    packages = ['pandas', 'plotly']
  )
}}

import streamlit as st
import pandas as pd
import plotly.express as px
from snowflake.snowpark.context import get_active_session

st.set_page_config(layout="wide")
session = get_active_session()
df = session.table("{{ref('sample_data')}}").to_pandas()
....

Step3: ビルドとデプロイ

dbt build -s +streamlitを実行する

これで、Snowsightの「Streamlit」タブに「Form Application」という名前のアプリケーションが表示され、クリックするだけで実行できます。

便利すぎ!

Snowflake Data Heroes

Discussion