🍊

データの鮮度を dbt freshness でチェックする

2024/09/12に公開

https://docs.getdbt.com/reference/resource-properties/freshness

はじめに

先人が、データに対するテストは、初手 dbt freshness すべきと仰っていたので試します

ありがたいお言葉(参照)

  • 以下、御託
    • データの鮮度は、データ分析において非常に重要な要素です。特にリアルタイムデータや、最新の状態で分析を行う必要がある場合、データが新しいかどうかを確認することは不可欠です。dbt では、データソースの鮮度を自動的にチェックできる「freshness」設定があり、これを活用することでデータの信頼性を維持できます。

動作検証

動作環境

  • dbt core
Running with dbt=1.8.3
Registered adapter: bigquery=1.8.0

鮮度テスト内容

  • 対象テーブルのupdated_at最新日付で鮮度テストを行う
    • 実行日が2024/09/11の場合
updated_at 鮮度テスト
今日(2024/09/11) PASS
昨日(2024/09/10) WARNING
2日前(2024/09/09) WARNING
3日前(2024/09/08) WARNING
4日前(2024/09/07) ERROR
5日前(2024/09/06) ERROR
6日前(2024/09/05) ERROR

鮮度テスト対象のデータ作成

サンプルデータ
CREATE OR REPLACE TABLE `project_id.work_dbt.test_001` (
  id INT64,
  name STRING,
  age INT64,
  city STRING,
  updated_at TIMESTAMP
) AS (
  SELECT *
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, '東京' AS city, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
    SELECT 2, '佐藤花子', 25, '大阪', TIMESTAMP('2024-01-01 10:00:00') UNION ALL
    SELECT 3, '鈴木一郎', 35, '名古屋', TIMESTAMP('2024-07-15 14:30:00') UNION ALL
    SELECT 4, '山田優子', 28, '福岡', TIMESTAMP('2024-08-10 08:15:00') UNION ALL
    SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-11 09:00:00')
  )
);

updated_atの日付を修正して、再実行することによりサンプルデータを作り直す。日付は実行日によって変わるので適宜修正お願いします。

-- 実行日:2024/09/11
-- PASS
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-11 09:00:00')
-- WARN
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-08 09:00:00')
-- ERROR STALE 
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-07 09:00:00')

sources.yml 作成

dbt の「freshness」設定では、データがどのくらいの期間新しいとみなすかを設定できます。

  • warn_after: 指定した期間内にデータが更新されていない場合に警告が発生する
  • error_after: 指定した期間内にデータが更新されていない場合にエラーが発生する
  • loaded_at_field: データがいつ最後に更新されたかを示すタイムスタンプフィールドを指定します。これにより、dbtはこのフィールドを基にデータの鮮度をチェックします。(timestampかつUTCである必要があるため必要に応じてCASTしましょう)
sources.yml
version: 2

sources:
  - name: work_dbt       # データソースの名前
    schema: work_dbt     # スキーマ名
    tables:
      - name: test_001      # テーブル名
        freshness:
          warn_after: {count: 1, period: day}
          error_after: {count: 4, period: day}
        loaded_at_field: updated_at
        columns:
          - name: id
            description: "ユニークな識別子"
          - name: name
            description: "個人の名前"
          - name: age
            description: "個人の年齢"
          - name: city
            description: "個人が居住する都市"
          - name: updated_at
            description: "レコードが最後に更新された日時"

補足情報

dbt では、データソース全体に対して鮮度チェックの設定を行うだけでなく、テーブルごとに個別の設定を適用することも可能です。例えば、特定のテーブルに対して厳格なルールを設けたり、特定のテーブルだけ鮮度チェックを無効にすることができます。

models/<filename>.yml

version: 2

sources:
  - name: <source_name>
    freshness:
      warn_after:
        count: <positive_integer>
        period: minute | hour | day
      error_after:
        count: <positive_integer>
        period: minute | hour | day
      filter: <boolean_sql_expression>
    loaded_at_field: <column_name_or_expression>

    tables:
      - name: <table_name>
        freshness:
          warn_after:
            count: <positive_integer>
            period: minute | hour | day
          error_after:
            count: <positive_integer>
            period: minute | hour | day
          filter: <boolean_sql_expression>
        loaded_at_field: <column_name_or_expression>
        ...
sources:
  - name: my_source
    tables:
      - name: orders
        freshness:
          warn_after:
            count: 6
            period: hour
          error_after:
            count: 12
            period: hour
          filter: datediff('day', _etl_loaded_at, current_timestamp) < 2

この例では、orders テーブルに対しては、6時間後に警告、12時間後にエラーを発生させる設定に加え、filter オプションで2日以内のデータに対してのみ鮮度チェックを行うよう制限しています。

鮮度テスト実行

  • test_001を指定して実行(今回はこれ)
    • 事前にサンプルデータupdated_atの日付を修正
dbt source freshness --select source:work_dbt.test_001
  • work_dbtの以下のテーブル全て実行
dbt source freshness --select source:work_dbt
  • dbt freshness を全て実行
dbt source freshness

実行結果

PASS
$ dbt source freshness --select source:work_dbt.test_001
13:08:51  
13:08:51  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:08:52  1 of 1 PASS freshness of work_dbt.test_001 ..................................... [PASS in 1.21s]
13:08:52  
13:08:52  Finished running 1 source in 0 hours 0 minutes and 8.90 seconds (8.90s).
13:08:53  Done.
WARN
$ dbt source freshness --select source:work_dbt.test_001
13:15:55  
13:15:55  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:15:56  1 of 1 WARN freshness of work_dbt.test_001 ..................................... [WARN in 1.14s]
13:15:56  
13:15:56  Finished running 1 source in 0 hours 0 minutes and 8.89 seconds (8.89s).
13:15:56  Done.
ERROR STALE
$ dbt source freshness --select source:work_dbt.test_001
13:17:05  
13:17:05  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:17:07  1 of 1 ERROR STALE freshness of work_dbt.test_001 .............................. [ERROR STALE in 1.43s]
13:17:07  
13:17:07  Finished running 1 source in 0 hours 0 minutes and 8.55 seconds (8.55s).
13:17:07  
13:17:07  Done.

https://www.datafold.com/blog/dbt-source-freshness

活用方法

  • 本番で使ってないので、半分妄想ですが

1.dbt source freshnessのエラーダウンで後続のdbt runを止める

後続のモデル(dbt_project/models/work_dbt/work_dbt__test_002.sql)
select *
from {{ source('work_dbt', 'test_001') }}
実行コマンド
dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
WARNなので後続タスクが実行できる
$ dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
13:21:14  
13:21:14  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:21:15  1 of 1 WARN freshness of work_dbt.test_001 ..................................... [WARN in 1.09s]
13:21:15  
13:21:15  Finished running 1 source in 0 hours 0 minutes and 8.65 seconds (8.65s).
13:21:15  Done.
13:21:25  
13:21:25  1 of 1 START sql view model work_dbt.test_002 .................................. [RUN]
13:21:26  1 of 1 OK created sql view model work_dbt.test_002 ............................. [CREATE VIEW (0 processed) in 0.79s]
13:21:26  
13:21:26  Finished running 1 view model in 0 hours 0 minutes and 8.13 seconds (8.13s).
13:21:27  
13:21:27  Completed successfully
13:21:27  
13:21:27  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
ERROR STALEなので後続タスクが実行されない
$ dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
13:24:35  
13:24:35  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:24:36  1 of 1 ERROR STALE freshness of work_dbt.test_001 .............................. [ERROR STALE in 1.06s]
13:24:36  
13:24:36  Finished running 1 source in 0 hours 0 minutes and 7.41 seconds (7.41s).
13:24:36  
13:24:36  Done.

2.定期実行のdbt run前にdbt source freshnessを一括でチェック

  • sourceの鮮度チェック
dbt source freshness

3.定期実行のdbt run後にdbt freshnessを一括でチェック

  • modelの鮮度チェック(データの変換やマートの鮮度確認)
dbt freshness

おわりに

データの鮮度チェックは、データの品質を維持し、信頼できる分析を行う上で非常に重要です。dbt を活用して、鮮度チェックの設定を適切に行うことで、データが常に最新かつ正確であることを保証できます。

参考

https://zenn.dev/gak_t12/articles/7fd436228263cc
https://docs.getdbt.com/reference/resource-properties/freshness
https://www.datafold.com/blog/dbt-source-freshness

Discussion