dbt python modelで遊ぶ
はじめに
dbt python modelがリリースされたので、SQLでは実装が難しそうなmodelを考えて実装してみます。
DWHはSnowflake, 実装はpandasおよびSnowpark for pythonを用いています。
フィボナッチ数列
SQLはforループを回すような処理は苦手なので、フィボナッチ数列を実装してみます。
import pandas as pd
def model(dbt, session):
dbt.config(materialized = "table")
df = pd.DataFrame()
for i in range(0, 10):
if i < 2:
df.loc[i, 'num'] = 1
else:
df.loc[i, 'num'] = df.loc[i - 1, 'num'] + df.loc[i - 2, 'num']
return df
これを実行すると、以下のようなテーブルが生成されます。
一応この程度のテーブルであれば再帰とかで頑張ればSQLでも実装は可能だと思います。
顧客ステータスの積み上げmodel
先ほどと同じように、自分自身を参照して、積み上げをしていくような処理を考えてみます。
以下のような顧客の状態遷移を考えます。
- アクティブ層は一定期間購入がないと休眠見込み層に移る
- 休眠見込み層は一定期間購入がないと休眠層に移る
- 休眠層は購入があると休眠復帰層に移る
顧客分析においてはよくあるセグメント化だと思いますが、自分自身の状態を参照する必要があるのでSQL一発で出すのは難しく、select ... from {{ this }}
などのSQLを書いてincrementalやsnapshotで積み上げる方式をとることが多いかと思います(=再集計が大変)。
python modelを用いればDataFrameに対してloopで処理ができるため、以下のようにしてtable modelとして割と簡単に実装が可能です。
簡単な集計処理はSnowparkで行なって、顧客ステータスの処理はforループで一つ前の状態を引数にして更新するような形にしています。
購買データはjaffle_shopのraw_ordersとraw_customers, 集計用calendarはseedで適当に作成しました。
month
2018-01-01
2018-02-01
2018-03-01
2018-04-01
実装例
import pandas as pd
from snowflake.snowpark.functions import date_trunc, col
def calc_status(order_count, prev_status):
status = None
if prev_status is None:
if order_count > 0:
status = "アクティブ"
else:
status = "休眠"
elif prev_status == "アクティブ" or prev_status == "休眠復帰":
if order_count > 0:
status = "アクティブ"
else:
status = "休眠見込み"
elif prev_status == "休眠見込み":
if order_count > 0:
status = "アクティブ"
else:
status = "休眠"
elif prev_status == "休眠":
if order_count > 0:
status = "休眠復帰"
else:
status = "休眠"
return status
def model(dbt, session):
dbt.config(materialized = "table")
months = dbt.ref("calendar").select("month")
customers = dbt.ref("raw_customers").select("id")
# 月ごとの受注数
customer_order = dbt.ref("raw_orders") \
.select("user_id", date_trunc("MONTH", "order_date").alias("month")) \
.group_by("month", "user_id").count()
# 月と顧客のcross join
monthly_customer = months.cross_join(customers) \
.select([
"month",
col("id").alias("user_id")
])
# 月ごとの受注数(0件含む)
monthly_order = monthly_customer.join(customer_order, ["month", "user_id"], "left") \
.fillna(0) \
.select([
"user_id",
months["month"].alias("month"),
col("count").alias("order_count")
]) \
.sort("user_id", "month") # ループが楽になるようにソートしておく
# 各行をループしてステータスを付与
return_rows = []
prev = {
"USER_ID" : None,
"USER_STATUS" : None
}
for row in monthly_order.to_local_iterator():
current = row.as_dict()
prev_status = (prev["USER_STATUS"] if prev["USER_ID"] == current["USER_ID"] else None)
current_status = calc_status(current["ORDER_COUNT"], prev_status)
return_rows.append([
current["USER_ID"],
current["MONTH"],
current["ORDER_COUNT"],
current_status
])
prev = {
"USER_ID": current["USER_ID"],
"USER_STATUS": current_status
}
df = pd.DataFrame(return_rows)
df.columns = ['USER_ID', 'MONTH', 'ORDER_COUNT', 'USER_STATUS']
return df
このテーブルをクエリすると以下のようになります。
おわりに
python modelを使って遊んでみました。SnowparkはpysparkとAPIがほぼ同じなので、使用感はAWS EMRやDataprocとほぼ同じになるのかなと思います。
SQLで処理に詰まったらdbt pythonにして社内のsparkつよつよマンを召喚しようと思います。
Discussion