❄️

Quickstart for dbt Cloud and Snowflakeのコピペ部分まとめ + オリジナルコンテンツ

2025/01/29に公開

本記事は、「dbt Cloud と Snowflake のクイックスタート(2024年1月時点)」のハンズオン用に、ソースコードをコピペする箇所を1ページにまとめたものです。
第3回Snowflake中部ユーザー会での利用を想定しています。

SnowSight(SnowflaeのWebUI)でやることと、dbt Cloud IDE(dbtのWebUI)でやることがあるので【xxx】で明記してあります。

Step3 【SnowSight】

Step2で作成したワークシートに以下をコピペ。

// Step3-1
// 仮想ウェアハウス、2つのデータベース (データソース用のraw、データマート用のanalytics)、
// 2つのスキーマ(顧客と注文データ用のjaffle_shop、支払いデータ用のstripe) を作成
create warehouse transforming;
create database raw;
create database analytics;
create schema raw.jaffle_shop;
create schema raw.stripe;

// Step3-2
// customerテーブルを作成
create table raw.jaffle_shop.customers 
( id integer,
  first_name varchar,
  last_name varchar
);

// データをcustomerテーブルにロード
copy into raw.jaffle_shop.customers (id, first_name, last_name)
from 's3://dbt-tutorial-public/jaffle_shop_customers.csv'
file_format = (
    type = 'CSV'
    field_delimiter = ','
    skip_header = 1
    ); 

// ordersテーブルを作成
create table raw.jaffle_shop.orders
( id integer,
  user_id integer,
  order_date date,
  status varchar,
  _etl_loaded_at timestamp default current_timestamp
);

// ordersテーブルにロード
copy into raw.jaffle_shop.orders (id, user_id, order_date, status)
from 's3://dbt-tutorial-public/jaffle_shop_orders.csv'
file_format = (
    type = 'CSV'
    field_delimiter = ','
    skip_header = 1
    );

// paymentテーブルを作成
create table raw.stripe.payment 
( id integer,
  orderid integer,
  paymentmethod varchar,
  status varchar,
  amount integer,
  created date,
  _batched_at timestamp default current_timestamp
);

// データをpaymentテーブルにロード
copy into raw.stripe.payment (id, orderid, paymentmethod, status, amount, created)
from 's3://dbt-tutorial-public/stripe_payments.csv'
file_format = (
    type = 'CSV'
    field_delimiter = ','
    skip_header = 1
    );

// Step3-3
// データがロードされていることを確認
select * from raw.jaffle_shop.customers;
select * from raw.jaffle_shop.orders;
select * from raw.stripe.payment;   

Step6-4 【dbt Cloud IDE】

models/に作成したファイル(ファイル名任意)に以下をコピペ。

select * from raw.jaffle_shop.customers

Step6 【SnowSight / SQL Worksheet】

Snowsightのワークシートに以下をコピペ。(パートナーコネクト用のロールpc_dbt_roleに必要な権限を付与するSQL)

grant all on database raw to role pc_dbt_role;
grant all on database analytics to role pc_dbt_role;

grant all on schema raw.jaffle_shop to role pc_dbt_role;
grant all on schema raw.stripe to role pc_dbt_role;

grant all on all tables in database raw to role pc_dbt_role;
grant all on future tables in database raw to role pc_dbt_role;

Step7 【dbt cloud IDE】

作成したファイルmodels/customers.sqlに以下をコピぺ。

with customers as (

    select
        id as customer_id,
        first_name,
        last_name

    from raw.jaffle_shop.customers

),

orders as (

    select
        id as order_id,
        user_id as customer_id,
        order_date,
        status

    from raw.jaffle_shop.orders

),

customer_orders as (

    select
        customer_id,

        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders

    from orders

    group by 1

),

final as (

    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders

    from customers

    left join customer_orders using (customer_id)

)

select * from final

Step8-1 【dbt cloud IDE】

すでにあるdbt_project.ymlの一部を修正。

  • name:の部分

〔修正前〕

name: 'my_new_project'

〔修正後〕

name: 'jaffle_shop'
  • models:の部分

〔修正前〕

models:
  my_new_project:
    # Applies to all files under models/example/
    example:
      +materialized: table

〔修正後〕

models:
  jaffle_shop:
    +materialized: table
    example:
      +materialized: view

Step8-3 【dbt cloud IDE】

Step7で作成したmodels/customers.sqlの先頭部分に記述を追加。

  • 先頭部分

〔修正前〕

with customers as (

    select
        id as customer_id
        ...

)

〔修正後〕(先頭の5行を追加)

{{
  config(
    materialized='view'
  )
}}

with customers as (

    select
        id as customer_id
        ...

)

Step9 【dbt cloud IDE】

Step8-1で修正したdbt_project.ymlを、以下のように修正。

  • models:の部分

〔修正前〕

models:
  jaffle_shop:
    +materialized: table
    example:
      +materialized: view

〔修正後〕(example:からの2行を削除)

models:
  jaffle_shop:
    +materialized: table

Step10-1 【dbt cloud IDE】

作成したファイルmodels/stg_customers.sqlに以下をコピペ。

select
    id as customer_id,
    first_name,
    last_name

from raw.jaffle_shop.customers

Step10-2 【dbt cloud IDE】

作成したファイルmodels/stg_orders.sqlに以下をコピペ。

select
    id as order_id,
    user_id as customer_id,
    order_date,
    status

from raw.jaffle_shop.orders

Step10-3 【dbt cloud IDE】

Step7で作成、Step8-3で修正したmodels/customers.sqlを、丸ごと以下の内容に書き換え。(from句を”stg_XXX”を使った記述に修正)

with customers as (

    select * from {{ ref('stg_customers') }}

),

orders as (

    select * from {{ ref('stg_orders') }}

),

customer_orders as (

    select
        customer_id,

        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders

    from orders

    group by 1

),

final as (

    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders

    from customers

    left join customer_orders using (customer_id)

)

select * from final

Step11-2 【dbt cloud IDE】

作成したmodels/sources.ymlに以下をコピペ。

version: 2

sources:
    - name: jaffle_shop
      description: This is a replica of the Postgres database used by our app
      database: raw
      schema: jaffle_shop
      tables:
          - name: customers
            description: One record per customer.
          - name: orders
            description: One record per order. Includes cancelled and deleted orders.

Step11-3 【dbt cloud IDE】

Step10-1で作成したmodels/stg_customers.sqlを、丸ごと以下の内容に書き換え。(from句をStep11-2で定義したsourceを使った記述になった)

select
    id as customer_id,
    first_name,
    last_name

from {{ source('jaffle_shop', 'customers') }}

Step11-4 【dbt cloud IDE】

Step10-2で作成したmodels/stg_orders.sqlを、丸ごと以下の内容に書き換え。(from句をStep11-2で定義したsourceを使った記述になった)

select
    id as order_id,
    user_id as customer_id,
    order_date,
    status

from {{ source('jaffle_shop', 'orders') }}

Step12 【dbt cloud IDE】

作成したファイルmodels/schema.ymlに以下をコピペ。

version: 2

models:
  - name: customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

  - name: stg_customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id

Step13 【dbt cloud IDE】

Step12で作成したファイルmodels/schema.ymlを、丸ごと以下の内容に書き換え。(description:が追記された)

version: 2

models:
  - name: customers
    description: One record per customer
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: first_order_date
        description: NULL when a customer has not yet placed an order.

  - name: stg_customers
    description: This model cleans up customer data
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null

  - name: stg_orders
    description: This model cleans up order data
    columns:
      - name: order_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id

Step16 【SnowSight / Streamlit App】

SnowSightで作成したStreamlit Appに、以下をコピペ。

# Import python packages
import streamlit as st
from snowflake.snowpark.context import get_active_session
import streamlit as st
import pandas as pd
import numpy as np
from datetime import timedelta

st.header("🎈 顧客分析 Streamlit アプリケーション 🎈")

st.write("---")

#------------------------------------------
# データ取得
#------------------------------------------
session = get_active_session()
df = session.table('customers').to_pandas()

#------------------------------------------
# KPI 表示
#------------------------------------------
# メトリック表示
st.subheader("✅️ 主要メトリクス")

col1, col2, col3 = st.columns(3)
with col1:
    st.metric(
        label="📦️ オーダー数",
        value=len(df),
    )
with col2:
    st.metric(
        label="🙎 顧客数",
        value=df['NUMBER_OF_ORDERS'].sum(),
    )
with col3:
    st.metric(
        label="🔢 平均オーダー数",
        value=round(len(df) / df['NUMBER_OF_ORDERS'].sum(), 2),
    )

st.write("---")

#------------------------------------------
# NUMBER_OF_ORDERS の分布(Bin 単位の顧客数)
#------------------------------------------

st.subheader("🕵️ 顧客のオーダー数分布")

# Bin サイズをユーザー指定可能に
bin_size = st.number_input("👇️ グルーピングするサイズを指定", min_value=1, value=1)

# Bin を計算して、各 Bin に含まれる顧客数を集計
max_orders = df["NUMBER_OF_ORDERS"].max()
bins = np.arange(0, max_orders + bin_size, bin_size)

# pd.cut を使って Bin 分類
df["注文数グループ"] = pd.cut(df["NUMBER_OF_ORDERS"], bins=bins)
bin_counts = df.groupby("注文数グループ")["CUSTOMER_ID"].count().reset_index(name="顧客数")

# bar_chart で可視化(x 軸を文字列化しておくと見やすい場合も)
bin_counts["注文数グループ"] = bin_counts["注文数グループ"].astype(str)
st.bar_chart(data=bin_counts, x="注文数グループ", y="顧客数")

st.write("---")

#------------------------------------------
# Top N 顧客一覧
#------------------------------------------

st.subheader("👑 Top N 顧客の詳細")

# ユーザーに N を入力させる
N = st.number_input("👇️ 何件表示しますか?", min_value=1, value=10)

# 昇順・降順を選択可能に
sort_order = st.selectbox("👇️ 並び順", ["降順 (多い順)", "昇順 (少ない順)"])
ascending = (sort_order == "昇順 (少ない順)")

# 並び替え
df_sorted = df.sort_values("NUMBER_OF_ORDERS", ascending=ascending).head(N)

st.write(f"NUMBER_OF_ORDERS が{'少ない' if ascending else '多い'}順に Top {N} 件を表示")
st.dataframe(df_sorted[[
    "CUSTOMER_ID",
    "FIRST_NAME",
    "LAST_NAME",
    "NUMBER_OF_ORDERS",
    "FIRST_ORDER_DATE",
    "MOST_RECENT_ORDER_DATE"
]])

st.write("---")

#------------------------------------------
# おまけ
#------------------------------------------
button = st.button("dbt 🤝 Snowflake")
if button:
    st.snow()

以上です。

Discussion