dbt で参照しているテーブルのデータチェック行う
はじめに
サブタイトルは、「TROCCOのデータチェックをdbtでに置き換えする」です。
dbt Coreでは、dbt test
を使用してモデルのテストが可能ですが、参照している読み取り先データのデータチェックをdbt test
では実装できないと理解しています。
TROCCOからdbt Core導入(dbt連携)の段階で、TROCCOのデータチェックをdbt test
に置き換えできなかったので、dbt のマクロを実装しました。
ワークフローデータチェックとは
ワークフロー内に設定できるタスクのひとつです。
DWHに対するクエリ結果とエラー条件を突合し、条件に合致する場合に該当のタスクをエラーとする機能です。
- 結構自由度が高く、クエリ結果によって、ワークフローをエラーで停止させることができます。
- 素朴にクエリ書いて、エラー条件設定。
置き換えたいTROCCOのワークフロー
- 読み込み先のデータチェックを行い、該当のデータが無かったらワークフローを止める
- 読み込み先にデータ更新されてるかのタイミングがわからないので定期的に確認する
- TROCCOのスケジュール設定で1日に何度も実行している
-
incremental_strategy='insert_overwrite'
使えという話もあるが、その前にワークフローを止めたい。
▼ START → データチェック → データマート
実装
- モデル作成(
dbt build
)前にデータチェックする用のマクロを実行して、エラー条件に合致したらデータマート作成(dbt build
)を止める。
環境
- dbt Core v1.8
モデル
- モデル作成(
dbt build
)前にデータチェックする用のマクロを実行する実装。- モデルの作成前や作成後にクエリを実行できるhooksを利用します。
- hooksには、下記の種類があります。
- pre-hook : modelの作成直前に任意のクエリを実行できる
- post-hook : modelの作成直後に任意のクエリを実行できる
{{
config(
pre_hook=[
"{{ data_check_today_data() }}"
]
)
}}
SELECT
~したい処理~
FROM {{ source("data", "event") }} -- 参照先データ
- pre-hookでmodelの作成直前に実行するマクロを指定
- 複数設定可能です。
マクロ
- データチェックのクエリと、その結果どの条件でエラーダウンするかマクロで設定
- 同様の書き方でもう一つマクロを用意し、データ集計先のテーブルをチェックして、今日分のデータ取り込みがすでに完了していたらエラーダウンをする実装もできると思います。
- その場合、おそらく参照が循環するからref使えないので、fromはテーブル直接指定書きする
- 同様の書き方でもう一つマクロを用意し、データ集計先のテーブルをチェックして、今日分のデータ取り込みがすでに完了していたらエラーダウンをする実装もできると思います。
- ifのエラー条件は、仮設定しているので適宜修正してください
{% macro data_check_today_data() %}
{% set check_query %}
SELECT COUNT(*) AS event_count
FROM {{ source("data", "event") }} -- 参照先データ
WHERE _table_suffix = FORMAT_DATE("%Y%m%d", CURRENT_DATE("Asia/Tokyo"))
{% endset %}
{% set results = run_query(check_query) %}
{% if execute %}
{% set event_count = results.columns[0].values()[0] %}
{% if event_count <= 10 %}
{{ exceptions.raise_compiler_error('エラーメッセージ1') }}
{% elif event_count is none %}
{{ exceptions.raise_compiler_error('エラーメッセージ2') }}
{% else %}
{% do log("データチェックOK : " ~ event_count ~ "件", info=True) %}
{% endif %}
{% endif %}
{% endmacro %}
set check_query
-
check_query
でデータチェック用のクエリを記載します。 - 参照先のデータチェック以外にも使用できます。
{% set check_query %}
SELECT COUNT(*) AS event_count
FROM {{ source("data", "event") }} -- 参照先データ
WHERE _table_suffix = FORMAT_DATE("%Y%m%d", CURRENT_DATE("Asia/Tokyo") - 1)
{% endset %}
if execute
if execute
は、dbtのマクロ内で実際にクエリを実行するかどうかを制御するために使用されます。これは、dbtのコンパイルフェーズと実行フェーズを区別するために使われ、次のような意味があります。
- コンパイルフェーズ: dbtは全てのマクロとモデルをコンパイルし、SQLクエリを生成しますが、クエリ自体は実行しません。このフェーズでは
execute
はFalse
になります。 - 実行フェーズ: dbtはコンパイルされたSQLクエリを実際にデータベースで実行します。このフェーズでは
execute
はTrue
になります。
if execute
は、例えばrun_query
の結果を使用したログの記録やエラーの発生など、実行フェーズのみに実行したい処理に使用します。これにより、コンパイルフェーズ中にこれらの処理が実行される問題を回避できます。
{% if execute %}
~処理~
{% endif %}
raise_compiler_error
-
exceptions.raise_compiler_error
は、指定されたメッセージでコンパイラエラーを発生させます。 -
exceptions.warn
という指定されたメッセージでコンパイラの警告を発生させるのもある。
コンパイラエラーは発生せず、成功扱いになるが、dbtに--warn-errorフラグが指定されている場合、この警告は例外に昇格する。- 今回は、使用せず。
-
exceptions.warn
のサンプル
{% macro some_macro(arg1, arg2) %}
{{ log("Running some_macro: " ~ arg1 ~ ", " ~ arg2) }}
{% endmacro %}
log
-
log
は、指定されたメッセージを表示させます。 - 引数 info:
-
False
の場合(デフォルト)、ログファイルに書き込む。 -
True
の場合、ログファイルと標準出力の両方に書き込みます。
-
{% if event_count <= 10 %}
{{ exceptions.raise_compiler_error('エラーメッセージ1') }}
{% elif event_count is none %}
{{ exceptions.raise_compiler_error('エラーメッセージ2') }}
{% else %}
{% do log("データチェックOK : " ~ event_count ~ "件", info=True) %}
{% endif %}
実現したフロー
旧フロー(TROCCO)
新フロー(dbt)
おわりに
dbtでどのように機能を実現したいかについて取り組む前に、dbt Jinja functions
の知識が足りていないことに気づきました。
今回、ChatGPTで無闇に質問して遠回りになってしまいました。
dbt Jinja functions
をざっと読み、理解を深めるためGitHubで他の人の実装を見る。その実装を元にChatGPTに質問して、理解を深めるという学習ループをとりあえず進めてみようかと思いました。
まだまだ知らない機能も多いのでもっと良い方法があるとかあれば教えて欲しいです。
参考
Discussion