【dbt Docs】Building a dbt ProjecModels - Configuring incremental models
Configuring incremental models
インクリメンタルモデルの構成
What is an incremental model?
インクリメンタルモデルとは?
インクリメンタルモデルは、テーブルとして作成する
最初に作るときは、ソースデータ全部を変換して、テーブルを作る
以降の実行では、「指定条件にあった」テーブルを作成済みのテーブルに追記する。
インクリメンタルモデルと使うと、変換対象のデータ量が減って、変換の実行時間が大幅に短縮できる。
How do I use the incremental materialization?
インクリメンタルマテリアライゼーションをつかうにはどうしたらいい?
.sql
のなかのconfig
ブロックで指定できる。
{{
config(
materialized='incremental'
)
}}
select ...
dbtでインクルメンタルモデルにするためには
- 増分実行のための行のフィルタリング方法
- モデルの一意制約(存在する場合)
Filtering rows on an incremental run
増分実行での行フィルタリング
増分実行でどの行を変換するか?をdbtにて指定するには、is_incremental()
マクロでどうフィルタリングするか?をSQLで記載する。
前回更新時から「新しい」行をフィルタリングする必要がある。その場合{{this}}
を使うと簡単にできる。
{{
config(
materialized='incremental'
)
}}
select
*,
my_slow_function(my_column)
from raw_app_data.events
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where event_time > (select max(event_time) from {{ this }})
{% endif %}
Defining a uniqueness constraint (optional)
一意性制約の定義
unique_key
: モデル内で一意である必要のあるフィールドを指定する。増分更新分に既存のキーがあればそこを更新し、なければ追記する。(=UPSERTですね)
一部のデータベースでは、式( concat(user_id, session_number
)も使えるけど、すべてのDWHで使えるわけでもないので、非推奨。やる場合は、モデルにそのフィールドを追加するほうが良さそう。
例として「イベントストリームに基づいて1日あたりのアクティブユーザー(DAU)の数を計算するモデル」ソースデータが到着したら、dbtが最後に実行された日の両方とそれ以降の任意の日のDAUの数を再計算する
{{
config(
materialized='incremental',
unique_key='date_day'
)
}}
select
date_trunc('day', event_at) as date_day,
count(distinct user_id) as daily_active_users
from raw_app_data.events
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where date_day >= (select max(date_day) from {{ this }})
{% endif %}
group by 1
このモデルを unique_key
パラメータ無しで実行すると、ターゲットテーブルに同じ日の行が複数生成されて、どんどん増えていってしまう。 unique_key
を指定していると、既存日のデータは更新される。
How do I rebuild an incremental model?
インクリメンタルモデルを再構築するにはどうすればいい?
dbtにインクリメンタルモデル全体を最初から再構築させる--full-refreshには、コマンドラインでフラグを使用します。このフラグにより、dbtは、データベース内の既存のターゲットテーブルを、常に再構築する前にドロップします。
$ dbt run --full-refresh --select my_incremental_model+
--full-refresh
を使うと全部作り直してくれる。
末尾に示されているような、ダウンストリームモデルを再構築することをおすすめします。
Understanding incremental models
インクリメンタルモデルを理解する
When should I use an incremental model?
インクリメンタルモデルはいつ使うのがよい?
- ソースデータのデーブルに、数百万、数十億の行がある場合
- ソースデータの変換の実行時間がかかる場合(複雑な正規表現関数やUDFを使ってる場合)
インクリメンタルモデルは、viewやtable(全部作り直し)よりは、仕組みとしては複雑だけど、パフォーマンスは必要な分だけ処理なので、dbt実行のパフォーマンスは大幅に向上する
Understanding the is_incremental() macro
is_incremental()
が、true
となるのは
- 宛先テーブルが既に存在する
- dbtがフルリフレッシュモード(
--full-refresh
オプション)では実行されていない -
meterialized=incremental
が指定されている
How do incremental models work behind the scenes?
インクリメンタルは裏でどう動く?
dbtのincremental
マテリアライゼーションは、DWHごとに動作が異なる。サポートしている場合はmerge
を実行する。
merge
をサポートしていないDWHの場合は、更新分をdelete
してから、insert
という手順を踏む
What if the columns of my incremental model change?
インクリメンタルモデルの列が変更した場合はどうなる?
models:
+on_schema_change: "sync_all_columns"
{{
config(
materialized='incremental',
unique_key='date_day',
on_schema_change='fail'
)
}}
on_schema_change
は次の値を指定できる。
ignore |
デフォルトの動作 |
fail |
ソーススキーマとターゲットスキーマが異なる場合にエラー |
append_new_columns |
既存のテーブルに新しい列を追加します。この設定では、新しいデータに存在しない既存のテーブルの列は削除されないことに注意してください。 |
sync_all_columns |
既存のテーブルに新しい列を追加し、現在欠落している列を削除します。これにはデータ型の変更が含まれることに注意してください。BigQueryでは、列タイプを変更するには全表スキャンが必要です。実装するときは、トレードオフに注意してください。 |
※注意 on_schema_change
には、新しく追加された行を追加前のレコードに埋め戻す動作はない。(対応された行以降)
なので、全部の行に「カラムの追加と値の登録」が必要な場合は、--full-refresh
を使うこと
Default behavior
デフォルトの挙動
古いバージョンの挙動で、何も指定しない場合は ignore
の挙動を行う。
ignore
の挙動は、モデルを変更しても、ターゲットテーブルにそのカラムがない場合は「無視」して、モデルを構築する。
同時に、モデルからカラムを削除しても、DWH上のテーブルからそのカラムが消えることは無い。なのでこれを反映させたい場合は、--full-refresh
を使うこと
What is an incremental_strategy?
DWHごとにインクリメンタル実行はどう動く?
incremental_strategy
を指定可能
- snowflake :
merge
がデフォルト。delete+insert
も可能 - BigQuery :
merge
がデフォルト,insert_overwrite
も可能 - Spark :
append
がデフォルト、insert_overwrite
も可能。merge
も可能だがデルタのみ
Configuring incremental strategy
incremantal_strategy
の指定は、他の指定と一緒
models:
+incremental_strategy: "insert_overwrite"
{{
config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='insert_overwrite',
...
)
}}
select ...
Strategy-specific configs
incremental_strategy
にmerge
を指定する場合、unique_key
にマッチしたものを新しい値で上書きする。
merge_update_colums
を指定すると、指定した列のみ変更する
{{
config(
materialized = 'incremental',
unique_key = 'id',
merge_update_columns = ['email', 'ip_address'],
...
)
}}
select ...
Discussion