DuckDB 50本ノック - 実際に使用した例から抜き出したtips集
まえがき
DuckDBをpythonで実際に使用してきて、これ100個くらいTipsを抜き出せるんじゃないかなと過去の自分の使用例を抜き出してみました。(実際は54個でした。そのうち100にしたい。)単純なデータベース利用というよりは圧倒的にデータ分析、ETL処理、可視化よりになるとは思いますが参考になればと思い(あと結構うろ覚えなので)記事を残しておきます。
基本操作編 (1-20)
1. インメモリデータベースへの接続
DuckDBはプログラム稼働中のみの一時的にデータベース接続を最初に作ることが多くこのコードを大体最初に書くことが多いです。このconを使ってcon.sql("")のようにし、""の内部でSQLを書くことでSQL文を使用することができます。
import duckdb
con = duckdb.connect(database=':memory:')
2. ファイルベースのデータベースへの接続
先ほどとは対照的に最初にデータベースに接続する例です。これはsqliteとかと同じ使用例でCRUD処理とかを書きたい時に使用できると思います。
con = duckdb.connect(database='my_database.db')
3. CSVファイルを直接クエリ
DuckDBではローカルにあるcsvをこのような感じで一文で読むことができます。非常に楽で助かります。
con.sql("SELECT * FROM 'data.csv'")
4. CSVファイルのテーブル変換
CSVを読み込みつつ更にテーブル作成まで行えてしまうのが以下のSQL文です。これもよく使います。
con.sql(f"CREATE OR REPLACE TABLE table_data AS SELECT * FROM read_csv_auto('{file_path}')")
5. Parquetファイルの読み込み
parquetファイルというcsvを列方向に高速検索できるようにし更に容量を減らすようなファイル形式があるのですがそういったparquetファイルも一文で読み込むことができます。
con.sql("SELECT * FROM 'data.parquet'")
6. JSONファイルの読み込み
jsonファイルも同様に読むことができます。
con.sql("SELECT * FROM read_json('data.json')")
7. Excelファイルの読み込み (xlsx)
xlsxファイル形式のExcelファイルも読めます!マクロ付きのファイル(xlsm)はどうなんでしょうねどこかで確認しておきたいですね。
con.sql("SELECT * FROM read_xlsx('data.xlsx', sheet = 'Sheet1')")
8. テーブル一覧の取得
データベースに接続した際に作成されたテーブル一覧が表示されます。
tables = con.sql("SHOW TABLES")
print(tables)
9. テーブルのスキーマ確認
テーブルのスキーマ構造を確認できます。(列名や列の型など)
schema = con.sql("DESCRIBE table_name")
print(schema)
10. 列数が異なるCSVの読み込み
RFC規格に沿っていないガタガタの(列数が行ごとに違うなど)csvを読むときにpandasなどでは読めないが、以下のコードでnullで埋めて読めるようになります。
con.sql("SELECT * FROM read_csv('irregular.csv', null_padding=true)")
11. 区切り文字の指定
一応csvを読むときに区切り文字を指定して読むことができます。
con.sql("SELECT * FROM read_csv('data.tsv', delim='\t')")
12. 文字エンコーディングの指定
文字エンコーディングも指定してcsvを読み込むことができます。
con.sql("SELECT * FROM read_csv('data.csv', encoding='SHIFT_JIS')")
13. 大きなCSVファイルのサンプリング読み込み
大きな行数を持つcsvを読み込みたいときにとりあえず上位1000行だけ読むようなことをするときに使います。LIMITはSQLとしてはおそらくお馴染みだとは思います。
con.sql("SELECT * FROM read_csv('large.csv') LIMIT 1000")
14. 複数CSVファイルの一括読み込み&Union(縦結合)してどのファイルから来たか示す列を追加する。
複数ファイルを同じ列名を持つ項目を縦に繋げつつ、どのファイルから来たか示す列を追加します。
con.sql("SELECT * FROM read_csv('data_*.csv', union_by_name = true,filename = true)")
15. トランザクションの使用
トランザクションの開始→データ挿入→コミットという処理を行うことができます。
トランザクション中は他の操作を受けない&失敗したらロールバックするのでデータの整合性が保てます。
con.execute("BEGIN TRANSACTION")
con.execute("INSERT INTO test_table VALUES (1, 'data')")
con.execute("COMMIT")
16. df()メソッドでDataFrame変換
sql文をdataframeへ変換することができます。pythonで使用する場合は使用頻度が高いです。
# DuckDBクエリ結果を直接pandas DataFrameに変換
df = con.sql("SELECT * FROM test_table").df()
17. 接続の適切なクローズ(try-finally使用)
try-finallyを使用して接続にエラーがあったときにエラーハンドリングして終了させられます。
try:
con = duckdb.connect()
# 処理
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
con.close() # 必ず接続を閉じる
18. 特定列のみ選択での高速化
単一の列を選択してクエリすることで高速化するようになっています。
df = con.sql(f"SELECT column1 FROM '{parquet_file}'")
19. UNION ALLでの複数列統計の結合
一応列指定してUNION ALLすることができます。
query = f"""
SELECT '列1' AS column_name, COUNT(列1), AVG(列1), MIN(列1), MAX(列1), STDDEV(列1)
FROM '{parquet_file}'
UNION ALL
SELECT '列2', COUNT(列2), AVG(列2), MIN(列2), MAX(列2), STDDEV(列2)
FROM '{parquet_file}'
"""
20. HTTPSエンドポイントからの直接読み込み
httpsエンドポイント上にあるcsvファイルを直接読み込んでいます。
df = con.sql("SELECT * FROM 'https://example.com/data.csv'")
データ変換・前処理編 (21-40)
21. CSVからParquetへの変換
これもよく使います。SQLというよりはcsvファイルをparquetファイルに変換して保存するコードになります。RDMSとは...となりますよね。
con.sql(f"COPY (SELECT * FROM read_csv_auto('{csv_file}')) TO '{parquet_file}' (FORMAT 'parquet')")
22. Parquetファイルの圧縮オプション付き保存
parquetファイルの圧縮形式もいろいろあってsnappyという形式で圧縮するように指定しています。zstdという圧縮率の高い形式も指定することができます。
con.sql("COPY test_table TO 'output.parquet' (FORMAT 'parquet', COMPRESSION 'snappy')")
23. 日付型への変換
いわゆる型変換(キャスト)も行うことができます。時系列データを扱うときに使用しました。
con.sql("SELECT CAST(date_column AS DATE) FROM test_table")
24. タイムスタンプの抽出
これも時系列データを扱うときに使用し、date型の列から年、月、日、時という要素を抜き出してそれぞれの列に格納しています。
con.sql("""
SELECT
EXTRACT(YEAR FROM date) AS year,
EXTRACT(MONTH FROM date) AS month,
EXTRACT(DAY FROM date) AS day,
EXTRACT(HOUR FROM date) AS hour
FROM test_table
""")
25. NULL値のパディング
列内のnull値を0で埋められます。
con.sql("SELECT COALESCE(col, 0) AS col FROM test_table")
26. 文字列の結合
ある列とある列を区切り文字指定して結合できます。改行文字とかも使用して結構使った記憶があります。
con.sql("SELECT column1 || ' ' || column2 AS combined FROM test_table")
27. 正規表現での文字列マッチング
これはある列の数字のみ含まれる行だけ抜きたいときに使いました。
con.sql("SELECT * FROM test_table WHERE col ~ '^[0-9]+$'")
28. 文字列の分割
,を区切り文字として列分割できるコードです。
con.sql("SELECT SPLIT(col, ',') AS array_column FROM test_table")
29. 配列の展開
配列としてデータがテーブルに格納されている場合、複数行に展開します。
con.sql("SELECT UNNEST(array_column) AS value FROM test_table")
30. UNNEST & SPLIT
この二つを組み合わせることで以下のようなデータ処理ができます。
これにもお世話になりました。(区切り文字ごとに分解して、他の列ごとにまとめる。)
con.sql("""
INSERT INTO products VALUES
(1, 'スマートフォン', '電化製品,通信機器,モバイル'),
(2, 'ノートPC', '電化製品,コンピュータ'),
(3, 'イヤホン', '電化製品,オーディオ,アクセサリー')
""")
# SPLIT → UNNEST で展開
result = con.sql("""
SELECT
product_id,
product_name,
UNNEST(STRING_SPLIT(categories, ',')) AS category
FROM products
""")
┌────────────┬─────────────────┬──────────────┐
│ product_id │ product_name │ category │
├────────────┼─────────────────┼──────────────┤
│ 1 │ スマートフォン │ 電化製品 │
│ 1 │ スマートフォン │ 通信機器 │
│ 1 │ スマートフォン │ モバイル │
│ 2 │ ノートPC │ 電化製品 │
│ 2 │ ノートPC │ コンピュータ │
│ 3 │ イヤホン │ 電化製品 │
│ 3 │ イヤホン │ オーディオ │
│ 3 │ イヤホン │ アクセサリー │
└────────────┴─────────────────┴──────────────┘
31. ピボットテーブルの作成
date別にカテゴリをピボットした表にしています。
con.sql("""
PIVOT test_table
ON category
USING SUM(value)
GROUP BY date
""")
32. ウィンドウ関数での順位付け
カテゴリごとに値の大きい順で順位を付けられます。
con.sql("""
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY value DESC) AS rank
FROM test_table
""")
33. 移動平均の計算
これで1週間の移動平均を求めたりしていました。(時系列データ)
確かkaggleコンペでの工場の電力消費量の変化のデータだったと思うのですが、1日ごとに変化が激しいデータだったので有効だったように思います。
con.sql("""
SELECT
*,
AVG(value) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS ma7
FROM test_table
""")
34. ラグ値の取得
時系列データのkaggleコンペでLAG特徴量を作成するときに使いました。意外と一行でできたりします。
con.sql("SELECT *, LAG(value, 1) OVER (ORDER BY date) AS prev_value FROM test_table")
35. 累積和の計算
これは日付順に値を累積して合計していく列を作っています。
con.sql("SELECT *, SUM(value) OVER (ORDER BY date) AS cumsum FROM test_table")
36. パーセンタイルの計算
これは四分位数を計算しています。
q1(第1四分位数): 下位25%の位置の値
median(中央値): 50%の位置の値
q3(第3四分位数): 下位75%の位置の値
con.sql("""
SELECT
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY value) AS q1,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY value) AS median,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY value) AS q3
FROM test_table
""")
37. 時系列データのリサンプリング
これは時間ごとにグループを作り、時間内平均を列として追加しています。
分刻みのデータでは変化が大きいことがあるので、時間ごとに集計しています。
con.sql("""
SELECT
DATE_TRUNC('hour', timestamp) AS hour,
AVG(value) AS avg_value
FROM test_table
GROUP BY hour
""")
38. カテゴリ変数のエンコーディング
カテゴリをアルファベット順でランキングし、連続した番号を割り当て(0, 1, 2...)
しています。DuckDBのscikit-learnの例で前処理でカテゴリごとにラベリングするときに使った...気がする。
con.sql("""
SELECT
*,
DENSE_RANK() OVER (ORDER BY category) - 1 AS category_encoded
FROM test_table
""")
39. 欠損値の前方補完
これはnull値があった場合時系列順に並べ、直前の行の値を使用して埋めるというコードです。
時系列データなどの連続値なら割と有効な手段かもしれません。
con.sql("""
SELECT
*,
COALESCE(value, LAG(value IGNORE NULLS) OVER (ORDER BY date)) AS filled_value
FROM test_table
""")
40. 変化率の計算
前の行と比べて何%変化したかの変化率を出しています。
con.sql("""
SELECT
*,
(value - LAG(value) OVER (ORDER BY date)) / LAG(value) OVER (ORDER BY date) * 100 AS pct_change
FROM test_table
""")
統計・分析編 (41-60)
41. 基本統計量の一括取得
実はSUMMARIZEで統計量の一括表示ができます。統計量を見るときに大体が個別のクエリを書かないといけないのですがこれで楽にみれます。
con.sql("SUMMARIZE SELECT * FROM test_table")
42. カラムごとの統計情報
逆に個別に統計量を得ようとすると以下のようなSQL文になります。必要なものがわかっているならこれで個別に書いても良いと思います。(速度的にも必要な統計量のみ指定する方が⚪︎)
con.sql("""
SELECT
column_name,
COUNT(*) AS count,
AVG(value) AS mean,
STDDEV(value) AS std,
MIN(value) AS min,
MAX(value) AS max
FROM test_table
GROUP BY column_name
""")
43. 相関係数の計算
ピアソン相関係数を計算できます。(両方とも数値の必要あり)
con.sql("SELECT CORR(column1, column2) AS correlation FROM test_table")
44. ヒストグラムデータの作成
0から100までの値を10ごとにビンを作って列作成しています。あと順番を入れ替え頻度計算してヒストグラムを作成できます。(長い)
con.sql("""
SELECT
CASE
WHEN value >= 0 AND value < 10 THEN 1
WHEN value >= 10 AND value < 20 THEN 2
WHEN value >= 20 AND value < 30 THEN 3
WHEN value >= 30 AND value < 40 THEN 4
WHEN value >= 40 AND value < 50 THEN 5
WHEN value >= 50 AND value < 60 THEN 6
WHEN value >= 60 AND value < 70 THEN 7
WHEN value >= 70 AND value < 80 THEN 8
WHEN value >= 80 AND value < 90 THEN 9
WHEN value >= 90 AND value <= 100 THEN 10
END AS bucket,
COUNT(*) AS frequency
FROM stats_test
GROUP BY bucket
ORDER BY bucket
""")
45. 外れ値の検出(IQR法)
四分位値を使用して外れ値を検出しています。
con.sql("""
WITH stats AS (
SELECT
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY value) AS q1,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY value) AS q3
FROM test_table
)
SELECT *
FROM test_table, stats
WHERE value < q1 - 1.5 * (q3 - q1)
OR value > q3 + 1.5 * (q3 - q1)
""")
46. 時系列の季節性分解
これは確か月別のトレンドをその値から全体の平均を引くことで算出していたコードです。時系列データで使用しました。
con.sql("""
SELECT
*,
value - AVG(value) OVER (
ORDER BY date
ROWS BETWEEN 12 PRECEDING AND 12 FOLLOWING
) AS seasonal_component
FROM test_table
""")
47. 分位点の計算
下位10%の境界値 と 下位90%の境界値を計算できます。これを元に閾値を定性線で引くなども可能です。
con.sql("""
SELECT
PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY value) AS p10,
PERCENTILE_DISC(0.9) WITHIN GROUP (ORDER BY value) AS p90
FROM test_table
""")
48. モード(最頻値)の計算
出現回数を出し、出現回数順に並べ替えしています。その後LIMIT 1 で最頻値のみ出しています。
con.sql("""
SELECT value, COUNT(*) AS frequency
FROM test_table
GROUP BY value
ORDER BY frequency DESC
LIMIT 1
""")
AWS・クラウド連携編 (49-60)
49. AWS S3からのデータ読み込み(Lambda環境)
AWS LambdaでS3のデータを取得する際に使用しました。tmpフォルダを作成する必要があるので冒頭で作成しています。
con.execute("SET home_directory='/tmp'")
con.execute("INSTALL httpfs")
con.execute("LOAD httpfs")
df = con.sql("SELECT * FROM read_csv('s3://bucket/file.csv')").df()
50. S3への認証情報設定
S3へ認証情報を最初に設定しておく必要があるのでこれで指定しています。
con.sql("""
CREATE SECRET (
TYPE s3,
KEY_ID 'your-access-key',
SECRET 'your-secret-key',
REGION 'ap-northeast-1'
)
""")
51. S3 Parquetファイルの直接クエリ
ローカルのparquetファイルを読むときのようにS3上のparquetファイルも読むことができます。
df = con.sql("SELECT * FROM read_parquet('s3://bucket/data/*.parquet')")
52. S3 Tables (Iceberg) への接続
S3 tables への接続もこれで行いました。
import json
import duckdb
def lambda_handler(event, context):
# 1
con = duckdb.connect(database=':memory:')
con.execute("SET home_directory='/tmp';")
# 2
con.sql("FORCE INSTALL aws FROM core_nightly")
con.sql("FORCE INSTALL httpfs FROM core_nightly")
con.sql("FORCE INSTALL iceberg FROM core_nightly")
con.sql("LOAD aws")
con.sql("LOAD httpfs")
con.sql("LOAD iceberg")
# 3
con.sql("""
CREATE SECRET (
TYPE s3,
KEY_ID 'XXXXXXXXXX',
SECRET 'XXXXXXXXXX',
REGION 'ap-northeast-1'
);
""")
# 4
con.sql("""
ATTACH 'XXXXXXXXXX'
AS s3_tables_db (
TYPE iceberg,
ENDPOINT_TYPE s3_tables
);
""")
# 5
print(con.sql("SHOW ALL TABLES"))
53. DuckDB UIの起動
ターミナルでduckdb UI が立ち上がり、ノートブック形式でSQLを書けるようになります。
duckdb -ui
54. Streamlitとの連携(キャッシュ付き)
streamlitとキャッシュ付きで接続したりしました。
@st.cache_resource
def get_duckdb_connection():
return duckdb.connect()
conn = get_duckdb_connection()
まとめ
まだ途中ですがそのうち100本ノックにしたいと思います。
Discussion