Open5

【dbt】dbt freshness

YuichiYuichi

version: 2

sources:
  - name: <source_name>
    freshness:
      warn_after:
        [count](#count): <positive_integer>
        [period](#period): minute | hour | day
      error_after:
        [count](#count): <positive_integer>
        [period](#period): minute | hour | day
      [filter](#filter): <boolean_sql_expression>
    [loaded_at_field](#loaded_at_field): <column_name_or_expression>

    tables:
      - name: <table_name>
        freshness:
          warn_after:
            [count](#count): <positive_integer>
            [period](#period): minute | hour | day
          error_after:
            [count](#count): <positive_integer>
            [period](#period): minute | hour | day
          [filter](#filter): <boolean_sql_expression>
        [loaded_at_field](#loaded_at_field): <column_name_or_expression>
        ...
YuichiYuichi
CREATE OR REPLACE TABLE `project_id.work_dbt.test_001` (
  id INT64,
  name STRING,
  age INT64,
  city STRING,
  updated_at TIMESTAMP
) AS (
  SELECT *
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, '東京' AS city, TIMESTAMP('2024-01-01 00:00:00') AS updated_at UNION ALL
    SELECT 2, '佐藤花子', 25, '大阪', TIMESTAMP('2024-01-01 10:00:00') UNION ALL
    SELECT 3, '鈴木一郎', 35, '名古屋', TIMESTAMP('2024-07-15 14:30:00') UNION ALL
    SELECT 4, '山田優子', 28, '福岡', TIMESTAMP('2024-08-10 08:15:00') UNION ALL
    SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-11 09:00:00')
  )
);
-- 実行日:2024/09/11
-- PASS
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-11 09:00:00')
-- WARN
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-08 09:00:00')
-- ERROR STALE 
SELECT 5, '伊藤健太', 32, '札幌', TIMESTAMP('2024-09-07 09:00:00')
sources.yml
version: 2

sources:
  - name: work_dbt       # データソースの名前
    schema: work_dbt     # スキーマ名
    tables:
      - name: test_001      # テーブル名
        freshness:
          warn_after: {count: 1, period: day}
          error_after: {count: 4, period: day}
        loaded_at_field: updated_at
        columns:
          - name: id
            description: "ユニークな識別子"
          - name: name
            description: "個人の名前"
          - name: age
            description: "個人の年齢"
          - name: city
            description: "個人が居住する都市"
          - name: updated_at
            description: "レコードが最後に更新された日時"

$ dbt source freshness --select source:work_dbt.test_001
13:08:51  
13:08:51  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:08:52  1 of 1 PASS freshness of work_dbt.test_001 ..................................... [PASS in 1.21s]
13:08:52  
13:08:52  Finished running 1 source in 0 hours 0 minutes and 8.90 seconds (8.90s).
13:08:53  Done.
$ dbt source freshness --select source:work_dbt.test_001
13:15:55  
13:15:55  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:15:56  1 of 1 WARN freshness of work_dbt.test_001 ..................................... [WARN in 1.14s]
13:15:56  
13:15:56  Finished running 1 source in 0 hours 0 minutes and 8.89 seconds (8.89s).
13:15:56  Done.
$ dbt source freshness --select source:work_dbt.test_001
13:17:05  
13:17:05  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:17:07  1 of 1 ERROR STALE freshness of work_dbt.test_001 .............................. [ERROR STALE in 1.43s]
13:17:07  
13:17:07  Finished running 1 source in 0 hours 0 minutes and 8.55 seconds (8.55s).
13:17:07  
13:17:07  Done.
YuichiYuichi
  • 最初に注意すべきことは、dbt buildコマンドにはソースの鮮度チェックが含まれていないことです。これはデータ モデルとそれに対応するテストを実行するための一般的なコマンドですが、ソースに対して鮮度テストは実行されません。
  • dbt source freshness コマンドを使用する代わりにこの方法を実行すると、鮮度テストが失敗してもジョブは実行されます。 ジョブ自体の中で dbt source freshness コマンドを使用すると、テストが失敗すると、順次モデルの実行がトリガーされなくなります。
  • フレッシュネス テストを実行すると、その結果はターゲット ディレクトリのsources.jsonというファイルに書き込まれます。このディレクトリには、dbt がすべてのコンパイル済みモデルを保存します。これにより、フレッシュネス テストの履歴の概要と、どのテストが最も頻繁に失敗するかを把握できます。

https://www.datafold.com/blog/dbt-source-freshness

YuichiYuichi

ジョブ自体の中で dbt source freshness コマンドを使用すると、テストが失敗すると、順次モデルの実行がトリガーされなくなります。

の確認

work_dbt__test_002.sql
select *
from {{ source('work_dbt', 'test_001') }}
$ dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
13:21:14  
13:21:14  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:21:15  1 of 1 WARN freshness of work_dbt.test_001 ..................................... [WARN in 1.09s]
13:21:15  
13:21:15  Finished running 1 source in 0 hours 0 minutes and 8.65 seconds (8.65s).
13:21:15  Done.
13:21:25  
13:21:25  1 of 1 START sql view model work_dbt.test_002 .................................. [RUN]
13:21:26  1 of 1 OK created sql view model work_dbt.test_002 ............................. [CREATE VIEW (0 processed) in 0.79s]
13:21:26  
13:21:26  Finished running 1 view model in 0 hours 0 minutes and 8.13 seconds (8.13s).
13:21:27  
13:21:27  Completed successfully
13:21:27  
13:21:27  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
$ dbt source freshness --select source:work_dbt.test_001 && dbt run --select work_dbt__test_002
13:24:35  
13:24:35  1 of 1 START freshness of work_dbt.test_001 .................................... [RUN]
13:24:36  1 of 1 ERROR STALE freshness of work_dbt.test_001 .............................. [ERROR STALE in 1.06s]
13:24:36  
13:24:36  Finished running 1 source in 0 hours 0 minutes and 7.41 seconds (7.41s).
13:24:36  
13:24:36  Done.