DuckDB Update & Blog reading #5:dbt core × DuckDB

検証不可となったので書きかけの記事をここに保存する。

まえがき
実は以前dbt coreとDuckDBを使用して自分で記事を書いていました。
今回公式からそのdbt coreに関するブログが出たので不足分を試してみました。
前回自分では作り損ねたスタースキーマ、スノーフレーク、リバースETLについての解説...!があったのでBlogに倣って作ってみました🙌
公式ブログ
①使用データについて
以前自分でやった時は自前のcsvをローカルに用意するしていたのですが、公式ブログではデータセットとしてcloudflare上のdbファイル、github上にホストされてるGeoJSON,コードと一緒に保存されてるGeoJSONファイルを使用しています。
使用するオープンデータセット
-
鉄道サービスデータ
データ内容: 列車が運行しているかどうかに関する情報
提供元: 「Rijden de Treinen(列車は走っているか?)」アプリケーションのチーム -
オランダの地図情報データ
データ内容: オランダの地理情報
提供元: Cartomap
とのことです。ありがとうございます。🙏
②ディメンションモデル
-
dim_nl_provinces(州ディメンションテーブル):
-
dim_nl_municipalities(自治体ディメンションテーブル)
-
dim_nl_train_stations(鉄道駅ディメンションテーブル)
-
fact_services(サービスファクトテーブル:列車の運行)
-
なんでこんなものを作る必要があるか。
とこんなふうに言葉で言われても理解できない。
整理して使用できるように!データマート
③各種ymlの設定
一応ymlファイルを作成する前にdbt initしてます🖐️
プロジェクト名はdutch_railway_networkにしときました。
そしてvscodeのターミナルで以下のコードを実行します。
code ~/.dbt/profiles.yml
profiles.ymlは公式の通りに以下のように設定しました。
dutch_railway_network:
outputs:
dev:
type: duckdb
path: data/dutch_railway_network.duckdb
extensions:
- spatial
- httpfs
threads: 5
attach:
- path: 'https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb'
type: duckdb
alias: external_db
target: dev
データベースファイルはdataフォルダを作ってその中に格納
拡張機能として以下の機能を追加しています。
- extensions:DuckDBで有効にする拡張機能を指定
- spatial:地理空間データを処理するための拡張機能
- httpfs:HTTPで外部のデータにアクセスするために使用する拡張機能
また以下のような設定もできるようです👀
-
threads: 5:クエリ実行時に並列処理するスレッド数を5にしている。
-
attach:外部のURLにあるduckdbにアクセスするためのpathをここで指定できます。
path: 'https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb':アタッチするデータベースのURL -
alias: external_db:この外部データベースに付ける別名。これで参照できます。
こんな設定ここでできるの?という感想でした。
拡張機能とか外部URLもymlファイルにまとめられるのか...👀
次はmodels内のsource.ymlを作成します。
sources.yml```
version: 2
sources:
- name: geojson_external
tables:- name: nl_provinces
config:
external_location: "https://cartomap.github.io/nl/wgs84/provincie_2025.geojson" - name: nl_municipalities
config:
external_location: "seeds/gemeente_2025.geojson"
- name: nl_provinces
- name: external_db
database: external_db
schema: main
tables:- name: stations
- name: services
tablesセクションで扱うデータにテーブル名などを設定しています。
nl_provincesは外部URL上にあるgeojsonファイルの内容...オランダの州の情報ですね。
external_locationは、このデータがあるURLを指定しています
nl_municipalitiesはローカルのseedsフォルダ内に入ってるgeojsonにテーブル名をつけてますね。こちらはオランダの自治体の情報でした。
external_db:先ほどのprofiles.ymlでアタッチした外部DuckDBデータベースを参照
database: external_db:接続先のデータベース名(profiles.ymlで定義したalias)
schema: main:使用するスキーマ名
と言った感じです。
また使用テーブルは以下のようになってます。
stations:おそらく駅の情報を含むテーブル
services:おそらく列車サービスの情報を含むテーブル
このファイルにより、例えばdbtのモデルから次のような参照方法でこれらのデータにアクセスできます:
{{ source('geojson_external', 'nl_provinces') }}
{{ source('external_db', 'services') }}
データソースの場所をモデルに直接ハードコードせずに、動的に扱えると言うことみたいですね。
④LOADデータ
DuckDBがターゲットにデータ保存するときに幾つかのオプションを選べるようです。
1.table
-
データを保存するときに毎回新しいテーブルを作り直します。
-
既存のテーブルは削除され、最新データのテーブルに置き換えられます。
2.incremental
-
データを追加するだけや、古いデータを削除して新しいデータを挿入する方法を選べます。
-
ただし、元のテーブルの構造自体はそのままです(既存のテーブルがある場合に利用)。
3.snapshot
-
過去のデータも残しつつ、新しいデータを追記していきます。
-
データの「いつ有効だったか」を記録するテーブル(スローリー・チェンジング・ディメンションタイプ2)を作ります。
4.view
-
実行ごとにビューを更新します。
-
実際のデータは保存されず、データを参照するviewが更新される。
じゃあそもそもテーブルのデータを更新するためには何をしようするのか...?
ここでは
dim_nl_provincesテーブルのデータを更新するために、空間関数st_readを使用しています。
この関数はsources.ymlで定義されたnl_provinces GeoJSONファイルを自動的に(!)読み込んで解析します。こういう仕組みでデータのロードの自動化をやってるんですね...👀
modelsフォルダの中にtest.sqlというファイルを作成して以下の内容を記述しました。
dim_nl_provinces.sql```
{{ config(materialized='table') }}
SELECT
{{ dbt_utils.generate_surrogate_key(['id']) }} AS province_sk,
id AS province_id,
statnaam AS province_name,
geom AS province_geometry,
{{ common_columns() }}
FROM st_read({{ source("geojson_external", "nl_provinces") }}) AS src;
この内容について解説します。
まずクエリ結果はテーブルとして保存されています。
そしてそのテーブルの中身はSELECTで記述されてますね。
- {{ dbt_utils.generate_surrogate_key(['id']) }} AS province_sk
いきなり複雑そうな感じですがid列を基に一意のサロゲートキー(province_sk)を生成...とのこと
まずdbt-utilsのヘルパー関数としてgenerate_surrogate_key というものがありこれで生成してるんですね👀
でサロゲートキーとは主キーとして使われる一意なIDのことらしいです。dbtだとこういう方法で主キーの作成の設定を行えるみたいです。(テーブル内のidを元に作成)
そしてそれをprovince_id列としてエイリアス設定しています。
- statnaam AS province_name,geom AS province_geometry
州の名前を含む列 statnaam を province_name,地理情報を含む列 geom を、province_geometry
としています。provinceは州という意味でnl_provincesから州の情報を引っ張ってきているのでわかりやすくこう言った名前にしているのだと思います。
- {{ common_columns() }}
この記述で複数テーブルの共通列を追加できるみたいです。便利ですね...
同様にdim_nl_municipalities と fact_services のデータもmaterializeします...と書いてるんですがdim_nl_municipalities.sqlの**記述がない...**
ので適当に作りました。
dim_nl_municipalities.sql```
{{ config(materialized='table') }}
SELECT
{{ dbt_utils.generate_surrogate_key(['id']) }} AS municipality_sk,
id AS municipality_id,
gemeentenaam AS municipality_name,
geom AS municipality_geometry,
p.province_sk AS province_sk,
{{ common_columns() }}
FROM st_read({{ source("geojson_external", "nl_municipalities") }}) AS src
LEFT JOIN {{ ref("dim_nl_provinces") }} AS p
ON st_contains(
p.province_geometry,
st_centroid(src.geom)
);
fact_services.sql```
{{ config(materialized='table') }}
SELECT
{{ dbt_utils.generate_surrogate_key(['srv.id']) }} AS service_sk,
srv.id AS service_id,
srv.service_date,
srv.service_number,
srv.service_type,
srv.service_company,
srv.service_planned_arrival,
srv.service_actual_arrival,
srv.service_arrival_delay,
srv.service_arrival_cancelled,
tr_st.station_sk,
{{ common_columns() }}
FROM {{ source("external_db", "services") }} AS srv
INNER JOIN {{ ref("dim_nl_train_stations") }} AS tr_st
ON srv.station_code = tr_st.station_code
WHERE srv.service_date >= '2024-01-01'
AND srv.service_date < '2025-01-01';
あとはdim_nl_train_stations.sqlは以下の通り?
dim_nl_train_stations.sql```
{{ config(materialized='table') }}
SELECT
{{ dbt_utils.generate_surrogate_key(['tr_st.code']) }} AS station_sk, -- 駅の一意の識別子(サロゲートキー)
tr_st.id AS station_id, -- 駅のID
tr_st.code AS station_code, -- 駅のコード
tr_st.name_long AS station_name, -- 駅の正式名称
tr_st.type AS station_type, -- 駅のタイプ(例: 中央駅など)
st_point(tr_st.geo_lng, tr_st.geo_lat) AS station_geo_location, -- 駅の緯度経度をジオメトリ形式で格納
coalesce(dim_mun.municipality_sk, 'unknown') AS municipality_sk, -- 関連付けられた自治体の識別子。ない場合は "unknown"
{{ common_columns() }} -- 共通列を追加
FROM {{ source("external_db", "stations") }} AS tr_st -- 外部ソース(stations)を参照
LEFT JOIN {{ ref ("dim_nl_municipalities") }} AS dim_mun -- dim_nl_municipalities テーブルと結合
ON st_contains(
dim_mun.municipality_geometry, -- 自治体のジオメトリ
st_point(tr_st.geo_lng, tr_st.geo_lat) -- 駅の緯度経度をジオメトリ形式で指定
)
WHERE tr_st.country = 'NL'; -- 駅の国がオランダの場合に限定
- coalesce(dim_mun.municipality_sk, 'unknown') AS municipality_sk
ここが気になったので調べてみたら、dim_mun.municipality_sk 列の値をチェックして、NULLなら 'unknown' というデフォルト値を代わりに返してます。
⑤外部データ
いよいよ作成したデータマートをエクスポートします!
データをファイルにエクスポートする機能は、dbt-duckdbアダプターのexternalマテリアライゼーションというらしいです。
これによりCSV、JSON、Parquetなどのファイル形式にデータをエクスポートして、指定した場所(ローカルまたは外部)に保存することができます。いろいろな場所、ファイル形式で保存できるという特性はまさにDuckDBならではですね...!
ただ差分更新などではなく、ロードタイプはfull refresh(完全更新)なので、既存のファイルは上書きされます。
modelsにoutput.sqlというファイルを作成して以下の内容を記述しました。
{{
config(
materialized='external',
location="data/exports/nl_train_services_aggregate",
options={
"partition_by": "service_year, service_month",
"overwrite": True
}
)
}}
SELECT
year(service_date) AS service_year,
month(service_date) AS service_month,
service_type,
service_company,
tr_st.station_sk,
tr_st.station_name,
m.municipality_sk,
m.municipality_name,
p.province_sk,
p.province_name,
count(*) AS number_of_rides
FROM {{ ref ("fact_services") }} AS srv
INNER JOIN {{ ref("dim_nl_train_stations") }} AS tr_st
ON srv.station_sk = tr_st.station_sk
INNER JOIN {{ ref("dim_nl_municipalities") }} AS m
ON tr_st.municipality_sk = m.municipality_sk
INNER JOIN {{ ref("dim_nl_provinces") }} AS p
ON m.province_sk = p.province_sk
WHERE service_year = {{ var('execution_year') }}
GROUP BY ALL
とりあえず冒頭のconfigで以下のようなことをやっています。
-materialized='external':
externalとすることで外部保存を選べるみたいです。
-location="data/exports/nl_train_services_aggregate":
これは単純にデータの保存先のパスを指定しています。
-option:
"partition_by": "service_year, service_month"
エクスポートするファイルを指定したカラムでパーティション分割できるみたいです。年、月別にファイル分けしていますね👀
"overwrite": True
あとは既存ファイルの上書きを許可しています。
これはDuckDBからexternalマテリアライゼーションを使用してエクスポートされたParquetファイルの「Hiveパーティショニング」データを論理的なカテゴリ(この場合は年と月)で分割する方法です
⑥ postgresql
鉄道サービスのデータマートにデータを処理した後、日次の集計データを駅レベルで生成することができます。このデータはスター・スキーマ・モデルに基づいて整理され、データにディメンションキー(次元キー)が含まれる形になります。
コピーconfig(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key="service_date, service_type, service_company, station_sk"
)
materialized='incremental': これは増分更新方式を使用することを指定しています。
incremental_strategy='delete+insert': 更新方法として「削除+挿入」戦略を使用します。既存の一致するレコードを削除し、新しいレコードを挿入します。
unique_key: どのフィールドの組み合わせがレコードを一意に識別するかを指定します。この場合、サービス日付、サービスタイプ、サービス会社、駅の代理キーの組み合わせです。
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key="""
service_date,
service_type,
service_company,
station_sk
"""
)
}}
SELECT
service_date,
service_type,
service_company,
srv.station_sk,
mn.municipality_sk,
province_sk,
count(*) AS number_of_rides,
{{ common_columns() }}
FROM {{ ref ("fact_services") }} AS srv
INNER JOIN {{ ref("rep_dim_nl_train_stations") }} AS tr_st
ON srv.station_sk = tr_st.station_sk
INNER JOIN {{ ref("rep_dim_nl_municipalities") }} AS mn
ON tr_st.municipality_sk = mn.municipality_sk
WHERE NOT service_arrival_cancelled
{% if is_incremental() %}
AND srv.invocation_id = (
SELECT invocation_id
FROM {{ ref("fact_services") }}
ORDER BY last_updated_dt DESC
LIMIT 1
)
{% endif %}
GROUP BY ALL
まずconfigで増分更新を選択し、方式は削除して再挿入(delete+insert)としています。
{% if is_incremental() %}
AND srv.invocation_id = (
SELECT invocation_id
FROM {{ ref("fact_services") }}
ORDER BY last_updated_dt DESC
LIMIT 1
)
{% endif %}
⑦Postgresqlの導入
ここで以下を参考にPostgresqlの導入を行います。🙏(ポスグレ入れてなかった...)
CLIで初期設定
pg_isready
/tmp:5432 - accepting connections
ユーザー名、パスワードは自分で設定
psql postgres -c "CREATE DATABASE mydb;"
CREATE DATABASE
psql -c "CREATE USER ユーザー名 WITH PASSWORD 'ユーザーパスワード';"
CREATE ROLE
psql -d postgres -c "GRANT ALL PRIVILEGES ON DATABASE データベース名 TO ユーザー名;"
GRANT
これでデータベースの設定が終わりました🙌
⑧profiles dbt_profile のyml設定について
postgres拡張機能:
DuckDBがPostgreSQLと通信するために必要な拡張機能を有効にします
これはextensionsセクションに追加します
attachセクションでのPostgreSQL接続文字列:
PostgreSQLデータベースへの接続情報を指定します
これにはホスト名、ポート、ユーザー名、パスワード、データベース名などが含まれます
以下のコードで実際にprofiles.ymlの修正をします。
code ~/.dbt/profiles.yml
具体的なprofiles.ymlの例は以下のようになります:
ducktest:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1
prod:
type: duckdb
path: prod.duckdb
threads: 4
target: dev
titanic:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1
prod:
type: duckdb
path: prod.duckdb
threads: 4
target: dev
walmart:
outputs:
dev:
type: duckdb
path: /Users/nk/Desktop/renshu/dbt_tutrial/walmart/database/test.db
threads: 1
prod:
type: duckdb
path: prod.duckdb
threads: 4
target: dev
railway:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1
prod:
type: duckdb
path: prod.duckdb
threads: 4
target: dev
dutch_railway_network:
outputs:
dev:
type: duckdb
path: data/dutch_railway_network.duckdb
extensions:
- spatial
- httpfs
- postgres
threads: 5
attach:
- path: "https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb"
type: duckdb
alias: external_db
- path: "postgresql://postgres:{{ env_var('DBT_DUCKDB_PG_PWD') }}@localhost:5466/postgres"
type: postgres
alias: postgres_db
target: dev
DBT_DUCKDB_PG_PWDは先ほど設定したデータベースへのパスワードです。
modelsフォルダ内に以下の内容を記述します。
models:
dutch_railway_network:
transformation:
schema: main
+docs:
node_color: 'silver'
reverse_etl:
database: postgres_db
schema: public
+docs:
node_color: '#d5b85a'
transformationフォルダ(データ変換用)
保存場所:DuckDBのmainスキーマ
見た目:ドキュメント上では銀色で表示
- reverse_etlフォルダ(PostgreSQLへのデータ転送用)
保存場所:PostgreSQLのpublicスキーマ
データベース:postgres_db(profiles.ymlで設定した接続先)
見た目:ドキュメント上では金色で表示
みたいです。
dbt runをするとどうもデータベースが必要なので作りました。
duckdb data/dutch_railway_network.duckdb
v1.2.1 8e52ec4395
Enter ".help" for usage hints.
D "CREATE TABLE dummy (id INTEGER);"
pg_isready -h localhost -p 5466
でpostgresqlが起動していることを確認。
起動していない場合は以下
brew services start postgresql@15
dbt run --model +reverse_etl
02:16:28 Running with dbt=1.9.4
02:16:28 Registered adapter: duckdb=1.9.2
02:16:28 [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 1 unused configuration paths:
- models.dutch_railway_network.transformation
02:16:28 Found 7 models, 4 data tests, 4 sources, 426 macros
02:16:28
02:16:28 Concurrency: 5 threads (target='dev')
02:16:28
02:19:30
02:19:30 Finished running in 0 hours 3 minutes and 1.48 seconds (181.48s).
02:19:30 Encountered an error:
Runtime Error
IO Error: Cannot open database "https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb" in read-only mode: database does not exist
あれデータベースがない...
duckdb
v1.2.1 8e52ec4395
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
D ATTACH 'https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb' AS external_db;
IO Error:
Cannot open database "https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb" in read-only mode: database does not exist
おそらく公式の方がブログのために一時的に作成したデータベースなのかな?
これ以上の検証ができないので一旦この記事はスクラップに入れてお蔵入りにします。