Open2

DuckDB Update & Blog reading #5:dbt core × DuckDB

nk_worknk_work

検証不可となったので書きかけの記事をここに保存する。

nk_worknk_work

まえがき

実は以前dbt coreとDuckDBを使用して自分で記事を書いていました。
https://zenn.dev/amana/articles/715a17da0b7a7b

今回公式からそのdbt coreに関するブログが出たので不足分を試してみました。
前回自分では作り損ねたスタースキーマ、スノーフレーク、リバースETLについての解説...!があったのでBlogに倣って作ってみました🙌

公式ブログ

https://duckdb.org/2025/04/04/dbt-duckdb.html

①使用データについて

以前自分でやった時は自前のcsvをローカルに用意するしていたのですが、公式ブログではデータセットとしてcloudflare上のdbファイル、github上にホストされてるGeoJSON,コードと一緒に保存されてるGeoJSONファイルを使用しています。

使用するオープンデータセット

  • 鉄道サービスデータ
    データ内容: 列車が運行しているかどうかに関する情報
    提供元: 「Rijden de Treinen(列車は走っているか?)」アプリケーションのチーム

  • オランダの地図情報データ
    データ内容: オランダの地理情報
    提供元: Cartomap

とのことです。ありがとうございます。🙏

②ディメンションモデル

  • dim_nl_provinces(州ディメンションテーブル):

  • dim_nl_municipalities(自治体ディメンションテーブル)

  • dim_nl_train_stations(鉄道駅ディメンションテーブル)

  • fact_services(サービスファクトテーブル:列車の運行)

  • なんでこんなものを作る必要があるか。
    とこんなふうに言葉で言われても理解できない。
    整理して使用できるように!データマート

③各種ymlの設定

一応ymlファイルを作成する前にdbt initしてます🖐️
プロジェクト名はdutch_railway_networkにしときました。

そしてvscodeのターミナルで以下のコードを実行します。

code ~/.dbt/profiles.yml

profiles.ymlは公式の通りに以下のように設定しました。

dutch_railway_network:

  outputs:
    dev:
      type: duckdb
      path: data/dutch_railway_network.duckdb
      extensions:
        - spatial
        - httpfs
      threads: 5
      attach:
        - path: 'https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb'
          type: duckdb
          alias: external_db
  target: dev

データベースファイルはdataフォルダを作ってその中に格納
拡張機能として以下の機能を追加しています。

  • extensions:DuckDBで有効にする拡張機能を指定
  • spatial:地理空間データを処理するための拡張機能
  • httpfs:HTTPで外部のデータにアクセスするために使用する拡張機能

また以下のような設定もできるようです👀

  • threads: 5:クエリ実行時に並列処理するスレッド数を5にしている。

  • attach:外部のURLにあるduckdbにアクセスするためのpathをここで指定できます。
    path: 'https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb':アタッチするデータベースのURL

  • alias: external_db:この外部データベースに付ける別名。これで参照できます。

こんな設定ここでできるの?という感想でした。
拡張機能とか外部URLもymlファイルにまとめられるのか...👀

次はmodels内のsource.ymlを作成します。
sources.yml```
version: 2
sources:

  • name: geojson_external
    tables:
  • name: external_db
    database: external_db
    schema: main
    tables:
    • name: stations
    • name: services

tablesセクションで扱うデータにテーブル名などを設定しています。
nl_provincesは外部URL上にあるgeojsonファイルの内容...オランダの州の情報ですね。
external_locationは、このデータがあるURLを指定しています

nl_municipalitiesはローカルのseedsフォルダ内に入ってるgeojsonにテーブル名をつけてますね。こちらはオランダの自治体の情報でした。

external_db:先ほどのprofiles.ymlでアタッチした外部DuckDBデータベースを参照
database: external_db:接続先のデータベース名(profiles.ymlで定義したalias)
schema: main:使用するスキーマ名
と言った感じです。
また使用テーブルは以下のようになってます。
stations:おそらく駅の情報を含むテーブル
services:おそらく列車サービスの情報を含むテーブル

このファイルにより、例えばdbtのモデルから次のような参照方法でこれらのデータにアクセスできます:

{{ source('geojson_external', 'nl_provinces') }}
{{ source('external_db', 'services') }}

データソースの場所をモデルに直接ハードコードせずに、動的に扱えると言うことみたいですね。

④LOADデータ

DuckDBがターゲットにデータ保存するときに幾つかのオプションを選べるようです。

1.table

  • データを保存するときに毎回新しいテーブルを作り直します。

  • 既存のテーブルは削除され、最新データのテーブルに置き換えられます。

2.incremental

  • データを追加するだけや、古いデータを削除して新しいデータを挿入する方法を選べます。

  • ただし、元のテーブルの構造自体はそのままです(既存のテーブルがある場合に利用)。

3.snapshot

  • 過去のデータも残しつつ、新しいデータを追記していきます。

  • データの「いつ有効だったか」を記録するテーブル(スローリー・チェンジング・ディメンションタイプ2)を作ります。

4.view

  • 実行ごとにビューを更新します。

  • 実際のデータは保存されず、データを参照するviewが更新される。

じゃあそもそもテーブルのデータを更新するためには何をしようするのか...?
ここでは
dim_nl_provincesテーブルのデータを更新するために、空間関数st_readを使用しています。

この関数はsources.ymlで定義されたnl_provinces GeoJSONファイルを自動的に(!)読み込んで解析します。こういう仕組みでデータのロードの自動化をやってるんですね...👀
modelsフォルダの中にtest.sqlというファイルを作成して以下の内容を記述しました。

dim_nl_provinces.sql```
{{ config(materialized='table') }}

SELECT
{{ dbt_utils.generate_surrogate_key(['id']) }} AS province_sk,
id AS province_id,
statnaam AS province_name,
geom AS province_geometry,
{{ common_columns() }}
FROM st_read({{ source("geojson_external", "nl_provinces") }}) AS src;

この内容について解説します。
まずクエリ結果はテーブルとして保存されています。
そしてそのテーブルの中身はSELECTで記述されてますね。

- {{ dbt_utils.generate_surrogate_key(['id']) }} AS province_sk
いきなり複雑そうな感じですがid列を基に一意のサロゲートキー(province_sk)を生成...とのこと
まずdbt-utilsのヘルパー関数としてgenerate_surrogate_key というものがありこれで生成してるんですね👀
でサロゲートキーとは主キーとして使われる一意なIDのことらしいです。dbtだとこういう方法で主キーの作成の設定を行えるみたいです。(テーブル内のidを元に作成)
そしてそれをprovince_id列としてエイリアス設定しています。

- statnaam AS province_name,geom AS province_geometry
州の名前を含む列 statnaam を province_name,地理情報を含む列 geom を、province_geometry
としています。provinceは州という意味でnl_provincesから州の情報を引っ張ってきているのでわかりやすくこう言った名前にしているのだと思います。

- {{ common_columns() }}
この記述で複数テーブルの共通列を追加できるみたいです。便利ですね...


同様にdim_nl_municipalities と fact_services のデータもmaterializeします...と書いてるんですがdim_nl_municipalities.sqlの**記述がない...**
ので適当に作りました。
dim_nl_municipalities.sql```
{{ config(materialized='table') }}

SELECT
    {{ dbt_utils.generate_surrogate_key(['id']) }} AS municipality_sk,
    id AS municipality_id,
    gemeentenaam AS municipality_name,
    geom AS municipality_geometry,
    p.province_sk AS province_sk,
    {{ common_columns() }}
FROM st_read({{ source("geojson_external", "nl_municipalities") }}) AS src
LEFT JOIN {{ ref("dim_nl_provinces") }} AS p
      ON st_contains(
          p.province_geometry,
          st_centroid(src.geom)
      );

fact_services.sql```
{{ config(materialized='table') }}

SELECT
{{ dbt_utils.generate_surrogate_key(['srv.id']) }} AS service_sk,
srv.id AS service_id,
srv.service_date,
srv.service_number,
srv.service_type,
srv.service_company,
srv.service_planned_arrival,
srv.service_actual_arrival,
srv.service_arrival_delay,
srv.service_arrival_cancelled,
tr_st.station_sk,
{{ common_columns() }}
FROM {{ source("external_db", "services") }} AS srv
INNER JOIN {{ ref("dim_nl_train_stations") }} AS tr_st
ON srv.station_code = tr_st.station_code
WHERE srv.service_date >= '2024-01-01'
AND srv.service_date < '2025-01-01';


あとはdim_nl_train_stations.sqlは以下の通り?
dim_nl_train_stations.sql```
{{ config(materialized='table') }}

SELECT
    {{ dbt_utils.generate_surrogate_key(['tr_st.code']) }} AS station_sk, -- 駅の一意の識別子(サロゲートキー)
    tr_st.id AS station_id, -- 駅のID
    tr_st.code AS station_code, -- 駅のコード
    tr_st.name_long AS station_name, -- 駅の正式名称
    tr_st.type AS station_type, -- 駅のタイプ(例: 中央駅など)
    st_point(tr_st.geo_lng, tr_st.geo_lat) AS station_geo_location, -- 駅の緯度経度をジオメトリ形式で格納
    coalesce(dim_mun.municipality_sk, 'unknown') AS municipality_sk, -- 関連付けられた自治体の識別子。ない場合は "unknown"
    {{ common_columns() }} -- 共通列を追加
FROM {{ source("external_db", "stations") }} AS tr_st -- 外部ソース(stations)を参照
LEFT JOIN {{ ref ("dim_nl_municipalities") }} AS dim_mun -- dim_nl_municipalities テーブルと結合
       ON st_contains(
           dim_mun.municipality_geometry, -- 自治体のジオメトリ
           st_point(tr_st.geo_lng, tr_st.geo_lat) -- 駅の緯度経度をジオメトリ形式で指定
       )
WHERE tr_st.country = 'NL'; -- 駅の国がオランダの場合に限定

  • coalesce(dim_mun.municipality_sk, 'unknown') AS municipality_sk
    ここが気になったので調べてみたら、dim_mun.municipality_sk 列の値をチェックして、NULLなら 'unknown' というデフォルト値を代わりに返してます。

⑤外部データ

いよいよ作成したデータマートをエクスポートします!
データをファイルにエクスポートする機能は、dbt-duckdbアダプターのexternalマテリアライゼーションというらしいです。
これによりCSV、JSON、Parquetなどのファイル形式にデータをエクスポートして、指定した場所(ローカルまたは外部)に保存することができます。いろいろな場所、ファイル形式で保存できるという特性はまさにDuckDBならではですね...!

ただ差分更新などではなく、ロードタイプはfull refresh(完全更新)なので、既存のファイルは上書きされます。
modelsにoutput.sqlというファイルを作成して以下の内容を記述しました。

{{
    config(
        materialized='external',
        location="data/exports/nl_train_services_aggregate",
        options={
            "partition_by": "service_year, service_month",
            "overwrite": True
        }
    )
}}

SELECT
    year(service_date) AS service_year,
    month(service_date) AS service_month,
    service_type,
    service_company,
    tr_st.station_sk,
    tr_st.station_name,
    m.municipality_sk,
    m.municipality_name,
    p.province_sk,
    p.province_name,
    count(*) AS number_of_rides
FROM {{ ref ("fact_services") }} AS srv
INNER JOIN {{ ref("dim_nl_train_stations") }} AS tr_st
        ON srv.station_sk = tr_st.station_sk
INNER JOIN {{ ref("dim_nl_municipalities") }} AS m
        ON tr_st.municipality_sk = m.municipality_sk
INNER JOIN {{ ref("dim_nl_provinces") }} AS p
        ON m.province_sk = p.province_sk
WHERE service_year = {{ var('execution_year') }}
GROUP BY ALL

とりあえず冒頭のconfigで以下のようなことをやっています。
-materialized='external':
externalとすることで外部保存を選べるみたいです。

-location="data/exports/nl_train_services_aggregate":
これは単純にデータの保存先のパスを指定しています。

-option:
"partition_by": "service_year, service_month"
エクスポートするファイルを指定したカラムでパーティション分割できるみたいです。年、月別にファイル分けしていますね👀
"overwrite": True
あとは既存ファイルの上書きを許可しています。

これはDuckDBからexternalマテリアライゼーションを使用してエクスポートされたParquetファイルの「Hiveパーティショニング」データを論理的なカテゴリ(この場合は年と月)で分割する方法です

⑥ postgresql

鉄道サービスのデータマートにデータを処理した後、日次の集計データを駅レベルで生成することができます。このデータはスター・スキーマ・モデルに基づいて整理され、データにディメンションキー(次元キー)が含まれる形になります。

コピーconfig(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key="service_date, service_type, service_company, station_sk"
)

materialized='incremental': これは増分更新方式を使用することを指定しています。

incremental_strategy='delete+insert': 更新方法として「削除+挿入」戦略を使用します。既存の一致するレコードを削除し、新しいレコードを挿入します。
unique_key: どのフィールドの組み合わせがレコードを一意に識別するかを指定します。この場合、サービス日付、サービスタイプ、サービス会社、駅の代理キーの組み合わせです。

{{
    config(
        materialized='incremental',
        incremental_strategy='delete+insert',
        unique_key="""
            service_date,
            service_type,
            service_company,
            station_sk
        """
    )
}}

SELECT
    service_date,
    service_type,
    service_company,
    srv.station_sk,
    mn.municipality_sk,
    province_sk,
    count(*) AS number_of_rides,
    {{ common_columns() }}
FROM {{ ref ("fact_services") }} AS srv
INNER JOIN {{ ref("rep_dim_nl_train_stations") }} AS tr_st
        ON srv.station_sk = tr_st.station_sk
INNER JOIN {{ ref("rep_dim_nl_municipalities") }} AS mn
        ON tr_st.municipality_sk = mn.municipality_sk
WHERE NOT service_arrival_cancelled

  {% if is_incremental() %}
    AND srv.invocation_id = (
        SELECT invocation_id
        FROM {{ ref("fact_services") }}
        ORDER BY last_updated_dt DESC
        LIMIT 1
    )
  {% endif %}
GROUP BY ALL

まずconfigで増分更新を選択し、方式は削除して再挿入(delete+insert)としています。

{% if is_incremental() %}
    AND srv.invocation_id = (
        SELECT invocation_id
        FROM {{ ref("fact_services") }}
        ORDER BY last_updated_dt DESC
        LIMIT 1
    )
  {% endif %}

⑦Postgresqlの導入

ここで以下を参考にPostgresqlの導入を行います。🙏(ポスグレ入れてなかった...)
https://zenn.dev/teru_whisky/books/667d3542f61d74

CLIで初期設定

pg_isready

/tmp:5432 - accepting connections

ユーザー名、パスワードは自分で設定

psql postgres -c "CREATE DATABASE mydb;"


CREATE DATABASE
psql -c "CREATE USER ユーザー名 WITH PASSWORD 'ユーザーパスワード';"
CREATE ROLE
psql -d postgres -c "GRANT ALL PRIVILEGES ON DATABASE データベース名 TO ユーザー名;"

GRANT

これでデータベースの設定が終わりました🙌

⑧profiles dbt_profile のyml設定について

postgres拡張機能:

DuckDBがPostgreSQLと通信するために必要な拡張機能を有効にします
これはextensionsセクションに追加します

attachセクションでのPostgreSQL接続文字列:

PostgreSQLデータベースへの接続情報を指定します
これにはホスト名、ポート、ユーザー名、パスワード、データベース名などが含まれます

以下のコードで実際にprofiles.ymlの修正をします。

code ~/.dbt/profiles.yml

具体的なprofiles.ymlの例は以下のようになります:

ducktest:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: duckdb
      path: prod.duckdb
      threads: 4

  target: dev

titanic:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: duckdb
      path: prod.duckdb
      threads: 4

  target: dev

walmart:
  outputs:
    dev:
      type: duckdb
      path: /Users/nk/Desktop/renshu/dbt_tutrial/walmart/database/test.db
      threads: 1

    prod:
      type: duckdb
      path: prod.duckdb
      threads: 4

  target: dev

railway:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: duckdb
      path: prod.duckdb
      threads: 4

  target: dev

dutch_railway_network:
  outputs:
    dev:
      type: duckdb
      path: data/dutch_railway_network.duckdb
      extensions:
        - spatial
        - httpfs
        - postgres
      threads: 5
      attach:
        - path: "https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb"
          type: duckdb
          alias: external_db
        - path: "postgresql://postgres:{{ env_var('DBT_DUCKDB_PG_PWD') }}@localhost:5466/postgres"
          type: postgres
          alias: postgres_db
  target: dev

DBT_DUCKDB_PG_PWDは先ほど設定したデータベースへのパスワードです。

modelsフォルダ内に以下の内容を記述します。

models:
  dutch_railway_network:
    transformation:
      schema: main
      +docs:
        node_color: 'silver'
    reverse_etl:
      database: postgres_db
      schema: public
      +docs:
        node_color: '#d5b85a'

transformationフォルダ(データ変換用)

保存場所:DuckDBのmainスキーマ
見た目:ドキュメント上では銀色で表示

  1. reverse_etlフォルダ(PostgreSQLへのデータ転送用)

保存場所:PostgreSQLのpublicスキーマ
データベース:postgres_db(profiles.ymlで設定した接続先)
見た目:ドキュメント上では金色で表示

みたいです。

dbt runをするとどうもデータベースが必要なので作りました。

duckdb data/dutch_railway_network.duckdb
v1.2.1 8e52ec4395
Enter ".help" for usage hints.
D "CREATE TABLE dummy (id INTEGER);"
pg_isready -h localhost -p 5466

でpostgresqlが起動していることを確認。
起動していない場合は以下

brew services start postgresql@15
dbt run --model +reverse_etl
02:16:28  Running with dbt=1.9.4
02:16:28  Registered adapter: duckdb=1.9.2
02:16:28  [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 1 unused configuration paths:
- models.dutch_railway_network.transformation
02:16:28  Found 7 models, 4 data tests, 4 sources, 426 macros
02:16:28  
02:16:28  Concurrency: 5 threads (target='dev')
02:16:28  
02:19:30  
02:19:30  Finished running  in 0 hours 3 minutes and 1.48 seconds (181.48s).
02:19:30  Encountered an error:
Runtime Error
  IO Error: Cannot open database "https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb" in read-only mode: database does not exist

あれデータベースがない...

duckdb
v1.2.1 8e52ec4395
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
D ATTACH 'https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb' AS external_db;
IO Error:
Cannot open database "https://blobs.duckdb.org/nl-railway/train_stations_and_services.duckdb" in read-only mode: database does not exist

おそらく公式の方がブログのために一時的に作成したデータベースなのかな?
これ以上の検証ができないので一旦この記事はスクラップに入れてお蔵入りにします。