🛤️

TROCCOのカスタム変数を活用したdbtによる増分更新

2024/12/01に公開

この記事は TROCCO® Advent Calendar 2024 2日目dbt Advent Calendar 2024 2日目 シリーズ2 の記事です。

思いがけず2つのアドベントカレンダーにリンクことになった

はじめに

TROCCOのカスタム変数機能を活用したdbtによる増分更新の紹介をします。
※ dbtに関しての細かな説明は割愛しています、、、

増分更新の概要

増分更新では、データ全体を処理するのではなく、変更や追加された部分のみを対象とします。このアプローチにより、以下のメリットが得られます

  • 処理時間の短縮: 不必要な再処理を回避。
  • 計算コストの削減: クラウド環境でのクエリ実行コストが抑えられる。

TROCCOのカスタム変数機能とdbtの連携

カスタム変数機能の活用

TROCCOのカスタム変数機能を利用することで、dbt実行時に動的に値を設定できます。たとえば、増分更新期間(start_dateend_date)を変数で指定可能です。

https://documents.trocco.io/docs/about-dbt-integration
https://documents.trocco.io/docs/custom-variables

dbtの増分更新モード

  • dbtではis_incremental()という関数を用いて、増分更新用のクエリを定義できます。
  • TROCCOのカスタム変数を使用するメリット : TROCCOのカスタム変数で更新期間を指定していると、データが欠損した場合TROCCOからデータ更新期間を変更することでbackfill(埋め戻し)処理が可能です。

TROCCOのカスタム変数機能を活用することで、dbtで増分更新の期間を簡単に設定できます。ここでは、具体的な設定例を見ていきます。

使用例

  • CURRENT_DATEを使用パターン(Before)
{% if is_incremental() %}
  WHERE DATE(created_at) = CURRENT_DATE('Asia/Tokyo') - 1
{% endif %}
  • TROCCOのカスタム変数を使用パターン(After)
{% if is_incremental() %}
  WHERE DATE(created_at) BETWEEN '{{ var('start_date') }}' AND '{{ var('end_date') }}'
{% endif %}

検証パターン

増分更新において、考えられる典型的な検証パターンとして、以下の3つを確認します。それぞれのパターンに通して、TROCCOのカスタム変数機能を活用したdbtによる増分更新の紹介していきます。

  1. ソースデータの前日データが入ってない状態
  2. ソースデータの前日データが更新された状態
  3. ソースデータのデータ欠損が修正された状態

検証環境

dbt=1.8.2
dbt-bigquery=1.8.1
TROCCO
BigQuery

作業日

SELECT CURRENT_DATE('Asia/Tokyo') AS today
today
2024-11-07

1. ソースデータの前日データが入ってない状態(サンプルデータ作成)

  • 期待される結果 : ターゲットテーブルには変更がないこと。

データの特徴

  • データの欠損日:2024-11-02、2024-11-03
  • データの未更新日(前日):2024-11-06

サンプルデータ作成
以下のSQL文でサンプルデータを作成します。

BigQueryでサンプルデータ作成
CREATE OR REPLACE TABLE `work.customer` (
    name STRING
  , age INT64
  , created_at TIMESTAMP
) AS (
  SELECT *
  FROM UNNEST (
    ARRAY<STRUCT<
        name STRING
      , age INT64
      , created_at TIMESTAMP
    >> [
        ('田中太郎', 45, '2024-11-01 10:00:00')
      , ('佐藤花子', 25, '2024-11-01 14:30:00')
      , ('鈴木一郎', 30, '2024-11-02 09:15:00')
      , ('高橋美紀', 27, '2024-11-04 07:25:00')
      , ('渡辺陽一', null, '2024-11-04 16:50:00')
      , ('加藤隆', 22, '2024-11-05 10:05:00')
    ]
  )
)

customer テーブル(ソースデータ)

name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC

dbtの設定

customer.yml
version: 2

sources:
  - name: work       # データソースの名前
    schema: work    # スキーマ名
    tables:
      - name: customer      # テーブル名
customer_salary.sql
{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {
      'field': 'created_at',
      'data_type': 'timestamp',
      'granularity': 'day',
      'copy_partitions': true
    }
) }}

SELECT *
FROM {{ source('work', 'customer') }}
{% if is_incremental() %}
  WHERE DATE(created_at) BETWEEN '{{ var('start_date') }}' AND '{{ var('end_date') }}'
{% endif %}

TROCCO(dbt連携)の設定

customer テーブルから customer_salary テーブル作成

dbt run --select customer_salary --full-refresh

↓ 実行結果 ↓

customer_salary テーブル(ターゲットテーブル)

--full-refreshを指定しているため、{% if is_incremental() %}〜{% endif %}の中の条件が無視されて実行される。

name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC

2. ソースデータの前日データが更新された状態

  • 検証目的 : ソースデータが更新された場合に、増分更新処理で最新のデータが適切に反映されることを確認します。
  • 手順 : サンプルデータを再作成して、前日のデータが更新された状態を再現する
    • データの未更新日(前日):2024-11-06
  • 期待される結果 : ターゲットテーブルに前日データ追加されていること。
BigQueryでサンプルデータ作成
CREATE OR REPLACE TABLE `work.customer` (
    name STRING
  , age INT64
  , created_at TIMESTAMP
) AS (
  SELECT *
  FROM UNNEST (
    ARRAY<STRUCT<
        name STRING
      , age INT64
      , created_at TIMESTAMP
    >> [
        ('田中太郎', 45, '2024-11-01 10:00:00')
      , ('佐藤花子', 25, '2024-11-01 14:30:00')
      , ('鈴木一郎', 30, '2024-11-02 09:15:00')
      , ('高橋美紀', 27, '2024-11-04 07:25:00')
      , ('渡辺陽一', null, '2024-11-04 16:50:00')
      , ('加藤隆', 22, '2024-11-05 10:05:00')
      , ('小林健', 35, '2024-11-06 17:25:00')
      , ('佐々木裕', 28, '2024-11-06 15:40:00')
    ]
  )
)

customer テーブル(ソースデータ)

name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC
佐々木裕 28 2024-11-06 15:40:00.000000 UTC
小林健 35 2024-11-06 17:25:00.000000 UTC

dbtの設定

dbt_project.ymlに設定していないとエラーになるので設定しておく

dbt_project.yml
vars:
  start_date : NULL
  end_date : NULL

https://docs.getdbt.com/docs/build/project-variables

dbtジョブ設定(TROCCO)

dbt run --select customer_salary --vars '{"start_date": "$start_date$", "end_date": "$end_date$"}'

実行ログ

running... dbt run --select customer_salary --vars \{start_date:\ 2024-11-06,\ end_date:\ 2024-11-06\}

[参考情報]変数の参照優先順位

  1. コマンドラインで指定された --vars
  2. dbt_project.yml に指定されたスコープド変数
  3. dbt_project.yml に指定されたグローバル変数
  4. vars() 関数の第2引数に指定されたデフォルト値
    vars() 関数で参照した変数が見つからなかった場合は、 dbt run 実行時にコンパイルエラーとなります。

https://zenn.dev/foursue/books/31456a86de5bb4/viewer/4201af
↓ 実行結果 ↓

customer_salary テーブル(ターゲットテーブル)

前日データが追加された。

name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC
佐々木裕 28 2024-11-06 15:40:00.000000 UTC
小林健 35 2024-11-06 17:25:00.000000 UTC

3. ソースデータのデータ欠損が修正された状態

  • 検証目的 : データ欠損が修正された場合に、正しいデータがターゲットテーブルに反映されることを確認します。
  • 検証目的 : データ欠損が修正された場合に、正しいデータがターゲットテーブルに反映されることを確認します。サンプルデータを再作成して、データの欠損が修正された状態を再現する
    • データの欠損日:2024-11-02、2024-11-03
  • 期待される結果 : 修正後の正しいデータがターゲットテーブルに反映されること。
BigQueryでサンプルデータ作成
CREATE OR REPLACE TABLE `work.customer` (
    name STRING
  , age INT64
  , created_at TIMESTAMP
) AS (
  SELECT *
  FROM UNNEST (
    ARRAY<STRUCT<
        name STRING
      , age INT64
      , created_at TIMESTAMP
    >> [
        ('田中太郎', 45, '2024-11-01 10:00:00')
      , ('佐藤花子', 25, '2024-11-01 14:30:00')
      , ('鈴木一郎', 30, '2024-11-02 09:15:00')
      , ('山田優子', null, '2024-11-02 18:20:00')
      , ('伊藤健太', 32, '2024-11-03 12:45:00')
      , ('高橋美紀', 27, '2024-11-04 07:25:00')
      , ('渡辺陽一', null, '2024-11-04 16:50:00')
      , ('加藤隆', 22, '2024-11-05 10:05:00')
      , ('小林健', 35, '2024-11-06 17:25:00')
      , ('佐々木裕', 28, '2024-11-06 15:40:00')
    ]
  )
)

customer テーブル(ソースデータ)

name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
山田優子 2024-11-02 18:20:00.000000 UTC
伊藤健太 32 2024-11-03 12:45:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC
佐々木裕 28 2024-11-06 15:40:00.000000 UTC
小林健 35 2024-11-06 17:25:00.000000 UTC

dbtジョブ設定(TROCCO)

データの欠損している期間をbackfill(埋め戻し)処理

dbt run --select customer_salary --vars '{"start_date": "2024-11-02", "end_date": "2024-11-03"}'

実行ログ

running... dbt run --select customer_salary --vars \{start_date:\ 2024-11-02,\ end_date:\ 2024-11-03\}

↓ 実行結果 ↓

customer_salary テーブル(ターゲットテーブル)

修正後の埋め戻しデータがターゲットテーブルに反映された。

name age created_at
田中太郎 45 2024-11-01 10:00:00.000000 UTC
佐藤花子 25 2024-11-01 14:30:00.000000 UTC
鈴木一郎 30 2024-11-02 09:15:00.000000 UTC
山田優子 2024-11-02 18:20:00.000000 UTC
伊藤健太 32 2024-11-03 12:45:00.000000 UTC
高橋美紀 27 2024-11-04 07:25:00.000000 UTC
渡辺陽一 2024-11-04 16:50:00.000000 UTC
加藤隆 22 2024-11-05 10:05:00.000000 UTC
佐々木裕 28 2024-11-06 15:40:00.000000 UTC
小林健 35 2024-11-06 17:25:00.000000 UTC

設定不良パターン

start_dateもしくはend_dateの未指定だった場合
エラー:Could not cast literal "None" to type DATE

WHERE DATE(created_at) BETWEEN 'None' AND 'None'

おわりに

TROCCOのカスタム変数を活用することで、dbtでの増分更新の設定が簡単になります。この仕組みを導入することで、データ処理の効率化やコスト削減を実現できます。興味があれば、TROCCOのカスタム変数の活用方法やdbtの応用例についてさらに探求してみてください!

参考

https://documents.trocco.io/docs/custom-variables
https://documents.trocco.io/docs/about-dbt-integration
https://docs.getdbt.com/docs/build/project-variables

Discussion