dbtの増分更新
{{
config(
materialized = 'incremental',
partition_by = {
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'},
incremental_strategy = 'insert_overwrite',
unique_key='event_id',
on_schema_change = 'fail',
tags=['incremental', 'daily']
)
}}
version: 2
sources:
- name: google_analytics
description: "Google Analytics 4 export"
database: project-id
schema: analytics_111111111
tables:
- name: events
identifier: events_*
loaded_at_field: "parse_timestamp(_table_suffix, '%Y%m%d')"
freshness:
warn_after:
count: 24
period: hour
error_after:
count: 36
period: hour
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = {
'field': 'created_at',
'data_type': 'date',
'granularity': 'day',
'copy_partitions': true
}
) }}
select
*
from
{{ source('test', 'ソーステーブル') }}
WHERE Ture
{% if is_incremental() %}
{% if var('start_date') and var('end_date') %}
AND created_at between '{{ var('start_date') }}' and '{{ var('end_date') }}'
{% else %}
AND created_at = CURRENT_DATE('Asia/Tokyo') - 1
{% endif %}
{% endif %}
dbt run --models your_model_name --vars '{"start_date": "2024-11-01", "end_date": "2024-11-05"}'
過去1週間分の更新
1週間分のデータを更新したい場合、created_at
が過去7日間に該当するレコードを抽出するように変更します。
AND created_at BETWEEN DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 7 DAY) AND CURRENT_DATE('Asia/Tokyo') - 1
これにより、created_at
が「今日の1日前」から「今日の7日前」までのデータが取得されます。
過去1ヶ月分の更新
1ヶ月分のデータを更新する場合、以下のように過去30日間に該当するレコードを抽出します。
AND created_at BETWEEN DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 30 DAY) AND CURRENT_DATE('Asia/Tokyo') - 1
これにより、created_at
が「今日の1日前」から「今日の30日前」までのデータが取得されます。
無いとエラーになる
vars:
start_date : NULL
end_date : NULL
CREATE OR REPLACE TABLE `work.customer` (
name STRING
, age INT64
, created_at TIMESTAMP
) AS (
SELECT *
FROM UNNEST (
ARRAY<STRUCT<
name STRING
, age INT64
, created_at TIMESTAMP
>> [
('田中太郎', 45, '2024-11-01 10:00:00')
, ('佐藤花子', 25, '2024-11-01 14:30:00')
, ('鈴木一郎', 30, '2024-11-02 09:15:00')
-- , ('山田優子', null, '2024-11-02 18:20:00')
-- , ('伊藤健太', 32, '2024-11-03 12:45:00')
, ('高橋美紀', 27, '2024-11-04 07:25:00')
, ('渡辺陽一', null, '2024-11-04 16:50:00')
, ('加藤隆', 22, '2024-11-05 10:05:00')
-- , ('小林健', 35, '2024-11-06 17:25:00')
-- , ('佐々木裕', 28, '2024-11-06 15:40:00')
]
)
)
select * from `work.customer` order by created_at
name | age | created_at |
---|---|---|
田中太郎 | 45 | 2024-11-01 10:00:00.000000 UTC |
佐藤花子 | 25 | 2024-11-01 14:30:00.000000 UTC |
鈴木一郎 | 30 | 2024-11-02 09:15:00.000000 UTC |
高橋美紀 | 27 | 2024-11-04 07:25:00.000000 UTC |
渡辺陽一 | 2024-11-04 16:50:00.000000 UTC | |
加藤隆 | 22 | 2024-11-05 10:05:00.000000 UTC |
version: 2
sources:
- name: work # データソースの名前
schema: work # スキーマ名
tables:
- name: customer # テーブル名
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = {
'field': 'created_at',
'data_type': 'timestamp',
'granularity': 'day',
'copy_partitions': true
}
) }}
SELECT *
FROM {{ source('work', 'customer') }}
WHERE TRUE
{% if is_incremental() %}
{% if var('target_start_date') and var('target_end_date') %}
AND DATE(created_at) between '{{ var('target_start_date') }}' and '{{ var('target_end_date') }}'
{% else %}
AND DATE(created_at) = CURRENT_DATE('Asia/Tokyo') - 1
{% endif %}
{% endif %}
00:29:05 Concurrency: 4 threads (target='dev')
00:29:05
00:29:05 1 of 1 START sql incremental model work.customer_salary ........................ [RUN]
00:29:08 1 of 1 OK created sql incremental model work.customer_salary ................... [CREATE TABLE (8.0 rows, 221.0 Bytes processed) in 2.28s]
00:29:08
00:29:08 Finished running 1 incremental model in 0 hours 0 minutes and 5.06 seconds (5.06s).
00:29:08
00:29:08 Completed successfully
dbt run --select customer_salary
select CURRENT_DATE('Asia/Tokyo') - 1
2024-11-06
select * from `work.customer_salary` order by created_at
name | age | created_at |
---|---|---|
田中太郎 | 45 | 2024-11-01 10:00:00.000000 UTC |
佐藤花子 | 25 | 2024-11-01 14:30:00.000000 UTC |
鈴木一郎 | 30 | 2024-11-02 09:15:00.000000 UTC |
山田優子 | 2024-11-02 18:20:00.000000 UTC | |
伊藤健太 | 32 | 2024-11-03 12:45:00.000000 UTC |
高橋美紀 | 27 | 2024-11-04 07:25:00.000000 UTC |
渡辺陽一 | 2024-11-04 16:50:00.000000 UTC | |
加藤隆 | 22 | 2024-11-05 10:05:00.000000 UTC |
CREATE OR REPLACE TABLE `work.customer` (
name STRING
, age INT64
, created_at TIMESTAMP
) AS (
SELECT *
FROM UNNEST (
ARRAY<STRUCT<
name STRING
, age INT64
, created_at TIMESTAMP
>> [
('田中太郎', 45, '2024-11-01 10:00:00')
, ('佐藤花子', 25, '2024-11-01 14:30:00')
, ('鈴木一郎', 30, '2024-11-02 09:15:00')
, ('山田優子', null, '2024-11-02 18:20:00')
, ('伊藤健太', 32, '2024-11-03 12:45:00')
, ('高橋美紀', 27, '2024-11-04 07:25:00')
, ('渡辺陽一', null, '2024-11-04 16:50:00')
, ('加藤隆', 22, '2024-11-05 10:05:00')
, ('小林健', 35, '2024-11-06 17:25:00')
, ('佐々木裕', 28, '2024-11-06 15:40:00')
]
)
);
CURRENT_DATEすると非決定的になるので、backfillできない問題が。一定期間のsourceの履歴はデータレイク、ハウスに残してbackfillできるようにしたいと思うが、どうなんでしょう。
{{ config(
materialized="incremental",
unique_key=['ymd'],
schema="kpi"
) }}
select
{{ dbt.safe_cast(dbt.date_trunc('day', '"time"'), api.Column.translate_type("date")) }} as ymd
,count(distinct user_id) daily_active_users
from {{ source('log', 'nginx_access_logs') }}
{%- if is_incremental() %}
where
{%- set target_date = var('target_date','') %}
{%- if target_date != '' %}
{{ dbt.date_trunc('day', '"time"') }} = {{ dbt.safe_cast("'"~target_date~"'", api.Column.translate_type("date")) }}
{%- else %}
{%- set get_last_ymd_sql %}
select max(ymd) from {{ this }}
{%- endset %}
{%- set last_ymd = run_query(get_last_ymd_sql).columns[0].values() | first %}
{%- if last_ymd is not none %}
{{ dbt.date_trunc('day', '"time"') }} >= {{ dbt.safe_cast("'"~last_ymd~"'", api.Column.translate_type("date")) }}
{%- else %}
{{ dbt.date_trunc('day', '"time"') }} = {{ dbt.date_trunc('day', dbt.current_timestamp()) }}
{%- endif %}
{%- endif %}
{%- endif %}
group by 1
{%- if is_incremental() %}
and {%- set target_date = var('target_date', '') %}
{%- if target_date != '' %}
{{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', "'"~target_date~"'") }}
{%- else %}
{{ dbt.date_trunc('day', 'date') }} = {{ dbt.date_trunc('day', dbt.current_timestamp())}}
{%- endif %}
{%- endif %}
論理パーティションごとに、incremental insert
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='_partition_hourly',
)
}}
with source as (
select * from {{ ref('impressions') }}
where _partition_hourly = {{ dbt.safe_cast("'"~var('target_partition')~"'", api.Column.translate_type("timestamp")) }} -- var target_partitionは、dbt run時に渡される eg: 12-16T10:00
),
final as (
select
-- 様々な処理
from source
)
select * from final
dbtの実装においてCURRENT_DATEを使用することが非決定的である意味について説明します。
非決定的な動作
CURRENT_DATEを使用すると、dbtモデルの実行結果が実行時によって変化する可能性があります。これは、以下の理由から非決定的と見なされます:
- 実行日時依存: CURRENT_DATEは実行時の日付を返すため、同じクエリでも実行日が異なれば結果が変わります。
- 再現性の問題: 過去の実行結果を正確に再現することが困難になります。
- テストの不安定性: テストを実行するたびに結果が変わる可能性があり、テストの信頼性が低下します。
dbtにおける影響
dbtでは、モデルの結果が予測可能で一貫していることが重要です。CURRENT_DATEの使用は以下の点で問題を引き起こす可能性があります:
- 増分モデルの不整合: 日付に基づいて増分更新を行うモデルで、予期しない結果が生じる可能性があります。
- データの整合性: 異なる時点で実行されたモデル間でデータの整合性が取れなくなる可能性があります。
推奨されるアプローチ
非決定的な動作を避けるため、dbtでは以下のアプローチが推奨されます:
-
変数の使用: dbtの
{{ var() }}
マクロを使用して、実行時に日付を指定します。 - 環境変数: 環境変数を通じて日付を設定し、クエリ内で参照します。
- 固定値: テストやデバッグ時には固定の日付値を使用します。
これらのアプローチにより、再現性が高く、予測可能な結果を得ることができます。
dbtで実行時に日付を指定する方法はいくつかあります。主な方法は以下の通りです:
1. varを使用する方法
--var
オプションを使用して、コマンドライン上で日付を指定できます。
dbt run --var 'execution_date: 2023-11-11'
モデル内では以下のように使用します:
SELECT *
FROM my_table
WHERE date = {{ var('execution_date') }}
2. 環境変数を使用する方法
環境変数を設定し、env_var
関数を使用してdbtモデル内で参照します。
export DBT_EXECUTION_DATE='2023-11-11'
dbt run
モデル内では:
SELECT *
FROM my_table
WHERE date = '{{ env_var("DBT_EXECUTION_DATE") }}'
3. プロファイルやプロジェクト設定を使用する方法
dbt_project.yml
やprofiles.yml
にデフォルト値を設定し、必要に応じてオーバーライドします。dbt_project.yml
:
vars:
execution_date: '2023-11-11'
コマンドラインでオーバーライド:
dbt run --vars '{"execution_date": "2023-11-12"}'
4. カスタムマクロを作成する方法
日付処理用のカスタムマクロを作成し、必要に応じて呼び出します。
{% macro get_execution_date() %}
{% set exec_date = var('execution_date', modules.datetime.date.today().isoformat()) %}
{{ return(exec_date) }}
{% endmacro %}
モデル内での使用:
SELECT *
FROM my_table
WHERE date = '{{ get_execution_date() }}'
これらの方法を使用することで、dbtの実行時に柔軟に日付を指定できます。特にvar
を使用する方法は、再現性と柔軟性のバランスが取れており、多くのケースで有効です。
{% macro current_date_jst() %}
{% set current_datetime_jst = modules.datetime.datetime.now() + modules.datetime.timedelta(hours=9) %}
{% set current_date_jst = current_datetime_jst.strftime('%Y-%m-%d') %}
{{ return("'" ~ current_datetime_jst ~ "'") }}
{% endmacro %}
{% macro current_date_utc() %}
{% set current_datetime_utc = modules.datetime.datetime.now() %}
{% set current_date_utc = current_datetime_utc.strftime('%Y-%m-%d') %}
{{ return("'" ~ current_datetime_utc ~ "'") }}
{% endmacro %}
{% set current_datetime_utc = modules.datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") %}
SELECT
'{{ current_datetime_utc }}' AS now_utc1
,PARSE_DATETIME('%Y-%m-%d %H:%M:%S','{{ current_datetime_utc }}') AS now_utc2
,DATETIME('{{ current_datetime_utc }}') AS now_utc3
,DATETIME('{{ current_datetime_utc }}','Asia/Tokyo') AS now_utc4
,DATE('{{ current_datetime_utc }}','Asia/Tokyo') AS now_utc5
SELECT
'2024-11-12 00:34:44' AS now_utc1
,PARSE_DATETIME('%Y-%m-%d %H:%M:%S','2024-11-12 00:34:44') AS now_utc2
,DATETIME('2024-11-12 00:34:44') AS now_utc3
,DATETIME('2024-11-12 00:34:44','Asia/Tokyo') AS now_utc4
,DATE('2024-11-12 00:34:44','Asia/Tokyo') AS now_utc5
BigQueryにおけるdbtのMERGE戦略(増分更新)について
CREATE OR REPLACE TABLE `work.test_001` (
id INT64,
name STRING,
age INT64,
created_at TIMESTAMP
) AS (
SELECT *
FROM (
SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS created_at
UNION ALL SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00')
UNION ALL SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00')
-- UNION ALL SELECT 4, '山田優子', 28, TIMESTAMP('2024-08-10 08:15:00')
-- UNION ALL SELECT 5, '伊藤健太', 32, TIMESTAMP('2024-09-11 09:00:00')
)
);
project_id.work.test_001
test_002.sql
{{
config(
materialized='incremental'
,incremental_strategy="merge"
)
}}
-- merge: デフォルトなので記載しなくてもOK
SELECT * FROM `project_id.work.test_001`
1. テーブル作成
> Executing task: dbt build --select test_002
15:17:34 1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:17:36 1 of 1 OK created sql incremental model work.test_002 .......................... [CREATE TABLE (3.0 rows, 114.0 Bytes processed) in 2.02s]
CREATE OR REPLACE TABLE `project_id`.`work`.`test_002`
OPTIONS (
description = """"""
)
AS (
SELECT *
FROM `project_id.work.test_001`
);
project_id.work.test_002
1-1. テーブル作成→
> Executing task: dbt build --select test_002
15:18:48 1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:18:53 1 of 1 OK created sql incremental model work.test_002 .......................... [MERGE (3.0 rows, 114.0 Bytes processed) in 4.56s]
CREATE OR REPLACE TABLE `project_id`.`work`.`test_002__dbt_tmp`
OPTIONS (
description = """"""
,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
SELECT *
FROM `project_id.work.test_001`
);
MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
SELECT *
FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
ON (FALSE)
WHEN NOT MATCHED THEN INSERT
(`id`,`name`,`age`,`created_at`)
VALUES
(`id`,`name`,`age`,`created_at`)
project_id.work.test_002
MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
SELECT *
FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
ON (
DBT_INTERNAL_SOURCE.ID = DBT_INTERNAL_DEST.ID
)
WHEN MATCHED THEN
UPDATE SET
`id` = DBT_INTERNAL_SOURCE.`id`,`name` = DBT_INTERNAL_SOURCE.`name`,`age`
= DBT_INTERNAL_SOURCE.`age`,`created_at` = DBT_INTERNAL_SOURCE.`created_at`
WHEN NOT MATCHED THEN INSERT
(`id`,`name`,`age`,`created_at`)
VALUES
(`id`,`name`,`age`,`created_at`)
このSQLクエリは、BigQueryのMERGE文を使用しています。MERGE文は、条件に基づいてテーブルのデータを更新、挿入、または削除するための強力な操作です。このクエリの内容を詳しく説明します:
- 対象テーブル:
project_id
.work
.test_002
というテーブルが更新の対象です。 - ソーステーブル:
project_id
.work
.test_002__dbt_tmp
というテーブルからデータを取得しています。 - マッチング条件:
両テーブルのID
カラムが一致する行を探します。 - マッチした場合の処理:
一致する行が見つかった場合、id
,name
,age
,created_at
の各カラムを更新します。 - マッチしなかった場合の処理:
一致する行が見つからなかった場合、新しい行を挿入します。挿入されるカラムはid
,name
,age
,created_at
です。
このクエリは、おそらくdbt(data build tool)によって生成されたものです。dbtは、データ変換のワークフローを管理するためのツールで、このようなMERGE文を自動生成することができます。このMERGE操作の目的は、既存のデータを更新しつつ、新しいデータを挿入することです。これにより、テーブルのデータを効率的に最新の状態に保つことができます。