🍊

dbt で参照しているテーブルのデータチェック行う

2024/07/15に公開

はじめに

サブタイトルは、「TROCCOのデータチェックをdbtでに置き換えする」です。
dbt Coreでは、dbt testを使用してモデルのテストが可能ですが、参照している読み取り先データのデータチェックをdbt testでは実装できないと理解しています。
TROCCOからdbt Core導入(dbt連携)の段階で、TROCCOのデータチェックをdbt testに置き換えできなかったので、dbt のマクロを実装しました。

ワークフローデータチェックとは

ワークフロー内に設定できるタスクのひとつです。
DWHに対するクエリ結果とエラー条件を突合し、条件に合致する場合に該当のタスクをエラーとする機能です。

https://documents.trocco.io/docs/workflow-data-check

  • 結構自由度が高く、クエリ結果によって、ワークフローをエラーで停止させることができます。
  • 素朴にクエリ書いて、エラー条件設定。

置き換えたいTROCCOのワークフロー

  • 読み込み先のデータチェックを行い、該当のデータが無かったらワークフローを止める
  • 読み込み先にデータ更新されてるかのタイミングがわからないので定期的に確認する
    • TROCCOのスケジュール設定で1日に何度も実行している
  • incremental_strategy='insert_overwrite'使えという話もあるが、その前にワークフローを止めたい。

▼ START → データチェック → データマート

実装

  • モデル作成(dbt build)前にデータチェックする用のマクロを実行して、エラー条件に合致したらデータマート作成(dbt build)を止める。

環境

  • dbt Core v1.8

モデル

  • モデル作成(dbt build)前にデータチェックする用のマクロを実行する実装。
    • モデルの作成前や作成後にクエリを実行できるhooksを利用します。
  • hooksには、下記の種類があります。
    • pre-hook : modelの作成直前に任意のクエリを実行できる
    • post-hook : modelの作成直後に任意のクエリを実行できる
{{
    config(
        pre_hook=[
          "{{ data_check_today_data() }}"
        ]
    )
}}

SELECT
~したい処理~
FROM {{ source("data", "event") }} -- 参照先データ
  • pre-hookでmodelの作成直前に実行するマクロを指定
    • 複数設定可能です。

https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook
https://dev.classmethod.jp/articles/dbt-hooks/

マクロ

  • データチェックのクエリと、その結果どの条件でエラーダウンするかマクロで設定
    • 同様の書き方でもう一つマクロを用意し、データ集計先のテーブルをチェックして、今日分のデータ取り込みがすでに完了していたらエラーダウンをする実装もできると思います。
      • その場合、おそらく参照が循環するからref使えないので、fromはテーブル直接指定書きする
  • ifのエラー条件は、仮設定しているので適宜修正してください
{% macro data_check_today_data() %}
    {% set check_query %}
        SELECT COUNT(*) AS event_count
        FROM {{ source("data", "event") }} -- 参照先データ
        WHERE _table_suffix = FORMAT_DATE("%Y%m%d", CURRENT_DATE("Asia/Tokyo"))
    {% endset %}

    {% set results = run_query(check_query) %}
    
    {% if execute %}
        {% set event_count = results.columns[0].values()[0] %}
        {% if event_count <= 10 %}
            {{ exceptions.raise_compiler_error('エラーメッセージ1') }}
        {% elif event_count is none %}
            {{ exceptions.raise_compiler_error('エラーメッセージ2') }}
        {% else %}
            {% do log("データチェックOK : " ~ event_count ~ "件", info=True) %}
        {% endif %}
    {% endif %}
{% endmacro %}

set check_query

  • check_queryでデータチェック用のクエリを記載します。
  • 参照先のデータチェック以外にも使用できます。
    {% set check_query %}
        SELECT COUNT(*) AS event_count
        FROM {{ source("data", "event") }} -- 参照先データ
        WHERE _table_suffix = FORMAT_DATE("%Y%m%d", CURRENT_DATE("Asia/Tokyo") - 1)
    {% endset %}

if execute

if executeは、dbtのマクロ内で実際にクエリを実行するかどうかを制御するために使用されます。これは、dbtのコンパイルフェーズと実行フェーズを区別するために使われ、次のような意味があります。

  • コンパイルフェーズ: dbtは全てのマクロとモデルをコンパイルし、SQLクエリを生成しますが、クエリ自体は実行しません。このフェーズではexecuteFalseになります。
  • 実行フェーズ: dbtはコンパイルされたSQLクエリを実際にデータベースで実行します。このフェーズではexecuteTrueになります。

if executeは、例えばrun_queryの結果を使用したログの記録やエラーの発生など、実行フェーズのみに実行したい処理に使用します。これにより、コンパイルフェーズ中にこれらの処理が実行される問題を回避できます。

{% if execute %}
~処理~
{% endif %}

raise_compiler_error

  • exceptions.raise_compiler_errorは、指定されたメッセージでコンパイラエラーを発生させます。
  • exceptions.warnという指定されたメッセージでコンパイラの警告を発生させるのもある。
    コンパイラエラーは発生せず、成功扱いになるが、dbtに--warn-errorフラグが指定されている場合、この警告は例外に昇格する。
    • 今回は、使用せず。
  • exceptions.warnのサンプル
    {% macro some_macro(arg1, arg2) %}
        {{ log("Running some_macro: " ~ arg1 ~ ", " ~ arg2) }}
    {% endmacro %}

https://docs.getdbt.com/reference/dbt-jinja-functions/exceptions

log

  • logは、指定されたメッセージを表示させます。
  • 引数 info:
    • Falseの場合(デフォルト)、ログファイルに書き込む。
    • Trueの場合、ログファイルと標準出力の両方に書き込みます。
        {% if event_count <= 10 %}
            {{ exceptions.raise_compiler_error('エラーメッセージ1') }}
        {% elif event_count is none %}
            {{ exceptions.raise_compiler_error('エラーメッセージ2') }}
        {% else %}
            {% do log("データチェックOK : " ~ event_count ~ "件", info=True) %}
        {% endif %}

https://docs.getdbt.com/reference/dbt-jinja-functions/log

実現したフロー

旧フロー(TROCCO)

新フロー(dbt)

おわりに

dbtでどのように機能を実現したいかについて取り組む前に、dbt Jinja functionsの知識が足りていないことに気づきました。
今回、ChatGPTで無闇に質問して遠回りになってしまいました。
dbt Jinja functionsをざっと読み、理解を深めるためGitHubで他の人の実装を見る。その実装を元にChatGPTに質問して、理解を深めるという学習ループをとりあえず進めてみようかと思いました。
まだまだ知らない機能も多いのでもっと良い方法があるとかあれば教えて欲しいです。

参考

https://docs.getdbt.com/reference/dbt-jinja-functions

Discussion