dbtを触ってみる
こんにちは!kirigayaです
すっかり周りは秋です!勉強の秋...
今回はぱぱっと環境作ってdbt触ってみようと思います。
環境は本来ならクラウドのDWHを使うのが良いと思いますがローカルにDuckDBを作ってやっていこうと思います
poetryの仮想環境を作成しdbtとdukdbをいれます
あとpandasとduckdb...
poetry add duckdb
poetry add pandas
poetry add dbt-core
poetry add dbt-duckdb
データを作成
import pandas as pd
raw_customers = pd.DataFrame(data={
"customer_id": pd.Series([1,2,3,4,5]),
"customer_name": pd.Series(["Alice Smith"]),
"email": pd.Series(["alice@example.com","bob_johnson@sample","carol.white@invalid","dave@black.com","eve.green@email"]),
"created_at": pd.Series(["2023-01-15 08:45","2023-02-03 12:30","2023-02-20 14:20","2023-03-01 09:55","2023-03-15 10:10"])
})
raw_orders = pd.DataFrame(data={
"order_id": pd.Series([101,102,103,104,105]),
"customer_id": pd.Series([1,2,1,3,4]),
"product_name": pd.Series(["Laptop","Phone","Tablet","Monitor","Mouse"]),
"quantity": pd.Series([1,2,1,1,3]),
"price": pd.Series([1000,600,300,200,100]),
"order_date": pd.Series(["2023-03-10 11:15","2023-03-12 13:00","2023-03-13 16:45","2023-03-14 17:20","2023-03-20 09:10"])
})
raw_customers.to_csv("raw_customers.csv", index=False)
raw_orders.to_csv("raw_orders.csv", index=False)
DBを作成
import sys
import duckdb
import pandas as pd
def create_persistent_table_from_pandas(db_path, table_name, dataframe=None, file_path=None) -> None:
"""
PandasのデータフレームまたはCSVファイルから、指定されたDuckDBデータベースにテーブルを作成します。
Args:
:param db_path: DuckDBデータベースファイルのパス
:param table_name: 作成するテーブルの名前
:param dataframe: Pandasのデータフレーム(オプション)
:param file_path: CSVファイルのパス(オプション、指定された場合はこれが優先)
"""
if file_path:
dataframe = pd.read_csv(file_path)
if dataframe is None:
raise ValueError("データフレームまたはCSVファイルのパスが必要です。")
# データベースに接続
conn = duckdb.connect(db_path)
try:
# データフレームからテーブルを作成
conn.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM dataframe")
print(f"テーブル '{table_name}' を作成しました。")
# テーブルの内容を確認(オプション)
result = conn.execute(f"SELECT * FROM {table_name} LIMIT 1").fetchall()
print("テーブルの最初の1行:")
print(result)
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
# 接続を閉じる
conn.close()
if __name__ == "__main__":
# コマンドライン引数からファイル名とテーブル名を取得
if len(sys.argv) != 3:
print("Usage: python script_name.py <csv_filename> <table_name>")
sys.exit(1)
csv_filename = sys.argv[1]
table_name = sys.argv[2]
# データフレームから永続テーブルを作成
db_path = "service.duckdb"
create_persistent_table_from_pandas(db_path, table_name, file_path=csv_filename)
pyファイルの引数にrawデータとテーブル名を渡して作成します
python3 build_db.py raw_orders.csv orders
python3 build_db.py raw_customers.csv customer
DBにクエリしてみる
すべてのテーブル情報を取得します
import duckdb
def print_table_schemas_and_first_rows(db_path):
"""
DuckDBデータベースの全テーブルとスキーマを取得し、
各テーブルの最初の5行目を標準出力します。
Args:
:param db_path: DuckDBデータベースファイルのパス
"""
# データベースに接続
conn = duckdb.connect(db_path)
try:
# スキーマ一覧を取得
schemas = conn.execute("SELECT DISTINCT schema_name FROM information_schema.schemata").fetchall()
print("スキーマ一覧:")
for schema in schemas:
print(f"- {schema[0]}")
# テーブル一覧を取得
tables = conn.execute("SELECT table_schema, table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE'").fetchall()
print("\nテーブル一覧:")
for schema, table in tables:
print(f"\nスキーマ: {schema}, テーブル: {table}")
# テーブルの最初の1行を取得して表示
first_row = conn.execute(f"SELECT * FROM {schema}.{table} LIMIT 1").fetchall()
if first_row:
print("最初の1行目:", first_row)
else:
print("テーブルは空です。")
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
# 接続を閉じる
conn.close()
if __name__ == "__main__":
# 使用するDuckDBのパスを指定
db_path = "service.duckdb"
# テーブル一覧と各テーブルの1行目を表示
print_table_schemas_and_first_rows(db_path)
dbtの設定
dbt init customer_product_analysis
DBが既にあると以下のように番号で選択できます
Which database would you like to use?
[1] duckdb
Enter a number: 1
.dbt/profiles.yml
の中身を変更しDBの接続設定を行います
.dbtはユーザー直下に作成されます...全部のプロジェクトの設定情報を管理している
vim ~/.dbt/profiles.yml
pathを変更します
customer_product_analysis:
outputs:
dev:
type: duckdb
path: .duckdbまでのpathを追加
threads: 1
prod:
type: duckdb
path:
threads: 4
target: dev
- dev:
開発中のモデルやテストを実行するための接続設定を定義 - prod:
本番環境での接続設定を定義
modelの作成
ディレクトリ構成はこんな感じ...
今回は全てテーブル化します...ベスプラは今度勉強します...
{{ config(materialized='table') }}
※models/example
は削除しておきましょう
customer_product_analysis/
│
├── dbt_project.yml # dbtプロジェクトの設定ファイル
├── models/ # モデルを格納するディレクトリ
│ ├── stg/ # ステージング層のモデルを格納
│ │ ├── stg_customers.sql # クレンジングされた顧客データ
│ │ └── stg_orders.sql # クレンジングされた注文データ
│ └── marts/ # マート層のモデルを格納
│ └── mart_sales_summary.sql # 購買履歴のサマリー
├── seeds/ # サンプルデータや初期データを格納
└── analyses/ # 分析用のスクリプトを格納
customer_product_analysis/models
にstgフォルダを作成します
クレンジング処理を書いたsqlファイルを作成
stg_customers.sql
名前が抜けているので追加
メールアドレスがおかしいので修正
{{ config(materialized='table') }}
with raw_data as (
select * from main.customer
)
select
customer_id,
-- customer_idに応じてcustomer_nameを補完
case
when customer_id = 2 then 'Bob Johnson'
when customer_id = 3 then 'Carol White'
when customer_id = 4 then 'Dave Black'
when customer_id = 5 then 'Eve Green'
else customer_name -- その他の場合は元の名前を使用
end as customer_name,
-- メールアドレスのクレンジング
case
when email like '%@%' then
case
when email not like '%.com' then email || '.com' -- .comがない場合は付ける
else email -- 既に.comがある場合はそのまま
end
else email -- 不正なメールアドレスでもそのまま
end as email,
created_at
from raw_data
stg_orders.sql
新しくレコードを追加
{{ config(materialized='table') }}
with raw_data as (
select * from main.orders
union all
select
105 as order_id, -- order_idに105を設定
5 as customer_id, -- customer_idに5を設定
'Laptop' as product_name, -- product_nameにLaptopを設定
3 as quantity, -- quantityに3を設定
1000 as price, -- priceに1000を設定
'2023-03-14 18:00' as order_date -- order_dateに2023-03-14 18:00を設定
)
select
order_id,
customer_id,
product_name,
quantity,
price,
order_date
from raw_data
where quantity > 0 -- 量が0より大きい注文のみを選択
同じ手順でmarts
フォルダを作成しmart_sales_summary.sql
を作成します
{{ config(materialized='table') }}
with customer_orders as (
select
c.customer_id,
c.customer_name,
sum(o.price * o.quantity) as total_spent -- 合計購入金額を計算
from main.stg_customers as c
left join main.stg_orders as o
on c.customer_id = o.customer_id -- customer_idでJOIN
group by c.customer_id, c.customer_name -- customerごとに集計
)
select
customer_id,
customer_name,
total_spent
from customer_orders
order by customer_id asc -- customer_idで降順に並び替え
※必ずプロジェクトのディレクトリに移動してコマンドを実行する
cd customer_product_analysis
dbt run
で一括実行できますが以下のコマンドで少しずつ確認しながら実行することもできます
- 設定をチェックする
dbt debug
- 特定のモデルが正しく設定されているかを確認
dbt run --models stg_customers
- SQLモデルをコンパイルして、生成されるSQLクエリを確認
dbt compile
成功したっぽい...
16:33:55 1 of 3 START sql table model main.mart_sales_summary ........................... [RUN]
16:33:55 1 of 3 OK created sql table model main.mart_sales_summary ...................... [OK in 0.14s]
16:33:55 2 of 3 START sql table model main.stg_customers ................................ [RUN]
16:33:55 2 of 3 OK created sql table model main.stg_customers ........................... [OK in 0.05s]
16:33:55 3 of 3 START sql table model main.stg_orders ................................... [RUN]
16:33:55 3 of 3 OK created sql table model main.stg_orders .............................. [OK in 0.06s]
16:33:55
16:33:55 Finished running 3 table models in 0 hours 0 minutes and 0.55 seconds (0.55s).
16:33:55
16:33:55 Completed successfully
dbtとしてはここで終了ですがこのまま終了してしまうと何のために加工したのかわからないので可視化までいきましょう!
streamlitを使ってデータを確認
poetry add streamlit
app.py
import streamlit as st
import duckdb
# データベース接続の設定
conn = duckdb.connect(database='service.duckdb')
# 顧客と注文のJOINデータを取得
query_join = """
SELECT
c.customer_id,
c.customer_name,
o.order_id,
o.product_name,
o.quantity,
o.price,
o.order_date
FROM main.customer AS c
JOIN main.orders AS o ON c.customer_id = o.customer_id
"""
joined_data = conn.execute(query_join).fetchdf()
# マート層のデータを取得
query_mart = "SELECT * FROM main.mart_sales_summary"
mart_data = conn.execute(query_mart).fetchdf()
# JOINデータを表示
st.subheader("Joined Customer and Order Data")
st.dataframe(joined_data)
# マート層データを表示
st.subheader("Mart Layer Customer and Order Data")
st.dataframe(mart_data)
# 接続を閉じる
conn.close()
dbtを動かすことができました。良さを感じるにはマクロ機能などもっと深堀する必要がありそうです...
お疲れ様でした...
Discussion