🐡

dbt で materialization マクロを魔改造する

2025/03/31に公開

MeDiCUでは、医療機関から提供されるバイタル等の臨床データをクレンジングし、研究への活用を推進する事業を行っています。
データは事業のまさにコアであり、BigQuery + dbt を用いてパイプラインを整備していますが、データ量やモデル定義の変更頻度が高まるにつれ、dbt再実行によるBigQueryのコストが加速度的に増加しているという課題が顕在化してきました。
無駄なクエリの実行を防いでコストを抑えるために、materialization を魔改造したので、この記事ではそれについて紹介します。

課題そのものは MeDiCU の事業の特殊性からくるもので、あまり一般的な課題ではないと思いますが、

  • マクロは柔軟な機構だが、それでもマクロでは解決できない問題がある
  • そのような問題を解決する方法として、materialization をカスタマイズするという手段がある
    という2点が面白いと思ったのでまとめてみました。

課題: データが変わらない無駄なモデル更新を抑制したい

まず前提として、dbt には「特定のモデル定義を変更した際、そのモデル自身と、それに依存する子孫モデルたちのみを再実行する」といった機能が内包されています。
この機能を単に使えば十分にコストが抑えられるというケースも多いでしょう。

一方、MeDiCU での dbt の利用状況としては、「モデル定義に変更があっても、最終的なデータは変化しない」ことが期待される変更が頻発していました。
リファクタリングや、共通化されたUDF定義の変更などが主な要因です。
このようなケースにおいて、dbt の builtin 機構だけを使ってデータを更新すると、変更を行ったモデルと、そのモデルを祖先に持つすべてのモデルたちが再実行されます。

本来モデルを再実行するべき条件はしたのいずれかです。

  • モデルのSQL変更が加わった
  • モデルが参照しているテーブルの内容に変更が加わった

実際に変更を行ったモデルそのものについて再実行されるのは構わない(むしろ検証のために再実行すべき)のですが、再実行の結果データに変化が起きなかった場合であっても、子孫モデルたちが巻き込みで再実行されてしまいます。

MeDiCU では、データとその整形がコア価値なので、リファクタリング頻度は高くありたいのですが、データ量も膨大であり、無駄な再実行が乱発すると BigQuery のコストが馬鹿にならない金額になってしまいます。
また、通常の事業会社で取り扱うデータと比較して生データの更新頻度が低いという MeDiCU 特有の事情があり、この課題のインパクトが相対的に非常に大きくなっています。

このあたりの詳細はこちらの記事にも記載されています。
https://zenn.dev/medicu/articles/4d8234762c0696#今後の技術戦略

そこで、どうにかして「データが変化しているか」を厳密に判定し、変化がない場合は子孫モデルの再実行を行わない、ということを実現する必要がありました。

方針

方針としては、以下のような戦略を実装することにしました。

  • モデルごとに「最終更新時刻(最後に実際にデータが変化した時刻)」をメタデータとして保存する
    • モデルを更新するたびに、現在のデータの内容をハッシュ化したものをメタデータとしてどこかに保存する
    • 更新前と更新後のハッシュ値を比較し、実際にデータに変化があったかを判別できるようにする
  • モデルのクエリが変わっている場合は、更新を抑制しない
  • モデルの実行時に、そのモデルの親モデルたちすべてについて、 モデルの最終更新時刻 > 親モデルの最終更新時刻 が成り立つ場合、子孫モデルの更新を抑制する
    • 親モデルのデータが変わっていないなら、子モデルを更新してもデータが変化しないことが期待されるため

この問題はマクロで解けない

メタデータの計算や保存は、 run-on-end などに hook を差し込むことで実現可能です。
あとは、実行の抑制を実現できれば良さそうです。
実行を抑制したい条件に合致した場合は何もしないようなモデル定義ができれば理想的です。

dbt では、このように動的にモデル定義を書き換える必要がある場合に、マクロを使うことができます。
たとえば以下のような実装がパッと思いつきます。incremental な更新を abuse して、無理やり「全件リフレッシュ or nothing」 を実現する方法です。

with t as (
  select some_computation from some_source_table
)
{% if is_incremental() %}
    {% if condition %}
        -- 条件を満たした場合: 全件リフレッシュとして全データを返す
        select * from t
    {% else %}
        -- 条件を満たさない場合: 空の結果を返す(何も更新しない)
        select * from {{ this }} where false
    {% endif %}
{% else %}
    -- 初回実行時(非インクリメンタル実行)は全件取得
    select * from t
{% endif %}

この書き方だと、「全件リフレッシュ or nothing」は実現できます。何も更新しない場合には Read や Compute の無駄な消費も起きません。

が、実は今回の問題においては、このようなマクロでは問題が解決できません。
さきのマクロでは、 condition に先述した方針の中の

モデルの実行時に、そのモデルの親モデルたちすべてについて、 モデルの最終更新時刻 > 親モデルの最終更新時刻 が成り立つ場合

に相当する boolean 値が入っている想定ですが、実はこれを満たすような値をマクロで計算することはできません。

dbt において、マクロは実行するモデルのビルド時に展開されます。
そして、dbt は複数のモデルが実行される際、実行対象となるモデルすべてをビルドし、その後1モデルずつ走らせる、という動きをします。
condition を正しく要求を満たすように計算するには、親モデルの実行後 = 自分自身の実行直前にマクロが評価される必要があります。

このように、dbt のマクロは相当な柔軟性を持っているのですが、問題設定によっては利用できないことがあります。

Custom Materialization

マクロの展開がビルド時(!= 実行直前)に行われるため、通常のマクロでは実行直前にしか計算できない値をもとにした動的なモデルの変換が実現できないことがわかりました。

これを実現する方法が、Custom Materialization です。

通常の dbt 利用において materialization とは、 viewtable, incremental などを指定する部分です。
そのモデルをどのようにデータウェアハウスに実体化するか、を規定する設定項目で、たとえば view を bigquery-adapter で使った場合、BigQuery に CREATE VIEW で view が作成されます。

dbt では、materialization は事前に定義された固定値だけでなく、独自の materialization 方法を追加することができます。 (Custom Materialization)
実装としては単純で、 macros/ 以下に materialization マクロを記述するだけです。

{% materialization custom_name, "adapter-name" %}
  ...
{% endmaterialization %}

実は、我々が普段よく使う materialization も同じ方法で実装されています。
たとえば BigQuery の table であれば、 https://github.com/dbt-labs/dbt-bigquery/blob/main/dbt/include/bigquery/macros/materializations/table.sql を見れば実装が確認できます。
いろいろと細かいサポートが入っているのと更にマクロを呼んでいるので複雑に見えますが、コアとしては要するに CREATE OR REPLACE TABLE ... を叩いている感じです。
下記がマクロ定義を抜粋しつつコメントで説明を入れたものです。

-- materialization="table" を指定したときに実行されるクエリを構築するマクロ
{% materialization table, adapter='bigquery', supported_languages=['sql', 'python']-%}
  ......

  -- pre_hook に登録したマクロを実行する
  {{ run_hooks(pre_hooks) }}

  -- table でないものとしてすでに同名のなにかが存在していた場合のお掃除
  {%- if exists_not_as_table -%}
      {{ adapter.drop_relation(old_relation) }}
  {%- endif -%}

  ......

  -- 本丸。compiled_code にユーザーが定義したクエリが格納されている。
  -- create_table_as の内部では create or replace table クエリを構築して実行している。
  {%- call statement('main', language=language) -%}
    {{ create_table_as(False, target_relation, compiled_code, language) }}
  {%- endcall -%}

  -- post_hooks に登録したマクロを実行する
  {{ run_hooks(post_hooks) }}

  ......

  -- 権限を設定する
  {% do apply_grants(target_relation, grant_config, should_revoke) %}
  -- BigQuery のドキュメントに情報を反映
  {% do persist_docs(target_relation, model) %}

  {{ return({'relations': [target_relation]}) }}
{% endmaterialization %}

見ていただくと分かる通り、このレイヤーをいじってしまえば、ほとんどありとあらゆることが実現できます。
hooks を実行するタイミングも自由自在だし、任意のクエリを任意の回数実行することもできます。クエリの実行結果をもとに分岐したりループしたりすることも可能です。adapter 経由で BigQuery の様々な情報にアクセス可能です。

pre_hooks / post_hooks が実行されていることからも分かる通り、ここでのマクロ評価は「真のモデル実行時」に行われます。というか、このマクロの評価こそが「モデルの実行」です。
したがって、今回の我々の問題であった「親モデルの実行直後に動的にモデルの定義を組み替えたい」は、このレイヤーを書き換えることで実現できます。

ここまで来てしまえば実装そのものは自明なので割愛しますが、ざっくりイメージとしては

-- materialization="table" を指定したときに実行されるクエリを構築するマクロ
{% materialization table, adapter='bigquery', supported_languages=['sql', 'python']-%}
  {%- set condition = メタデータを読み取って再実行が必要かを判定する -%}
  {%- call statement('main', language=language) -%}
    {%- if condition -%}
      {{ create_table_as(False, target_relation, compiled_code, language) }}
    {%- else -%}
      select 1  -- ここでなにかしらのクエリを実行しないと dbt 側でエラーになる
    {%- endif -%}
  {%- endcall -%}
{% endmaterialization %}

というようなことをするだけです。
メタデータを格納する場所はいくつか考えられますが、我々の場合は単純に BigQuery 上にメタデータを格納するためのテーブルを用意しました。materialization の末尾付近でメタデータテーブルに「最終更新時刻」などを insert し、condition はこのメタデータテーブルへのクエリで計算しています。

上記の例では materialization="table" として定義していますが、ここには任意の名前が使えるので、通常の table も残したい場合は別名の materialization として登録しておくとよいでしょう。
我々の場合は、最初はスモールスタートとして別名で登録していましたが、安定稼働を確認できたタイミングで table の定義自体を上書きする形にしてしまいました。

Disclaimer

この方法は正直にいえばかなりピーキーで不安定な方法です。
materialization をカスタマイズするのは adapter author であることが想定されていると思います。
我々の採用した方法は、BigQuery Adapter を使いながら materialization 部分だけを override するというもので、BigQuery Adapter の機構が変わるなどした場合に急に動かなくなるリスクをはらんでいます。

また、materialization をカスタマイズしたとしても結局のところ jinja ですから、実行タイミングが異なるだけでできること自体は普通の macro とかわりません。
さきほどは「ほとんどありとあらゆることができる」と書きましたが、あくまでそれは jinja の表現力や dbt が提供するマクロ解決の環境の範囲内でのことです。
その範囲を超える処理を記述したい場合には、macro からは adapter インスタンスやそこから macro 用に公開されているメソッドが利用できるので、adapter 自体を自ら書くことで対応できます。

今回は、要件の特殊性から、「最悪ふつうの table にもどせばいい」というアグレッシブさが許容できると判断しましたが、メンテナンス性やニアリアルタイムなデータの信頼性が求められる要件の場合には採用しづらい方法かもしれません。

が、macro以上の選択肢を知っておくとかなり世界が広がると思います。
また、これにたどり着くまでにドキュメントや dbt の実装を読んだところ、意外と読めるなという気持ちにもなってきました。
どういうところがカスタマイズ可能なのか、どういうカスタマイズは危険そうなのか、がわかってきて、より dbt を使いこなせそうな気持ちになれたので、おすすめです。

MeDiCU

Discussion