🤖

dbt v1.9.0から使えるようになったmicrobatch incremental modelがとても良さそう

2024/12/11に公開

この記事はFinatextグループ Advent Calendar 2024の11日目の記事です。

はじめに

先日、dbt-core v1.9.0がリリースになり、新たに microbatch incremental modelが使えるようになりました。この機能は、特にサイズの大きな時系列データに対して有効で、これまでdelete+insertやappend strategyで工夫しながら大きなデータを運用してきたデータエンジニアにとって、コスト、パフォーマンス、運用の観点で非常に嬉しい機能となりそうです。ナウキャストでも、サイズの大きなデータの運用に頭を悩ませていたので、積極的に活用していきたいと思ってます。

本記事では、microbatchの概要を解説しつつ、実際に動かしながらその挙動を理解することを試みます。主にdbtを使っている方々、特にDWHとしてSnowflakeを使っている方々に参考になればと思います。

microbatch inremental modelとは?

microbatchは incremental modelにおける新しいincremental strategyです。
時系列データの更新において、日付ごとにデータを更新するクエリー(「バッチ」と呼んでいる)を並列で作成・実行してくれる仕組みです。microbatchをうまく使うことで、特に大規模な時系列データセットにおけるデータ更新処理の効率化が期待できます。

従来のincremental strategyとの違い

従来のincremental strategyには、appenddelete+insertmergeinsert_overwrite がありますが、それぞれ課題を抱えていました。例えばappend strategyはレコードを追加するのみなので、古いデータの削除・更新ができませんし、delete+insert strategyでは古いデータの削除・更新をするにはSQL文を工夫する必要がありました。if is_incremental() で頑張って分岐を書いている方も多いと思います(私も書いてます)
microbatchでは、その取り回しを基本的にはdbt側がやってくれます。

仕組み

microbatchでは、ソース元テーブルと作成するテーブルにおけるevent_timeとなるカラムと、バッチの粒度(時間別/日別/月別/年別)を指定します。buildすると、バッチの粒度ごとにフィルタしてテーブルを更新するクエリを生成・実行します。

例えば以下のようなconfigでモデルを定義します

{{
    config(
        materialized='incremental',
        incremental_strategy='microbatch',
        event_time='date',
        begin='2024-12-01',
        batch_size='day',
        lookback=1,
        full_refresh=false,
        cluster_by=['date'],
    )
}}

実行日が 2024-12-10 だとして、このmodelをbuildすると、初回実行では、2024-12-01から2024-12-10までの日付ごとに、データ元テーブルから日別のレコードを取得してモデルのテーブルに挿入するクエリ(=これが一つのバッチ)が生成・実行されます。

翌日2024-12-11 のbuildでは、2024-12-092024-12-10 の2日分のデータをデータ元から取得してモデルのテーブルに挿入する、といった具合です。

どう嬉しいか

日次で実行する場合は直近の数日分だけのレコードを参照して格納するので、処理するデータ量が最小限で済むため、コストとパフォーマンスの面でメリットがあります。
このメリット自体は、他のincremental modelでも実現できますし、すでにincremental modelを運用してきた方々にとっては馴染みがあるものだと思います。

microbatchの嬉しい点は、過去に漏れた分の実行や、過去分を再実行したい場合(backfill)も、 dbt run --event-time-start "YYYY-MM-DD" --event-time-end "YYYY-MM-DD" のようなコマンドを実行することで簡単に行える点です。
delete+insertだと、uniqueキーを指定した上で、WHERE句に --vars で渡す変数を入れたり is_incremental()で分岐したりと、細かい工夫をして行う必要がありますが、microbatchだと単純に対象のテーブルに対してSELECTするのみで良くなります。(実際に動かしてみるパートで詳しく見ます)

更に、バッチを並列で実行できたり、Failしたバッチだけをretryできたりといった便利機能もあります。

運用がすごくしやすくなる、というイメージを私は持ちました。

実際に動かしてみる

実際に動かしながら、microbatchの挙動を理解していきます。

前提条件

まず、dbt-core ver1.9にアップデートします。

事前準備

データ元となるテーブルとしてスクショのようなものを用意します。ここでは商品の売上テーブルと商品マスタということで、transactionとitem_masterというテーブルを用意します。
transaction

item_master

これらのテーブルをsourceとして、microbatch incremental strategyを使ったモデルを作ります。

設定コード

※適宜割愛してます

_sources.yml
version 2:

sources:
  - name: workspace
    schema: workspace
    tables:
      - name: transaction
        config:
          event_time: date
      - name: item_master
enriched_transaction.sql
{{
    config(
        materialized='incremental',
        incremental_strategy='microbatch',
        event_time='date',
        begin='2024-12-01',
        batch_size='day',
        lookback=1,
        full_refresh=false,
        cluster_by=['date'],
    )
}}
with
t as (
    select
        *
    from
        {{ source('workspace', 'transaction') }}
),
m as (
    select
        *
    from
        {{ source('workspace', 'item_master') }}
)

select
    t.*,
    m.item_name
from
    t left join m using(item_id)
order by
    date

実行してみる/クエリも深ぼってみる

初期実行

$ dbt build を実行すると、11日分のtransactionデータが取り込まれます。
11日分のバッチが実行されていることがログからわかります

結果テーブル

実際にSnowflakeで実行されるクエリを見てみます。

  • 最初のバッチ(2024-12-01分)で create or replace transient table 〜 で定義してテーブルを作成
    create or replace transient table XXX.WORKSPACE.enriched_transaction
             as
            (select * from (
                  
    with
    t as (
        select
            *
        from
            (select * from XXX.workspace.transaction where date >= '2024-12-01 00:00:00+00:00' and date < '2024-12-02 00:00:00+00:00')
    ),
    m as (
        select
            *
        from
            XXX.workspace.item_master
    )
    
    select
        t.*,
        m.item_name
    from
        t left join m using(item_id)
    order by
        date
                  ) order by (date)
            )
    
  • cluster byの設定
    alter  table XXX.WORKSPACE.enriched_transaction cluster by (date)
    

それ以降は、以下のクエリを1バッチ分として、対象期間のバッチの数だけ処理を繰り返しています。
この挙動自体はdelete+insertに近いですね

  • 格納対象のレコードを持つ一時VIEWの作成
    create or replace  temporary view XXX.WORKSPACE.enriched_transaction__dbt_tmp_20241202
        
    (
        "ITEM_ID" COMMENT $$$$,  
        "DATE" COMMENT $$$$, 
        "SALES" COMMENT $$$$,
        "QUANTITY" COMMENT $$$$, 
        "ITEM_NAME" COMMENT $$$$
    )
       as (    
    with
    t as (
        select
            *
        from
            (select * from XXX.workspace.transaction where date >= '2024-12-02 00:00:00+00:00' and date < '2024-12-03 00:00:00+00:00')
    ),
    m as (
        select
            *
        from
            XXX.workspace.item_master
    )
    
    select
        t.*,
        m.item_name
    from
        t left join m using(item_id)
    order by
        date
      )
    
  • モデルからWHERE句でバッチの対象日付のレコードをDELETE
    delete from XXX.WORKSPACE.enriched_transaction DBT_INTERNAL_TARGET
        where (
        DBT_INTERNAL_TARGET.date >= to_timestamp_tz('2024-12-02 00:00:00+00:00')
        and DBT_INTERNAL_TARGET.date < to_timestamp_tz('2024-12-03 00:00:00+00:00')
        )
    
  • 一時VIEWからINSERT
    insert into XXX.WORKSPACE.enriched_transaction ("ITEM_ID", "DATE", "SALES", "QUANTITY", "ITEM_NAME")
        (
            select "ITEM_ID", "DATE", "SALES", "QUANTITY", "ITEM_NAME"
            from XXX.WORKSPACE.enriched_transaction__dbt_tmp_20241202
        )
    
  • 一時VIEWの削除
    drop view if exists XXX.WORKSPACE.enriched_transaction__dbt_tmp_20241202 cascade
    

特に推したいポイントとしては、
INSERT用のデータを作成するクエリの

(select * from XXX.workspace.transaction where date >= '2024-12-02 00:00:00+00:00' and date < '2024-12-03 00:00:00+00:00')
    )

やレコードを削除するクエリの

where (
        DBT_INTERNAL_TARGET.date >= to_timestamp_tz('2024-12-02 00:00:00+00:00')
        and DBT_INTERNAL_TARGET.date < to_timestamp_tz('2024-12-03 00:00:00+00:00')
        )

あたりの条件は、モデルのSQLには一切書いていない点です。

enriched_transaction.sql
t as (
    select
        *
    from
        {{ source('workspace', 'transaction') }}
),

SQLがスッキリして感動しました。

2回目以降の実行

$ dbt build

  • 初回実行以降の実行では、実行日とその1日前を対象としたバッチが実行されます。
    • 2024-12-11に実行すると、2024-12-10と2024-12-11分のバッチが実行される。
  • configのlookback の値で対象日数は決まります。最新日から何日前までを対象にするか、ということで、1(デフォルト値)なら実行日とその1日前(2024-12-11と2024-12-10)が更新対象です。

並列実行と直列実行

並列実行と直列実行を試してみます。

特に何も設定しない場合は並列実行になり、実行順序がバラバラになります。(ただし、対象期間の最初の日と最後の日の実行タイミングは必ず最初と最後になります。)
※一度tableをdropしてから実行

↑初日(2024-12-01)と最後(2024-12-12)以外はバラバラに完了していることがわかります。

直列実行にするには concurrent_batches: true を設定します。すると、以下のように実行順序が固定されます。

※一度tableをdropしてから実行

↑順次良く実行されていることがわかります

backfill

$ dbt run --event-time-start '2024-12-05' --event-time-end '2024-12-10'
backfillを行うことで、過去のデータを再度入れ直すことができます。

パーティションはどうなる?

microbatchを使用した際、パーティションがどのように切られるのかはまだ正確に理解しきれていません。データ量やデータ元としているテーブルの定義などでも変わるのかもしれず、このあたりは今後もう少し検証しながら理解を深めていきたいと思います。

ただ、時系列で大きなサイズのテーブルから作成したデータモデルも時系列で大きなサイズになりがちなので、cluster_byを付けてpruningが適切に効くような工夫は必要になりそうです。

microbatchが使えるケースと使いづらいケース

実際に試してみて、サイズの大きな時系列データの管理においてはとても便利だと思いました。

ただし、時系列データであっても、更新処理の「実行順序」が重要な場合、「使えない」とはまではいかずとも注意が必要です。例えば、テーブル自身のカラムを使って何らかの計算や処理を行う(例えば「前日売上との比率」「期間中の累積売上」など)場合、並列実行をすると値がおかしくなってしまいます。
configで直列実行を強制することで回避できるかもしれません。

また、Airflowなどで運用する場合、 dbt build だと実行タイミングでの日付(しかもUTC)をベースにした処理となるため、例えばある日に失敗した処理を翌日に処理すると対象期間が異なる問題が出てしまいます。この場合、dbt build ではなくdbt run --event-time-start ...を使う方が運用はしやすそうだと思いました。

まとめ

microbatch incremental modelを使うことで、

  • クエリがシンプルになる
  • backfillやretryが簡単にできる
  • 並列実行で実行時間を短縮できる

というメリットがありそうです。弊社でも使っていきたいと思います。
皆さんもぜひ試してみてください!

参考

Finatext Tech Blog

Discussion