🐳

Snowflakeアプリ開発環境をDockerで構築【Docker+Snowpark+LangChain+Streamlit】

2023/08/12に公開

概要

この記事は、ローカルの環境を汚さずにSnowflake上のデータや大規模言語モデル(LLM)を活用したアプリ開発を行えるよう、Snowpark+LangChain+StreamlitといったPythonのライブラリを活用できる環境を、Dockerで構築することを目的に書いています。
私のGitHub上でもコードなど公開していますので、良ければ参照やgit cloneしてください。
あくまでも目的は開発環境構築ですが、後半では動作確認のためのこれらのライブラリを用いた単純なアプリについても紹介します。
アプリの外観
アプリの外観

前提条件

構築する上での条件は下記の通りです。

  • Dockerをインストール済みである。
  • Snowparkを使う場合:Snowflakeのアカウントを持っている。
  • LangChainを使う場合:OpenAIのAPIキーを払い出し済みである。

構成

ディレクトリ構成は下記の通りです。

├── snowflake-apps
|   └── src
|       └── streamlit_app.py
├── compose.yml
├── Dockerfile
└── environment.yml

Snowparkをインストールする方法として、Snowflakeの公式文書からcondaを用いた方法が紹介されているので、それに合わせる形でDockerのベースイメージをminiconda3にしています。また、インストールするライブラリは管理しやすいようにenvironment.ymlに記しています。よって、コンテナ上で下記のコマンドを発行させることでconda環境を整えるようにします。

$ conda env create -f ./environment.yml

では、compose.yml, Dockerfile, environment.ymlファイルの中身を載せます。

compose.ymlの中身
compose.yml
services:
    streamlit:
        build: .
        ports:
        - "9191:8501"
        volumes:
        - ./snowflake-apps/src:/app
        environment:
        - OPENAI_API_KEY=$OPENAI_API_KEY
        - SNOWFLAKE_ACCOUNT=$SNOWFLAKE_ACCOUNT
        - SNOWFLAKE_USERNAME=$SNOWFLAKE_USERNAME
        - SNOWFLAKE_PASSWORD=$SNOWFLAKE_PASSWORD
Dockerfileの中身
Dockerfile
FROM continuumio/miniconda3:23.5.2-0

COPY ./environment.yml .

RUN apt-get update \
    && conda env create -f ./environment.yml \
    && rm -rf /var/lib/apt/lists/*

ENV PATH /opt/conda/envs/snowflake-env/bin:$PATH

RUN groupadd --gid 1000 streamlit \
    && useradd --uid 1000 --gid streamlit --shell /bin/bash --create-home streamlit \
    && mkdir /app \
    && chown -R streamlit:streamlit /app

EXPOSE 8501

WORKDIR /app

USER streamlit

COPY --chown=streamlit:streamlit ./snowflake-apps/src .

CMD ["streamlit", "run", "streamlit_app.py"]
environment.ymlの中身
environment.yml
name: snowflake-env
channels:
    - https://repo.anaconda.com/pkgs/snowflake
dependencies:
    - python=3.9
    - numpy
    - pandas
    - snowflake-snowpark-python
    - pip
    - pip:
        - langchain
        - openai
        - plotly
        - streamlit

上のファイルの中身について、3点ほど補足します。

  • compose.ymlでは、Streamlitはデフォルトの場合に8501番ポートで通信することに注意し、ホストの9191番ポートにフォワードしています。
  • Dockerfileでは、セキュリティの観点からrootユーザー以外で実行する形にしています。また必要なファイルに対してはchownし、それらのパーミッションも実行するユーザーに合わせています。

以上で、Dockerで構築する準備は整いました。


本節のここから先は、Streamlit,Snowpark,LangChainといったライブラリを使ったプログラムの動作確認ができるよう、snowflake-apps/src/app.pyの中身について埋めていきます。
試しに国別のCOVID-19の感染者数の変化を表示しChatGPTとやり取りできる、簡単なアプリを作成してみましょう。

まずは、COVID-19のデータを取得しましょう。
Snowflakeにログインし、ACCOUNTADMINロールか、CREATE DATABASEおよびIMPORT SHARE権限を持ったロールに切り替え、MarketplaceからCOVID-19 Epidemiological Dataを探し出し、"Get"ボタンを押下しましょう。
Get COVID-19 Epidemiological Data

無事取得できればCOVID19_EPIDEMIOLOGICAL_DATAというデータベースを参照できます。本アプリではこの中のPUBLIC.ECDC_GLOBALテーブルを活用し、COUNTRY_REGION, DATE, CASESカラムに注目することで、時系列に対する国別の感染者数を抽出します。

では、snowflake-apps/src/streamlit_app.pyの中身についても載せます。

streamlit_app.pyの中身
streamlit_app.py
from langchain.chat_models import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage, AIMessage
import os
import pandas as pd
import plotly.express as px
from snowflake.snowpark.session import Session
import streamlit as st


def init_page():
    st.set_page_config(
        page_title="Snowflake App",
        page_icon=":snowflake:"
    )
    st.header("Snowflakeアプリ開発環境をDockerで構築")
    st.sidebar.title("LLM Option")


def select_model():
    model = st.sidebar.radio("Choose a model:", ("GPT-3.5", "GPT-4"))
    if model == "GPT-3.5":
        model_name = "gpt-3.5-turbo"
    else:
        model_name = "gpt-4"

    temperature = st.sidebar.slider("Temperature:",
                                    min_value=0.0,
                                    max_value=2.0,
                                    value=0.0,
                                    step=0.1
                                    )

    return ChatOpenAI(temperature=temperature, model_name=model_name)


def init_messages():
    clear_button = st.sidebar.button("チャットをクリア", key="clear")
    if clear_button or "messages" not in st.session_state:
        st.session_state.messages = [
            SystemMessage(content="You are a helpful assistant.")
        ]
        st.session_state.costs = []


def create_session_object():
    connection_parameters = {
        "account": os.environ['SNOWFLAKE_ACCOUNT'],
        "user": os.environ['SNOWFLAKE_USERNAME'],
        "password": os.environ['SNOWFLAKE_PASSWORD'],
        "role": "ACCOUNTADMIN",
        "warehouse": "COMPUTE_WH",
        "database": "COVID19_EPIDEMIOLOGICAL_DATA",
        "schema": "PUBLIC"
    }
    session = Session.builder.configs(connection_parameters).create()
    return session


def get_df_date_vs_cases(session):
    df_all_country = session.sql(
        """
        SELECT COUNTRY_REGION FROM PUBLIC.ECDC_GLOBAL GROUP BY COUNTRY_REGION;
        """
    ).to_pandas()

    df_selected_country = st.multiselect(
        '調査したい国名を選択してください。',
        df_all_country,
        ['United States', 'India', 'France']
    )

    # vertically concat DATE, CASES, and COUNTRY_REGION by country
    df_date_vs_cases = pd.DataFrame()
    for i in range(len(df_selected_country)):
        df_date_vs_cases_in_a_country = session.sql(
            f"""
            SELECT DATE, CASES, COUNTRY_REGION FROM PUBLIC.ECDC_GLOBAL
            WHERE COUNTRY_REGION = '{df_selected_country[i]}';
            """
        ).to_pandas()

        df_date_vs_cases = pd.concat([df_date_vs_cases,
                                      df_date_vs_cases_in_a_country
                                      ])

    return df_date_vs_cases


def draw_graph(df_selected_cases):
    # exception handling when none of the countries are selected
    try:
        fig = px.line(df_selected_cases,
                      x='DATE',
                      y='CASES',
                      color='COUNTRY_REGION')

        fig.update_layout(xaxis_title='date',
                          legend_title='country',
                          yaxis_title='number of cases'
                          )
        st.write(fig)

    except ValueError:
        st.write("⛔国名を1つ以上、選択してください")


def converse_with_ai(llm):
    container = st.container()
    with container:
        with st.form(key='your_form', clear_on_submit=True):
            user_input = st.text_area(label='質問はありませんか?',
                                      key='input',
                                      height=100
                                      )

            # leave nothing against line breaks
            content = user_input.replace('\n', '')
            submit_button = st.form_submit_button(label='送信')

    if submit_button and user_input:
        st.session_state.messages.append(HumanMessage(content=content))
        with st.spinner("Waiting ..."):
            response = llm(st.session_state.messages)
        st.session_state.messages.append(AIMessage(content=response.content))

    # present chat history
    messages = st.session_state.get('messages', [])
    for message in messages:
        if isinstance(message, AIMessage):
            with st.chat_message('assistant'):
                st.markdown(message.content)
        elif isinstance(message, HumanMessage):
            with st.chat_message('user'):
                st.markdown(message.content)
        else:
            st.write(f"System message: {message.content}")


def main():
    init_page()
    llm = select_model()
    init_messages()

    st.subheader("COVID-19 国別感染者数の表示📉")
    session = create_session_object()
    df_cases = get_df_date_vs_cases(session)
    draw_graph(df_cases)

    st.subheader("AIサポート🤖")
    converse_with_ai(llm)


if __name__ == '__main__':
    main()

このファイル内のSnowpark周りとLangChain周りについて簡単に補足しておきます。

  • Snowpark周りについて:まずcreate_session_objectメソッドで、構築の際に渡すSNOWFLAKE_ACCOUNTなどの環境変数を取得しSnowflakeアカウント・認証情報やロール等を指定することで、セッションを作成します。そして、get_df_date_vs_casesメソッドで、指定したアカウント上の上記のテーブルに対して、GUIから選択された国名で絞り込んだselect文を発行して必要なデータを取得します。最後にdraw_graphメソッドで、plotlyというインタラクティブなグラフ作成に長けたライブラリを活用して、取得したデータをもとに描画するようにします。
  • LangChain周りについて:まずselect_modelメソッドでLLMをカスタマイズします。次にconverse_with_aiメソッドで、"送信"ボタンの押下をトリガーとしてOpenAIのAPIを叩いて、入力情報とそのレスポンスを取得し交互に表示させるようにします。またそれらの表示が長くなった際は、"チャットをクリア"ボタンからinit_messagesメソッドを呼び出すことで、チャット周りの表示を初期に上書きできます。

構築

snowflake-apps/src/streamlit_app.pyは白紙でも、それ以外のファイルを埋めたら、構築に取り掛かりましょう。
まずは環境変数を設定しましょう。OpenAIのAPIキーとSnowflakeのアカウント名・ベーシック認証情報をもとに、下記のコマンドを発行してください。

$ export OPENAI_API_KEY=sk-ABCDE...   # LangChainを使わない場合、不要
$ export SNOWFLAKE_ACCOUNT=ex12345.ap-northeast-1.aws   # Snowparkを使わない場合、不要
$ export SNOWFLAKE_USERNAME=XXX           # Snowparkを使わない場合、不要
$ export SNOWFLAKE_PASSWORD=P@ssword!     # Snowparkを使わない場合、不要

環境変数の設定が完了したら、compose.ymlと同じ階層で下記のコマンドを発行してください。

$ docker compose build
$ docker compose up -d

コンテナが起動したら、ブラウザからhttp://localhost:9191にアクセスできるか確認しましょう。また、紹介した内容でsnowflake-apps/src/streamlit_app.pyを埋めていれば、アプリの動作確認をしてみましょう。
アプリの動作確認

最後に

引き続きStreamlit,Snowpark,LangChain周りについて学習しまして、Snowflakeネイティブアプリの開発にも挑戦してみたいと思います。今後のSnowflakeの発展が益々楽しみな今日この頃です。

Discussion