dbt snapshot のタイムスタンプ戦略を試してみる
はじめに
- サービスは、全てログ保有すると動作が遅くなったり色々な弊害があります。とはいえ、ログがないと時系列での分析ができなかったり色々したいことができません。
dbt snapshot
はそういった課題を解決するデータのログを取る機能です。 - 理解を深める為基本的な挙動を確認します。
動作検証
動作環境
- dbt core
Running with dbt=1.8.3
Registered adapter: bigquery=1.8.0
確認内容
- 今回はタイムスタンプ戦略のみ確認します
- 記事に記載の順番で
delete処理
→update処理
→insert処理
の流れでテストデータを更新し挙動確認する
スナップショットの「戦略」は、dbtがどのようにして行の変更を検出するかを定義します。dbtには、デフォルトで2つの戦略が組み込まれており、これらは「timestamp」と「check」です。
タイムスタンプ戦略(推奨)
タイムスタンプ戦略は、updated_at
フィールドを使用して行が変更されたかどうかを判断します。もし、行の設定された updated_at
カラムが前回のスナップショット実行時よりも新しい場合、dbtは古いレコードを無効にして、新しいレコードを記録します。タイムスタンプが変更されていない場合、dbtは何もアクションを取らないままです。
タイムスタンプ戦略を使用するためには、以下の設定が必要です。
設定項目 | 説明 | 例 |
---|---|---|
updated_at | ソース行が最後に更新された時刻を示すカラム | updated_at |
チェック戦略
チェック戦略は、信頼できる updated_at
カラムがないテーブルに便利です。この戦略は、指定されたカラムのリストを、現在の値と履歴の値で比較することで動作します。これらのカラムのいずれかが変更された場合、dbtは古いレコードを無効にし、新しいレコードを記録します。カラムの値が同一であれば、dbtはアクションを取りません。
チェック戦略を使用するためには、以下の設定が必要です。
設定項目 | 説明 | 例 |
---|---|---|
check_cols | 変更を確認するカラムのリスト、またはすべてのカラムを確認するために all を指定 |
["name", "email"] |
対象のデータ作成
対象のデータ作成
下記のクエリを実行しテストデータを作成する
CREATE OR REPLACE TABLE `project_id.work.test_001` (
id INT64,
name STRING,
age INT64,
updated_at TIMESTAMP
) AS (
SELECT id, name, age, updated_at
FROM (
SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
SELECT 4, '山田優子', 28, TIMESTAMP('2024-08-10 08:15:00') UNION ALL
SELECT 5, '伊藤健太', 32, TIMESTAMP('2024-09-11 09:00:00')
)
);
id | name | age | updated_at |
---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC |
4 | 山田優子 | 28 | 2024-08-10 08:15:00.000000 UTC |
5 | 伊藤健太 | 32 | 2024-09-11 09:00:00.000000 UTC |
dbt の各種設定
...
snapshots: # dbt snapshot機能の設定セクション
dbt_project:
+persist_docs: # yamlファイルに記載したテーブルやカラムのdescriptionをデータベースに追加する設定
relation: true # テーブルやビューのdescriptionを追加する
columns: true # 各カラムのdescriptionを追加する
+target_database: project_id # スナップショットの対象となるデータベース(プロジェクトID)
+unique_key: id # スナップショットでのレコードの一意キー(id列)
+strategy: timestamp # スナップショットの戦略。timestampは、更新時刻に基づいて変更を追跡する戦略
+updated_at: updated_at # レコードの更新時刻を示すカラム(updated_at列)
+invalidate_hard_deletes: True # データが削除された際に、論理削除ではなくスナップショットのレコードを無効にする設定
work_snapshot: # 個別スナップショットの設定
+target_schema: work_snapshot # スナップショットデータを格納するスキーマ名(work_snapshotスキーマ)
+tags: # タグの設定。プロジェクトのタグ付けに利用
- "work" # このスナップショットに「work」タグを付与
version: 2
sources:
- name: work # データソースの名前
schema: work # スキーマ名
tables:
- name: test_001 # テーブル名
columns:
- name: id
description: "ユニークな識別子"
- name: name
description: "個人の名前"
- name: age
description: "個人の年齢"
- name: updated_at
description: "レコードが最後に更新された日時"
{% snapshot daily_data_snapshot %}
select *
from {{ source('work', 'test_001') }}
{% endsnapshot %}
`updated_at`がない場合
dbt_project.yml
の一括設定で+updated_at: updated_at
にしているが、updated_at
がない場合もあるのでconfig
で個別に設定する(config
の設定が優先)
{% snapshot daily_data_snapshot %}
{{
config(
updated_at='deleted_at'
)
}}
select *
from {{ source('work', 'test_001') }}
{% endsnapshot %}
dbt snapshot
実行
初回のdbt snapshot --select daily_data_snapshot
生成されるテーブル
id | name | age | updated_at | dbt_scd_id | dbt_updated_at | dbt_valid_from | dbt_valid_to |
---|---|---|---|---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC | 1add38f10d92ddac67bd3a452e5a5b4b | 2024-01-01 00:00:00.000000 UTC | 2024-01-01 00:00:00.000000 UTC | |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC | 9bd1a995c13b03ec6c0838b39488ce2d | 2024-01-01 10:00:00.000000 UTC | 2024-01-01 10:00:00.000000 UTC | |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC | ce744b718be82cda7bd6b524435d63a8 | 2024-07-15 14:30:00.000000 UTC | 2024-07-15 14:30:00.000000 UTC | |
4 | 山田優子 | 28 | 2024-08-10 08:15:00.000000 UTC | c3cb0978dd4110a756fea115f4853849 | 2024-08-10 08:15:00.000000 UTC | 2024-08-10 08:15:00.000000 UTC | |
5 | 伊藤健太 | 32 | 2024-09-11 09:00:00.000000 UTC | 6b8809c8f4a577349644c7c2959948d1 | 2024-09-11 09:00:00.000000 UTC | 2024-09-11 09:00:00.000000 UTC |
付与される Snapshot メタフィールド
スナップショットテーブルは、元のデータセットのクローンとして作成され、いくつかの追加のメタフィールドが含まれます。
-
dbt_scd_id
- スナップショットされた各レコードに対して生成されるユニークキー
- dbtが内部的に使用します
-
dbt_updated_at
- このスナップショット行が挿入されたときのソースレコードの
updated_at
タイムスタンプ - dbtが内部的に使用します
- このスナップショット行が挿入されたときのソースレコードの
-
dbt_valid_from
- このスナップショット行が最初に挿入されたタイムスタンプ
- レコードの異なる「バージョン」を順序付けるために使用されます
-
dbt_valid_to
- この行が無効になったタイムスタンプ
- 最新のスナップショットレコードでは、
dbt_valid_to
がNULLに設定されます
delete 処理確認
宛先テーブルに存在するレコードがソーステーブルでdeleteされていた場合
対象のデータ更新
下記のクエリを実行しテストデータを更新する
CREATE OR REPLACE TABLE `project_id.work.test_001` (
id INT64,
name STRING,
age INT64,
updated_at TIMESTAMP
) AS (
SELECT id, name, age, updated_at
FROM (
SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
SELECT 5, '伊藤健太', 32, TIMESTAMP('2024-09-11 09:00:00')
)
);
id = 4
のレコードが削除されていた想定
id | name | age | updated_at |
---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC |
5 | 伊藤健太 | 32 | 2024-09-11 09:00:00.000000 UTC |
dbt snapshot
実行
delete 処理後のdbt snapshot --select daily_data_snapshot
生成されるテーブル
id | name | age | updated_at | dbt_scd_id | dbt_updated_at | dbt_valid_from | dbt_valid_to |
---|---|---|---|---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC | 1add38f10d92ddac67bd3a452e5a5b4b | 2024-01-01 00:00:00.000000 UTC | 2024-01-01 00:00:00.000000 UTC | |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC | 9bd1a995c13b03ec6c0838b39488ce2d | 2024-01-01 10:00:00.000000 UTC | 2024-01-01 10:00:00.000000 UTC | |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC | ce744b718be82cda7bd6b524435d63a8 | 2024-07-15 14:30:00.000000 UTC | 2024-07-15 14:30:00.000000 UTC | |
4 | 山田優子 | 28 | 2024-08-10 08:15:00.000000 UTC | c3cb0978dd4110a756fea115f4853849 | 2024-08-10 08:15:00.000000 UTC | 2024-08-10 08:15:00.000000 UTC | 2024-09-18 15:50:36.354824 UTC |
5 | 伊藤健太 | 32 | 2024-09-11 09:00:00.000000 UTC | 6b8809c8f4a577349644c7c2959948d1 | 2024-09-11 09:00:00.000000 UTC | 2024-09-11 09:00:00.000000 UTC |
- 確認できた挙動
- 削除されたレコードの
dbt_valid_to
にdbt snapshot --select daily_data_snapshot
の実行タイミングの現在時刻(UTC)がに挿入される - もう一度
dbt snapshot --select daily_data_snapshot
してもdbt_valid_to
は更新されなかったので、最初の実行タイミングの現在時刻(UTC)が固定されることがわかった
- 削除されたレコードの
update 処理確認
宛先テーブルと比較してソーステーブルのレコードがupdateされていた場合
対象のデータ更新
下記のクエリを実行しテストデータを更新する
CREATE OR REPLACE TABLE `project_id.work.test_001` (
id INT64,
name STRING,
age INT64,
updated_at TIMESTAMP
) AS (
SELECT id, name, age, updated_at
FROM (
SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
SELECT 5, '伊藤健太', 30, TIMESTAMP('2024-09-11 09:30:00')
)
);
id = 5
のレコードが更新されていた想定
id | name | age | updated_at |
---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC |
5 | 伊藤健太 | 30 | 2024-09-11 09:30:00.000000 UTC |
dbt snapshot
実行
update 処理後のdbt snapshot --select daily_data_snapshot
生成されるテーブル
id | name | age | updated_at | dbt_scd_id | dbt_updated_at | dbt_valid_from | dbt_valid_to |
---|---|---|---|---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC | 1add38f10d92ddac67bd3a452e5a5b4b | 2024-01-01 00:00:00.000000 UTC | 2024-01-01 00:00:00.000000 UTC | |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC | 9bd1a995c13b03ec6c0838b39488ce2d | 2024-01-01 10:00:00.000000 UTC | 2024-01-01 10:00:00.000000 UTC | |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC | ce744b718be82cda7bd6b524435d63a8 | 2024-07-15 14:30:00.000000 UTC | 2024-07-15 14:30:00.000000 UTC | |
4 | 山田優子 | 28 | 2024-08-10 08:15:00.000000 UTC | c3cb0978dd4110a756fea115f4853849 | 2024-08-10 08:15:00.000000 UTC | 2024-08-10 08:15:00.000000 UTC | 2024-09-18 15:50:36.354824 UTC |
5 | 伊藤健太 | 32 | 2024-09-11 09:00:00.000000 UTC | 6b8809c8f4a577349644c7c2959948d1 | 2024-09-11 09:00:00.000000 UTC | 2024-09-11 09:00:00.000000 UTC | 2024-09-11 09:30:00.000000 UTC |
5 | 伊藤健太 | 30 | 2024-09-11 09:30:00.000000 UTC | 757713eedb36bc5c5e62474910f1b377 | 2024-09-11 09:30:00.000000 UTC | 2024-09-11 09:30:00.000000 UTC |
- 確認できた挙動
- 更新したレコードは追加され、
dbt_valid_to
に更新前のupdated_at
が挿入される - 元データを
updated_at
を変化なしで項目変更して、dbt snapshot --select daily_data_snapshot
したがupdated_at
が変化していない為、項目の変化は反映されなかった- あくまで項目が変化したら
updated_at
が更新される前提に立っていることが確認できた。その前提がない場合、Check戦略を使うしかなさそう
- あくまで項目が変化したら
- 更新したレコードは追加され、
insert 処理確認
宛先テーブルに無いレコードがソーステーブル側に新規で作成されていた場合
対象のデータ更新
下記のクエリを実行しテストデータを更新する
CREATE OR REPLACE TABLE `project_id.work.test_001` (
id INT64,
name STRING,
age INT64,
updated_at TIMESTAMP
) AS (
SELECT id, name, age, updated_at
FROM (
SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') UNION ALL
SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') UNION ALL
SELECT 5, '伊藤健太', 30, TIMESTAMP('2024-09-11 09:30:00') UNION ALL
SELECT 6, '山田太郎', 65, TIMESTAMP('2024-09-11 09:30:00')
)
);
id = 6
のレコードが追加されていた想定
id | name | age | updated_at |
---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC |
5 | 伊藤健太 | 30 | 2024-09-11 09:30:00.000000 UTC |
6 | 山田太郎 | 65 | 2024-09-11 09:30:00.000000 UTC |
dbt snapshot
実行
insert 処理後のdbt snapshot --select daily_data_snapshot
生成されるテーブル
id | name | age | updated_at | dbt_scd_id | dbt_updated_at | dbt_valid_from | dbt_valid_to |
---|---|---|---|---|---|---|---|
1 | 田中太郎 | 30 | 2024-01-01 00:00:00.000000 UTC | 1add38f10d92ddac67bd3a452e5a5b4b | 2024-01-01 00:00:00.000000 UTC | 2024-01-01 00:00:00.000000 UTC | |
2 | 佐藤花子 | 25 | 2024-01-01 10:00:00.000000 UTC | 9bd1a995c13b03ec6c0838b39488ce2d | 2024-01-01 10:00:00.000000 UTC | 2024-01-01 10:00:00.000000 UTC | |
3 | 鈴木一郎 | 35 | 2024-07-15 14:30:00.000000 UTC | ce744b718be82cda7bd6b524435d63a8 | 2024-07-15 14:30:00.000000 UTC | 2024-07-15 14:30:00.000000 UTC | |
4 | 山田優子 | 28 | 2024-08-10 08:15:00.000000 UTC | c3cb0978dd4110a756fea115f4853849 | 2024-08-10 08:15:00.000000 UTC | 2024-08-10 08:15:00.000000 UTC | 2024-09-18 15:50:36.354824 UTC |
5 | 伊藤健太 | 32 | 2024-09-11 09:00:00.000000 UTC | 6b8809c8f4a577349644c7c2959948d1 | 2024-09-11 09:00:00.000000 UTC | 2024-09-11 09:00:00.000000 UTC | 2024-09-11 09:30:00.000000 UTC |
5 | 伊藤健太 | 30 | 2024-09-11 09:30:00.000000 UTC | 757713eedb36bc5c5e62474910f1b377 | 2024-09-11 09:30:00.000000 UTC | 2024-09-11 09:30:00.000000 UTC | |
6 | 山田太郎 | 65 | 2024-09-11 09:30:00.000000 UTC | 249ef6d890f3fade17808c2463a7e7de | 2024-09-11 09:30:00.000000 UTC | 2024-09-11 09:30:00.000000 UTC |
- 確認できた挙動
- 追加されたレコードが追加された。
追記メモ
スナップショットテーブルのデータ加工
created_at
しかなく、物理削除されるテーブルをスナップショットした後のデータ加工
- dbt_valid_toをdeleted_atに採用
- idごとに最新のレコード採用
- UTCをJSTに変換
SELECT
* EXCEPT (created_at,dbt_updated_at,dbt_valid_from,dbt_valid_to,dbt_scd_id)
,CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S',created_at,'Asia/Tokyo') AS DATETIME) AS created_at
,CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S',dbt_valid_to,'Asia/Tokyo') AS DATETIME) AS deleted_at
FROM スナップショットテーブル
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY dbt_updated_at DESC) = 1
おわりに
何でもかんでもスナップショットとってたらコストがかかるので、見極めが必要な気がする。
サービスのログ機能をデータ分析基盤に全振りするのもありかも、、、
dbt snapshot
は Slowly Changing Dimension Type 2
だが他のタイプについても理解を深めたいと思った。
参考
Discussion