🍊
データの鮮度を dbt freshness でチェックする
はじめに
先人が、データに対するテストは、初手 dbt freshness すべきと仰っていたので試します
ありがたいお言葉(参照)
- 以下、御託
- データの鮮度は、データ分析において非常に重要な要素です。特にリアルタイムデータや、最新の状態で分析を行う必要がある場合、データが新しいかどうかを確認することは不可欠です。dbt では、データソースの鮮度を自動的にチェックできる「freshness」設定があり、これを活用することでデータの信頼性を維持できます。
動作検証
動作環境
- dbt core
Running with dbt=1.8.3
Registered adapter: bigquery=1.8.0
鮮度テスト内容
- 対象テーブルの
updated_at
の最新日付で鮮度テストを行う- 実行日が2024/09/11の場合
updated_at |
鮮度テスト |
---|---|
今日(2024/09/11) | PASS |
昨日(2024/09/10) | WARNING |
2日前(2024/09/09) | WARNING |
3日前(2024/09/08) | WARNING |
4日前(2024/09/07) | ERROR |
5日前(2024/09/06) | ERROR |
6日前(2024/09/05) | ERROR |
鮮度テスト対象のデータ作成
サンプルデータ
CREATE OR REPLACE TABLE `project_id.work_dbt.test_001` (
id INT64,
name STRING,
age INT64,
city STRING,
updated_at TIMESTAMP
) AS (
SELECT *
FROM (
SELECT 1 AS id, '田中太郎' AS name, 30 AS age, '東京' AS city, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
SELECT 2, '佐藤花子', 25, '大阪', TIMESTAMP('2024-01-01 10:00:00') UNION ALL
SELECT 3, '鈴木一郎', 35, '名古屋', TIMESTAMP('2024-07-15 14:30:00') UNION ALL
SELECT 4, '山田優子', 28, '福岡', TIMESTAMP('2024-08-10 08:15:00') UNION ALL
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-11 09:00:00')
)
);
updated_at
の日付を修正して、再実行することによりサンプルデータを作り直す。日付は実行日によって変わるので適宜修正お願いします。
-- 実行日:2024/09/11
-- PASS
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-11 09:00:00')
-- WARN
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-08 09:00:00')
-- ERROR STALE
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-07 09:00:00')
sources.yml 作成
dbt
の「freshness」設定では、データがどのくらいの期間新しいとみなすかを設定できます。
-
warn_after
: 指定した期間内にデータが更新されていない場合に警告が発生する -
error_after
: 指定した期間内にデータが更新されていない場合にエラーが発生する -
loaded_at_field
: データがいつ最後に更新されたかを示すタイムスタンプフィールドを指定します。これにより、dbt
はこのフィールドを基にデータの鮮度をチェックします。(timestampかつUTCである必要があるため必要に応じてCASTしましょう)
sources.yml
version: 2
sources:
- name: work_dbt # データソースの名前
schema: work_dbt # スキーマ名
tables:
- name: test_001 # テーブル名
freshness:
warn_after: {count: 1, period: day}
error_after: {count: 4, period: day}
loaded_at_field: updated_at
columns:
- name: id
description: "ユニークな識別子"
- name: name
description: "個人の名前"
- name: age
description: "個人の年齢"
- name: city
description: "個人が居住する都市"
- name: updated_at
description: "レコードが最後に更新された日時"
補足情報
dbt
では、データソース全体に対して鮮度チェックの設定を行うだけでなく、テーブルごとに個別の設定を適用することも可能です。例えば、特定のテーブルに対して厳格なルールを設けたり、特定のテーブルだけ鮮度チェックを無効にすることができます。
models/<filename>.yml
version: 2
sources:
- name: <source_name>
freshness:
warn_after:
count: <positive_integer>
period: minute | hour | day
error_after:
count: <positive_integer>
period: minute | hour | day
filter: <boolean_sql_expression>
loaded_at_field: <column_name_or_expression>
tables:
- name: <table_name>
freshness:
warn_after:
count: <positive_integer>
period: minute | hour | day
error_after:
count: <positive_integer>
period: minute | hour | day
filter: <boolean_sql_expression>
loaded_at_field: <column_name_or_expression>
...
sources:
- name: my_source
tables:
- name: orders
freshness:
warn_after:
count: 6
period: hour
error_after:
count: 12
period: hour
filter: datediff('day', _etl_loaded_at, current_timestamp) < 2
この例では、orders
テーブルに対しては、6時間後に警告、12時間後にエラーを発生させる設定に加え、filter
オプションで2日以内のデータに対してのみ鮮度チェックを行うよう制限しています。
鮮度テスト実行
-
test_001
を指定して実行(今回はこれ)- 事前にサンプルデータ
updated_at
の日付を修正
- 事前にサンプルデータ
dbt source freshness --select source:work_dbt.test_001
-
work_dbt
の以下のテーブル全て実行
dbt source freshness --select source:work_dbt
- dbt freshness を全て実行
dbt source freshness
実行結果
PASS
$ dbt source freshness --select source:work_dbt.test_001
13:08:51
13:08:51 1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:08:52 1 of 1 PASS freshness of work_dbt.test_001 ..................................... [PASS in 1.21s]
13:08:52
13:08:52 Finished running 1 source in 0 hours 0 minutes and 8.90 seconds (8.90s).
13:08:53 Done.
WARN
$ dbt source freshness --select source:work_dbt.test_001
13:15:55
13:15:55 1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:15:56 1 of 1 WARN freshness of work_dbt.test_001 ..................................... [WARN in 1.14s]
13:15:56
13:15:56 Finished running 1 source in 0 hours 0 minutes and 8.89 seconds (8.89s).
13:15:56 Done.
ERROR STALE
$ dbt source freshness --select source:work_dbt.test_001
13:17:05
13:17:05 1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:17:07 1 of 1 ERROR STALE freshness of work_dbt.test_001 .............................. [ERROR STALE in 1.43s]
13:17:07
13:17:07 Finished running 1 source in 0 hours 0 minutes and 8.55 seconds (8.55s).
13:17:07
13:17:07 Done.
活用方法
- 本番で使ってないので、半分妄想ですが
dbt source freshness
のエラーダウンで後続のdbt run
を止める
1.後続のモデル(dbt_project/models/work_dbt/work_dbt__test_002.sql)
select *
from {{ source('work_dbt', 'test_001') }}
実行コマンド
dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
WARNなので後続タスクが実行できる
$ dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
13:21:14
13:21:14 1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:21:15 1 of 1 WARN freshness of work_dbt.test_001 ..................................... [WARN in 1.09s]
13:21:15
13:21:15 Finished running 1 source in 0 hours 0 minutes and 8.65 seconds (8.65s).
13:21:15 Done.
13:21:25
13:21:25 1 of 1 START sql view model work_dbt.test_002 .................................. [RUN]
13:21:26 1 of 1 OK created sql view model work_dbt.test_002 ............................. [CREATE VIEW (0 processed) in 0.79s]
13:21:26
13:21:26 Finished running 1 view model in 0 hours 0 minutes and 8.13 seconds (8.13s).
13:21:27
13:21:27 Completed successfully
13:21:27
13:21:27 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
ERROR STALEなので後続タスクが実行されない
$ dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
13:24:35
13:24:35 1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:24:36 1 of 1 ERROR STALE freshness of work_dbt.test_001 .............................. [ERROR STALE in 1.06s]
13:24:36
13:24:36 Finished running 1 source in 0 hours 0 minutes and 7.41 seconds (7.41s).
13:24:36
13:24:36 Done.
dbt run
前にdbt source freshness
を一括でチェック
2.定期実行の- sourceの鮮度チェック
dbt source freshness
dbt run
後にdbt freshness
を一括でチェック
3.定期実行の- modelの鮮度チェック(データの変換やマートの鮮度確認)
dbt freshness
おわりに
データの鮮度チェックは、データの品質を維持し、信頼できる分析を行う上で非常に重要です。dbt
を活用して、鮮度チェックの設定を適切に行うことで、データが常に最新かつ正確であることを保証できます。
参考
Discussion