DuckDB Update & Blog reading #8:Ducklakeのタイムトラベル機能・スキーマエボリューションを試す
まえがき
DuckDBチームから新しくDuckLakeというフォーマットが出ました。
このフォーマットを使用することでローカルでicebergのようなタイムトラベル機能やスキーマエボリューションを実現できます...!(まさかクラウド抜きでこういった機能を使えるとは...)
早速使ってみました!
使用言語
Python
公式サイト
youtube
タイムトラベル機能とは
その名の通りざっくり言うと一度データを変更した後に、昔のデータを読める機能です。(タイムトラベルするように)
これを実現するために例えばicebergは複数のjsonファイルなどを使用しなければいけなかったのですが、ducklakeではducklakeフォーマット(とはいってもデータベース形式)のファイルを一つ作成しこの中にその機能を使うために19個のテーブルを作って対応しています。
複数のjsonなどを使用してそういった機能を実現するより、データベースファイルに持たせてテーブルとして管理してよりシンプルにする...と言うことだと思います。
インストール
まずDuckDB v1.3.0をインストールした状態で拡張機能としてducklakeをインストールする必要があります。
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("INSTALL ducklake;")
Ducklake用のデータベース作成
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("INSTALL ducklake;")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
このコードを使用すると以下のようにmy_ducklake.ducklakeというデータベースファイルが実行ファイルと同じディレクトリ内に作成されます。
この中にメタデータなどのテーブルが入ることになります。
Ducklakeを使用する
普通のデータベースと同じようにスキーマやテーブルの作成、データの挿入、データの更新、データの削除、テーブルスキーマの変更ができます。
他のデータレイクやレイクハウス形式と同様に、DuckLake 形式ではインデックス、主キー、外部キー、UNIQUE 制約や CHECK 制約はサポートされていない...なるほど
上記の例で作成した DuckLake を使いつつここではオランダの鉄道駅データセットを使用します。
import duckdb
con = duckdb.connect(database=":memory:")
# con.sql("INSTALL ducklake;")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
con.sql("CREATE TABLE nl_train_stations AS FROM 'https://blobs.duckdb.org/nl_stations.csv';")
これでweb上のcsvからテーブルが作られます。またこの時点でmy_ducklake.ducklake.filesというフォルダができ、内部にparquetファイルができました。
中身を見てみましょう。
import duckdb
con = duckdb.connect(database=":memory:")
# con.sql("INSTALL ducklake;")
# con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
# con.sql("USE my_ducklake;")
# con.sql("CREATE TABLE nl_train_stations AS FROM 'https://blobs.duckdb.org/nl_stations.csv';")
con.sql("FROM glob('my_ducklake.ducklake.files/*')")
print(con.sql("FROM 'my_ducklake.ducklake.files/*.parquet' LIMIT 10;"))
┌───────┬─────────┬─────────┬───┬──────────────────────┬─────────────────┬─────────────────┐
│ id │ code │ uic │ … │ type │ geo_lat │ geo_lng │
│ int64 │ varchar │ int64 │ │ varchar │ double │ double │
├───────┼─────────┼─────────┼───┼──────────────────────┼─────────────────┼─────────────────┤
│ 266 │ HT │ 8400319 │ … │ knooppuntIntercity… │ 51.69048 │ 5.29362 │
│ 269 │ HTO │ 8400320 │ … │ stoptreinstation │ 51.700553894043 │ 5.3183331489563 │
│ 227 │ HDE │ 8400388 │ … │ stoptreinstation │ 52.4091682 │ 5.893611 │
│ 8 │ AHBF │ 8015345 │ … │ knooppuntIntercity… │ 50.7678 │ 6.091499 │
│ 818 │ AW │ 8015199 │ … │ stoptreinstation │ 50.78036 │ 6.070715 │
│ 51 │ ATN │ 8400045 │ … │ stoptreinstation │ 51.921326524551 │ 6.5786272287369 │
│ 5 │ AC │ 8400047 │ … │ stoptreinstation │ 52.2785 │ 4.977 │
│ 550 │ EAHS │ 8021123 │ … │ stoptreinstation │ 52.079796120944 │ 7.0163583755493 │
│ 12 │ AIME │ 8774176 │ … │ intercitystation │ 45.55438 │ 6.64869 │
│ 819 │ ACDG │ 8727149 │ … │ knooppuntIntercity… │ 49.004048 │ 2.571133 │
├───────┴─────────┴─────────┴───┴──────────────────────┴─────────────────┴─────────────────┤
│ 10 rows 11 columns (6 shown) │
└──────────────────────────────────────────────────────────────────────────────────────────┘
この時点では単純にweb上のcsvが実データとしてparquetで読み込まれているだけのようです。
次はこのデータに変更を加えてみましょう。
Ducklakeのデータを上書きする。
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("INSTALL ducklake;")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
con.sql("CREATE TABLE nl_train_stations AS FROM 'https://blobs.duckdb.org/nl_stations.csv';")
# con.sql("FROM glob('my_ducklake.ducklake.files/*')")
# print(con.sql("FROM 'my_ducklake.ducklake.files/*.parquet' LIMIT 10;"))
# con.sql("FROM glob('my_ducklake.ducklake.files/*')")
print(con.sql("SELECT name_long FROM nl_train_stations WHERE code = 'ASB';"))
con.sql("""
UPDATE nl_train_stations
SET name_long = 'Johan Cruijff ArenA'
WHERE code = 'ASB';
""")
print(con.sql("SELECT name_long FROM nl_train_stations WHERE code = 'ASB';"))
UPDATE する前とした後のデータをprintしています。
結果は以下でした。
┌─────────────────────────┐
│ name_long │
│ varchar │
├─────────────────────────┤
│ Amsterdam Bijlmer ArenA │
└─────────────────────────┘
┌─────────────────────┐
│ name_long │
│ varchar │
├─────────────────────┤
│ Johan Cruijff ArenA │
└─────────────────────┘
この変更時に実はファイルがいくつか作成されています。
この3つのファイルは元のデータファイル、削除された行を示すファイル、挿入された行をそれぞれ表しています。
ここで置き換わり削除された行のみ表示したい場合は
import duckdb
con = duckdb.connect(database=":memory:")
print(con.sql("FROM 'my_ducklake.ducklake.files/ducklake-*-delete.parquet';"))
で以下のように削除された行のみ表示されました!
┌──────────────────────────────────────────────────────────────────────────────────┬───────┐
│ file_path │ pos │
│ varchar │ int64 │
├──────────────────────────────────────────────────────────────────────────────────┼───────┤
│ my_ducklake.ducklake.files/ducklake-01972b9f-f367-7d81-ab84-9f3f7e07c983.parquet │ 29 │
└──────────────────────────────────────────────────────────────────────────────────┴───────┘
こういった感じでファイルに変更があったときに複数のparquetファイルにその情報を保存する...という仕組みのようです。
実行しているのはいつものSQL文のような感じなのですが実際に作成されているのがparquetファイルなのがちょっと新鮮なかんじです。
スナップショットの確認
作成されたスナップショット(変更内容が記述されたparquetファイル)は以下のコードで確認できます。
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
print(con.sql("FROM my_ducklake.snapshots();"))
┌─────────────┬──────────────────────┬────────────────┬─────────────────────────────────────────────┐
│ snapshot_id │ snapshot_time │ schema_version │ changes │
│ int64 │ timestamp with tim… │ int64 │ map(varchar, varchar[]) │
├─────────────┼──────────────────────┼────────────────┼─────────────────────────────────────────────┤
│ 0 │ 2025-06-01 22:13:1… │ 0 │ {schemas_created=[main]} │
│ 1 │ 2025-06-01 22:13:1… │ 1 │ {tables_created=[main.nl_train_stations],… │
│ 2 │ 2025-06-01 22:13:1… │ 1 │ {tables_inserted_into=[1], tables_deleted… │
└─────────────┴──────────────────────┴────────────────┴─────────────────────────────────────────────┘
スナップショットが作成された時間や変更内がまとめて表示できました!
また変更前と変更後のそれぞれのバージョンの中身も以下で確認できます。
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
print(con.sql("""
SELECT name_long
FROM nl_train_stations AT (VERSION => 1)
WHERE code = 'ASB';
"""))
print(con.sql("""
SELECT name_long
FROM nl_train_stations AT (VERSION => 2)
WHERE code = 'ASB';
"""))
┌─────────────────────────┐
│ name_long │
│ varchar │
├─────────────────────────┤
│ Amsterdam Bijlmer ArenA │
└─────────────────────────┘
┌─────────────────────┐
│ name_long │
│ varchar │
├─────────────────────┤
│ Johan Cruijff ArenA │
└─────────────────────┘
バージョンごとのデータを確認できました!
またテーブルの状態を時間で指定したいこともあると思います。
今の状態を確認したいときは、
print(con.sql("""
SELECT *
FROM nl_train_stations AT (TIMESTAMP => now());
"""))
例えば1週間前のテーブルの状態を取得したいときは
print(con.sql("""
SELECT *
FROM nl_train_stations AT (TIMESTAMP => now() - INTERVAL '1 week');
"""))
で表示されます。
またバージョン1と2の間の操作一覧を見たいときは、
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
print(con.sql("FROM my_ducklake.table_changes('nl_train_stations',1,2);"))
値やchange_typeで行った操作(insertなど)も確認できます!
┌─────────────┬───────┬──────────────────┬───┬─────────────────┬─────────────────┐
│ snapshot_id │ rowid │ change_type │ … │ geo_lat │ geo_lng │
│ int64 │ int64 │ varchar │ │ double │ double │
├─────────────┼───────┼──────────────────┼───┼─────────────────┼─────────────────┤
│ 2 │ 29 │ update_postimage │ … │ 52.3122215271 │ 4.9469442367554 │
│ 1 │ 0 │ insert │ … │ 51.69048 │ 5.29362 │
│ 1 │ 1 │ insert │ … │ 51.700553894043 │ 5.3183331489563 │
│ 1 │ 2 │ insert │ … │ 52.4091682 │ 5.893611 │
│ 1 │ 3 │ insert │ … │ 50.7678 │ 6.091499 │
│ 1 │ 4 │ insert │ … │ 50.78036 │ 6.070715 │
│ 1 │ 5 │ insert │ … │ 51.921326524551 │ 6.5786272287369 │
│ 1 │ 6 │ insert │ … │ 52.2785 │ 4.977 │
│ 1 │ 7 │ insert │ … │ 52.079796120944 │ 7.0163583755493 │
│ 1 │ 8 │ insert │ … │ 45.55438 │ 6.64869 │
│ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │
│ · │ · │ · │ · │ · │ · │
│ 1 │ 569 │ insert │ … │ 52.046390533447 │ 4.4927778244019 │
│ 1 │ 570 │ insert │ … │ 53.159612280006 │ 6.8679141998291 │
│ 1 │ 571 │ insert │ … │ 53.2486292 │ 6.4062361 │
│ 1 │ 572 │ insert │ … │ 47.37819 │ 8.5392 │
│ 1 │ 573 │ insert │ … │ 52.145278930664 │ 6.1941666603088 │
│ 1 │ 574 │ insert │ … │ 51.814998626709 │ 4.6416668891907 │
│ 1 │ 575 │ insert │ … │ 51.214167 │ 4.33 │
│ 1 │ 576 │ insert │ … │ 52.504722595215 │ 6.0919442176819 │
│ 1 │ 577 │ insert │ … │ 52.52764 │ 6.05146 │
│ 2 │ 29 │ update_preimage │ … │ 52.3122215271 │ 4.9469442367554 │
├─────────────┴───────┴──────────────────┴───┴─────────────────┴─────────────────┤
│ 580 rows (20 shown) 14 columns (5 shown) │
└────────────────────────────────────────────────────────────────────────────────┘
こういった感じでローカルだけでタイムトラベル機能が使用できます。
スキーマエボリューション機能の確認
スキーマエボリューションとは実ファイルを変更せずにデータ構造を変更したりできます。(列の追加や削除など)
この機能のメリットは実データを読まないで良いと言うことです。(数十GBのファイルを複数読むなどの場合時間がかかるので単純に遅くなります。)
あと列追加前のデータにもアクセスできたりもします。(列名が変更されたり消えたりしてBIツールが読めない...とかもありそうですがそういった場合にも対応できます。)
というわけで以下のコードで列を一つ追加します。
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
con.sql("ALTER TABLE nl_train_stations ADD COLUMN new_column INTEGER;")
そのあと
import duckdb
con = duckdb.connect(database=":memory:")
con.sql("ATTACH 'ducklake:my_ducklake.ducklake' AS my_ducklake;")
con.sql("USE my_ducklake;")
print(con.sql("FROM my_ducklake.snapshots();"))
で確認すると
┌─────────────┬──────────────────────┬────────────────┬─────────────────────────────────────────────┐
│ snapshot_id │ snapshot_time │ schema_version │ changes │
│ int64 │ timestamp with tim… │ int64 │ map(varchar, varchar[]) │
├─────────────┼──────────────────────┼────────────────┼─────────────────────────────────────────────┤
│ 0 │ 2025-06-01 22:13:1… │ 0 │ {schemas_created=[main]} │
│ 1 │ 2025-06-01 22:13:1… │ 1 │ {tables_created=[main.nl_train_stations],… │
│ 2 │ 2025-06-01 22:13:1… │ 1 │ {tables_inserted_into=[1], tables_deleted… │
│ 3 │ 2025-06-01 23:03:0… │ 2 │ {tables_altered=[1]} │
└─────────────┴──────────────────────┴────────────────┴─────────────────────────────────────────────┘
のように列の追加が23:03の時間にされ、新しいスキーマバージョンとして表示されています。
しかしparquetファイルの実ファイルの変更日は以下のように22:13のままです。
つまりDucklakeの内部のメタデータのみ変更し実際の実データが入っているparquetファイル自体には変更を加えていません。
簡単にスキーマエボリューションが確認できました...!
まとめ
parquetファイルをスナップショットとすることでシンプルにローカルでこういった機能を使えるというのが魅力だと思いました。(クラウドが使えなくても...という意味で個人的に助かりそうだと思いました...)
またスキーマの変化も実データに変化を加えないことで計算の高速化などが見込めそうだなと思います。
バッチデータやローカル環境しか使えないような現場で重宝しそうです!
Discussion