📚

dbtを触ってみる

2024/10/15に公開

こんにちは!kirigayaです
すっかり周りは秋です!勉強の秋...
今回はぱぱっと環境作ってdbt触ってみようと思います。

環境は本来ならクラウドのDWHを使うのが良いと思いますがローカルにDuckDBを作ってやっていこうと思います

poetryの仮想環境を作成しdbtとdukdbをいれます
あとpandasとduckdb...

poetry add duckdb
poetry add pandas
poetry add dbt-core
poetry add dbt-duckdb

dbtのドキュメント
dbt-duckdbのリポジトリ

データを作成

import pandas as pd

raw_customers = pd.DataFrame(data={
    "customer_id": pd.Series([1,2,3,4,5]),
    "customer_name": pd.Series(["Alice Smith"]),
    "email": pd.Series(["alice@example.com","bob_johnson@sample","carol.white@invalid","dave@black.com","eve.green@email"]),
    "created_at": pd.Series(["2023-01-15 08:45","2023-02-03 12:30","2023-02-20 14:20","2023-03-01 09:55","2023-03-15 10:10"])
})

raw_orders = pd.DataFrame(data={
    "order_id": pd.Series([101,102,103,104,105]),
    "customer_id": pd.Series([1,2,1,3,4]),
    "product_name": pd.Series(["Laptop","Phone","Tablet","Monitor","Mouse"]),
    "quantity": pd.Series([1,2,1,1,3]),
    "price": pd.Series([1000,600,300,200,100]),
    "order_date": pd.Series(["2023-03-10 11:15","2023-03-12 13:00","2023-03-13 16:45","2023-03-14 17:20","2023-03-20 09:10"])
})

raw_customers.to_csv("raw_customers.csv", index=False)
raw_orders.to_csv("raw_orders.csv", index=False)

DBを作成

import sys
import duckdb
import pandas as pd

def create_persistent_table_from_pandas(db_path, table_name, dataframe=None, file_path=None) -> None:
    """
    PandasのデータフレームまたはCSVファイルから、指定されたDuckDBデータベースにテーブルを作成します。

    Args:
    :param db_path: DuckDBデータベースファイルのパス
    :param table_name: 作成するテーブルの名前
    :param dataframe: Pandasのデータフレーム(オプション)
    :param file_path: CSVファイルのパス(オプション、指定された場合はこれが優先)
    """
    
    if file_path:
        dataframe = pd.read_csv(file_path)

    if dataframe is None:
        raise ValueError("データフレームまたはCSVファイルのパスが必要です。")

    # データベースに接続
    conn = duckdb.connect(db_path)

    try:
        # データフレームからテーブルを作成
        conn.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM dataframe")
        print(f"テーブル '{table_name}' を作成しました。")

        # テーブルの内容を確認(オプション)
        result = conn.execute(f"SELECT * FROM {table_name} LIMIT 1").fetchall()
        print("テーブルの最初の1行:")
        print(result)

    except Exception as e:
        print(f"エラーが発生しました: {e}")
    
    finally:
        # 接続を閉じる
        conn.close()

if __name__ == "__main__":
    # コマンドライン引数からファイル名とテーブル名を取得
    if len(sys.argv) != 3:
        print("Usage: python script_name.py <csv_filename> <table_name>")
        sys.exit(1)

    csv_filename = sys.argv[1]
    table_name = sys.argv[2]

    # データフレームから永続テーブルを作成
    db_path = "service.duckdb"
    create_persistent_table_from_pandas(db_path, table_name, file_path=csv_filename)

pyファイルの引数にrawデータとテーブル名を渡して作成します
python3 build_db.py raw_orders.csv orders
python3 build_db.py raw_customers.csv customer

DBにクエリしてみる

すべてのテーブル情報を取得します

import duckdb

def print_table_schemas_and_first_rows(db_path):
    """
    DuckDBデータベースの全テーブルとスキーマを取得し、
    各テーブルの最初の5行目を標準出力します。
    
    Args:
    :param db_path: DuckDBデータベースファイルのパス
    """
    
    # データベースに接続
    conn = duckdb.connect(db_path)
    
    try:
        # スキーマ一覧を取得
        schemas = conn.execute("SELECT DISTINCT schema_name FROM information_schema.schemata").fetchall()
        print("スキーマ一覧:")
        for schema in schemas:
            print(f"- {schema[0]}")
        
        # テーブル一覧を取得
        tables = conn.execute("SELECT table_schema, table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE'").fetchall()
        print("\nテーブル一覧:")

        for schema, table in tables:
            print(f"\nスキーマ: {schema}, テーブル: {table}")

            # テーブルの最初の1行を取得して表示
            first_row = conn.execute(f"SELECT * FROM {schema}.{table} LIMIT 1").fetchall()
            if first_row:
                print("最初の1行目:", first_row)
            else:
                print("テーブルは空です。")
    
    except Exception as e:
        print(f"エラーが発生しました: {e}")
    
    finally:
        # 接続を閉じる
        conn.close()

if __name__ == "__main__":
    # 使用するDuckDBのパスを指定
    db_path = "service.duckdb"
    
    # テーブル一覧と各テーブルの1行目を表示
    print_table_schemas_and_first_rows(db_path)

dbtの設定

dbt init customer_product_analysis
DBが既にあると以下のように番号で選択できます

Which database would you like to use?
[1] duckdb

Enter a number: 1

.dbt/profiles.ymlの中身を変更しDBの接続設定を行います
.dbtはユーザー直下に作成されます...全部のプロジェクトの設定情報を管理している
vim ~/.dbt/profiles.yml
pathを変更します

customer_product_analysis:
  outputs:
    dev:
      type: duckdb
      path: .duckdbまでのpathを追加
      threads: 1

    prod:
      type: duckdb
      path: 
      threads: 4

  target: dev
  • dev:
    開発中のモデルやテストを実行するための接続設定を定義
  • prod:
    本番環境での接続設定を定義

modelの作成

ディレクトリ構成はこんな感じ...
今回は全てテーブル化します...ベスプラは今度勉強します...
{{ config(materialized='table') }}
models/exampleは削除しておきましょう

customer_product_analysis/
│
├── dbt_project.yml            # dbtプロジェクトの設定ファイル
├── models/                     # モデルを格納するディレクトリ
│   ├── stg/                   # ステージング層のモデルを格納
│   │   ├── stg_customers.sql  # クレンジングされた顧客データ
│   │   └── stg_orders.sql     # クレンジングされた注文データ
│   └── marts/                 # マート層のモデルを格納
│       └── mart_sales_summary.sql # 購買履歴のサマリー
├── seeds/                     # サンプルデータや初期データを格納
└── analyses/                  # 分析用のスクリプトを格納

customer_product_analysis/modelsにstgフォルダを作成します
クレンジング処理を書いたsqlファイルを作成

stg_customers.sql
名前が抜けているので追加
メールアドレスがおかしいので修正

{{ config(materialized='table') }}

with raw_data as (
    select * from main.customer
)

select
    customer_id,
    -- customer_idに応じてcustomer_nameを補完
    case 
        when customer_id = 2 then 'Bob Johnson'
        when customer_id = 3 then 'Carol White'
        when customer_id = 4 then 'Dave Black'
        when customer_id = 5 then 'Eve Green'
        else customer_name  -- その他の場合は元の名前を使用
    end as customer_name,
    
    -- メールアドレスのクレンジング
    case 
        when email like '%@%' then 
            case 
                when email not like '%.com' then email || '.com'  -- .comがない場合は付ける
                else email  -- 既に.comがある場合はそのまま
            end
        else email  -- 不正なメールアドレスでもそのまま
    end as email,
    
    created_at
from raw_data

stg_orders.sql
新しくレコードを追加

{{ config(materialized='table') }}

with raw_data as (
    select * from main.orders
    union all
    select
        105 as order_id,       -- order_idに105を設定
        5 as customer_id,      -- customer_idに5を設定
        'Laptop' as product_name, -- product_nameにLaptopを設定
        3 as quantity,         -- quantityに3を設定
        1000 as price,         -- priceに1000を設定
        '2023-03-14 18:00' as order_date  -- order_dateに2023-03-14 18:00を設定
)

select
    order_id,
    customer_id,
    product_name,
    quantity,
    price,
    order_date
from raw_data
where quantity > 0  -- 量が0より大きい注文のみを選択

同じ手順でmartsフォルダを作成しmart_sales_summary.sqlを作成します

{{ config(materialized='table') }}

with customer_orders as (
    select
        c.customer_id,
        c.customer_name,
        sum(o.price * o.quantity) as total_spent  -- 合計購入金額を計算
    from main.stg_customers as c
    left join main.stg_orders as o
    on c.customer_id = o.customer_id  -- customer_idでJOIN
    group by c.customer_id, c.customer_name  -- customerごとに集計
)

select
    customer_id,
    customer_name,
    total_spent
from customer_orders
order by customer_id asc  -- customer_idで降順に並び替え

※必ずプロジェクトのディレクトリに移動してコマンドを実行する
cd customer_product_analysis

dbt runで一括実行できますが以下のコマンドで少しずつ確認しながら実行することもできます

  • 設定をチェックする
    dbt debug
  • 特定のモデルが正しく設定されているかを確認
    dbt run --models stg_customers
  • SQLモデルをコンパイルして、生成されるSQLクエリを確認
    dbt compile

成功したっぽい...

16:33:55  1 of 3 START sql table model main.mart_sales_summary ........................... [RUN]
16:33:55  1 of 3 OK created sql table model main.mart_sales_summary ...................... [OK in 0.14s]
16:33:55  2 of 3 START sql table model main.stg_customers ................................ [RUN]
16:33:55  2 of 3 OK created sql table model main.stg_customers ........................... [OK in 0.05s]
16:33:55  3 of 3 START sql table model main.stg_orders ................................... [RUN]
16:33:55  3 of 3 OK created sql table model main.stg_orders .............................. [OK in 0.06s]
16:33:55  
16:33:55  Finished running 3 table models in 0 hours 0 minutes and 0.55 seconds (0.55s).
16:33:55  
16:33:55  Completed successfully

dbtとしてはここで終了ですがこのまま終了してしまうと何のために加工したのかわからないので可視化までいきましょう!

streamlitを使ってデータを確認

poetry add streamlit
app.py

import streamlit as st
import duckdb

# データベース接続の設定

conn = duckdb.connect(database='service.duckdb')

# 顧客と注文のJOINデータを取得
query_join = """
SELECT
    c.customer_id,
    c.customer_name,
    o.order_id,
    o.product_name,
    o.quantity,
    o.price,
    o.order_date
FROM main.customer AS c
JOIN main.orders AS o ON c.customer_id = o.customer_id
"""
joined_data = conn.execute(query_join).fetchdf()

# マート層のデータを取得
query_mart = "SELECT * FROM main.mart_sales_summary"
mart_data = conn.execute(query_mart).fetchdf()

# JOINデータを表示
st.subheader("Joined Customer and Order Data")
st.dataframe(joined_data)

# マート層データを表示
st.subheader("Mart Layer Customer and Order Data")
st.dataframe(mart_data)

# 接続を閉じる
conn.close()

dbtを動かすことができました。良さを感じるにはマクロ機能などもっと深堀する必要がありそうです...
お疲れ様でした...

岩田組

Discussion