『DuckDB in Action』を読んでモダンダックスタックに入門した
DuckDBにどういったことができ、いつ使うことに適しているのかわからないので本で学ぶことにした。
DuckDBを使ったDWHを展開しているMotherDuckが本を無料公開している。
Manningでは一部無料公開している。UIが気持ちいい。
サンプルデータを含んだgithubリポジトリは以下。
1. An Introduction to DuckDB の メモ
DuckDBは、マシン上で実行される組み込み分析データベース。組み込みデータベースは、アプリケーションやノートブックなどの別のプロセス内で実行され、ネットワーク経由でアクセスしない。
データをネットワーク経由でコピーせずにデータが存在する場所で処理を実行することで、コスト削減ができる。
たとえば、S3にあるAWSアクセスログファイルを処理する場合のコスト比較だと、AWS Athena SQLクエリを実行する場合はスキャンされるデータ量に基づくが、DuckDBをEC2にデプロイした場合は、EC料金のみ支払う。
Athenaの場合
- クエリコスト:
- 1TB × $5/TB = $5(1回のクエリ)
- 月20回実行する場合:$5 × 20 = $100/月
合計:約$100/月
DuckDB + EC2の場合
- EC2コスト:
- t3.xlarge(4vCPU, 16GB RAM)を使用した場合
- $0.1664/時 × 24時間 × 30日 = 約$120/月
- S3アクセスコスト:
- GETリクエスト:1000リクエスト × $0.0004/1000 = $0.0004
- データ転送(同一リージョン):無料
- データ取得(初回):1TB × $0.0007/GB = $0.7
合計:約$121/月
例:月100回クエリを実行する場合
- Athena:$5 × 100 = $500/月
- DuckDB:約$121/月(変化なし)
DuckDBはDataFrameからコピーすることなくデータに直接アクセスできるため、データを効率的に処理することができる。従来のDataFrame変換が不要となりパファーマンスが向上する。
# 従来の方法
import pandas as pd
# 1. S3からParquetを読み込み → DataFrameに変換
df = pd.read_parquet('s3://bucket/data.parquet')
# 2. データ処理(メモリ上で新しいDataFrameが作成される)
filtered_df = df[df['value'] > 100]
result_df = filtered_df.groupby('category').agg({'value': 'sum'})
# 3. 結果をCSVに保存(また変換が発生)
result_df.to_csv('result.csv')
import duckdb
# ネイティブSQLで直接処理(DataFrame変換なし)
duckdb.sql("""
SELECT
category,
SUM(value) as total_value
FROM read_parquet('s3://bucket/data.parquet')
WHERE value > 100
GROUP BY category
-- 直接CSVに出力
COPY TO 'result.csv'
""")
2. Getting Started with DuckDB の メモ
拡張機能
DuckDBにはデータベースのコア機能にはない拡張機能がある。
describe select * from duckdb_extensions();
┌───────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │ column_type │ null │ key │ default │ extra │
│ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │
├───────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ extension_name │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ loaded │ BOOLEAN │ YES │ NULL │ NULL │ NULL │
│ installed │ BOOLEAN │ YES │ NULL │ NULL │ NULL │
│ install_path │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ description │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ aliases │ VARCHAR[] │ YES │ NULL │ NULL │ NULL │
│ extension_version │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ install_mode │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ installed_from │ VARCHAR │ YES │ NULL │ NULL │ NULL │
└───────────────────┴─────────────┴─────────┴─────────┴─────────┴─────────┘
デフォルトでは、DuckDBはインターネット上にあるファイルをクエリすることはできないが、httpfs拡張機能を介して利用できる。
INSTALL httpfs;
LOAD httpfs;
CSVファイルの解析
DuckDBは手動でダウンロードしてインポートするプロセスを実行することなく、データを直接処理できる。
レコード数をカウント
SELECT count(*)
FROM 'https://github.com/bnokoro/Data-Science/raw/master/'
'countries%20of%20the%20world.csv';
リモートファイルのフォーマットを指定する
SELECT count(*)
FROM read_csv_auto("https://bit.ly/3KoiZR0");
3. Executing SQL queries のメモ
JOIN句とは、2つのテーブルを結合させて新しいテーブルを作る操作。
集合操作: ベン図で表現
- UNION
- INTERSECT
- EXCEPT
結合操作: 直積で表現
- CROSS JOIN: すべての可能な組み合わせ。結果の行数は、テーブルAの行数×テーブルBの行数。
- INNER JOIN: CROSS JOINの結果から、等しいキーを持つ一致するペアのみを結合する内部結合
結合操作は、2つのテーブルを直積を取得してからフィルター操作の2つの操作で成り立っている。
-- INNER JOINは以下の2つの操作で構成される
SELECT *
FROM A INNER JOIN B
ON A.key = B.key;
-- 上記は以下と同じ
SELECT *
FROM (A CROSS JOIN B) -- 1. まず直積を取る
WHERE A.key = B.key; -- 2. 条件で絞り込む
直積集合の例: トランプのカード。
{A, K, Q, J, 10, 9, 8, 7, 6, 5, 4, 3, 2} CROSS JOIN {♠, ♥, ♦, ♣}の結果として、52枚のトランプのカードができる。
4. Advanced aggregation and analysis of data の メモ
githubのサンプルリポジトリのデータに対してクエリを実行する。
cd duckdb-in-action/examples/ch03
duckdb
> import database 'ch03_db';
非相関サブクエリ(uncorrelated subquery)
非相関サブクエリの例: 生成される総電力の平均を取得する
SELECT avg(sum_per_system) -- これが外部クエリ
FROM ( -- ここからサブクエリ。外部クエリの列を参照せずに独立して実行される。
SELECT sum(kWh) AS sum_per_system
FROM v_power_per_day
GROUP BY system_id
);
┌─────────────────────┐
│ avg(sum_per_system) │
│ double │
├─────────────────────┤
│ 133908.08666666667 │
└─────────────────────┘
非相関サブクエリは、外部クエリとは独立して実行されるサブクエリのこと。外部クエリの列を参照せず、単独で評価され、その結果が外部クエリで使用される。
外部クエリとは、サブクエリを含んでいる「より外側の」クエリのことを指す。
非相関スカラサブクエリ(uncorrelated, scalar subqueries)
非相関スカラサブクエリの例: どのシステムで最も大量の電力が消費されたか
SELECT read_on, power
FROM readings
WHERE power = (SELECT max(power) FROM readings);
┌─────────────────────┬───────────────┐
│ read_on │ power │
│ timestamp │ decimal(10,3) │
├─────────────────────┼───────────────┤
│ 2019-05-08 12:15:00 │ 133900.000 │
│ 2019-05-23 10:00:00 │ 133900.000 │
│ 2019-05-23 11:30:00 │ 133900.000 │
│ 2019-05-28 11:45:00 │ 133900.000 │
│ 2020-04-02 11:30:00 │ 133900.000 │
└─────────────────────┴───────────────┘
非相関スカラサブクエリとは、
- スカラー(単一値)を返す: 結果として単一の値(1行1列)のみを返す
- 非相関: 外部クエリの値を参照せず、独立して実行できる
全体の最大電力生成の値のみが表示されるため、システム毎の最大電力と時間の取得ができない。
サブクエリの行毎に異なる値を取得する必要がある場合は、相関サブクエリを利用する。
相関スカラサブクエリ(correlated, scalar subqueries)
相関スカラサブクエリの例: システム毎の最大電力と時間の取得
SELECT system_id, read_on, power -- これが外部クエリ
FROM readings r1
WHERE power = ( -- ここからサブクエリ。
SELECT max(power)
FROM readings r2
WHERE r2.system_id = r1.system_id -- サブクエリ内で外部クエリの値を参照するため、外部クエリの行毎に異なる結果を返却可能
)
ORDER BY ALL;
┌───────────┬─────────────────────┬───────────────┐
│ system_id │ read_on │ power │
│ int32 │ timestamp │ decimal(10,3) │
├───────────┼─────────────────────┼───────────────┤
│ 10 │ 2019-02-23 12:45:00 │ 1109.293 │
│ 34 │ 2019-05-08 12:15:00 │ 133900.000 │
│ 34 │ 2019-05-23 10:00:00 │ 133900.000 │
│ 34 │ 2019-05-23 11:30:00 │ 133900.000 │
│ 34 │ 2019-05-28 11:45:00 │ 133900.000 │
│ 34 │ 2020-04-02 11:30:00 │ 133900.000 │
│ 1200 │ 2020-04-16 12:15:00 │ 47873.333 │
└───────────┴─────────────────────┴───────────────┘
相関サブクエリは、外部クエリのカラムを参照するサブクエリ。サブクエリが外部クエリの各行に依存して実行されるため、外部クエリの行ごとに異なる結果を返す。
サブクエリは結合で置き換え可能。
SELECT r1.system_id, read_on, power
FROM readings r1
JOIN (
SELECT r2.system_id, max(power) AS value
FROM readings r2
GROUP BY ALL
) AS max_power ON (
max_power.system_id = r1.system_id AND
max_power.value = r1.power
)
ORDER BY ALL;
他のRDBMSでは相関サブクエリは遅くなるためサブクエリは結合で置き換えるが、DuckDBでは相関解除オプティマイザを使用しているため、この書き換え操作は不要。
GROUPING SETS, ROLLUP, CUBE
GROUPING SETS
GROUPING SETS は、複数の異なる GROUP BY 句の結果を一つの結果セットにまとめるためのSQLの機能。
- (列1, 列2): 列1 と 列2 の両方の組み合わせでグループ化する。
- 列1: 列1 だけでグループ化。この場合、列2 に対応する値は NULL になる。
- (): グルーピングキーを指定しないため、全体の集計を行う。この場合、列1 と 列2 の両方に対応する値は NULL になる。
GROUPING SETSによるグループ化の例:
SELECT year(read_on) AS year,
system_id,
count(*),
round(sum(power) / 4 / 1000, 2) AS kWh
FROM readings
GROUP BY GROUPING SETS ((year, system_id), year, ())
ORDER BY year NULLS FIRST, system_id NULLS FIRST;
┌───────┬───────────┬──────────────┬───────────┐
│ year │ system_id │ count_star() │ kWh │
│ int64 │ int32 │ int64 │ double │
├───────┼───────────┼──────────────┼───────────┤
│ NULL │ NULL │ 151879 │ 401723.22 │ --
│ 2019 │ NULL │ 103621 │ 269303.39 │
│ 2019 │ 10 │ 33544 │ 1549.34 │
│ 2019 │ 34 │ 35040 │ 205741.9 │
│ 2019 │ 1200 │ 35037 │ 62012.15 │
│ 2020 │ NULL │ 48258 │ 132419.83 │
│ 2020 │ 10 │ 14206 │ 677.14 │
│ 2020 │ 34 │ 17017 │ 101033.35 │
│ 2020 │ 1200 │ 17035 │ 30709.34 │
└───────┴───────────┴──────────────┴───────────┘
- 年とシステムIDの組み合わせごとのカウントと kWh
- 年ごとのカウントと kWh (この場合、system_id は NULL)
- 全体のカウントと kWh (この場合、year と system_id は NULL)
ROLLUP
ROLLUPは階層的な集計を行う。ROLLUP(A, B, C)は(A,B,C), (A,B), (A), ()の4つのグループ化を生成する。
ROLLUPによるグループ化の例:
ROLLUP 句に指定されている列は year と system_id の 2 つ(N=2)。
生成されるグルーピングセットは 2 + 1 = 3 になる:
- (year, system_id): 年とシステムID の組み合わせごとの集計
- (year): 年ごとの集計(system_id は NULL)
- (): 全体の総計(year と system_id は NULL)
SELECT year(read_on) AS year,
system_id,
count(*),
round(sum(power) / 4 / 1000, 2) AS kWh
FROM readings
GROUP BY ROLLUP (year, system_id) -- GROUP BY GROUPING SETS ((year, system_id), year, ())と同じ結果
ORDER BY year NULLS FIRST, system_id NULLS FIRST;
CUBE
すべての可能な組み合わせの集計を行う。
CUBE(A, B, C)は(A,B,C), (A,B), (A,C), (B,C), (A), (B), (C), ()の8つのグループ化を生成する。
CUBEによるグループ化の例:
CUBE 句に指定されている列は year と system_id の 2 つ(N=2)。
生成されるグルーピングセットは 2^2 = 4 になる:
- (year, system_id): 年とシステムID の両方 を指定した組み合わせごとの集計
- (year): 年のみ を指定した集計(system_id は NULL)
- (system_id): システムID のみ を指定した集計(year は NULL)
- (): どの列も指定しない 空のグルーピングセットで、全体の総計 を表します(year と system_id は NULL)
SELECT year(read_on) AS year,
system_id,
count(*),
round(sum(power) / 4 / 1000, 2) AS kWh
FROM readings
GROUP BY CUBE (year, system_id)
ORDER BY year NULLS FIRST, system_id NULLS FIRST;
┌───────┬───────────┬──────────────┬───────────┐
│ year │ system_id │ count_star() │ kWh │
│ int64 │ int32 │ int64 │ double │
├───────┼───────────┼──────────────┼───────────┤
│ NULL │ NULL │ 151879 │ 401723.22 │
│ NULL │ 10 │ 47750 │ 2226.48 │
│ NULL │ 34 │ 52057 │ 306775.25 │
│ NULL │ 1200 │ 52072 │ 92721.48 │
│ 2019 │ NULL │ 103621 │ 269303.39 │
│ 2019 │ 10 │ 33544 │ 1549.34 │
│ 2019 │ 34 │ 35040 │ 205741.9 │
│ 2019 │ 1200 │ 35037 │ 62012.15 │
│ 2020 │ NULL │ 48258 │ 132419.83 │
│ 2020 │ 10 │ 14206 │ 677.14 │
│ 2020 │ 34 │ 17017 │ 101033.35 │
│ 2020 │ 1200 │ 17035 │ 30709.34 │
├───────┴───────────┴──────────────┴───────────┤
│ 12 rows 4 columns │
└──────────────────────────────────────────────┘
Window関数
通常のSQLクエリでは現在の行のみ参照し他の行のデータにアクセスすることはできない。また、max
などの集計関数では集計する場合は集計項目以外の情報が失われる。
Window関数は集計しながらも元の行の詳細情報を維持できる。
windowはOVER句によって定義される。
select row_number() over() from readings;
最も多く生成される電力量の上位3つを取得するSQLの例
SELECT * FROM readings ORDER BY power DESC LIMIT 3;
┌───────────┬─────────────────────┬───────────────┐
│ system_id │ read_on │ power │
│ int32 │ timestamp │ decimal(10,3) │
├───────────┼─────────────────────┼───────────────┤
│ 34 │ 2019-05-08 12:15:00 │ 133900.000 │
│ 34 │ 2019-05-23 10:00:00 │ 133900.000 │
│ 34 │ 2019-05-23 11:30:00 │ 133900.000 │
└───────────┴─────────────────────┴───────────────┘
window関数を利用した最も多く生成される電力量の上位3つを取得するSQLの例
window関数でランキング算出後に、with句と組み合わせて上位3件を取得
WITH ranked_readings AS (
SELECT *,
dense_rank()
OVER (ORDER BY power DESC) AS rnk
FROM readings
)
SELECT *
FROM ranked_readings
WHERE rnk <= 3;
┌───────────┬─────────────────────┬───────────────┬───────┐
│ system_id │ read_on │ power │ rnk │
│ int32 │ timestamp │ decimal(10,3) │ int64 │
├───────────┼─────────────────────┼───────────────┼───────┤
│ 34 │ 2019-05-08 12:15:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-23 10:00:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-23 11:30:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-28 11:45:00 │ 133900.000 │ 1 │
│ 34 │ 2020-04-02 11:30:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-09 10:30:00 │ 133700.000 │ 2 │
│ 34 │ 2019-05-10 12:15:00 │ 133700.000 │ 2 │
│ 34 │ 2019-03-21 13:00:00 │ 133600.000 │ 3 │
│ 34 │ 2019-04-02 10:30:00 │ 133600.000 │ 3 │
└───────────┴─────────────────────┴───────────────┴───────┘
QUALIFY句を使っても取得可能。
SELECT *,
dense_rank()
OVER (ORDER BY power DESC) AS rnk
FROM readings
QUALIFY rnk <=3
システム毎に最も多く生成される電力量の上位3つを取得するSQLの例
PARTITION BYでsystem_idを指定することでウィンドウをシステム単位でパーティションを作成する。
WITH ranked_readings AS (
SELECT *,
dense_rank()
OVER (PARTITION BY system_id ORDER BY power DESC) AS rnk
FROM readings
)
SELECT *
FROM ranked_readings
WHERE rnk <= 3;
┌───────────┬─────────────────────┬───────────────┬───────┐
│ system_id │ read_on │ power │ rnk │
│ int32 │ timestamp │ decimal(10,3) │ int64 │
├───────────┼─────────────────────┼───────────────┼───────┤
│ 10 │ 2019-02-23 12:45:00 │ 1109.293 │ 1 │
│ 10 │ 2019-03-01 12:15:00 │ 1087.900 │ 2 │
│ 10 │ 2019-02-17 13:00:00 │ 1080.847 │ 3 │
│ 34 │ 2019-05-08 12:15:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-23 10:00:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-23 11:30:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-28 11:45:00 │ 133900.000 │ 1 │
│ 34 │ 2020-04-02 11:30:00 │ 133900.000 │ 1 │
│ 34 │ 2019-05-09 10:30:00 │ 133700.000 │ 2 │
│ 34 │ 2019-05-10 12:15:00 │ 133700.000 │ 2 │
│ 34 │ 2019-03-21 13:00:00 │ 133600.000 │ 3 │
│ 34 │ 2019-04-02 10:30:00 │ 133600.000 │ 3 │
│ 1200 │ 2020-04-16 12:15:00 │ 47873.333 │ 1 │
│ 1200 │ 2020-04-02 12:30:00 │ 47866.667 │ 2 │
│ 1200 │ 2020-04-16 13:15:00 │ 47866.667 │ 2 │
│ 1200 │ 2020-04-16 12:30:00 │ 47860.000 │ 3 │
│ 1200 │ 2020-04-16 12:45:00 │ 47860.000 │ 3 │
├───────────┴─────────────────────┴───────────────┴───────┤
│ 17 rows 4 columns │
└─────────────────────────────────────────────────────────┘
エネルギー生成の7日間移動平均を取得するSQLの例
7日間移動平均を取得するには、7日間のフレーミングを定義する必要がある。
フレーミング(framing)は、対象となる集合を現在の行を起点に相対的に定義すること。
SELECT system_id,
day,
kwh,
avg(kwh) OVER (
PARTITION BY system_id
ORDER BY day ASC
RANGE BETWEEN INTERVAL 3 Days PRECEDING
AND INTERVAL 3 Days FOLLOWING
) AS "kWh 7-day moving average"
FROM v_power_per_day
ORDER BY system_id, day;
5. Exploring data without persistence の メモ
DuckDBは他のデータベースにアタッチしてクエリすることができる。
Kaggle European Soccer Database からデータセットをダウンロードする。データは300BMのSQLiteファイル。
INSTALL sqlite;
LOAD sqlite;
ATTACH 'database.sqlite' AS fifa (TYPE sqlite);
use fifa;
CREATE OR REPLACE VIEW Country AS
FROM sqlite_scan('database.sqlite', 'Country');
from Country limit 5;
┌───────┬─────────┐
│ id │ name │
│ int64 │ varchar │
├───────┼─────────┤
│ 1 │ Belgium │
│ 1729 │ England │
│ 4769 │ France │
│ 7809 │ Germany │
│ 10257 │ Italy │
└───────┴─────────┘
Postgresqlなどへのアタッチも可能。
6. Integrating with the Python ecosystem の メモ
DuckDB Python APIを利用して、DuckDBはPythonのエコシステムと統合できる。
7. DuckDB in the cloud with MotherDuck の メモ
MotherDuckはサーバーレスデータ分析プラットフォーム
エコシステム:モダンダックスタックより引用。
8. Building data pipelines with DuckDB の メモ
dlt
dltはデータパイプラインにおけるデータ取り込み(DataInjest)を目的としたツール。
様々なデータソースからDuckDBなどのデータストアにデータ取り込みができる。
インストール
dltおよびduckdbをインストールする
python3 -m venv venv
source venv/bin/activate
pip install dlt
pip install duckdb
パイプラインの構築
Chess.comをソースとして、duckdbを宛先としたパイプラインを構築する。
dlt init chess duckdb
Chess.comのplayer_profilesをduckdbに取り込むpythonスクリプトをスクラッチで作成する。
# chess_pipeline.py
import dlt
from chess import source
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="main"
)
data = source(
players=[
"magnuscarlsen", "vincentkeymer",
"dommarajugukesh", "rpragchess"
],
start_month="2022/11",
end_month="2022/11",
)
players_profiles = data.with_resources("players_profiles")
info = pipeline.run(players_profiles)
print(info)
パイプラインの実行
パイプラインを実行する。
python chess_pipeline.py
duckdbファイルに書き込まれた情報を確認する
duckdb chess_pipeline.duckdb
データが取得できることを確認できた。
D show tables;
┌─────────────────────┐
│ name │
│ varchar │
├─────────────────────┤
│ _dlt_loads │
│ _dlt_pipeline_state │
│ _dlt_version │
│ players_profiles │
└─────────────────────┘
D FROM players_profiles LIMIT 1;
┌──────────────────────┬──────────────────────┬───┬────────────────────┬────────────────┐
│ last_online │ joined │ … │ _dlt_load_id │ _dlt_id │
│ timestamp with tim… │ timestamp with tim… │ │ varchar │ varchar │
├──────────────────────┼──────────────────────┼───┼────────────────────┼────────────────┤
│ 2025-03-24 00:42:0… │ 2018-02-10 19:45:5… │ … │ 1742802433.8449512 │ ISOWTSrxyo721g │
├──────────────────────┴──────────────────────┴───┴────────────────────┴────────────────┤
│ 1 rows 18 columns (4 shown) │
dbt
dbtはデータパイプラインにおける変換を目的としたツール
SQLベースのデータクレンジングなどを行う。
dbt-duckdbを利用することで、dbtのパイプラインにduckdbを組み込むことができる。
インストール
dbt-duckdbのインストールとdbtプロジェクトの作成
python3 -m venv venv
source venv/bin/activate
pip install dbt-duckdb
dbt init dbt_transformations
パイプラインの構築
GitHubに保存されているCSVファイルを取得し、クリーンアップとデータ変換を適用し、クリーニングされたデータをParquetファイルとして出力するパイプラインを構築する
- source.ymlを参照して、dbtはGithubからCSVデータを取得する
- 取得したデータをDuckDBにわたす(.dbt/profiles.ymlの設定を参照する)
- DuckDBがSQL変換を行う
- Parquetに出力する
# dbt_transformations/models/atp/source.yml
version: 2
sources:
- name: github
meta:
external_location: "https://raw.githubusercontent.com/JeffSackmann/tennis_atp/master/atp_matches_2023.csv"
tables:
- name: matches_file
# dbt_transformations/models/atp/matches.sql
{{ config(
materialized='external',
location='output/matches.parquet',
format='parquet'
) }} -- <.>
WITH noWinLoss AS (
SELECT COLUMNS(col ->
NOT regexp_matches(col, 'w_.*') AND -- <.>
NOT regexp_matches(col, 'l_.*') -- <.>
)
FROM {{ source('github', 'matches_file') }} -- <.>
)
SELECT * REPLACE (
cast(strptime(cast(tourney_date as varchar), '%Y%m%d') AS date) as tourney_date -- <.>
)
FROM noWinLoss
サンプルリポジトリのコードだと失敗するため下記のように修正する必要がある。
SELECT * REPLACE (
- cast(strptime(tourney_date, '%Y%m%d') AS date) as tourney_date
+ cast(strptime(cast(tourney_date as varchar), '%Y%m%d') AS date) as tourney_date -- <.>
)
パイプラインの実行
パイプラインの実行と結果を確認する
mkdir output
dbt run
duckdb -s "SELECT count(*) FROM 'output/matches.parquet'";
┌──────────────┐
│ count_star() │
│ int64 │
├──────────────┤
│ 2986 │
└──────────────┘
テスト
schema.ymlを用いたデータテストを行う。これはめちゃめちゃ便利。
# dbt_transformations/models/atp/schema.yml
version: 2
models:
- name: matches
description: "ATP tennis matches schema"
columns:
- name: tourney_id
description: "The ID of the tournament."
tests:
- not_null
- name: winner_id
description: "The ID of the winning player."
tests:
- not_null
- name: loser_id
description: "The ID of the losing player."
tests:
- not_null
- name: surface
description: "The surface of the court."
tests:
- not_null
- accepted_values:
values: ['Grass', 'Hard', 'Clay']
Dagster
Dagsterはデータパイプラインのオーケストレーションを行うためのツール。
DuckDB との統合により、データを DuckDB にロードし、Dagster アセット内で Python と pandas を使用して変換することができる。
インストール
python@3.13だとインストール時に下記のエラーが発生する。python@3.12を利用して環境を作る
error: subprocess-exited-with-error
× Preparing metadata (pyproject.toml) did not run successfully.
│ exit code: 1
╰─> [6 lines of output]
Cargo, the Rust package manager, is not installed or is not on PATH.
This package requires Rust and Cargo to compile extensions. Install it through
the system's package manager or via https://rustup.rs/
Checking for Rust toolchain....
[end of output]
brew install python@3.12
python3.12 -m venv venv
source venv/bin/activate
venv ❯ python --version
Python 3.12.9
dagsterのインスール
pip install dagster dagster-duckdb
pip install dagster-webserver
pip install pandas
パイプラインを構築する
テニスに関するCSVソースを取得して、duckdbに出力するパイプラインを構築する。
-
assets.py
にデータアセットの定義を記載する。
# atp/assets.py
from dagster_duckdb import DuckDBResource
from dagster import asset
import pandas as pd
@asset
def atp_matches_dataset(duckdb: DuckDBResource) -> None:
base = "https://raw.githubusercontent.com/JeffSackmann/tennis_atp/master"
csv_files = [ # <.>
f"{base}/atp_matches_{year}.csv"
for year in range(1968,2024)
]
with duckdb.get_connection() as conn: # <.>
conn.execute("""
CREATE TABLE IF NOT EXISTS matches AS
SELECT * REPLACE(
cast(strptime(cast(tourney_date as varchar), '%Y%m%d') AS date) as tourney_date
)
FROM read_csv_auto($1, types={
'winner_seed': 'VARCHAR',
'loser_seed': 'VARCHAR'
})
""", [csv_files])
from dagster_duckdb import DuckDBResource
@asset
def atp_players_dataset(duckdb: DuckDBResource) -> None:
base = "https://raw.githubusercontent.com/JeffSackmann/tennis_atp/master"
csv_file = f"{base}/atp_players.csv"
with duckdb.get_connection() as conn:
conn.execute("""
CREATE OR REPLACE TABLE players AS
SELECT * REPLACE(
CASE
WHEN dob IS NULL THEN NULL -- <.>
WHEN SUBSTRING(CAST(dob AS VARCHAR), 5, 4) = '0000' THEN -- <.>
CAST(strptime(
CONCAT(SUBSTRING(CAST(dob AS VARCHAR), 1, 4), '0101'),
'%Y%m%d'
) AS date)
ELSE
CAST(strptime(dob, '%Y%m%d') AS date) -- <.>
END AS dob
)
FROM read_csv_auto($1, types = {
'dob': 'STRING'
});
""", [csv_file])
@asset(deps=[atp_players_dataset])
def atp_players_name_dataset(duckdb: DuckDBResource) -> None:
with duckdb.get_connection() as conn:
conn.execute("""
ALTER TABLE players ADD COLUMN name_full VARCHAR;
UPDATE players
SET name_full = name_first || ' ' || name_last
""", [])
@asset
def atp_rounds_dataset(duckdb: DuckDBResource) -> None:
rounds_df = pd.DataFrame({
"name": [
"R128", "R64", "R32", "R16", "ER","RR",
"QF", "SF", "BR", "F"
],
"order": [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
]
})
with duckdb.get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS rounds AS
SELECT * FROM rounds_df
""")
@asset
def atp_levels_dataset(duckdb: DuckDBResource) -> None:
levels_df = pd.DataFrame({
'short_name': [
"G", "M", "A", "C", "S", "F"
],
'name': [
"Grand Slam", "Tour Finals", "Masters 1000s",
"Other Tour Level", "Challengers", "ITFs"
],
'rank': [
5, 4, 3, 2, 1, 0
]
})
with duckdb.get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS levels AS
SELECT * FROM levels_df
""")
サンプルリポジトリのコードだと失敗するため下記のように修正する必要がある。
SELECT * REPLACE (
- cast(strptime(tourney_date, '%Y%m%d') AS date) as tourney_date
+ cast(strptime(cast(tourney_date as varchar), '%Y%m%d') AS date) as tourney_date -- <.>
)
-
__init__.py
にオーケストレーションに関する定義を記載する。atp.duckdbへファイル出力するようにしている。
from dagster_duckdb import DuckDBResource
from dagster import (
AssetSelection,
ScheduleDefinition,
Definitions,
define_asset_job,
load_assets_from_modules,
)
from . import assets
atp_job = define_asset_job("atp_job", selection=AssetSelection.all()) # (1)
atp_schedule = ScheduleDefinition(
job=atp_job,
cron_schedule="0 * * * *", # (2)
)
all_assets = load_assets_from_modules([assets])
defs = Definitions( # (3)
assets=all_assets,
jobs=[atp_job],
resources={"duckdb": DuckDBResource(
database="atp.duckdb", # (4)
)},
schedules=[atp_schedule],
)
パイプラインの実行
UIを起動して実行する。
dagster dev -m atp
もしくは、CLIで実行する
dagster job execute -m atp --job atp_job
取り込み結果を確認する
duckdb atp.duckdb 'SELECT count(*) FROM matches'
┌──────────────┐
│ count_star() │
│ int64 │
├──────────────┤
│ 191920 │
└──────────────┘
9. Building and deploying data apps の メモ
Streamlit
Streamlitはpythonのみを使用してインタラクティブなUIを作成するためのツール。
dagsterを使って作成したatp.duckdb
をデータベースとして利用する。
データベースはStreamlitアプリケーションと同じプロセスに組み込まれて実行されるため、リモート接続は不要。
また、データの可視化はPlotlyを使って行う。
インタラクティブに検索条件を指定して、例えばフェデラーと錦織圭の対戦成績を表示することができる。
Superset
SupersetはノーコードのBIツール。
本書で指定していたSupersetのバージョンが3系で古かったため、QuickStartを参照に4系での環境構築を行う。
git checkout tags/4.1.1
docker compose -f docker-compose-image-tag.yml up
quickstartで起動した状態だと、duckdbがデータベースとして選択できないので、データベースドライバーのインストールを参照する。
2025/3/24時点ではDuckDBに関するドキュメント記載はないが、対応はされている模様。
duckdbのドライバーをコンテナ内にインストールする。
docker exec -it 4bc64206c898 bash
pip install duckdb duckdb-engine
また、作成されたファイルを./docker
にコピーしてSupersetから参照できるようにしておく。
cd superset
cp atp.duckdb docker
起動後に下記で接続可能。
データベースは接続後はDatasetおよびDashboardをカスタマイズできる。
10. Performance considerations for large datasets のメモ
「ビッグデータ」の定義の1つは「1 台のマシンに収まらないもの」だが、マシンの進化により24TBのRAMがAWSで利用可能になり、以前はビッグデータとして扱われていた多くのデータセットが単一のマシンで処理できるようになってきている。
DuckDBは「morsel approach」によりマルチコアの並列処理を効率化している。
11. Conclusion のメモ
本書で深く掘り下げられなかったDuckDBの内部についてはYoutubeで紹介されている
ラップアップ
SQLについてはGROUPINGSETS、ウィンドウ関数など知らなかった機能を改めて学んだ。
モダンダックスタックを中心に、DuckDBをdlt/dbt/dagster/streamlit/supersetsと統合した例がそれぞれ有り充実した本。
データエンジニアのスキルアップでDuckDBが紹介されている。DuckDBは分散エンジンより単一ノードでの実行のため速度が速いことが利点として上げられてもいる。
Happy Quacking!
Discussion