Quickstart for dbt Cloud and Snowflakeのコピペ部分まとめ + オリジナルコンテンツ
本記事は、「dbt Cloud と Snowflake のクイックスタート(2024年1月時点)」のハンズオン用に、ソースコードをコピペする箇所を1ページにまとめたものです。
第3回Snowflake中部ユーザー会での利用を想定しています。
SnowSight(SnowflaeのWebUI)でやることと、dbt Cloud IDE(dbtのWebUI)でやることがあるので【xxx】で明記してあります。
Step3 【SnowSight】
Step2で作成したワークシートに以下をコピペ。
// Step3-1
// 仮想ウェアハウス、2つのデータベース (データソース用のraw、データマート用のanalytics)、
// 2つのスキーマ(顧客と注文データ用のjaffle_shop、支払いデータ用のstripe) を作成
create warehouse transforming;
create database raw;
create database analytics;
create schema raw.jaffle_shop;
create schema raw.stripe;
// Step3-2
// customerテーブルを作成
create table raw.jaffle_shop.customers
( id integer,
first_name varchar,
last_name varchar
);
// データをcustomerテーブルにロード
copy into raw.jaffle_shop.customers (id, first_name, last_name)
from 's3://dbt-tutorial-public/jaffle_shop_customers.csv'
file_format = (
type = 'CSV'
field_delimiter = ','
skip_header = 1
);
// ordersテーブルを作成
create table raw.jaffle_shop.orders
( id integer,
user_id integer,
order_date date,
status varchar,
_etl_loaded_at timestamp default current_timestamp
);
// ordersテーブルにロード
copy into raw.jaffle_shop.orders (id, user_id, order_date, status)
from 's3://dbt-tutorial-public/jaffle_shop_orders.csv'
file_format = (
type = 'CSV'
field_delimiter = ','
skip_header = 1
);
// paymentテーブルを作成
create table raw.stripe.payment
( id integer,
orderid integer,
paymentmethod varchar,
status varchar,
amount integer,
created date,
_batched_at timestamp default current_timestamp
);
// データをpaymentテーブルにロード
copy into raw.stripe.payment (id, orderid, paymentmethod, status, amount, created)
from 's3://dbt-tutorial-public/stripe_payments.csv'
file_format = (
type = 'CSV'
field_delimiter = ','
skip_header = 1
);
// Step3-3
// データがロードされていることを確認
select * from raw.jaffle_shop.customers;
select * from raw.jaffle_shop.orders;
select * from raw.stripe.payment;
Step6-4 【dbt Cloud IDE】
models/に作成したファイル(ファイル名任意)に以下をコピペ。
select * from raw.jaffle_shop.customers
Step6 【SnowSight / SQL Worksheet】
Snowsightのワークシートに以下をコピペ。(パートナーコネクト用のロールpc_dbt_roleに必要な権限を付与するSQL)
grant all on database raw to role pc_dbt_role;
grant all on database analytics to role pc_dbt_role;
grant all on schema raw.jaffle_shop to role pc_dbt_role;
grant all on schema raw.stripe to role pc_dbt_role;
grant all on all tables in database raw to role pc_dbt_role;
grant all on future tables in database raw to role pc_dbt_role;
Step7 【dbt cloud IDE】
作成したファイルmodels/customers.sqlに以下をコピぺ。
with customers as (
select
id as customer_id,
first_name,
last_name
from raw.jaffle_shop.customers
),
orders as (
select
id as order_id,
user_id as customer_id,
order_date,
status
from raw.jaffle_shop.orders
),
customer_orders as (
select
customer_id,
min(order_date) as first_order_date,
max(order_date) as most_recent_order_date,
count(order_id) as number_of_orders
from orders
group by 1
),
final as (
select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
coalesce(customer_orders.number_of_orders, 0) as number_of_orders
from customers
left join customer_orders using (customer_id)
)
select * from final
Step8-1 【dbt cloud IDE】
すでにあるdbt_project.ymlの一部を修正。
- name:の部分
〔修正前〕
name: 'my_new_project'
〔修正後〕
name: 'jaffle_shop'
- models:の部分
〔修正前〕
models:
my_new_project:
# Applies to all files under models/example/
example:
+materialized: table
〔修正後〕
models:
jaffle_shop:
+materialized: table
example:
+materialized: view
Step8-3 【dbt cloud IDE】
Step7で作成したmodels/customers.sqlの先頭部分に記述を追加。
- 先頭部分
〔修正前〕
with customers as (
select
id as customer_id
...
)
〔修正後〕(先頭の5行を追加)
{{
config(
materialized='view'
)
}}
with customers as (
select
id as customer_id
...
)
Step9 【dbt cloud IDE】
Step8-1で修正したdbt_project.ymlを、以下のように修正。
- models:の部分
〔修正前〕
models:
jaffle_shop:
+materialized: table
example:
+materialized: view
〔修正後〕(example:からの2行を削除)
models:
jaffle_shop:
+materialized: table
Step10-1 【dbt cloud IDE】
作成したファイルmodels/stg_customers.sqlに以下をコピペ。
select
id as customer_id,
first_name,
last_name
from raw.jaffle_shop.customers
Step10-2 【dbt cloud IDE】
作成したファイルmodels/stg_orders.sqlに以下をコピペ。
select
id as order_id,
user_id as customer_id,
order_date,
status
from raw.jaffle_shop.orders
Step10-3 【dbt cloud IDE】
Step7で作成、Step8-3で修正したmodels/customers.sqlを、丸ごと以下の内容に書き換え。(from句を”stg_XXX”を使った記述に修正)
with customers as (
select * from {{ ref('stg_customers') }}
),
orders as (
select * from {{ ref('stg_orders') }}
),
customer_orders as (
select
customer_id,
min(order_date) as first_order_date,
max(order_date) as most_recent_order_date,
count(order_id) as number_of_orders
from orders
group by 1
),
final as (
select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
coalesce(customer_orders.number_of_orders, 0) as number_of_orders
from customers
left join customer_orders using (customer_id)
)
select * from final
Step11-2 【dbt cloud IDE】
作成したmodels/sources.ymlに以下をコピペ。
version: 2
sources:
- name: jaffle_shop
description: This is a replica of the Postgres database used by our app
database: raw
schema: jaffle_shop
tables:
- name: customers
description: One record per customer.
- name: orders
description: One record per order. Includes cancelled and deleted orders.
Step11-3 【dbt cloud IDE】
Step10-1で作成したmodels/stg_customers.sqlを、丸ごと以下の内容に書き換え。(from句をStep11-2で定義したsourceを使った記述になった)
select
id as customer_id,
first_name,
last_name
from {{ source('jaffle_shop', 'customers') }}
Step11-4 【dbt cloud IDE】
Step10-2で作成したmodels/stg_orders.sqlを、丸ごと以下の内容に書き換え。(from句をStep11-2で定義したsourceを使った記述になった)
select
id as order_id,
user_id as customer_id,
order_date,
status
from {{ source('jaffle_shop', 'orders') }}
Step12 【dbt cloud IDE】
作成したファイルmodels/schema.ymlに以下をコピペ。
version: 2
models:
- name: customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: stg_customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
Step13 【dbt cloud IDE】
Step12で作成したファイルmodels/schema.ymlを、丸ごと以下の内容に書き換え。(description:が追記された)
version: 2
models:
- name: customers
description: One record per customer
columns:
- name: customer_id
description: Primary key
tests:
- unique
- not_null
- name: first_order_date
description: NULL when a customer has not yet placed an order.
- name: stg_customers
description: This model cleans up customer data
columns:
- name: customer_id
description: Primary key
tests:
- unique
- not_null
- name: stg_orders
description: This model cleans up order data
columns:
- name: order_id
description: Primary key
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
Step16 【SnowSight / Streamlit App】
SnowSightで作成したStreamlit Appに、以下をコピペ。
# Import python packages
import streamlit as st
from snowflake.snowpark.context import get_active_session
import streamlit as st
import pandas as pd
import numpy as np
from datetime import timedelta
st.header("🎈 顧客分析 Streamlit アプリケーション 🎈")
st.write("---")
#------------------------------------------
# データ取得
#------------------------------------------
session = get_active_session()
df = session.table('customers').to_pandas()
#------------------------------------------
# KPI 表示
#------------------------------------------
# メトリック表示
st.subheader("✅️ 主要メトリクス")
col1, col2, col3 = st.columns(3)
with col1:
st.metric(
label="📦️ オーダー数",
value=len(df),
)
with col2:
st.metric(
label="🙎 顧客数",
value=df['NUMBER_OF_ORDERS'].sum(),
)
with col3:
st.metric(
label="🔢 平均オーダー数",
value=round(len(df) / df['NUMBER_OF_ORDERS'].sum(), 2),
)
st.write("---")
#------------------------------------------
# NUMBER_OF_ORDERS の分布(Bin 単位の顧客数)
#------------------------------------------
st.subheader("🕵️ 顧客のオーダー数分布")
# Bin サイズをユーザー指定可能に
bin_size = st.number_input("👇️ グルーピングするサイズを指定", min_value=1, value=1)
# Bin を計算して、各 Bin に含まれる顧客数を集計
max_orders = df["NUMBER_OF_ORDERS"].max()
bins = np.arange(0, max_orders + bin_size, bin_size)
# pd.cut を使って Bin 分類
df["注文数グループ"] = pd.cut(df["NUMBER_OF_ORDERS"], bins=bins)
bin_counts = df.groupby("注文数グループ")["CUSTOMER_ID"].count().reset_index(name="顧客数")
# bar_chart で可視化(x 軸を文字列化しておくと見やすい場合も)
bin_counts["注文数グループ"] = bin_counts["注文数グループ"].astype(str)
st.bar_chart(data=bin_counts, x="注文数グループ", y="顧客数")
st.write("---")
#------------------------------------------
# Top N 顧客一覧
#------------------------------------------
st.subheader("👑 Top N 顧客の詳細")
# ユーザーに N を入力させる
N = st.number_input("👇️ 何件表示しますか?", min_value=1, value=10)
# 昇順・降順を選択可能に
sort_order = st.selectbox("👇️ 並び順", ["降順 (多い順)", "昇順 (少ない順)"])
ascending = (sort_order == "昇順 (少ない順)")
# 並び替え
df_sorted = df.sort_values("NUMBER_OF_ORDERS", ascending=ascending).head(N)
st.write(f"NUMBER_OF_ORDERS が{'少ない' if ascending else '多い'}順に Top {N} 件を表示")
st.dataframe(df_sorted[[
"CUSTOMER_ID",
"FIRST_NAME",
"LAST_NAME",
"NUMBER_OF_ORDERS",
"FIRST_ORDER_DATE",
"MOST_RECENT_ORDER_DATE"
]])
st.write("---")
#------------------------------------------
# おまけ
#------------------------------------------
button = st.button("dbt 🤝 Snowflake")
if button:
st.snow()
以上です。
Discussion