🍊

dbt モデルに監査カラムを追加する

に公開

はじめに

データエンジニア界隈ではおなじみの GitLab 社 Data Team Handbook
https://handbook.gitlab.com/handbook/enterprise-data/

さらに、同社が公開している dbt プロジェクトのリポジトリ
https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt

を参考にしている方も多いのではないでしょうか。

今回はその中でも、全モデルに適用されている dbt_audit マクロ に着目します。このマクロを利用し、監査カラムを自動付与しながら BigQuery でインクリメンタル更新を行う手順を検証します。

dbt_audit マクロ概要

dbt_auditモデルに監査用カラムを追加するマクロ です。
モデル内の 最後のステートメントが「CTE からの SELECT *」であること を前提とします。
最終的な SQL は SELECT * のままですが、6つの監査カラム が付与されます。
このマクロ呼び出し後に ORDER BYGROUP BY といった追加の SQL DML を記述することも可能です。

https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/macros/utils/dbt_audit.sql

監査カラム付与例

dbt_audit マクロの必要性

1. 監査ログの一元化・変更履歴の追跡

  • どのモデルが いつ、誰によって更新されたか を自動記録
  • インクリメンタルモデルでは 初回作成時刻と更新時刻 を保持
  • これにより、データのライフサイクルを可視化でき、更新履歴を簡単に追跡できる
  • チーム運用時に、モデル変更によるデータ影響範囲も把握しやすい

2. モデル作成・更新とテーブル更新の管理

  • クエリ(dbtモデル)の作成・更新と、生成されるテーブルの更新を 紐づけて管理可能
  • ⭐️ 増分更新時でも、途中でモデルが変更されても 旧モデルと編集後モデルによる更新レコードの区別が容易

追加される監査カラム

  1. created_by
    • 説明: モデル作成者を記録するカラム。(手動記入)
      マクロ呼び出し時の引数で渡した値が設定されます。
    • 型: VARCHAR
    • 更新タイミング: 初回作成時のみ設定、インクリメンタル更新時は変更されません。
  2. updated_by
    • 説明: モデル更新者を記録するカラム。(手動記入)
      マクロ呼び出し時の引数で渡した値が設定されます。
    • 型: VARCHAR
    • 更新タイミング: モデル実行ごとに更新されます。
  3. model_created_date
    • 説明: モデル作成日を固定で保持するカラム。(手動記入)
      マクロ呼び出し時の created_date が設定されます。
    • 型: DATE
    • 更新タイミング: 初回作成時に設定、以後保持されます。
  4. model_updated_date
    • 説明: モデル更新日を記録するカラム。(手動記入)
      マクロ呼び出し時の updated_date が設定されます。
    • 型: DATE
    • 更新タイミング: モデル実行ごとに更新されます。
  5. dbt_updated_at
    • 説明: 実際に dbt がモデルを更新した日時を記録するカラム。(自動付与)
      CURRENT_TIMESTAMP() が設定されます。
    • 型: TIMESTAMP
    • 更新タイミング: モデル実行ごとに更新されます。
  6. dbt_created_at
    • 説明: モデルの初回作成時刻を記録するカラム。(自動付与)
    • 型: TIMESTAMP
    • 設定ロジック:
      • incremental モデルかつ FULL_REFRESH でない場合
        • 既存テーブルがある → MIN(dbt_created_at)CURRENT_TIMESTAMP() の小さい方
        • 既存テーブルがない → CURRENT_TIMESTAMP()
      • FULL_REFRESH / non-incremental の場合 → CURRENT_TIMESTAMP()

日付計算のポイント

テーブル作成時刻インクリメンタルモデルでデータが挿入された時刻 の2つを内部的に計算して格納します。

使用例

WITH my_cte AS (
  ...
)

{{ dbt_audit(
    cte_ref="my_cte",
    created_by="@gitlab_user1",
    updated_by="@gitlab_user2",
    created_date="2019-02-12",
    updated_date="2020-08-20"
) }}
ORDER BY updated_at

処理フロー

動作確認ログ

検証用テーブル作成

検証用テーブルを作成します。

CREATE OR REPLACE TABLE work.user_name_history
PARTITION BY DATE(created_at) AS
SELECT *
FROM UNNEST([
  STRUCT(1 AS id, '佐藤太郎' AS user_name, DATETIME '2025-08-11 09:00:00' AS created_at),
  STRUCT(2, '鈴木花子', DATETIME '2025-08-11 11:30:00'),
  STRUCT(3, '田中一郎', DATETIME '2025-08-12 14:15:00'),
  STRUCT(4, '佐藤太郎', DATETIME '2025-08-12 08:45:00')
]);

データ確認

select * from work.user_name_history order by 1

dbt マクロ定義

元々のマクロは、Snowflake用のクエリですが、BigQueryで動くように変えてます。

macros/dbt_audit.sql
{%- macro dbt_audit(cte_ref, created_by, updated_by, created_date, updated_date) -%}

    select
        *,
        cast('{{ created_by }}' as string) as created_by,
        cast('{{ updated_by }}' as string) as updated_by,
        cast('{{ created_date }}' as date) as model_created_date,
        cast('{{ updated_date }}' as date) as model_updated_date,
        current_timestamp() as dbt_updated_at,

        {% if execute %}

            {% if not flags.FULL_REFRESH and config.get(
                "materialized"
            ) == "incremental" %}

                {%- set source_relation = adapter.get_relation(
                    database=target.database,
                    schema=this.schema,
                    identifier=this.table,
                ) -%}

                {% if source_relation is not none %}

                    {% set min_created_date %}
                    SELECT
                        LEAST(MIN(dbt_created_at), CURRENT_TIMESTAMP()) AS min_ts
                    FROM {{ this }}
                    {% endset %}

                    {% set results = run_query(min_created_date) %}

                    cast(
                        '{{ results.columns[0].values()[0] }}' as timestamp
                    ) as dbt_created_at

                {% else %} current_timestamp() as dbt_created_at

                {% endif %}

            {% else %} current_timestamp() as dbt_created_at

            {% endif %}
        {% endif %}

    from {{ cte_ref }}

{%- endmacro -%}

dbt モデル定義

insert_overwrite 戦略でパーティションを日単位で更新するモデルを作成します。
on_schema_change="sync_all_columns により監査カラム追加後も列同期を保証

user_name_history_incremental.sql
{{
    config(
        materialized="incremental",
        incremental_strategy="insert_overwrite",
        on_schema_change="sync_all_columns",
        partition_by={
            "field": "created_at",
            "data_type": "datetime",
            "granularity": "day",
            "copy_partitions": true,
        },
    )
}}

with
    final as (
        select *
        from work.user_name_history
        {% if is_incremental() %}
            where date(created_at) > (select max(date(created_at)) from {{ this }})
        {% endif %}
    )

    {{
        dbt_audit(
            cte_ref="final",
            created_by="@yuichi",
            updated_by="@yuichi",
            created_date="2025-09-10",
            updated_date="2025-09-10",
        )
    }}

実行結果

1. 初回 dbt build 実行

  • モデルが初めて実行されたタイミングで、ソーステーブルの全データが BigQuery にロードされ、同時に 監査カラムが自動付与 されます。
  • 監査カラムの値はマクロ呼び出し時の引数や CURRENT_TIMESTAMP() に基づいて設定されます。
dbt build --select user_name_history_incremental

データ確認

select * from work.user_name_history_incremental order by 1

  • created_by / updated_by / model_created_date / model_updated_date はマクロ引数から設定
  • dbt_created_at / dbt_updated_at は実行時のタイムスタンプで設定

2. ソーステーブルに新レコードを追加

  • 新しいデータをソースに追加して、インクリメンタル更新を確認します。
INSERT INTO work.user_name_history
VALUES (5, '鈴木花子', DATETIME '2025-08-13 00:20:00');

3. dbt モデル編集

  • 新しい列(name_length)を追加し、更新者(updated_by="@taro")・更新日(updated_date="2025-09-11")を新しい値に変更しています。モデルを編集した状態で再度 dbt build を実行します。
  • インクリメンタル更新により、新しいデータのみが追加され、既存データの created_by / model_created_date保持 されます。
  • updated_by / model_updated_date / dbt_updated_at更新 されます。
  • dbt_created_at は既存テーブルと現在時刻を比較して、最小値を保持します。
user_name_history_incremental.sql
{{
    config(
        materialized="incremental",
        incremental_strategy="insert_overwrite",
        on_schema_change="sync_all_columns",
        partition_by={
            "field": "created_at",
            "data_type": "datetime",
            "granularity": "day",
            "copy_partitions": true,
        },
    )
}}

with
    final as (
        select *, char_length(user_name) as name_length -- カラム追加
        from work.user_name_history
        {% if is_incremental() %}
            where date(created_at) > (select max(date(created_at)) from {{ this }})
        {% endif %}
    )

    {{
        dbt_audit(
            cte_ref="final",
            created_by="@yuichi",
            updated_by="@taro", -- 更新
            created_date="2025-09-10",
            updated_date="2025-09-11", -- 更新
        )
    }}

4. dbt build 再実行

dbt build --select user_name_history_incremental

データ確認

select * from work.user_name_history_incremental order by 1

  • 新規追加行は監査カラム付きで追加されます
  • 既存行は created_by / model_created_date を保持しつつ、更新カラムのみ更新
  • これにより、誰がいつ作成・更新したか が常に追跡可能

おわりに

本記事では、GitLab 社の dbt_audit マクロを利用して、BigQuery でインクリメンタルモデルを更新する際に 監査カラムを自動付与する方法 を紹介しました。

チーム開発においては、モデル運用の 透明性と信頼性 を高める有効な仕組みだと感じます。
GitLab 社のプロジェクトでは設定されていませんでした?が、dbt_audit の適用有無や、マクロ引数に正しい値が設定されているかを CI でチェックする仕組み があれば、さらに安心して運用できるでしょう。

今後、実務で導入する機会があれば、検討したいと思います。

参考

https://handbook.gitlab.com/handbook/enterprise-data/
https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt
https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/macros/utils/dbt_audit.sql
https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change

Discussion