🤖

チュートリアルお試し記録 ~Snowpark ML編~

2025/02/13に公開

はじめに

Snowflake の AI 周りを色々学習中でして、今回は以下のチュートリアルの実施記録とそれによって理解できたことを簡単に記しておきます。内容としては主に Snowpark ML を Snowflake Notebooks 上で実行していったものになります。
https://quickstarts.snowflake.com/guide/getting_started_with_dataengineering_ml_using_snowpark_python/index.html?index=..%2F..index#0

1. Snowpark/Snowpark ML とは?

Snowpark は、Snowflake 内で Python や他のプログラミング言語を実行するためのライブラリとコード実行環境のセットです。これを利用することで、データパイプラインや機械学習モデル、アプリケーション、さらにはその他のデータ処理タスクを構築することができます。
Snowflake ML は、管理されたデータの上に構築されたエンドツーエンドの機械学習機能を統合したものです。Snowflake ML は、完全にカスタマイズされたワークフローと、すぐに使えるワークフローの両方に対応しています。準備された ML を使用する場合は ML 関数を活用して開発時間を短縮したり、SQL を使って組織全体で ML を普及させたりすることができます。
一方、カスタム ML を必要とするデータサイエンティストや ML エンジニアはスケーラブルな機能やモデルを簡単かつ安全に開発・運用することが可能です。
Snowflake ML は Snowpark ML ライブラリの Python API を使用することも Snowflake Notebooks から直接利用することもできます。

2. Snowflake Notebooks とは?

Snowflake Notebooks は、Python、SQL、および Markdown 用のインタラクティブなセルベースのプログラミング環境を提供する統合開発インターフェースです。 Snowflake Notebooks では、Snowflake データを活用して探索的データ分析の実行、機械学習モデルの開発、その他のデータサイエンスの実行を行うことができます。 データエンジニアリングワークフローをすべて同じインターフェース内で実現します。

3. 実施した内容

チュートリアルでは以下を実施しました。

  • Snowpark DataFrames と API を使用してデータを分析したデータ エンジニアリング タスクの実行
  • データパイプラインを自動化するための Snowflake タスクの作成(オプション)
  • Snowflake の Snowpark ML を使用した ML モデルのトレーニング
  • Snowpark ML モデルレジストリから ML モデルへの登録
  • ユーザー入力に基づいて推論を行う ML モデルを使用する Streamlit アプリケーションの作成

データの概要

データ分析とデータ準備のタスクを実施し、検索、動画、ソーシャルメディア、メールなどの複数のチャネルにおける変動する広告予算の将来の ROI(投資収益率)を予測するための線形回帰モデルをトレーニングします。これには、Snowpark for Python、Snowpark ML、Streamlit を使用します。セッションの終わりには、異なる広告予算の ROI を視覚化するインタラクティブなウェブアプリケーションが展開されます。

データの準備

新しい SQL ワークシートで、次の SQL コマンドを実行して、ウェアハウス、データベース、およびスキーマを作成します。

USE ROLE ACCOUNTADMIN;
CREATE WAREHOUSE IF NOT EXISTS DASH_S WAREHOUSE_SIZE=SMALL;
CREATE DATABASE IF NOT EXISTS DASH_DB;
CREATE SCHEMA IF NOT EXISTS DASH_SCHEMA;
USE DASH_DB.DASH_SCHEMA;
USE WAREHOUSE DASH_S;

同じ SQL ワークシートで、次の SQL コマンドを実行して、パブリックにアクセス可能な S3 バケットでホストされているデータから CAMPAIGN_SPEND テーブルを作成します。

CREATE or REPLACE file format csvformat
  skip_header = 1
  type = 'CSV';
CREATE or REPLACE stage campaign_data_stage
  file_format = csvformat
  url = 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/campaign_spend/';
CREATE or REPLACE TABLE CAMPAIGN_SPEND (
  CAMPAIGN VARCHAR(60),
  CHANNEL VARCHAR(60),
  DATE DATE,
  TOTAL_CLICKS NUMBER(38,0),
  TOTAL_COST NUMBER(38,0),
  ADS_SERVED NUMBER(38,0)
);
COPY into CAMPAIGN_SPEND
  from @campaign_data_stage;

同じ SQL ワークシートで、次の SQL コマンドを実行して、パブリックにアクセス可能な S3 バケットでホストされているデータから MONTHLY_REVENUE テーブルを作成します。

CREATE or REPLACE stage monthly_revenue_data_stage
  file_format = csvformat
  url = 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/monthly_revenue/';
CREATE or REPLACE TABLE MONTHLY_REVENUE (
  YEAR NUMBER(38,0),
  MONTH NUMBER(38,0),
  REVENUE FLOAT
);
COPY into MONTHLY_REVENUE
  from @monthly_revenue_data_stage;
USE WAREHOUSE DASH_S;

同じ SQL ワークシートで、次の SQL コマンドを実行して、過去 6 か月の予算配分と ROI を保持する BUDGET_ALLOCATIONS_AND_ROI テーブルを作成します。

CREATE or REPLACE TABLE BUDGET_ALLOCATIONS_AND_ROI (
  MONTH varchar(30),
  SEARCHENGINE integer,
  SOCIALMEDIA integer,
  VIDEO integer,
  EMAIL integer,
  ROI float
)
COMMENT = '{"origin":"sf_sit-is", "name":"aiml_notebooks_ad_spend_roi", "version":{"major":1, "minor":0}, "attributes":{"is_quickstart":1, "source":"streamlit"}}';
INSERT INTO BUDGET_ALLOCATIONS_AND_ROI (MONTH, SEARCHENGINE, SOCIALMEDIA, VIDEO, EMAIL, ROI)
VALUES
('January',35,50,35,85,8.22),
('February',75,50,35,85,13.90),
('March',15,50,35,15,7.34),
('April',25,80,40,90,13.23),
('May',95,95,10,95,6.246),
('June',35,50,35,85,8.22);

データ エンジニアリング タスクの実行

データの準備が整ったので、Snowpark ML を使った分析を行っていきます。ここからは Snowflake Notebooks 上で作業を行います。GitHub リポジトリに公開されている以下の ipynb ファイルを読み込み、作成した DASH_DB と DASH_SCHEMA を選択し利用できるようにします。
https://github.com/Snowflake-Labs/sfguide-getting-started-dataengineering-ml-snowpark-python/blob/main/Snowpark_For_Python_DE.ipynb
このノートブックでは、Snowpark for Python を使用した Snowflake におけるデータエンジニアリングに焦点を当て、以下を実施します。

  • Snowflake のテーブルから Snowpark のデータフレームにデータをロードします。
  • Snowpark のデータフレームに対して探索的データ分析を実施します。
  • Snowpark のデータフレームを使用して、複数のテーブルからデータをピボットおよび結合します。
  • Snowflake タスクを使用してデータ準備を自動化する方法を示します。
    Notebook のセルを順次実行していきます。まずは必要なライブラリのインポートです。
from snowflake.snowpark.session import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import month,year,col,sum
from snowflake.snowpark.version import VERSION
from snowflake.core import Root
from snowflake.core.task import Task, StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core import CreateMode
# Misc
from datetime import timedelta
import json
import logging
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)
session = get_active_session()
# Add a query tag to the session. This helps with monitoring and troubleshooting.
session.query_tag = {"origin":"sf_sit-is",
                     "name":"aiml_notebooks_ad_spend_roi",
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":1, "source":"notebook"}}

まず、キャンペーンの支出データを読み込みましょう。このテーブルには、検索エンジン、ソーシャルメディア、メール、ビデオなどのデジタル広告チャネル全体で、日ごとの支出を示すように集計された広告クリックデータが含まれています。

snow_df_spend = session.table('campaign_spend')
snow_df_spend.queries

データフレームの中身を確認すると、以下のようになります。(先頭数行のみ)

"CAMPAIGN" "CHANNEL" "DATE" "TOTAL_CLICKS" "TOTAL_COST" "ADS_SERVED"
winter_sports video 2012-06-03 213 1762 426
sports_across_cultures video 2012-06-02 87 678 157
building_community search_engine 2012-06-03 66 471 134
world_series social_media 2017-12-28 72 591 149

データを変換して、チャネルごとの年/月ごとの総コストを表示できるようにしましょう。これには、group_by()および agg()の Snowpark データフレーム関数を使用します。

# Stats per Month per Channel
snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
    with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')
snow_df_spend_per_channel.show(10)

結果は以下のようになります。(先頭数行のみ)

YEAR MONTH CHANNEL TOTAL_COST
2012 5 search_engine 516431
2012 5 video 516729
2012 5 email 517208

キャンペーンの支出データをさらに変換して、各行が年/月ごとのすべてのチャネルにおける総コストを表すようにしましょう。これには、pivot()および sum()の Snowpark データフレーム関数を使用します。この変換により、収益テーブルと結合できるようになり、モデルのトレーニングのために入力特徴とターゲット変数を 1 つのテーブルにまとめることができます。

snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
snow_df_spend_per_month.show()

結果は以下のようになります。(先頭数行のみ)

YEAR MONTH SEARCH_ENGINE SOCIAL_MEDIA VIDEO EMAIL
2012 5 516431 517618 516729 517208
2012 6 506497 504679 501098 501947
2012 7 522780 521395 522762 518405

データフレームの結果を SPEND_PER_MONTH テーブルに格納します。

snow_df_spend_per_month.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

ここまででデータエンジニアリング作業が完了しました。上記は手動で逐次実行する手順でしたが、ノートブックには Snowflake Task を使った自動化のコードも記されています。個々のセルの解説は割愛しますが、手順の中で作成される SPEND_AND_REVENUE_PER_MONTH というテーブルのデータを後続の ML の作業で呼び出すため、一連の流れとして実施しておいた方がよいです。

Snowpark ML を使用した ML モデルのトレーニング

ここからは ML モデルのトレーニングを行います。新しいノートブックを使用しますので、先ほどと同様に GitHub リポジトリから該当のファイルをインポートします。また、パッケージとして snowflake-ml-python を事前にインストールしておきます。
このノートブックでは、Snowpark for Python を使用した Snowflake における機械学習に焦点を当て、以下を実施します。

  • Snowflake のテーブルから特徴量とターゲットを Snowpark データフレームにロードします。
  • モデルのトレーニングのために特徴量を準備します。
  • Snowflake 内の Snowpark ML を使用して機械学習モデルをトレーニングし、そのモデルを Snowflake のステージにアップロードします。
  • 機械学習モデルを登録し、Snowpark ML モデルレジストリから推論に使用します。
    Notebook のセルを順次実行していきます。まずは必要なライブラリのインポートです。
# Snowpark for Python
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.version import VERSION
# Snowpark ML
from snowflake.ml.modeling.compose import ColumnTransformer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import PolynomialFeatures, StandardScaler
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import Registry
from snowflake.ml.version import VERSION as ml_version
# Misc
#import pandas as pd
import json
import logging
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)
session = get_active_session()
# Add a query tag to the session. This helps with monitoring and troubleshooting.
session.query_tag = {"origin":"sf_sit-is",
                     "name":"aiml_notebooks_ad_spend_roi",
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":1, "source":"notebook"}}

以下を実行しモデルのトレーニングのために特徴量とターゲットを保存します。

  • 欠損値を含む行を削除します。
  • モデリングに必要のない列を除外します。
  • 特徴量を MARKETING_BUDGETS_FEATURES テーブルに保存します。
# Load data
snow_df_spend_and_revenue_per_month = session.table('spend_and_revenue_per_month')
# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()
# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])
# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')
snow_df_spend_and_revenue_per_month.show()

結果は以下のようになります。(先頭数行のみ)

SEARCH_ENGINE SOCIAL_MEDIA VIDEO EMAIL REVENUE
516431 517618 516729 517208 3264300.11
506497 504679 501098 501947 3208482.33
522780 521395 522762 518405 3311966.98

モデルのトレーニングを実施します。

CROSS_VALIDATION_FOLDS = 10
POLYNOMIAL_FEATURES_DEGREE = 2
# Create train and test Snowpark DataDrames
train_df, test_df = session.table("MARKETING_BUDGETS_FEATURES").random_split(weights=[0.8, 0.2], seed=0)
# Preprocess the Numeric columns
# We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
# NOTE: High degrees can cause overfitting.
numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = POLYNOMIAL_FEATURES_DEGREE)),('scaler', StandardScaler())])
# Combine the preprocessed step together using the Column Transformer module
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features)])
# The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
parameteres = {}
# Use GridSearch to find the best fitting model based on number_of_folds folds
model = GridSearchCV(
    estimator=pipeline,
    param_grid=parameteres,
    cv=CROSS_VALIDATION_FOLDS,
    label_cols=["REVENUE"],
    output_cols=["PREDICTED_REVENUE"],
    verbose=2
)
# Fit and Score
model.fit(train_df)
train_r2_score = model.score(train_df)
test_r2_score = model.score(test_df)
# R2 score on train and test datasets
print(f"R2 score on Train : {train_r2_score}")
print(f"R2 score on Test  : {test_r2_score}")

データの分割、前処理、モデルの訓練、評価が順に行われ R² スコアが出力されます。

  • R2 score on Train : 0.995107670224394
  • R2 score on Test : 0.9069317995589423
    作成したモデルを PREDICT_ROI という名前でレジストリに格納します。
registry = Registry(session)
MODEL_NAME = "PREDICT_ROI"
mv = registry.log_model(model,
                        model_name=MODEL_NAME,
                        version_name="v1",
                        metrics={"R2_train": train_r2_score, "R2_test":test_r2_score},
                        comment='Model pipeline to predict revenue',
                        options={"relax_version": False}
                    )

以下のように正常に格納されていることがわかります。

registry.show_models()

モデルがログに記録されたら、新しいデータに対して推論を行うために使用できます。
まず、サンプルデータを使って Snowpark データフレームを作成し、その後、ログに記録されたモデルを呼び出して新しい予測を取得します。

est_df = session.create_dataframe([[250000,250000,200000,450000],[500000,500000,500000,500000],[8500,9500,2000,500]],
                                    schema=['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL'])
mv.run(test_df, function_name='predict').show()

以下のような推論結果が返されます。

SEARCH_ENGINE SOCIAL_MEDIA VIDEO EMAIL PREDICTED_REVENUE
500000 500000 500000 500000 3180176.6792253554
250000 250000 200000 450000 -12735711.132665249
8500 9500 2000 500 -107982.70425568242

Streamlit アプリケーションの構築

作成したモデルを呼び出すための Streamlit アプリケーションを作成します。Snowsight 上で新規作成した Streamlit アプリケーションのコードを以下リポジトリの内容に置き換えます。
https://github.com/Snowflake-Labs/sfguide-getting-started-dataengineering-ml-snowpark-python/blob/main/Snowpark_Streamlit_Revenue_Prediction_SiS.py

# Snowpark for Python API reference: https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/index.html
# Snowpark for Python Developer Guide: https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html
import calendar
import altair as alt
import streamlit as st
import pandas as pd
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
# Function to load last six months' budget allocations and ROI
def load():
  data = session.table("BUDGET_ALLOCATIONS_AND_ROI").unpivot("Budget", "Channel", ["SearchEngine", "SocialMedia", "Video", "Email"]).filter(col("MONTH") != "July")
  alloc, rois, last_alloc = data.drop("ROI"), data.drop(["CHANNEL", "BUDGET"]).distinct(), data.filter(col("MONTH") == "June")
  return data.to_pandas(), alloc.to_pandas(), rois.to_pandas(), last_alloc.to_pandas()
def predict(budgets):
  pred = session.sql(f"SELECT ABS(PREDICT_ROI!predict({budgets[0]*1000},{budgets[1]*1000},{budgets[2]*1000},{budgets[3]*1000})['PREDICTED_REVENUE']::int) as PREDICTED_ROI").to_pandas()
  pred = pred["PREDICTED_ROI"].values[0] / 100000
  change = round(((pred / rois["ROI"].iloc[-1]) - 1) * 100, 1)
  return pred, change
def chart(chart_data):
  base = alt.Chart(chart_data).encode(alt.X("MONTH", sort=list(calendar.month_name), title=None))
  bars = base.mark_bar().encode(y=alt.Y("BUDGET", title="Budget", scale=alt.Scale(domain=[0, 300])), color=alt.Color("CHANNEL", legend=alt.Legend(orient="top", title=" ")), opacity=alt.condition(alt.datum.MONTH=="July", alt.value(1), alt.value(0.3)))
  lines = base.mark_line(size=3).encode(y=alt.Y("ROI", title="Revenue", scale=alt.Scale(domain=[0, 25])), color=alt.value("#808495"))
  points = base.mark_point(strokeWidth=3).encode(y=alt.Y("ROI"), stroke=alt.value("#808495"), fill=alt.value("white"), size=alt.condition(alt.datum.MONTH=="July", alt.value(300), alt.value(70)))
  chart = alt.layer(bars, lines + points).resolve_scale(y="independent").configure_view(strokeWidth=0).configure_axisY(domain=False).configure_axis(labelColor="#808495", tickColor="#e6eaf1", gridColor="#e6eaf1", domainColor="#e6eaf1", titleFontWeight=600, titlePadding=10, labelPadding=5, labelFontSize=14).configure_range(category=["#FFE08E", "#03C0F2", "#FFAAAB", "#995EFF"])
  st.altair_chart(chart, use_container_width=True)
# Streamlit config
st.header("SkiGear Co Ad Spend Optimizer")
st.subheader("Advertising budgets")
# Call functions to get Snowflake session and load data
session = snowpark.session._get_active_session()
channels = ["Search engine", "Email", "Social media", "Video"]
channels_upper = [channel.replace(" ", "").upper() for channel in channels]
data, alloc, rois, last_alloc = load()
last_alloc = last_alloc.replace(channels_upper, channels)
# Display advertising budget sliders and set their default values
col1, _, col2 = st.columns([4, 1, 4])
budgets = []
for alloc, col in zip(last_alloc.itertuples(), [col1, col1, col2, col2]):
  budgets.append(col.slider(alloc.CHANNEL, 0, 100, alloc.BUDGET, 5))
# Function to call "predict_roi" UDF that uses the pre-trained model for inference
# Note: Both the model training and UDF registration is done in Snowpark_For_Python.ipynb
pred, change = predict(budgets)
st.metric("", f"Predicted revenue ${pred:.2f} million", f"{change:.1f} % vs last month")
july = pd.DataFrame({"MONTH": ["July"]*4, "CHANNEL": channels_upper, "BUDGET": budgets, "ROI": [pred]*4})
chart(pd.concat([data, july]).reset_index(drop=True).replace(channels_upper, channels))
# Setup the ability to save user-entered allocations and predicted value back to Snowflake
if st.button("❄️ Save to Snowflake"):
  with st.spinner("Making snowflakes..."):
    df = pd.DataFrame({"MONTH": ["July"], "SEARCHENGINE": [budgets[0]], "SOCIALMEDIA": [budgets[1]], "VIDEO": [budgets[2]], "EMAIL": [budgets[3]], "ROI": [pred]})
    session.write_pandas(df, "BUDGET_ALLOCATIONS_AND_ROI")
    st.success("✅ Successfully wrote budgets & prediction to your Snowflake account!")
    st.snow()

コードを置き換えると以下のような画面が表示されます。

広告予算スライダーを調整すると、それに応じてグラフの割当予測 ROI が変動します。また、❄️ Save to Snowflake ボタンを押下することで BUDGET_ALLOCATIONS_AND_ROI テーブルに更新された ROI が格納されます。
以下、SQL でデータを確認してみると July のデータが登録されていることがわかります。

select * from budget_allocations_and_roi;
MONTH SEARCHENGINE SOCIALMEDIA VIDEO EMAIL ROI
January 35 50 35 85 8.22
February 75 50 35 85 13.9
March 15 50 35 15 7.34
April 25 80 40 90 13.23
May 95 95 10 95 6.246
June 35 50 35 85 8.22
July 70 50 35 60 10.80754

4. まとめ

このチュートリアルを通じて、Snowpark for Python を使用して、検索、ビデオ、ソーシャル メディア、メールなどの複数のチャネルにわたる変動広告支出予算の将来の ROI (投資収益率) を予測する線形回帰モデルをトレーニングしました。その後、そのモデルを使用して、ユーザー入力に基づいて新しい予算配分の予測を生成する Streamlit アプリケーションを作成しました。
大まかな流れはなんとなく理解できましたが、コードの細かい中身は少し特徴がありそうなためそのあたりの理解も今後深めていきたいと思います。引き続き、チュートリアルの実施記録も記していきます。

電通総研 データテックブログ

Discussion