BigQuery dbt で MERGEだけ戦略
この記事はdbt Advent Calendar 2024 5日目 シリーズ2の記事です。
はじめに
現在TROCCOで実装している処理からdbtの処理に移行段階で、まずは、TROCCOで実装している処理をdbtで再現してから、dbt snapshotなどのdbtの便利機能に置き換える段階的な手順をとっています。
その移行段階で、困った「マージ処理のみ」の実装を解説します。
TROCCO: 書き込みモード(追記)
→ dbt:「マージ処理のみ」
やりたいこと : - TROCCO: 書き込みモード(追記)
- 追記設定 : 既存のテーブルのレコードの後に、クエリ実行結果が追記されます。
- dbt:「マージ処理のみ」
- 日次による増分更新なら簡単だが、条件マッチするレコードを追記する単純な追記処理ができないっぽい(そんなことはなかった)
- MERGE戦略があるが条件によって、マージ処理だけではなくアップデート処理も実行されてしまう → マージ処理のみ行いたい
結論
ユニークキーを指定せず、MERGE戦略を実装する。以上です。
環境
- dbt core
- dbt=1.8.3
- adapter: bigquery=1.8.0
BigQueryにおけるdbtの増分更新処理について
dbtでは、BigQuery向けの増分更新処理において、merge と insert_overwrite の2種類の incremental_strategy(増分更新の方法)を選択できます。
- 今回は、「マージ処理のみ」の実現可能なmerge(デフォルト) を使います。
MERGE戦略処理(通常)の流れ
まずは、公式ドキュメントに沿った。基本的な処理の紹介です。
- データの比較: ユニークキーを比較し、行が一致するかどうかを判定。
- 一致した場合: 宛先テーブルの該当行が、ソースデータで更新されます。
- 一致しなかった場合: 宛先テーブルに存在しない行が、新たに追加されます。
注意: merge 増分更新戦略を選択する場合、unique_key 設定が必須です。
ユニークキーの指定が重要:ID
はユニークキーであることが前提です。この条件が満たされないと、意図しない更新や挿入が発生する可能性があります。
merge
増分更新戦略では、以下のような MERGE
文を生成します
{{
config(
materialized="incremental"
,incremental_strategy="merge"
,unique_key="ユニークキー"
)
}}
初回実行時処理
CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID`
OPTIONS (description = """""")
AS (
{{モデルSQL}}
);
次回実行時処理
-- 一時テーブル作成
CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
OPTIONS (
description = """"""
,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
{{モデルSQL}}
);
MERGE INTO `プロジェクトID`.`データセットID`.`テーブルID` AS DBT_INTERNAL_DEST
USING (
SELECT * FROM `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
-- ソースデータの ID と宛先データの ID が一致する場合に「行が一致している」と判断。
-- これにより、更新と挿入の処理を適切に振り分ける。
ON ( DBT_INTERNAL_SOURCE.ユニークキー = DBT_INTERNAL_DEST.ユニークキー)
-- ユニーク指定キーがマッチングした場合
WHEN MATCHED THEN UPDATE SET
-- 一致する行が見つかった場合、宛先テーブルの該当行をソースデータで上書きします。
-- 宛先テーブルにユニーク指定キーが存在しなかった場合
WHEN NOT MATCHED THEN INSERT
-- 宛先テーブルに存在しない行を、新しいデータとして挿入します。
「MERGEだけ戦略」の実装
-
公式ドキュメントには、
注意: merge 増分更新戦略を選択する場合、unique_key 設定が必須です。
とありますが、unique_key 設定しないことで、単純なマージ処理を行うことが可能です。
-
full_refresh = false
基本フルリフレッシュしても再現しないテーブルになるケースが多いと思うので、
dbt run --full-refresh
実行しても初回実行時処理
ではなく次回実行時処理
が行われる設定のfull_refresh = false
にしておく -
今回フルリフレッシュしない想定なので
is_incremental()
は使わない
dbtのconfigブロックの定義
{{
config(
materialized="incremental"
,incremental_strategy="merge"
,full_refresh = false
)
}}
初回実行時処理
CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID`
OPTIONS (description = """""")
AS (
{{モデルSQL}}
);
次回実行時処理
-- 一時テーブル作成
CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
OPTIONS (
description = """"""
,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
{{モデルSQL}}
);
MERGE INTO `プロジェクトID`.`データセットID`.`テーブルID` AS DBT_INTERNAL_DEST
USING (
SELECT * FROM `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
-- ユニークキーの指定がないため、FALSEになる
ON (FALSE)
WHEN NOT MATCHED THEN INSERT
-- {{モデルSQL}}をそのまま、新しいデータとして挿入します。
今回実装した処理の使い所
- TROCCOの書き込みモード(追記)の処理をそのまま移管したい
- ユニークキーレコードのアップデートはせず、単純なマージ処理を行いたい
[おまけ] 実際の処理確認
project_id.work.test_001
)
サンプルテーブル作成(CREATE OR REPLACE TABLE `work.test_001` (
id INT64,name STRING,age INT64,created_at TIMESTAMP
) AS (
SELECT *
FROM (
SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS created_at
UNION ALL SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00')
UNION ALL SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00')
)
);
初回実行時処理(test_002.sql)
CREATE OR REPLACE TABLE `project_id`.`work`.`test_002`
OPTIONS (
description = """"""
)
AS (
SELECT *
FROM `project_id.work.test_001`
);
> Executing task: dbt build --select test_002
15:17:34 1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:17:36 1 of 1 OK created sql incremental model work.test_002 .......................... [CREATE TABLE (3.0 rows, 114.0 Bytes processed) in 2.02s]
project_id.work.test_002
初回実行時処理(test_002.sql) → 次回実行時処理:unique_key指定なし(test_002.sql)
{{
config(
materialized='incremental'
,incremental_strategy="merge"
)
}}
-- merge: デフォルトなので記載しなくてもOK
SELECT *
FROM `project_id.work.test_001`
> Executing task: dbt build --select test_002
15:18:48 1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:18:53 1 of 1 OK created sql incremental model work.test_002 .......................... [MERGE (3.0 rows, 114.0 Bytes processed) in 4.56s]
CREATE OR REPLACE TABLE `project_id`.`work`.`test_002__dbt_tmp`
OPTIONS (
description = """"""
,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
SELECT *
FROM `project_id.work.test_001`
);
MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
SELECT *
FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
ON (FALSE)
WHEN NOT MATCHED THEN INSERT
(`id`,`name`,`age`,`created_at`)
VALUES
(`id`,`name`,`age`,`created_at`)
project_id.work.test_002
初回実行時処理(test_002.sql) → 次回実行時処理:unique_key指定あり(test_002.sql)
{{
config(
materialized='incremental'
,incremental_strategy="merge"
,unique_key="id"
)
}}
SELECT *
FROM `cookbiz-data.work.test_001`
> Executing task: dbt build --select test_002
14:10:38 1 of 1 START sql incremental model work.test_002 ............................... [RUN]
14:10:43 1 of 1 OK created sql incremental model work.test_002 .......................... [MERGE (3.0 rows, 228.0 Bytes processed) in 4.79s]
MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
SELECT *
FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
ON (
DBT_INTERNAL_SOURCE.ID = DBT_INTERNAL_DEST.ID
)
WHEN MATCHED THEN
UPDATE SET
`id` = DBT_INTERNAL_SOURCE.`id`,`name` = DBT_INTERNAL_SOURCE.`name`,`age`
= DBT_INTERNAL_SOURCE.`age`,`created_at` = DBT_INTERNAL_SOURCE.`created_at`
WHEN NOT MATCHED THEN INSERT
(`id`,`name`,`age`,`created_at`)
VALUES
(`id`,`name`,`age`,`created_at`)
project_id.work.test_002
おわりに
データ分析基盤導入時からdbtを使っていれば、幸せなのでは、、、現場から以上です。
参考
Discussion