Open12

dbtの増分更新

YuichiYuichi

https://github.com/PaddyAlton/DBT-GA4/blob/main/my_project/models/base/analytics/base_analytics__events.sql

{{
  config(
    materialized = 'incremental',
    partition_by = {
      'field': 'event_date', 
      'data_type': 'date', 
      'granularity': 'day'},
    incremental_strategy = 'insert_overwrite',
    unique_key='event_id',
    on_schema_change = 'fail',
    tags=['incremental', 'daily']
  )
}}
version: 2

sources:

  - name: google_analytics
    description: "Google Analytics 4 export"
    database: project-id
    schema: analytics_111111111
    tables:
      - name: events
        identifier: events_*
        loaded_at_field: "parse_timestamp(_table_suffix, '%Y%m%d')"
        freshness:
          warn_after:
            count: 24
            period: hour
          error_after:
            count: 36
            period: hour
YuichiYuichi

https://zenn.dev/raksul_data/articles/dbt_incremental_model_on_bq

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {
      'field': 'created_at',
      'data_type': 'date',
      'granularity': 'day',
      'copy_partitions': true
    }
) }}

select
 *
from
  {{ source('test', 'ソーステーブル') }}
WHERE Ture
{% if is_incremental() %}
  {% if var('start_date') and var('end_date') %}
    AND created_at between '{{ var('start_date') }}' and '{{ var('end_date') }}'
  {% else %}
    AND created_at = CURRENT_DATE('Asia/Tokyo') - 1
  {% endif %}
{% endif %}
dbt run --models your_model_name --vars '{"start_date": "2024-11-01", "end_date": "2024-11-05"}'

過去1週間分の更新

1週間分のデータを更新したい場合、created_at が過去7日間に該当するレコードを抽出するように変更します。

AND created_at BETWEEN DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 7 DAY) AND CURRENT_DATE('Asia/Tokyo') - 1

これにより、created_at が「今日の1日前」から「今日の7日前」までのデータが取得されます。

過去1ヶ月分の更新

1ヶ月分のデータを更新する場合、以下のように過去30日間に該当するレコードを抽出します。

AND created_at BETWEEN DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 30 DAY) AND CURRENT_DATE('Asia/Tokyo') - 1

これにより、created_at が「今日の1日前」から「今日の30日前」までのデータが取得されます。

無いとエラーになる

dbt_project.yml
vars:
      start_date : NULL
      end_date : NULL
YuichiYuichi
CREATE OR REPLACE TABLE `work.customer` (
    name STRING
  , age INT64
  , created_at TIMESTAMP
) AS (
  SELECT *
  FROM UNNEST (
    ARRAY<STRUCT<
        name STRING
      , age INT64
      , created_at TIMESTAMP
    >> [
        ('田中太郎', 45, '2024-11-01 10:00:00')
      , ('佐藤花子', 25, '2024-11-01 14:30:00')
      , ('鈴木一郎', 30, '2024-11-02 09:15:00')
      -- , ('山田優子', null, '2024-11-02 18:20:00')
      -- , ('伊藤健太', 32, '2024-11-03 12:45:00')
      , ('高橋美紀', 27, '2024-11-04 07:25:00')
      , ('渡辺陽一', null, '2024-11-04 16:50:00')
      , ('加藤隆', 22, '2024-11-05 10:05:00')
      -- , ('小林健', 35, '2024-11-06 17:25:00')
      -- , ('佐々木裕', 28, '2024-11-06 15:40:00')
    ]
  )
)
select * from `work.customer` order by created_at
name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC
customer.yml
version: 2

sources:
  - name: work       # データソースの名前
    schema: work    # スキーマ名
    tables:
      - name: customer      # テーブル名
customer_salary.sql
{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {
      'field': 'created_at',
      'data_type': 'timestamp',
      'granularity': 'day',
      'copy_partitions': true
    }
) }}

SELECT *
FROM {{ source('work', 'customer') }}
WHERE TRUE
  {% if is_incremental() %}
    {% if var('target_start_date') and var('target_end_date') %}
    AND DATE(created_at) between '{{ var('target_start_date') }}' and '{{ var('target_end_date') }}'
  {% else %}
      AND DATE(created_at) = CURRENT_DATE('Asia/Tokyo') - 1
    {% endif %}
  {% endif %}

00:29:05 Concurrency: 4 threads (target='dev')
00:29:05
00:29:05 1 of 1 START sql incremental model work.customer_salary ........................ [RUN]
00:29:08 1 of 1 OK created sql incremental model work.customer_salary ................... [CREATE TABLE (8.0 rows, 221.0 Bytes processed) in 2.28s]
00:29:08
00:29:08 Finished running 1 incremental model in 0 hours 0 minutes and 5.06 seconds (5.06s).
00:29:08
00:29:08 Completed successfully

dbt run --select customer_salary

select CURRENT_DATE('Asia/Tokyo') - 1

2024-11-06

select * from `work.customer_salary` order by created_at
name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
山田優子 2024-11-02 18:20:00.000000 UTC
伊藤健太 32 2024-11-03 12:45:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC
CREATE OR REPLACE TABLE `work.customer` (
    name STRING
  , age INT64
  , created_at TIMESTAMP
) AS (
  SELECT *
  FROM UNNEST (
    ARRAY<STRUCT<
        name STRING
      , age INT64
      , created_at TIMESTAMP
    >> [
        ('田中太郎', 45, '2024-11-01 10:00:00')
      , ('佐藤花子', 25, '2024-11-01 14:30:00')
      , ('鈴木一郎', 30, '2024-11-02 09:15:00')
      , ('山田優子', null, '2024-11-02 18:20:00')
      , ('伊藤健太', 32, '2024-11-03 12:45:00')
      , ('高橋美紀', 27, '2024-11-04 07:25:00')
      , ('渡辺陽一', null, '2024-11-04 16:50:00')
      , ('加藤隆', 22, '2024-11-05 10:05:00')
      , ('小林健', 35, '2024-11-06 17:25:00')
      , ('佐々木裕', 28, '2024-11-06 15:40:00')
    ]
  )
);

CURRENT_DATEすると非決定的になるので、backfillできない問題が。一定期間のsourceの履歴はデータレイク、ハウスに残してbackfillできるようにしたいと思うが、どうなんでしょう。

YuichiYuichi

https://zenn.dev/pei0804/articles/data-partitioning-in-dbt
https://qiita.com/Ayumu-y/items/1cb3a3feb8d3f5db0ae5
https://techblog.kayac.com/working-with-dbt-incremental-model

{{ config(
    materialized="incremental",
    unique_key=['ymd'],
    schema="kpi"
) }}

select
    {{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
    ,count(distinct user_id) daily_active_users
from {{ source('log', 'nginx_access_logs') }}
{%- if is_incremental() %}
where
    {%- set target_date = var('target_date','') %}
    {%- if target_date != '' %}
    {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
    {%- else %}
        {%- set get_last_ymd_sql %}
            select max(ymd) from {{ this }}
        {%- endset %}
        {%- set last_ymd = run_query(get_last_ymd_sql).columns[0].values() | first %}
        {%- if last_ymd is not none %}
            {{ dbt.date_trunc('day', '"time"') }} >= {{ dbt.safe_cast("'"~last_ymd~"'", api.Column.translate_type("date")) }}
        {%- else %}
            {{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
        {%- endif %}
    {%- endif %}
{%- endif %}
group by 1
        {%- if is_incremental() %}
        and {%- set target_date = var('target_date', '') %}
            {%- if target_date != '' %}
            {{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', "'"~target_date~"'") }}
            {%- else %}
            {{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', dbt.current_timestamp())}}
            {%- endif %}
        {%- endif %}

論理パーティションごとに、incremental insert

{{
  config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='_partition_hourly',
  )
}}

with source as (

  select * from {{ ref('impressions') }}
  where _partition_hourly = {{ dbt.safe_cast("'"~var('target_partition')~"'", api.Column.translate_type("timestamp")) }} -- var target_partitionは、dbt run時に渡される eg: 12-16T10:00

),

final as (
  select
    -- 様々な処理
  from source
)

select * from final
YuichiYuichi

https://x.com/legoboku/status/1854319834433253423

dbtの実装においてCURRENT_DATEを使用することが非決定的である意味について説明します。

非決定的な動作

CURRENT_DATEを使用すると、dbtモデルの実行結果が実行時によって変化する可能性があります。これは、以下の理由から非決定的と見なされます:

  1. 実行日時依存: CURRENT_DATEは実行時の日付を返すため、同じクエリでも実行日が異なれば結果が変わります。
  2. 再現性の問題: 過去の実行結果を正確に再現することが困難になります。
  3. テストの不安定性: テストを実行するたびに結果が変わる可能性があり、テストの信頼性が低下します。

dbtにおける影響

dbtでは、モデルの結果が予測可能で一貫していることが重要です。CURRENT_DATEの使用は以下の点で問題を引き起こす可能性があります:

  • 増分モデルの不整合: 日付に基づいて増分更新を行うモデルで、予期しない結果が生じる可能性があります。
  • データの整合性: 異なる時点で実行されたモデル間でデータの整合性が取れなくなる可能性があります。

推奨されるアプローチ

非決定的な動作を避けるため、dbtでは以下のアプローチが推奨されます:

  1. 変数の使用: dbtの{{ var() }}マクロを使用して、実行時に日付を指定します。
  2. 環境変数: 環境変数を通じて日付を設定し、クエリ内で参照します。
  3. 固定値: テストやデバッグ時には固定の日付値を使用します。

これらのアプローチにより、再現性が高く、予測可能な結果を得ることができます。

dbtで実行時に日付を指定する方法はいくつかあります。主な方法は以下の通りです:

1. varを使用する方法

--varオプションを使用して、コマンドライン上で日付を指定できます。

dbt run --var 'execution_date: 2023-11-11'

モデル内では以下のように使用します:

SELECT *
FROM my_table
WHERE date = {{ var('execution_date') }}

2. 環境変数を使用する方法

環境変数を設定し、env_var関数を使用してdbtモデル内で参照します。

export DBT_EXECUTION_DATE='2023-11-11'
dbt run

モデル内では:

SELECT *
FROM my_table
WHERE date = '{{ env_var("DBT_EXECUTION_DATE") }}'

3. プロファイルやプロジェクト設定を使用する方法

dbt_project.ymlprofiles.ymlにデフォルト値を設定し、必要に応じてオーバーライドします。dbt_project.yml:

vars:
  execution_date: '2023-11-11'

コマンドラインでオーバーライド:

dbt run --vars '{"execution_date": "2023-11-12"}'

4. カスタムマクロを作成する方法

日付処理用のカスタムマクロを作成し、必要に応じて呼び出します。

{% macro get_execution_date() %}
  {% set exec_date = var('execution_date', modules.datetime.date.today().isoformat()) %}
  {{ return(exec_date) }}
{% endmacro %}

モデル内での使用:

SELECT *
FROM my_table
WHERE date = '{{ get_execution_date() }}'

これらの方法を使用することで、dbtの実行時に柔軟に日付を指定できます。特にvarを使用する方法は、再現性と柔軟性のバランスが取れており、多くのケースで有効です。

YuichiYuichi

https://github.com/patkearns10/dbt_workspace/blob/62b2d6ec8c837dd7a132a2e12d5e1a6a0e6df249/dbt_snowflake/models/timezones.sql

current_date.sql
{% macro current_date_jst() %}
  {% set current_datetime_jst =  modules.datetime.datetime.now() + modules.datetime.timedelta(hours=9)  %}
  {% set current_date_jst = current_datetime_jst.strftime('%Y-%m-%d') %}
  {{ return("'" ~ current_datetime_jst ~ "'") }}
{% endmacro %}

{% macro current_date_utc() %}
  {% set current_datetime_utc = modules.datetime.datetime.now() %}
  {% set current_date_utc = current_datetime_utc.strftime('%Y-%m-%d') %}
  {{ return("'" ~ current_datetime_utc ~ "'") }}
{% endmacro %}
YuichiYuichi
{% set current_datetime_utc = modules.datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") %}
SELECT
  '{{ current_datetime_utc }}' AS now_utc1
  ,PARSE_DATETIME('%Y-%m-%d %H:%M:%S','{{ current_datetime_utc }}') AS now_utc2
  ,DATETIME('{{ current_datetime_utc }}') AS now_utc3
  ,DATETIME('{{ current_datetime_utc }}','Asia/Tokyo') AS now_utc4
  ,DATE('{{ current_datetime_utc }}','Asia/Tokyo') AS now_utc5
SELECT
  '2024-11-12 00:34:44' AS now_utc1
  ,PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2024-11-12 00:34:44') AS now_utc2
  ,DATETIME('2024-11-12 00:34:44') AS now_utc3
  ,DATETIME('2024-11-12 00:34:44','Asia/Tokyo') AS now_utc4
  ,DATE('2024-11-12 00:34:44','Asia/Tokyo') AS now_utc5

YuichiYuichi

BigQueryにおけるdbtのMERGE戦略(増分更新)について

CREATE OR REPLACE TABLE `work.test_001` (
  id INT64,
  name STRING,
  age INT64,
  created_at TIMESTAMP
) AS (
  SELECT *
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS created_at 
    UNION ALL SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') 
    UNION ALL SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') 
    -- UNION ALL SELECT 4, '山田優子', 28, TIMESTAMP('2024-08-10 08:15:00') 
    -- UNION ALL SELECT 5, '伊藤健太', 32, TIMESTAMP('2024-09-11 09:00:00')
  )
);

project_id.work.test_001

test_002.sql

{{
 config(
        materialized='incremental'
        ,incremental_strategy="merge"
    )
}}
-- merge: デフォルトなので記載しなくてもOK
SELECT * FROM `project_id.work.test_001`

1. テーブル作成

> Executing task: dbt build --select test_002
15:17:34  1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:17:36  1 of 1 OK created sql incremental model work.test_002 .......................... [CREATE TABLE (3.0 rows, 114.0 Bytes processed) in 2.02s]
CREATE OR REPLACE TABLE `project_id`.`work`.`test_002`
OPTIONS (
  description = """"""
)
AS (
  SELECT *
  FROM `project_id.work.test_001`
);

project_id.work.test_002

1-1. テーブル作成→

> Executing task: dbt build --select test_002
15:18:48  1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:18:53  1 of 1 OK created sql incremental model work.test_002 .......................... [MERGE (3.0 rows, 114.0 Bytes processed) in 4.56s]
CREATE OR REPLACE TABLE `project_id`.`work`.`test_002__dbt_tmp`
OPTIONS (
  description = """"""
  ,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
  SELECT *
  FROM `project_id.work.test_001`
);

MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
  SELECT *
  FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
  ON (FALSE)

WHEN NOT MATCHED THEN INSERT
  (`id`,`name`,`age`,`created_at`)
VALUES
  (`id`,`name`,`age`,`created_at`)

project_id.work.test_002

YuichiYuichi

MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
  SELECT *
  FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
  ON (
    DBT_INTERNAL_SOURCE.ID = DBT_INTERNAL_DEST.ID
  )


WHEN MATCHED THEN
  UPDATE SET
    `id` = DBT_INTERNAL_SOURCE.`id`,`name` = DBT_INTERNAL_SOURCE.`name`,`age` 
        = DBT_INTERNAL_SOURCE.`age`,`created_at` = DBT_INTERNAL_SOURCE.`created_at`


WHEN NOT MATCHED THEN INSERT
  (`id`,`name`,`age`,`created_at`)
VALUES
  (`id`,`name`,`age`,`created_at`)

このSQLクエリは、BigQueryのMERGE文を使用しています。MERGE文は、条件に基づいてテーブルのデータを更新、挿入、または削除するための強力な操作です。このクエリの内容を詳しく説明します:

  1. 対象テーブル:
    project_id.work.test_002というテーブルが更新の対象です。
  2. ソーステーブル:
    project_id.work.test_002__dbt_tmpというテーブルからデータを取得しています。
  3. マッチング条件:
    両テーブルのIDカラムが一致する行を探します。
  4. マッチした場合の処理:
    一致する行が見つかった場合、id, name, age, created_atの各カラムを更新します。
  5. マッチしなかった場合の処理:
    一致する行が見つからなかった場合、新しい行を挿入します。挿入されるカラムはid, name, age, created_atです。

このクエリは、おそらくdbt(data build tool)によって生成されたものです。dbtは、データ変換のワークフローを管理するためのツールで、このようなMERGE文を自動生成することができます。このMERGE操作の目的は、既存のデータを更新しつつ、新しいデータを挿入することです。これにより、テーブルのデータを効率的に最新の状態に保つことができます。