😎

【dbt Docs】Building a dbt ProjecModels - Configuring incremental models

2022/03/11に公開

Configuring incremental models

インクリメンタルモデルの構成
https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models

What is an incremental model?

インクリメンタルモデルとは?

インクリメンタルモデルは、テーブルとして作成する
最初に作るときは、ソースデータ全部を変換して、テーブルを作る
以降の実行では、「指定条件にあった」テーブルを作成済みのテーブルに追記する。

インクリメンタルモデルと使うと、変換対象のデータ量が減って、変換の実行時間が大幅に短縮できる。

How do I use the incremental materialization?

インクリメンタルマテリアライゼーションをつかうにはどうしたらいい?

.sqlのなかのconfigブロックで指定できる。

{{
    config(
        materialized='incremental'
    )
}}

select ...

dbtでインクルメンタルモデルにするためには

  • 増分実行のための行のフィルタリング方法
  • モデルの一意制約(存在する場合)

Filtering rows on an incremental run

増分実行での行フィルタリング

増分実行でどの行を変換するか?をdbtにて指定するには、is_incremental()マクロでどうフィルタリングするか?をSQLで記載する。

前回更新時から「新しい」行をフィルタリングする必要がある。その場合{{this}}を使うと簡単にできる。

models/stg_events.sql
{{
    config(
        materialized='incremental'
    )
}}

select
    *,
    my_slow_function(my_column)

from raw_app_data.events

{% if is_incremental() %}

  -- this filter will only be applied on an incremental run
  where event_time > (select max(event_time) from {{ this }})

{% endif %}

Defining a uniqueness constraint (optional)

一意性制約の定義

unique_key : モデル内で一意である必要のあるフィールドを指定する。増分更新分に既存のキーがあればそこを更新し、なければ追記する。(=UPSERTですね)
一部のデータベースでは、式( concat(user_id, session_number)も使えるけど、すべてのDWHで使えるわけでもないので、非推奨。やる場合は、モデルにそのフィールドを追加するほうが良さそう。

例として「イベントストリームに基づいて1日あたりのアクティブユーザー(DAU)の数を計算するモデル」ソースデータが到着したら、dbtが最後に実行された日の両方とそれ以降の任意の日のDAUの数を再計算する

models/staging/fct_daily_active_users.sql
{{
    config(
        materialized='incremental',
        unique_key='date_day'
    )
}}

select
    date_trunc('day', event_at) as date_day,
    count(distinct user_id) as daily_active_users

from raw_app_data.events


{% if is_incremental() %}

  -- this filter will only be applied on an incremental run
  where date_day >= (select max(date_day) from {{ this }})

{% endif %}

group by 1

このモデルを unique_keyパラメータ無しで実行すると、ターゲットテーブルに同じ日の行が複数生成されて、どんどん増えていってしまう。 unique_keyを指定していると、既存日のデータは更新される。

How do I rebuild an incremental model?

インクリメンタルモデルを再構築するにはどうすればいい?

dbtにインクリメンタルモデル全体を最初から再構築させる--full-refreshには、コマンドラインでフラグを使用します。このフラグにより、dbtは、データベース内の既存のターゲットテーブルを、常に再構築する前にドロップします。

$ dbt run --full-refresh --select my_incremental_model+

--full-refresh を使うと全部作り直してくれる。
末尾に示されているような、ダウンストリームモデルを再構築することをおすすめします。

Understanding incremental models

インクリメンタルモデルを理解する

When should I use an incremental model?

インクリメンタルモデルはいつ使うのがよい?

  • ソースデータのデーブルに、数百万、数十億の行がある場合
  • ソースデータの変換の実行時間がかかる場合(複雑な正規表現関数やUDFを使ってる場合)

インクリメンタルモデルは、viewやtable(全部作り直し)よりは、仕組みとしては複雑だけど、パフォーマンスは必要な分だけ処理なので、dbt実行のパフォーマンスは大幅に向上する

Understanding the is_incremental() macro

is_incremental()が、trueとなるのは

  • 宛先テーブルが既に存在する
  • dbtがフルリフレッシュモード(--full-refreshオプション)では実行されていない
  • meterialized=incrementalが指定されている

How do incremental models work behind the scenes?

インクリメンタルは裏でどう動く?

dbtのincrementalマテリアライゼーションは、DWHごとに動作が異なる。サポートしている場合はmergeを実行する。
mergeをサポートしていないDWHの場合は、更新分をdeleteしてから、insertという手順を踏む

What if the columns of my incremental model change?

インクリメンタルモデルの列が変更した場合はどうなる?

dbt_project.yml
models:
  +on_schema_change: "sync_all_columns"
models/staging/fct_daily_active_users.sql
{{
    config(
        materialized='incremental',
        unique_key='date_day',
        on_schema_change='fail'
    )
}}

on_schema_changeは次の値を指定できる。

ignore デフォルトの動作
fail ソーススキーマとターゲットスキーマが異なる場合にエラー
append_new_columns 既存のテーブルに新しい列を追加します。この設定では、新しいデータに存在しない既存のテーブルの列は削除されないことに注意してください。
sync_all_columns 既存のテーブルに新しい列を追加し、現在欠落している列を削除します。これにはデータ型の変更が含まれることに注意してください。BigQueryでは、列タイプを変更するには全表スキャンが必要です。実装するときは、トレードオフに注意してください。

※注意 on_schema_changeには、新しく追加された行を追加前のレコードに埋め戻す動作はない。(対応された行以降)
なので、全部の行に「カラムの追加と値の登録」が必要な場合は、--full-refreshを使うこと

Default behavior

デフォルトの挙動

古いバージョンの挙動で、何も指定しない場合は ignoreの挙動を行う。
ignoreの挙動は、モデルを変更しても、ターゲットテーブルにそのカラムがない場合は「無視」して、モデルを構築する。
同時に、モデルからカラムを削除しても、DWH上のテーブルからそのカラムが消えることは無い。なのでこれを反映させたい場合は、--full-refresh を使うこと

What is an incremental_strategy?

DWHごとにインクリメンタル実行はどう動く?

incremental_strategyを指定可能

  • snowflake : mergeがデフォルト。delete+insertも可能
  • BigQuery : mergeがデフォルト, insert_overwriteも可能
  • Spark : appendがデフォルト、insert_overwriteも可能。mergeも可能だがデルタのみ

Configuring incremental strategy

incremantal_strategyの指定は、他の指定と一緒

dbt_project.yml
models:
  +incremental_strategy: "insert_overwrite"
models/my_model/sql
{{
  config(
    materialized='incremental',
    unique_key='date_day',
    incremental_strategy='insert_overwrite',
    ...
  )
}}

select ...

Strategy-specific configs

incremental_strategymergeを指定する場合、unique_keyにマッチしたものを新しい値で上書きする。
merge_update_columsを指定すると、指定した列のみ変更する

models/my_model.sql
{{
  config(
    materialized = 'incremental',
    unique_key = 'id',
    merge_update_columns = ['email', 'ip_address'],
    ...
  )
}}

select ...

Discussion