🍊

dbt snapshot のタイムスタンプ戦略を試してみる

2024/09/19に公開

https://docs.getdbt.com/docs/build/snapshots

はじめに

  • サービスは、全てログ保有すると動作が遅くなったり色々な弊害があります。とはいえ、ログがないと時系列での分析ができなかったり色々したいことができません。dbt snapshotはそういった課題を解決するデータのログを取る機能です。
  • 理解を深める為基本的な挙動を確認します。

動作検証

動作環境

  • dbt core
Running with dbt=1.8.3
Registered adapter: bigquery=1.8.0

確認内容

  • 今回はタイムスタンプ戦略のみ確認します
  • 記事に記載の順番でdelete処理update処理insert処理の流れでテストデータを更新し挙動確認する

スナップショットの「戦略」は、dbtがどのようにして行の変更を検出するかを定義します。dbtには、デフォルトで2つの戦略が組み込まれており、これらは「timestamp」と「check」です。

タイムスタンプ戦略(推奨)

タイムスタンプ戦略は、updated_at フィールドを使用して行が変更されたかどうかを判断します。もし、行の設定された updated_at カラムが前回のスナップショット実行時よりも新しい場合、dbtは古いレコードを無効にして、新しいレコードを記録します。タイムスタンプが変更されていない場合、dbtは何もアクションを取らないままです。

タイムスタンプ戦略を使用するためには、以下の設定が必要です。

設定項目 説明
updated_at ソース行が最後に更新された時刻を示すカラム updated_at

チェック戦略

チェック戦略は、信頼できる updated_at カラムがないテーブルに便利です。この戦略は、指定されたカラムのリストを、現在の値と履歴の値で比較することで動作します。これらのカラムのいずれかが変更された場合、dbtは古いレコードを無効にし、新しいレコードを記録します。カラムの値が同一であれば、dbtはアクションを取りません。

チェック戦略を使用するためには、以下の設定が必要です。

設定項目 説明
check_cols 変更を確認するカラムのリスト、またはすべてのカラムを確認するために all を指定 ["name", "email"]

対象のデータ作成

対象のデータ作成

下記のクエリを実行しテストデータを作成する

CREATE OR REPLACE TABLE `project_id.work.test_001` (
  id INT64,
  name STRING,
  age INT64,
  updated_at TIMESTAMP
) AS (
  SELECT id, name, age, updated_at
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
    SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
    SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
    SELECT 4, '山田優子', 28, TIMESTAMP('2024-08-10 08:15:00') UNION ALL
    SELECT 5, '伊藤健太', 32, TIMESTAMP('2024-09-11 09:00:00')
  )
);

id name age updated_at
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC
4 山田優子 28 2024-08-10 08:15:00.000000 UTC
5 伊藤健太 32 2024-09-11 09:00:00.000000 UTC

dbt の各種設定

dbt_project.yml
...

snapshots: # dbt snapshot機能の設定セクション
  dbt_project:
    +persist_docs: # yamlファイルに記載したテーブルやカラムのdescriptionをデータベースに追加する設定
      relation: true # テーブルやビューのdescriptionを追加する
      columns: true # 各カラムのdescriptionを追加する
    +target_database: project_id # スナップショットの対象となるデータベース(プロジェクトID)
    +unique_key: id # スナップショットでのレコードの一意キー(id列)
    +strategy: timestamp # スナップショットの戦略。timestampは、更新時刻に基づいて変更を追跡する戦略
    +updated_at: updated_at # レコードの更新時刻を示すカラム(updated_at列)
    +invalidate_hard_deletes: True # データが削除された際に、論理削除ではなくスナップショットのレコードを無効にする設定

  work_snapshot: # 個別スナップショットの設定
    +target_schema: work_snapshot # スナップショットデータを格納するスキーマ名(work_snapshotスキーマ)
    +tags: # タグの設定。プロジェクトのタグ付けに利用
      - "work" # このスナップショットに「work」タグを付与
sources.yml
version: 2

sources:
  - name: work       # データソースの名前
    schema: work    # スキーマ名
    tables:
      - name: test_001      # テーブル名
        columns:
          - name: id
            description: "ユニークな識別子"
          - name: name
            description: "個人の名前"
          - name: age
            description: "個人の年齢"
          - name: updated_at
            description: "レコードが最後に更新された日時"
daily_data_snapshot.sql
{% snapshot daily_data_snapshot %}

select * 
from {{ source('work', 'test_001') }}

{% endsnapshot %}
`updated_at`がない場合

dbt_project.ymlの一括設定で+updated_at: updated_atにしているが、updated_atがない場合もあるのでconfigで個別に設定する(configの設定が優先)

daily_data_snapshot.sql
{% snapshot daily_data_snapshot %}
{{
    config(
      updated_at='deleted_at'
    )
}}
select * 
from {{ source('work', 'test_001') }}

{% endsnapshot %}

初回のdbt snapshot実行

dbt snapshot --select daily_data_snapshot

生成されるテーブル

id name age updated_at dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC 1add38f10d92ddac67bd3a452e5a5b4b 2024-01-01 00:00:00.000000 UTC 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC 9bd1a995c13b03ec6c0838b39488ce2d 2024-01-01 10:00:00.000000 UTC 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC ce744b718be82cda7bd6b524435d63a8 2024-07-15 14:30:00.000000 UTC 2024-07-15 14:30:00.000000 UTC
4 山田優子 28 2024-08-10 08:15:00.000000 UTC c3cb0978dd4110a756fea115f4853849 2024-08-10 08:15:00.000000 UTC 2024-08-10 08:15:00.000000 UTC
5 伊藤健太 32 2024-09-11 09:00:00.000000 UTC 6b8809c8f4a577349644c7c2959948d1 2024-09-11 09:00:00.000000 UTC 2024-09-11 09:00:00.000000 UTC

付与される Snapshot メタフィールド

スナップショットテーブルは、元のデータセットのクローンとして作成され、いくつかの追加のメタフィールドが含まれます。

  • dbt_scd_id
    • スナップショットされた各レコードに対して生成されるユニークキー
    • dbtが内部的に使用します
  • dbt_updated_at
    • このスナップショット行が挿入されたときのソースレコードの updated_at タイムスタンプ
    • dbtが内部的に使用します
  • dbt_valid_from
    • このスナップショット行が最初に挿入されたタイムスタンプ
    • レコードの異なる「バージョン」を順序付けるために使用されます
  • dbt_valid_to
    • この行が無効になったタイムスタンプ
    • 最新のスナップショットレコードでは、dbt_valid_to がNULLに設定されます

delete 処理確認

宛先テーブルに存在するレコードがソーステーブルでdeleteされていた場合

対象のデータ更新

下記のクエリを実行しテストデータを更新する

CREATE OR REPLACE TABLE `project_id.work.test_001` (
  id INT64,
  name STRING,
  age INT64,
  updated_at TIMESTAMP
) AS (
  SELECT id, name, age, updated_at
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
    SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
    SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
    SELECT 5, '伊藤健太', 32, TIMESTAMP('2024-09-11 09:00:00')
  )
);

id = 4のレコードが削除されていた想定

id name age updated_at
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC
5 伊藤健太 32 2024-09-11 09:00:00.000000 UTC

delete 処理後のdbt snapshot実行

dbt snapshot --select daily_data_snapshot

生成されるテーブル

id name age updated_at dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC 1add38f10d92ddac67bd3a452e5a5b4b 2024-01-01 00:00:00.000000 UTC 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC 9bd1a995c13b03ec6c0838b39488ce2d 2024-01-01 10:00:00.000000 UTC 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC ce744b718be82cda7bd6b524435d63a8 2024-07-15 14:30:00.000000 UTC 2024-07-15 14:30:00.000000 UTC
4 山田優子 28 2024-08-10 08:15:00.000000 UTC c3cb0978dd4110a756fea115f4853849 2024-08-10 08:15:00.000000 UTC 2024-08-10 08:15:00.000000 UTC 2024-09-18 15:50:36.354824 UTC
5 伊藤健太 32 2024-09-11 09:00:00.000000 UTC 6b8809c8f4a577349644c7c2959948d1 2024-09-11 09:00:00.000000 UTC 2024-09-11 09:00:00.000000 UTC
  • 確認できた挙動
    • 削除されたレコードのdbt_valid_todbt snapshot --select daily_data_snapshotの実行タイミングの現在時刻(UTC)がに挿入される
    • もう一度dbt snapshot --select daily_data_snapshotしてもdbt_valid_toは更新されなかったので、最初の実行タイミングの現在時刻(UTC)が固定されることがわかった

update 処理確認

宛先テーブルと比較してソーステーブルのレコードがupdateされていた場合

対象のデータ更新

下記のクエリを実行しテストデータを更新する

CREATE OR REPLACE TABLE `project_id.work.test_001` (
  id INT64,
  name STRING,
  age INT64,
  updated_at TIMESTAMP
) AS (
  SELECT id, name, age, updated_at
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
    SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
    SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
    SELECT 5, '伊藤健太', 30, TIMESTAMP('2024-09-11 09:30:00')
  )
);

id = 5のレコードが更新されていた想定

id name age updated_at
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC
5 伊藤健太 30 2024-09-11 09:30:00.000000 UTC

update 処理後のdbt snapshot実行

dbt snapshot --select daily_data_snapshot

生成されるテーブル

id name age updated_at dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC 1add38f10d92ddac67bd3a452e5a5b4b 2024-01-01 00:00:00.000000 UTC 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC 9bd1a995c13b03ec6c0838b39488ce2d 2024-01-01 10:00:00.000000 UTC 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC ce744b718be82cda7bd6b524435d63a8 2024-07-15 14:30:00.000000 UTC 2024-07-15 14:30:00.000000 UTC
4 山田優子 28 2024-08-10 08:15:00.000000 UTC c3cb0978dd4110a756fea115f4853849 2024-08-10 08:15:00.000000 UTC 2024-08-10 08:15:00.000000 UTC 2024-09-18 15:50:36.354824 UTC
5 伊藤健太 32 2024-09-11 09:00:00.000000 UTC 6b8809c8f4a577349644c7c2959948d1 2024-09-11 09:00:00.000000 UTC 2024-09-11 09:00:00.000000 UTC 2024-09-11 09:30:00.000000 UTC
5 伊藤健太 30 2024-09-11 09:30:00.000000 UTC 757713eedb36bc5c5e62474910f1b377 2024-09-11 09:30:00.000000 UTC 2024-09-11 09:30:00.000000 UTC
  • 確認できた挙動
    • 更新したレコードは追加され、dbt_valid_toに更新前のupdated_atが挿入される
    • 元データをupdated_atを変化なしで項目変更して、dbt snapshot --select daily_data_snapshotしたがupdated_atが変化していない為、項目の変化は反映されなかった
      • あくまで項目が変化したらupdated_atが更新される前提に立っていることが確認できた。その前提がない場合、Check戦略を使うしかなさそう

insert 処理確認

宛先テーブルに無いレコードがソーステーブル側に新規で作成されていた場合

対象のデータ更新

下記のクエリを実行しテストデータを更新する

CREATE OR REPLACE TABLE `project_id.work.test_001` (
  id INT64,
  name STRING,
  age INT64,
  updated_at TIMESTAMP
) AS (
  SELECT id, name, age, updated_at
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
    SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
    SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
    SELECT 5, '伊藤健太', 30, TIMESTAMP('2024-09-11 09:30:00') UNION ALL
    SELECT 6, '山田太郎', 65, TIMESTAMP('2024-09-11 09:30:00') 
  )
);

id = 6のレコードが追加されていた想定

id name age updated_at
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC
5 伊藤健太 30 2024-09-11 09:30:00.000000 UTC
6 山田太郎 65 2024-09-11 09:30:00.000000 UTC

insert 処理後のdbt snapshot実行

dbt snapshot --select daily_data_snapshot

生成されるテーブル

id name age updated_at dbt_scd_id dbt_updated_at dbt_valid_from dbt_valid_to
1 田中太郎 30 2024-01-01 00:00:00.000000 UTC 1add38f10d92ddac67bd3a452e5a5b4b 2024-01-01 00:00:00.000000 UTC 2024-01-01 00:00:00.000000 UTC
2 佐藤花子 25 2024-01-01 10:00:00.000000 UTC 9bd1a995c13b03ec6c0838b39488ce2d 2024-01-01 10:00:00.000000 UTC 2024-01-01 10:00:00.000000 UTC
3 鈴木一郎 35 2024-07-15 14:30:00.000000 UTC ce744b718be82cda7bd6b524435d63a8 2024-07-15 14:30:00.000000 UTC 2024-07-15 14:30:00.000000 UTC
4 山田優子 28 2024-08-10 08:15:00.000000 UTC c3cb0978dd4110a756fea115f4853849 2024-08-10 08:15:00.000000 UTC 2024-08-10 08:15:00.000000 UTC 2024-09-18 15:50:36.354824 UTC
5 伊藤健太 32 2024-09-11 09:00:00.000000 UTC 6b8809c8f4a577349644c7c2959948d1 2024-09-11 09:00:00.000000 UTC 2024-09-11 09:00:00.000000 UTC 2024-09-11 09:30:00.000000 UTC
5 伊藤健太 30 2024-09-11 09:30:00.000000 UTC 757713eedb36bc5c5e62474910f1b377 2024-09-11 09:30:00.000000 UTC 2024-09-11 09:30:00.000000 UTC
6 山田太郎 65 2024-09-11 09:30:00.000000 UTC 249ef6d890f3fade17808c2463a7e7de 2024-09-11 09:30:00.000000 UTC 2024-09-11 09:30:00.000000 UTC
  • 確認できた挙動
    • 追加されたレコードが追加された。

追記メモ

スナップショットテーブルのデータ加工

created_atしかなく、物理削除されるテーブルをスナップショットした後のデータ加工

  • dbt_valid_toをdeleted_atに採用
  • idごとに最新のレコード採用
  • UTCをJSTに変換
SELECT
  * EXCEPT (created_at,dbt_updated_at,dbt_valid_from,dbt_valid_to,dbt_scd_id)
  ,CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S',created_at,'Asia/Tokyo') AS DATETIME) AS created_at
  ,CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S',dbt_valid_to,'Asia/Tokyo') AS DATETIME) AS deleted_at
FROM スナップショットテーブル
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY dbt_updated_at DESC) = 1

おわりに

何でもかんでもスナップショットとってたらコストがかかるので、見極めが必要な気がする。
サービスのログ機能をデータ分析基盤に全振りするのもありかも、、、
dbt snapshotSlowly Changing Dimension Type 2だが他のタイプについても理解を深めたいと思った。

参考

https://docs.getdbt.com/docs/build/snapshots
https://tech.timee.co.jp/entry/2024/06/11/151948#insert処理宛先テーブルに無いレコードがソーステーブル側に新規で作成されていた場合
https://data.gunosy.io/entry/dbt_snapshot_and_scd
https://qiita.com/Ayumu-y/items/af48189c8168c7030707
https://jimatomo.hatenablog.com/entry/dbt/snapshot
https://dev.classmethod.jp/articles/dbt-try-snapshot/

Discussion