dbt-coreにduckdbで入門してみる。
まえがき
いままでデータの前処理をpythonやduckdbで行ってきたが、データの品質などが求められる場面が多くそれに対応するには何を使えば良いか...
という時にdbtの名前をよく聞くのでデータパイプラインをdbt-coreとduckdbで作ってみた。
参考にしたサイトとデータ一覧
Quickstart for dbt Core from a manual install
チュートリアルをそのままなぞったわけではないのですが基本的な流れは以下の公式ドキュメントで行いました。
dagster & dbt
データソース
kaggleの以下のコンペのデータセット
「Walmart Recruiting - Store Sales Forecasting」
dbt-coreとdbt-duckdbをインストールする。
今回はdbtをpython使用でローカルで使用するのでvenvで仮想環境を作成したのち以下のインストールを行いました。dbt-cloudの方がいろいろな機能があり便利そうなのですが...ひとまずローカルで試したかったのとduckdbとの連携をしてみたかったので今回はこの選択にしました。
pip install dbt-core dbt-duckdb
dbt プロジェクトの初期化
ローカルでdbtの初期化を行う場合以下のコードを走らせます。
dbt init walmart
すると以下のような表示が出ます。これはdbt用のデータベースに何を使用するかという選択肢です。dbt-duckdb以外をpip installしていると他の選択肢が出るんですが今回はduckdbのみしかインストールしていないので1を押してエンターします。
Which database would you like to use?
[1] duckdb
Enter a number: 1
profiles.ymlの編集
データベースをどこに置くかの設定ファイルがあります。
これはちょっと分かりづらいところ(ホームディレクトリの隠しファイル)にあり移動するのが面倒なのでvscodeのターミナルで以下のコードを使用して直接編集します。
code ~/.dbt/profiles.yml
するとprofiles.ymlは以下のようになっていると思います。
walmart:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1
prod:
type: duckdb
path: prod.duckdb
threads: 4
target: dev
デフォルトではdev(開発環境)を使用するようになっているのでとりあえずdevの方のpathを実際に作成するdbファイルを置くフォルダのpathに設定します。
walmart:
outputs:
dev:
type: duckdb
path: /Users/XXXXX/walmart/database
threads: 1
prod:
type: duckdb
path: prod.duckdb
threads: 4
target: dev
次は読みμ体複数のcsvからdbを作成してみます。
dbt seeds に読み込みたいcsvを置く。
セオリーに従うとdbt seed?コマンドを使用することになると思うのですが,duckdbで直接ファイルパス指定できるので今回は以下のようにしてみました。
まずseedsフォルダ内に読み込みたいcsvファイルを配置します。(S3からファイルを読むような場合だとsqlで直接指定になりそうですね。)
dbtのプロジェクト内のseedsフォルダ内に以下のようにデータソースを置きます。
.
├── features.csv
├── sampleSubmission.csv
├── stores.csv
├── test.csv
└── train.csv
次にmodelsフォルダ配下に以下のsource.ymlファイルを置きます。これでsql上でcsvを指定するためのエイリアスを設定できるみたいです。
version: 2
sources:
- name: raw_data
tables:
- name: features
meta:
external_location: "read_csv_auto('/Users/XXXXX/walmart/seeds/features.csv')"
これで別名を設定できたので次はsql文を書いてみたいと思います。
dbt run する
modelsフォルダにもともと入っているmy_first_dbt_model.sqlに以下のsql文を記載します。
-- models/staging/stg_customers.sql
{{ config(materialized='table') }}
SELECT * FROM {{ source('raw_data', 'features') }}
{{ config(materialized='table') }}で囲まれた部分はJinja構文だそうです。
Pythonの中にかける特殊な構文のようなものみたいですね。
またmaterialized='table' で実際のテーブルとして保存するようにしてます。
view(ビュー)、incremental(増分更新)、ephemeral(一時的)などなど他にも指定できるようですね。
あとは先ほど指定した別名をSELECT文で指定しています。
raw_dataの中のfeaturesで指定していたcsvからデータを読み込んでいます。
この状態で以下のコードをターミナルに入力すると...
dbt run
以下のように処理が完了します。(デフォルトではsecond_dbt_model.sqlも使用されるようになっているのでその関連のwarningも出ますが今回は放置します。)
05:03:42 Finished running 1 table model in 0 hours 0 minutes and 0.41 seconds (0.41s).
05:03:42
05:03:42 Completed successfully
合ってるかわからないがdimension modelを作ってみる。
とりあえず動作確認が終わったので次は簡易的なディメンションモデルをつくってみます。
modelsのフォルダの配下にdimensionsというフォルダを作成し3つのsqlを作成します。
-- models/dimensions/dim_date.sql
{{ config(materialized='table') }}
SELECT DISTINCT
Date as date_id,
Date as full_date,
year(Date) as year,
month(Date) as month,
day(Date) as day,
date_part('week', Date) as week_number,
IsHoliday as is_holiday
FROM {{ source('raw_data', 'features') }}
まずfeaturesの中から一意の日付を引っ張って来れるようにDISTINCTで重複を排除して、
SELECTでdateと年、月、日、週などの情報と祝日フラグを設定してます。
-- models/dimensions/dim_department.sql
{{ config(materialized='table') }}
SELECT DISTINCT
Dept as department_id
FROM {{ source('raw_data', 'train') }}
trainから部署idを重複なしで引っ張ってきています。(他に情報がない...)
-- models/dimensions/dim_store.sql
{{ config(materialized='table') }}
SELECT
s.Store as store_id,
s.Type as store_type,
s.Size as store_size
FROM {{ source('raw_data', 'stores') }} s
storeの情報を取得するsqlも作ります。
合ってるか不明だがfact tableをつくる。
models配下にfactというフォルダを作成してsqlを作ります。
{{ config(materialized='table') }}
SELECT
ROW_NUMBER() OVER () as sales_id,
t.Store as store_id,
t.Dept as department_id,
t.Date as date_id,
t.Weekly_Sales as weekly_sales,
f.Temperature as temperature,
f.Fuel_Price as fuel_price,
f.MarkDown1 as markdown1,
f.MarkDown2 as markdown2,
f.MarkDown3 as markdown3,
f.MarkDown4 as markdown4,
f.MarkDown5 as markdown5,
f.CPI as cpi,
f.Unemployment as unemployment,
f.IsHoliday as is_holiday
FROM {{ source('raw_data', 'train') }} t
JOIN {{ source('raw_data', 'features') }} f
ON t.Store = f.Store AND t.Date = f.Date
dbt_project.ymlにmodelsを追加する。
models:
walmart:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
walmart_star_schema:
dimensions:
+materialized: table
facts:
+materialized: table
dbt run
出来上がったリネージを確認する。
dbtには視覚的に確認する方法があります。
dbt docs generate
の後
dbt docs serve
でローカルホストでUIが立ち上がります。
リネージ機能も確認できました。
とりあえずfactと二つのdimensionは繋がってますね。
(同じデータソースから持ってきているのでstoreは繋がらなかったんだと思います。)
各データベース確認
databaseフォルダに作られたデータベースの中身を簡単に確認したいので以下のpythonファイルを作成し結果を確認しました。
とりあえずcsvからデータを取得してデータベースに格納するところまではできました。
import duckdb
con = duckdb.connect(database="test.db")
print(con.sql("SHOW TABLES"))
print(con.sql("SELECT * FROM dim_date"))
print(con.sql("SELECT * FROM dim_department"))
print(con.sql("SELECT * FROM dim_store"))
print(con.sql("SELECT * FROM fact_sales"))
結果
┌────────────────────┐
│ name │
│ varchar │
├────────────────────┤
│ dim_date │
│ dim_department │
│ fact_sales │
│ my_first_dbt_model │
└────────────────────┘
┌────────────┬────────────┬───────┬───────┬───────┬─────────────┬────────────┐
│ date_id │ full_date │ year │ month │ day │ week_number │ is_holiday │
│ date │ date │ int64 │ int64 │ int64 │ int64 │ boolean │
├────────────┼────────────┼───────┼───────┼───────┼─────────────┼────────────┤
│ 2010-04-02 │ 2010-04-02 │ 2010 │ 4 │ 2 │ 13 │ false │
│ 2010-06-25 │ 2010-06-25 │ 2010 │ 6 │ 25 │ 25 │ false │
│ 2010-08-06 │ 2010-08-06 │ 2010 │ 8 │ 6 │ 31 │ false │
│ 2010-08-13 │ 2010-08-13 │ 2010 │ 8 │ 13 │ 32 │ false │
│ 2010-10-08 │ 2010-10-08 │ 2010 │ 10 │ 8 │ 40 │ false │
│ 2010-10-29 │ 2010-10-29 │ 2010 │ 10 │ 29 │ 43 │ false │
│ 2010-12-10 │ 2010-12-10 │ 2010 │ 12 │ 10 │ 49 │ false │
│ 2011-02-04 │ 2011-02-04 │ 2011 │ 2 │ 4 │ 5 │ false │
│ 2011-04-01 │ 2011-04-01 │ 2011 │ 4 │ 1 │ 13 │ false │
│ 2011-07-01 │ 2011-07-01 │ 2011 │ 7 │ 1 │ 26 │ false │
│ · │ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │ · │
│ 2012-04-06 │ 2012-04-06 │ 2012 │ 4 │ 6 │ 14 │ false │
│ 2012-05-18 │ 2012-05-18 │ 2012 │ 5 │ 18 │ 20 │ false │
│ 2012-05-25 │ 2012-05-25 │ 2012 │ 5 │ 25 │ 21 │ false │
│ 2012-06-01 │ 2012-06-01 │ 2012 │ 6 │ 1 │ 22 │ false │
│ 2012-06-29 │ 2012-06-29 │ 2012 │ 6 │ 29 │ 26 │ false │
│ 2012-07-27 │ 2012-07-27 │ 2012 │ 7 │ 27 │ 30 │ false │
│ 2012-09-14 │ 2012-09-14 │ 2012 │ 9 │ 14 │ 37 │ false │
│ 2012-10-12 │ 2012-10-12 │ 2012 │ 10 │ 12 │ 41 │ false │
│ 2013-05-17 │ 2013-05-17 │ 2013 │ 5 │ 17 │ 20 │ false │
│ 2013-07-19 │ 2013-07-19 │ 2013 │ 7 │ 19 │ 29 │ false │
├────────────┴────────────┴───────┴───────┴───────┴─────────────┴────────────┤
│ 182 rows (20 shown) 7 columns │
└────────────────────────────────────────────────────────────────────────────┘
┌───────────────┐
│ department_id │
│ int64 │
├───────────────┤
│ 51 │
│ 56 │
│ 72 │
│ 4 │
│ 23 │
│ 24 │
│ 33 │
│ 40 │
│ 43 │
│ 45 │
│ · │
│ · │
│ · │
│ 71 │
│ 93 │
│ 2 │
│ 10 │
│ 18 │
│ 28 │
│ 29 │
│ 34 │
│ 47 │
│ 39 │
├───────────────┤
│ 81 rows │
│ (20 shown) │
└───────────────┘
┌──────────┬────────────┬────────────┐
│ store_id │ store_type │ store_size │
│ int64 │ varchar │ int64 │
├──────────┼────────────┼────────────┤
│ 1 │ A │ 151315 │
│ 2 │ A │ 202307 │
│ 3 │ B │ 37392 │
│ 4 │ A │ 205863 │
│ 5 │ B │ 34875 │
│ 6 │ A │ 202505 │
│ 7 │ B │ 70713 │
│ 8 │ A │ 155078 │
│ 9 │ B │ 125833 │
│ 10 │ B │ 126512 │
│ · │ · │ · │
│ · │ · │ · │
│ · │ · │ · │
│ 36 │ A │ 39910 │
│ 37 │ C │ 39910 │
│ 38 │ C │ 39690 │
│ 39 │ A │ 184109 │
│ 40 │ A │ 155083 │
│ 41 │ A │ 196321 │
│ 42 │ C │ 39690 │
│ 43 │ C │ 41062 │
│ 44 │ C │ 39910 │
│ 45 │ B │ 118221 │
├──────────┴────────────┴────────────┤
│ 45 rows (20 shown) 3 columns │
└────────────────────────────────────┘
┌──────────┬──────────┬───────────────┬───┬───────────┬─────────────┬──────────────┬────────────┐
│ sales_id │ store_id │ department_id │ … │ markdown5 │ cpi │ unemployment │ is_holiday │
│ int64 │ int64 │ int64 │ │ varchar │ varchar │ varchar │ boolean │
├──────────┼──────────┼───────────────┼───┼───────────┼─────────────┼──────────────┼────────────┤
│ 1 │ 1 │ 1 │ … │ NA │ 211.0963582 │ 8.106 │ false │
│ 2 │ 1 │ 1 │ … │ NA │ 211.2421698 │ 8.106 │ true │
│ 3 │ 1 │ 1 │ … │ NA │ 211.2891429 │ 8.106 │ false │
│ 4 │ 1 │ 1 │ … │ NA │ 211.3196429 │ 8.106 │ false │
│ 5 │ 1 │ 1 │ … │ NA │ 211.3501429 │ 8.106 │ false │
│ 6 │ 1 │ 1 │ … │ NA │ 211.3806429 │ 8.106 │ false │
│ 7 │ 1 │ 1 │ … │ NA │ 211.215635 │ 8.106 │ false │
│ 8 │ 1 │ 1 │ … │ NA │ 211.0180424 │ 8.106 │ false │
│ 9 │ 1 │ 1 │ … │ NA │ 210.8204499 │ 7.808 │ false │
│ 10 │ 1 │ 1 │ … │ NA │ 210.6228574 │ 7.808 │ false │
│ · │ · │ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │ · │ · │
│ 9991 │ 1 │ 97 │ … │ NA │ 215.2736553 │ 7.682 │ false │
│ 9992 │ 1 │ 97 │ … │ NA │ 215.0435229 │ 7.682 │ false │
│ 9993 │ 1 │ 97 │ … │ NA │ 214.9980596 │ 7.682 │ false │
│ 9994 │ 1 │ 97 │ … │ NA │ 215.0910982 │ 7.682 │ false │
│ 9995 │ 1 │ 97 │ … │ NA │ 215.1841368 │ 7.962 │ false │
│ 9996 │ 1 │ 97 │ … │ NA │ 215.2771754 │ 7.962 │ false │
│ 9997 │ 1 │ 97 │ … │ NA │ 215.3611087 │ 7.962 │ false │
│ 9998 │ 1 │ 97 │ … │ NA │ 215.4222784 │ 7.962 │ false │
│ 9999 │ 1 │ 97 │ … │ NA │ 215.4834482 │ 7.962 │ false │
│ 10000 │ 1 │ 97 │ … │ NA │ 215.544618 │ 7.962 │ false │
├──────────┴──────────┴───────────────┴───┴───────────┴─────────────┴──────────────┴────────────┤
│ ? rows (>9999 rows, 20 shown) 15 columns (7 shown) │
└───────────────────────────────────────────────────────────────────────────────────────────────┘
今後はdagsterでの自動化やsnowflakeとの連携を試してみたいと思います👍
Discussion