📑

dbt python modelで遊ぶ

2022/12/06に公開

はじめに

dbt python modelがリリースされたので、SQLでは実装が難しそうなmodelを考えて実装してみます。
DWHはSnowflake, 実装はpandasおよびSnowpark for pythonを用いています。

フィボナッチ数列

SQLはforループを回すような処理は苦手なので、フィボナッチ数列を実装してみます。

import pandas as pd

def model(dbt, session):
    dbt.config(materialized = "table")
    df = pd.DataFrame()

    for i in range(0, 10):
        if i < 2:
            df.loc[i, 'num'] = 1    
        else:
            df.loc[i, 'num'] =  df.loc[i - 1, 'num'] + df.loc[i - 2, 'num']
    return df

これを実行すると、以下のようなテーブルが生成されます。

一応この程度のテーブルであれば再帰とかで頑張ればSQLでも実装は可能だと思います。

顧客ステータスの積み上げmodel

先ほどと同じように、自分自身を参照して、積み上げをしていくような処理を考えてみます。
以下のような顧客の状態遷移を考えます。

  • アクティブ層は一定期間購入がないと休眠見込み層に移る
  • 休眠見込み層は一定期間購入がないと休眠層に移る
  • 休眠層は購入があると休眠復帰層に移る

顧客分析においてはよくあるセグメント化だと思いますが、自分自身の状態を参照する必要があるのでSQL一発で出すのは難しく、select ... from {{ this }}などのSQLを書いてincrementalやsnapshotで積み上げる方式をとることが多いかと思います(=再集計が大変)。

python modelを用いればDataFrameに対してloopで処理ができるため、以下のようにしてtable modelとして割と簡単に実装が可能です。
簡単な集計処理はSnowparkで行なって、顧客ステータスの処理はforループで一つ前の状態を引数にして更新するような形にしています。
購買データはjaffle_shopのraw_ordersとraw_customers, 集計用calendarはseedで適当に作成しました。

calendar.csv
month
2018-01-01
2018-02-01
2018-03-01
2018-04-01

実装例

user_status.py
import pandas as pd
from snowflake.snowpark.functions import date_trunc, col

def calc_status(order_count, prev_status):
    status = None
    if prev_status is None:
        if order_count > 0:
            status = "アクティブ"
        else:
            status = "休眠"
    elif prev_status == "アクティブ" or prev_status == "休眠復帰":
        if order_count > 0:
            status = "アクティブ"
        else:
            status = "休眠見込み"
    elif prev_status == "休眠見込み":
        if order_count > 0:
            status = "アクティブ"
        else:
            status = "休眠"
    elif prev_status == "休眠":
        if order_count > 0:
            status = "休眠復帰"
        else:
            status = "休眠"
    return status

def model(dbt, session):
    dbt.config(materialized = "table")

    months = dbt.ref("calendar").select("month")
    customers = dbt.ref("raw_customers").select("id")


    # 月ごとの受注数
    customer_order = dbt.ref("raw_orders") \
        .select("user_id", date_trunc("MONTH", "order_date").alias("month")) \
        .group_by("month", "user_id").count()
    
    # 月と顧客のcross join
    monthly_customer = months.cross_join(customers) \
        .select([
            "month",
            col("id").alias("user_id")
        ])

    # 月ごとの受注数(0件含む)
    monthly_order = monthly_customer.join(customer_order, ["month", "user_id"], "left") \
        .fillna(0) \
        .select([
            "user_id",
            months["month"].alias("month"),
            col("count").alias("order_count")
        ]) \
        .sort("user_id", "month") # ループが楽になるようにソートしておく

    
    # 各行をループしてステータスを付与
    return_rows = []

    prev = {
        "USER_ID" : None,
        "USER_STATUS" : None
    }
    
    for row in monthly_order.to_local_iterator():
        current = row.as_dict()
        prev_status = (prev["USER_STATUS"] if prev["USER_ID"] == current["USER_ID"] else None)
        current_status = calc_status(current["ORDER_COUNT"], prev_status)
        return_rows.append([
            current["USER_ID"],
            current["MONTH"],
            current["ORDER_COUNT"],
            current_status
        ])

        prev = {
            "USER_ID": current["USER_ID"],
            "USER_STATUS": current_status
        }

    df = pd.DataFrame(return_rows)
    df.columns = ['USER_ID', 'MONTH', 'ORDER_COUNT', 'USER_STATUS']

    return df

このテーブルをクエリすると以下のようになります。

おわりに

python modelを使って遊んでみました。SnowparkはpysparkとAPIがほぼ同じなので、使用感はAWS EMRやDataprocとほぼ同じになるのかなと思います。
SQLで処理に詰まったらdbt pythonにして社内のsparkつよつよマンを召喚しようと思います。

Discussion