🍊

dbt の incremental model を用いた BigQuery の差分更新

に公開

はじめに

dbtのインクリメンタルモデルは、日々追加・更新されるデータを効率よく取り込むための基本的な仕組みです。
しかし実際の運用では、

  • 過去データが後から更新される
  • 障害時に複数日分を再計算したい
  • スキャン量をできるだけ減らしたい

といった要件に応じて、単純な「最新レコードだけ追加」では不十分なケースが少なくありません。
本記事では、実装パターンを整理します。
自社のデータ基盤に最適な戦略を検討する際のヒントになれば幸いです。

insert_overwrite戦略のCopying partitionsを利用

  • insert_overwrite戦略
    • 宛先テーブルの 指定したパーティションのみを置き換える
    • 宛先テーブルにパーティション設定が必要
    • テーブル全体のフルスキャンが不要なため、処理時間とコストの両面で有利
  • Copying partitions
    • 利用するには、partition_by の設定で copy_partitions: true を指定する必要がある
    • 公式ドキュメントによると Copy Table API を利用しており、データの再挿入にコストがかからないため、大規模データセットでも時間・コストを大幅に節約可能

    ⚠️ 注意:
    パーティション設定の粒度と、差分取得の粒度が一致している必要があります。
    例:日単位フィルターを月単位パーティションに対して指定すると期待通り動作しません。

https://zenn.dev/raksul_data/articles/dbt_incremental_model_on_bq

差分更新マクロ

再利用しやすいよう、差分抽出条件をマクロ化します。

1. マクロの定義

{% macro filter_incremental_max_days(update_date_column="updated_at") %}
  • マクロ名は filter_incremental_max_days
  • デフォルト引数として update_date_column="updated_at" を受け取る
    → 更新日時を持つカラム名を柔軟に切り替え可能

2. 開始日・終了日を指定する場合

過去分のデータ障害が発生した場合に、期間指定で洗い替えできると便利です。

{% if var("target_start_date") and var("target_end_date") %}
    and date({{ update_date_column }})
    between '{{ var("target_start_date") }}' and '{{ var("target_end_date") }}'
  • dbt run の際に --vars 'target_start_date: 2025-09-01, target_end_date: 2025-09-03' のように指定したときに適用
  • 日付範囲で抽出したいときに便利(リカバリ処理や過去データ再処理用)

3. それ以外の場合(通常のインクリメンタル処理)

事例を参考に、下記の設定を採用する。

    and date({{ update_date_column }})
    > (select max(date({{ update_date_column }})) from {{ this }})

insert_overwrite 戦略で特定パーティションのみを更新する場合、過去データの再取得や障害復旧などの要件に応じて「どこまでの期間を再スキャンするか」を決める必要があります。ここでは、2つのアプローチを紹介します。

1️⃣ 複数日を固定的に再スキャンする方式(タイミー事例)

https://tech.timee.co.jp/entry/2023/12/08/121616

  • 概要: 直近 N 日分を常に再取得して差分を反映。
  • 背景: 過去データが後から修正される可能性があるテーブルや、
    障害復旧時に前日以前を補完したいケースで有効。
  • ポイント
    • バッチ遅延や一時的なダウン時も、最新〜日分が確実に再処理される。
    • 過去の値が変わりうるテーブルなど、データの性質を考慮して設定するパーティションを決定する。おそらくデータ基盤エラーダウン時に備えて複数日設定して、翌営業日の復旧のSLOとかに設定しているんじゃないかと想像
dbt_project.yml
last_2_days : [
      current_date('Asia/Tokyo')
      , date_sub(current_date('Asia/Tokyo'), interval 1 day)
  ]
{{config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partitions=var('last_2_days'),

2️⃣ 既存テーブルの最大日付を基準に差分抽出(GitLab事例)

https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt/models

  • 概要: 宛先テーブル ({{ this }}) の最大更新日時より新しい行だけを抽出。
  • ポイント:
    • 宛先テーブルの最新値を参照するため、遅延レコードや欠損の補完には向かない。
    • 実装がシンプル

実装を確認すると>=,>だったりまちまちで一貫性はなさそう?

  • WHERE updated_at > (SELECT MAX(updated_at) FROM {{this}})
  • WHERE updated_at >= (SELECT MAX(updated_at) FROM {{this}})
{{ config({
    "materialized": "incremental",
    "unique_key": "dim_note_id",
    "on_schema_change": "sync_all_columns"
    })
}}
〜
    {% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{this}})
    {% endif %}
〜

アイデア: よりスキャン量を抑える工夫として、過去 7 日分に限定する例

    {% if is_incremental() %}
    WHERE updated_at > (
        SELECT MAX(updated_at) FROM {{this}}
        WHERE DATE(updated_at) >= DATE_SUB(
            CURRENT_DATE('Asia/Tokyo'), INTERVAL 7 DAY))
    {% endif %}

4. 常に追加される条件

and date({{ update_date_column }}) < current_date('Asia/Tokyo')
  • 当日データは対象外(集計途中・不安定なデータを除外する目的)
  • タイムゾーンは 'Asia/Tokyo' に固定されている

5. マクロ全体

/macros/filter_incremental/filter_incremental_max_days.sql
{% macro filter_incremental_max_days(update_date_column="updated_at") %}
    true  -- filter_incremental_max_days
    {% if is_incremental() %}
        {% if var("target_start_date") and var("target_end_date") %}
            and date({{ update_date_column }})
            between '{{ var("target_start_date") }}' and '{{ var("target_end_date") }}'
        {% else %}
            and date({{ update_date_column }})
            > (select max(date({{ update_date_column }})) from {{ this }})
        {% endif %}
    {% endif %}
    and date({{ update_date_column }}) < current_date('Asia/Tokyo')
{% endmacro %}

動作確認ログ

検証用テーブル作成

CREATE OR REPLACE TABLE work.user_prefecture_history
PARTITION BY DATE(created_at) AS
SELECT *
FROM UNNEST([
  STRUCT(1 AS id, '佐藤太郎' AS user_name, '東京都' AS current_prefecture, DATETIME '2025-08-11 09:00:00' AS created_at),
  STRUCT(2, '鈴木花子', '大阪府', DATETIME '2025-08-11 11:30:00'),
  STRUCT(3, '田中一郎', '北海道', DATETIME '2025-08-12 14:15:00'),
  STRUCT(4, '佐藤太郎', '石川県', DATETIME '2025-08-12 08:45:00')
]);

データ確認

select * from work.user_prefecture_history order by 1

モデル定義

user_prefecture_history_incremental.sql
{{
    config(
        materialized="incremental",
        incremental_strategy="insert_overwrite",
        partition_by={
            "field": "created_at",
            "data_type": "datetime",
            "granularity": "day",
            "copy_partitions": true,
        },
    )
}}

select *
from work.user_prefecture_history
where {{ filter_incremental_max_days("created_at") }}

実行例

初回実行

dbt build --select user_prefecture_history_incremental
  • 既存データを日単位パーティションごとに丸ごと作成
  • and date(created_at) < current_date('Asia/Tokyo') が常に適用

実行される処理

create or replace table ``.`work`.`user_prefecture_history_incremental`
partition by datetime_trunc(created_at, day)
OPTIONS(
  description=""""""
)
as (
  select *
  from work.user_prefecture_history
  where 
      true  -- filter_incremental_max_days 
      and date(created_at) < current_date('Asia/Tokyo')
);

データ確認

select * from work.user_prefecture_history_incremental order by 1

差分レコード追加

id = 5 のレコードが追加された想定

CREATE OR REPLACE TABLE work.user_prefecture_history
PARTITION BY DATE(created_at) AS
SELECT *
FROM UNNEST([
  STRUCT(1 AS id, '佐藤太郎' AS user_name, '東京都' AS current_prefecture, DATETIME '2025-08-11 09:00:00' AS created_at),
  STRUCT(2, '鈴木花子', '大阪府', DATETIME '2025-08-11 11:30:00'),
  STRUCT(3, '田中一郎', '北海道', DATETIME '2025-08-12 14:15:00'),
  STRUCT(4, '佐藤太郎', '石川県', DATETIME '2025-08-12 08:45:00'),
  -- 追加
  STRUCT(5, '鈴木花子', '京都府', DATETIME '2025-08-13 00:20:00')
]);

データ確認

select * from work.user_prefecture_history order by 1


id = 5 のレコードを追加後に再実行すると…

dbt build --select user_prefecture_history_incremental
  • 一時テーブルを作成
  • 差分パーティション(この場合 2025-08-13)だけ WRITE_TRUNCATE でコピー
  • 既存パーティションはノータッチ

実行される処理

  1. 一時テーブル作成
create or replace table ``.`work`.`user_prefecture_history_incremental__dbt_tmp`
partition by datetime_trunc(created_at, day)
OPTIONS(
  description="""""",
  expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
  select *
  from work.user_prefecture_history
  where 
    true  -- filter_incremental_max_days
            and date(created_at)
            > (select max(date(created_at)) from ``.`work`.`user_prefecture_history_incremental`)
    and date(created_at) < current_date('Asia/Tokyo')
);
  1. 差分の日単位確認
select distinct datetime_trunc(created_at, day)
from ``.`work`.`user_prefecture_history_incremental__dbt_tmp`
  1. パーティション単位で上書き
Copying table(s) 
"/projects/〜/datasets/work/tables/user_prefecture_history_incremental__dbt_tmp$20250813" 
to "/projects/〜/datasets/work/tables/user_prefecture_history_incremental$20250813" 
with disposition: "WRITE_TRUNCATE"
  1. 一時テーブル削除
-- Clean up the temp table
drop table if exists ``.`work`.`user_prefecture_history_incremental__dbt_tmp`

更新なしで再実行

  • 一時テーブルは作成されるが、差分がないためコピー処理は実行されない
dbt build --select user_prefecture_history_incremental
  1. 一時テーブル作成
create or replace table ``.`work`.`user_prefecture_history_incremental__dbt_tmp`
partition by datetime_trunc(created_at, day)
OPTIONS(
  description="""""",
  expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
  select *
  from work.user_prefecture_history
  where 
    true  -- filter_incremental_max_days
            and date(created_at)
            > (select max(date(created_at)) from ``.`work`.`user_prefecture_history_incremental`)
    and date(created_at) < current_date('Asia/Tokyo')
);
  1. 差分の日単位確認
select distinct datetime_trunc(created_at, day)
from ``.`work`.`user_prefecture_history_incremental__dbt_tmp`
  1. パーティション単位で上書き
    実行されない
  2. 一時テーブル削除
-- Clean up the temp table
drop table if exists ``.`work`.`user_prefecture_history_incremental__dbt_tmp`

過去日にレコード追加

target_start_date / target_end_date を指定すると、任意期間のパーティションを強制的に再コピーできる。

CREATE OR REPLACE TABLE work.user_prefecture_history
PARTITION BY DATE(created_at) AS
SELECT *
FROM UNNEST([
  STRUCT(1 AS id, '佐藤太郎' AS user_name, '東京都' AS current_prefecture, DATETIME '2025-08-11 09:00:00' AS created_at),
  STRUCT(2, '鈴木花子', '大阪府', DATETIME '2025-08-11 11:30:00'),
  STRUCT(3, '田中一郎', '北海道', DATETIME '2025-08-12 14:15:00'),
  STRUCT(4, '佐藤太郎', '石川県', DATETIME '2025-08-12 08:45:00'),
  STRUCT(5, '鈴木花子', '京都府', DATETIME '2025-08-13 00:20:00'),
  -- 追加
  STRUCT(6, '山本健一', '徳島県', DATETIME '2025-08-11 11:00:00'),
  STRUCT(7, '小林彩', '静岡県', DATETIME '2025-08-12 10:00:00')
]);

データ確認

select * from work.user_prefecture_history order by 1

dbt build --select user_prefecture_history_incremental --vars '{"target_start_date": "2025-08-11", "target_end_date": "2025-08-12"}'

実行される処理

  1. 一時テーブル作成
create or replace table ``.`work`.`user_prefecture_history_incremental__dbt_tmp`
partition by datetime_trunc(created_at, day)
OPTIONS(
  description="""""",
  expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
  select *
  from work.user_prefecture_history
  where 
    true  -- filter_incremental_max_days
        and date(created_at)
        between '2025-08-11' and '2025-08-12'
    and date(created_at) < current_date('Asia/Tokyo')
);
  1. 差分の日単位確認
select distinct datetime_trunc(created_at, day)
from ``.`work`.`user_prefecture_history_incremental__dbt_tmp`
  1. パーティション単位で上書き
Copying table(s) 
"/projects/〜/datasets/work/tables/user_prefecture_history_incremental__dbt_tmp$20250811" 
to "/projects/〜/datasets/work/tables/user_prefecture_history_incremental$20250811" 
with disposition: "WRITE_TRUNCATE"

Copying table(s) 
"/projects/〜/datasets/work/tables/user_prefecture_history_incremental__dbt_tmp$20250812" 
to "/projects/〜/datasets/work/tables/user_prefecture_history_incremental$20250812" 
with disposition: "WRITE_TRUNCATE"
  1. 一時テーブル削除
-- Clean up the temp table
drop table if exists ``.`work`.`user_prefecture_history_incremental__dbt_tmp`

データ確認

select * from work.user_prefecture_history_incremental order by 1

サブクエリを使うなど定数でないものが入るとフルスキャンが走ってしまう問題

記事書いた後に気づいたが、記事の方法だとフルスキャンが2度かかってしまう挙動になるため、多少工夫が必要です。詳しくは下記の記事を参考にしてください。
https://x.com/tanuhack/status/1968576347607023663

-- ❌: Bigqueryは、サブクエリで動的に取得した値で比較した場合、パーティション分割が適用されない
-- ソーステーブル:フルスキャン、宛先テーブル:フルスキャン
select * from {{ ref('model') }}
{% if is_incremental () %}
    -- dbtのドキュメントによく紹介してある方法
    where load_ts > (select max(load_ts) from {{ this }})
{% endif %}
-- ✅
-- ソーステーブル:部分スキャン、宛先テーブル:フルスキャン
select * from {{ ref('model') }}
{% if is_incremental () %}
    {% set statement %}select max(load_ts) as max_load_ts from {{ this }}{% endset %}
    {% set result = run_query (statement) %}
    {% set max_load_ts = result [0]['max_load_ts'] %}
    where load_ts > '{{ max_load_ts }}'
{% endif %}

https://www.yasuhisay.info/entry/2023/05/30/190616

{% set results = run_query('SELECT MAX(_table_suffix) FROM `my-project.my_dataset.my_table_*`') %}

{% if execute %}
{% set max_table_suffix = results.columns[0].values()[0] %}
{% else %}
{% set max_table_suffix = none %}
{% endif %}

SELECT
  *
FROM
  `my-project.my_dataset.my_table_*`
WHERE
  _table_suffix = "{{ max_table_suffix }}"

おわりに

インクリメンタルモデルは単なる「最新レコードだけ取り込む」ではなく、

  • テーブル特性(過去値が更新されるか)
  • 復旧方針・SLO(何日分を再計算するか)
  • コスト(スキャン量削減)

といった観点で最適解が変わります。

「最新日時より新しいデータを抽出」するシンプルな実装もあれば、「複数日分を必ず再計算」する運用もあります。自社のデータ更新頻度や障害復旧ポリシーに合わせて、検討し、チーム内でガイドライン化しておくと安心です。

参考

https://docs.getdbt.com/reference/resource-configs/bigquery-configs
https://zenn.dev/raksul_data/articles/dbt_incremental_model_on_bq
https://docs.getdbt.com/blog/bigquery-ingestion-time-partitioning-and-partition-copy-with-dbt
https://tech.timee.co.jp/entry/2023/12/08/121616
https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt/models

Discussion