🍊

BigQuery dbt で MERGEだけ戦略

2024/12/04に公開

この記事はdbt Advent Calendar 2024 5日目 シリーズ2の記事です。

はじめに

現在TROCCOで実装している処理からdbtの処理に移行段階で、まずは、TROCCOで実装している処理をdbtで再現してから、dbt snapshotなどのdbtの便利機能に置き換える段階的な手順をとっています。
その移行段階で、困った「マージ処理のみ」の実装を解説します。

やりたいこと : TROCCO: 書き込みモード(追記)dbt:「マージ処理のみ」

  • TROCCO: 書き込みモード(追記)
    • 追記設定 : 既存のテーブルのレコードの後に、クエリ実行結果が追記されます。
  • dbt:「マージ処理のみ」
    • 日次による増分更新なら簡単だが、条件マッチするレコードを追記する単純な追記処理ができないっぽい(そんなことはなかった)
    • MERGE戦略があるが条件によって、マージ処理だけではなくアップデート処理も実行されてしまう → マージ処理のみ行いたい

https://documents.trocco.io/docs/datamart-bigquery

結論

ユニークキーを指定せず、MERGE戦略を実装する。以上です。

環境

  • dbt core
    • dbt=1.8.3
    • adapter: bigquery=1.8.0

BigQueryにおけるdbtの増分更新処理について

dbtでは、BigQuery向けの増分更新処理において、merge と insert_overwrite の2種類の incremental_strategy(増分更新の方法)を選択できます。

  • 今回は、「マージ処理のみ」の実現可能なmerge(デフォルト) を使います。

MERGE戦略処理(通常)の流れ

まずは、公式ドキュメントに沿った。基本的な処理の紹介です。

  1. データの比較: ユニークキーを比較し、行が一致するかどうかを判定。
  2. 一致した場合: 宛先テーブルの該当行が、ソースデータで更新されます。
  3. 一致しなかった場合: 宛先テーブルに存在しない行が、新たに追加されます。

注意: merge 増分更新戦略を選択する場合、unique_key 設定が必須です。
ユニークキーの指定が重要:IDはユニークキーであることが前提です。この条件が満たされないと、意図しない更新や挿入が発生する可能性があります。

https://docs.getdbt.com/reference/resource-configs/bigquery-configs
merge 増分更新戦略では、以下のような MERGE 文を生成します

dbtのconfigブロックの定義
{{
 config(
        materialized="incremental"
        ,incremental_strategy="merge"
        ,unique_key="ユニークキー"
    )
}}

初回実行時処理

CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID`
OPTIONS (description = """""")
AS (
  {{モデルSQL}}
);

次回実行時処理

-- 一時テーブル作成
CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
OPTIONS (
  description = """"""
  ,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
  {{モデルSQL}}
);

MERGE INTO `プロジェクトID`.`データセットID`.`テーブルID` AS DBT_INTERNAL_DEST
USING (
  SELECT * FROM `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
 -- ソースデータの ID と宛先データの ID が一致する場合に「行が一致している」と判断。
 -- これにより、更新と挿入の処理を適切に振り分ける。
  ON ( DBT_INTERNAL_SOURCE.ユニークキー = DBT_INTERNAL_DEST.ユニークキー)

-- ユニーク指定キーがマッチングした場合
WHEN MATCHED THEN UPDATE SET
    -- 一致する行が見つかった場合、宛先テーブルの該当行をソースデータで上書きします。

-- 宛先テーブルにユニーク指定キーが存在しなかった場合
WHEN NOT MATCHED THEN INSERT
    -- 宛先テーブルに存在しない行を、新しいデータとして挿入します。

「MERGEだけ戦略」の実装

  • 公式ドキュメントには、

    注意: merge 増分更新戦略を選択する場合、unique_key 設定が必須です。

    とありますが、unique_key 設定しないことで、単純なマージ処理を行うことが可能です。

  • full_refresh = false

    基本フルリフレッシュしても再現しないテーブルになるケースが多いと思うので、dbt run --full-refresh実行しても初回実行時処理ではなく次回実行時処理が行われる設定のfull_refresh = falseにしておく

  • 今回フルリフレッシュしない想定なのでis_incremental()は使わない

dbtのconfigブロックの定義

{{
 config(
        materialized="incremental"
        ,incremental_strategy="merge"
        ,full_refresh = false
    )
}}

初回実行時処理

CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID`
OPTIONS (description = """""")
AS (
  {{モデルSQL}}
);

次回実行時処理

-- 一時テーブル作成
CREATE OR REPLACE TABLE `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
OPTIONS (
  description = """"""
  ,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
  {{モデルSQL}}
);

MERGE INTO `プロジェクトID`.`データセットID`.`テーブルID` AS DBT_INTERNAL_DEST
USING (
  SELECT * FROM `プロジェクトID`.`データセットID`.`テーブルID__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
 -- ユニークキーの指定がないため、FALSEになる
  ON (FALSE)

WHEN NOT MATCHED THEN INSERT
    -- {{モデルSQL}}をそのまま、新しいデータとして挿入します。

今回実装した処理の使い所

  • TROCCOの書き込みモード(追記)の処理をそのまま移管したい
  • ユニークキーレコードのアップデートはせず、単純なマージ処理を行いたい

[おまけ] 実際の処理確認

サンプルテーブル作成(project_id.work.test_001)

CREATE OR REPLACE TABLE `work.test_001` (
  id INT64,name STRING,age INT64,created_at TIMESTAMP
) AS (
  SELECT *
  FROM (
    SELECT 1 AS id, '田中太郎' AS name, 30 AS age, TIMESTAMP('2024-01-01 00:00:00') AS created_at 
    UNION ALL SELECT 2, '佐藤花子', 25, TIMESTAMP('2024-01-01 10:00:00') 
    UNION ALL SELECT 3, '鈴木一郎', 35, TIMESTAMP('2024-07-15 14:30:00') 
  )
);

初回実行時処理(test_002.sql)

CREATE OR REPLACE TABLE `project_id`.`work`.`test_002`
OPTIONS (
  description = """"""
)
AS (
  SELECT *
  FROM `project_id.work.test_001`
);
> Executing task: dbt build --select test_002
15:17:34  1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:17:36  1 of 1 OK created sql incremental model work.test_002 .......................... [CREATE TABLE (3.0 rows, 114.0 Bytes processed) in 2.02s]

project_id.work.test_002

初回実行時処理(test_002.sql) → 次回実行時処理:unique_key指定なし(test_002.sql)

{{
 config(
        materialized='incremental'
        ,incremental_strategy="merge"
    )
}}
-- merge: デフォルトなので記載しなくてもOK
SELECT *
FROM `project_id.work.test_001`
> Executing task: dbt build --select test_002
15:18:48  1 of 1 START sql incremental model work.test_002 ............................... [RUN]
15:18:53  1 of 1 OK created sql incremental model work.test_002 .......................... [MERGE (3.0 rows, 114.0 Bytes processed) in 4.56s]
CREATE OR REPLACE TABLE `project_id`.`work`.`test_002__dbt_tmp`
OPTIONS (
  description = """"""
  ,expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(),INTERVAL 12 HOUR)
)
AS (
  SELECT *
  FROM `project_id.work.test_001`
);

MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
  SELECT *
  FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
  ON (FALSE)

WHEN NOT MATCHED THEN INSERT
  (`id`,`name`,`age`,`created_at`)
VALUES
  (`id`,`name`,`age`,`created_at`)

project_id.work.test_002

初回実行時処理(test_002.sql) → 次回実行時処理:unique_key指定あり(test_002.sql)

{{
 config(
        materialized='incremental'
        ,incremental_strategy="merge"
        ,unique_key="id"
    )
}}

SELECT *
FROM `cookbiz-data.work.test_001`
> Executing task: dbt build --select test_002
14:10:38  1 of 1 START sql incremental model work.test_002 ............................... [RUN]
14:10:43  1 of 1 OK created sql incremental model work.test_002 .......................... [MERGE (3.0 rows, 228.0 Bytes processed) in 4.79s]
MERGE INTO `project_id`.`work`.`test_002` AS DBT_INTERNAL_DEST
USING (
  SELECT *
  FROM `project_id`.`work`.`test_002__dbt_tmp`
) AS DBT_INTERNAL_SOURCE
  ON (
    DBT_INTERNAL_SOURCE.ID = DBT_INTERNAL_DEST.ID
  )


WHEN MATCHED THEN
  UPDATE SET
    `id` = DBT_INTERNAL_SOURCE.`id`,`name` = DBT_INTERNAL_SOURCE.`name`,`age` 
        = DBT_INTERNAL_SOURCE.`age`,`created_at` = DBT_INTERNAL_SOURCE.`created_at`


WHEN NOT MATCHED THEN INSERT
  (`id`,`name`,`age`,`created_at`)
VALUES
  (`id`,`name`,`age`,`created_at`)

project_id.work.test_002

おわりに

データ分析基盤導入時からdbtを使っていれば、幸せなのでは、、、現場から以上です。

参考

https://tech.timee.co.jp/entry/2023/12/08/121616
https://docs.getdbt.com/reference/resource-configs/full_refresh
https://docs.getdbt.com/reference/resource-configs/bigquery-configs
https://dev.classmethod.jp/articles/dbt-materialization-incremental/

Discussion