🛤️
TROCCO のワークフローデータチェックのクエリを dbt で管理する
はじめに
TROCCOにdbt Coreを導入(dbt連携)すると、「すべてのクエリをdbtで管理したい」という欲求が高まります。データマート定義は簡単に置き換えられますが、ワークフローの「データチェック」タスクをそのままdbtに移行するのは簡単ではありません。
そこで本記事では、ワークフローデータチェックと同等の機能を持つdbtマクロを作成し、dbtでデータチェックを管理する方法を紹介します。
ワークフローデータチェックとは
ワークフロー内に設定できるタスクのひとつです。DWHに対するクエリ結果とエラー条件を突合し、条件に合致する場合に該当のタスクをエラーとする機能です。

- 結構自由度が高く、クエリ結果によって、ワークフローをエラーで停止させることができます。
- 素朴にクエリ書いて、エラー条件設定ができます。
ワークフローデータチェックの設定項目
下記の設定項目でデータチェックを行います。
- データチェックするクエリの定義
- クエリ結果が1行1列の数値となるように、SELECT文のクエリを記述
- エラー条件を指定
- クエリ結果の定義
- 選択できるエラー条件指定
- 以上
- 以下
- より大きい
- より小さい
- 等しい
- 等しくない
- クエリ結果がNULLの場合に成功とする(ON/OFF)

dbtマクロによる再現
以下のマクロを用意し、dbt run-operation で呼び出すことで、ワークフローデータチェックと同じ動きをdbt上で実現します。
先ほどのワークフローデータチェックの設定項目と同様のオプションを設定できる dbt マクロになっていて、データチェックするクエリの定義は別途、データチェック用のモデルを作成する必要があります。
macros/data_check_with_conditions.sql
{% macro data_check_with_conditions(
model_name, threshold, condition, null_is_success=False
) %}
{#
model_name: チェック対象のモデル名(ref指定)、 1行1列の数値を返すクエリを設定してください。それ以外の場合にはエラーとなります。小数点は切り捨てられます。
threshold: 数値
condition: 比較条件("以上: >=", "以下: <=", "より大きい: >", "より小さい: <", "等しい: ==", "等しくない: !=")
null_is_success: クエリ結果がNULLの場合に成功とする
例: dbt run-operation data_check_with_conditions --args '{model_name: test_001, threshold: 1, condition: "<=" }'
#}
{% set model_ref = ref(model_name) %}
{% set check_query %}
select * from {{ model_ref }}
{% endset %}
{% set results = run_query(check_query) %}
{% if execute %}
{% set result_value = results.columns[0].values()[0] %}
{% if result_value is none %}
{% if null_is_success %}
{% do log(
"✅ タスク成功: クエリ結果がNULLですが、成功と見なします (モデル名: "
~ model_name
~ ")",
info=True,
) %}
{% else %}
{{
exceptions.raise_compiler_error(
"❌ タスク失敗: クエリ結果がNULLのため、エラーとします (モデル名: "
~ model_name
~ ")"
)
}}
{% endif %}
{% else %}
{% set result_value = result_value | int %}
{% set threshold = threshold | int %}
{% set operators = {
">=": {"label": "以上", "check": result_value >= threshold},
"<=": {"label": "以下", "check": result_value <= threshold},
">": {
"label": "より大きい",
"check": result_value > threshold,
},
"<": {
"label": "より小さい",
"check": result_value < threshold,
},
"==": {
"label": "に等しい",
"check": result_value == threshold,
},
"!=": {
"label": "に等しくない",
"check": result_value != threshold,
},
} %}
{% if condition in operators %}
{% set op = operators[condition] %}
{% if op.check %}
{{
exceptions.raise_compiler_error(
"❌ タスク失敗: "
~ "クエリ結果 "
~ result_value
~ " がエラー条件「"
~ threshold
~ " "
~ op.label
~ "」を満たしています (モデル名: "
~ model_name
~ ")"
)
}}
{% else %}
{% do log(
"✅ タスク成功: "
~ "クエリ結果 "
~ result_value
~ " がエラー条件「"
~ threshold
~ " "
~ op.label
~ "」を満たしません (モデル名: "
~ model_name
~ ")",
info=True,
) %}
{% endif %}
{% else %}
{{
exceptions.raise_compiler_error(
"❌ タスク失敗: 不正な条件が指定されました: "
~ condition
~ " (モデル名: "
~ model_name
~ ")"
)
}}
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}
モデル例:取り込み有無チェック
ビューでモデルを作成します
models/records_check/ga4_import_existence.sql
/* 昨日分のGA4イベントが取り込まれているか */
select count(*) as count
from {{ source("analytics_999999999", "events_*") }}
where _table_suffix = format_date("%Y%m%d", current_date("Asia/Tokyo") - 1)
実行手順
下記の実行コマンドをTROCCOのdbt連携でdbtジョブ設定を行います。
ワークフローで失敗通知設定を行うことで、様々なデータのモニタリングで使用できます。
- 対象モデルを一度ビルド(念の為)
dbt build --select ga4_import_existence
- マクロを実行
dbt run-operation data_check_with_conditions --args '{model_name: ga4_import_existence, threshold: 0, condition: "<=" }'
実行結果サンプル
成功
[INFO]: ------------------------------
[INFO]: running... dbt run-operation data_check_with_conditions --args \{model_name:\ ga4_import_existence,\ threshold:\ 0,\ condition:\ \"\<\=\"\ \}
[INFO]: 02:02:38 Running with dbt=1.9.10
[INFO]: 02:02:40 Registered adapter: bigquery=1.9.2
[INFO]: 02:02:45 ✅ タスク成功: クエリ結果 99999 がエラー条件「0 以下」を満たしません (モデル名: records_check__ga4_import_existence)
[INFO]: ------------------------------
失敗
[INFO]: ------------------------------
[INFO]: running... dbt run-operation data_check_with_conditions --args \{model_name:\ ga4_import_existence,\ threshold:\ 0,\ condition:\ \"\<\=\"\ \}
[INFO]: 01:01:56 Running with dbt=1.9.10
[INFO]: 01:01:58 Registered adapter: bigquery=1.9.2
[INFO]: 01:02:03 Encountered an error while running operation: Compilation Error in macro data_check_with_conditions (macros/data_check_with_conditions.sql)
[INFO]: ❌ タスク失敗: クエリ結果 0 がエラー条件「0 以下」を満たしています (モデル名: ga4_import_existence)
[INFO]:
[INFO]: > in macro data_check_with_conditions (macros/data_check_with_conditions.sql)
[INFO]: > called by macro data_check_with_conditions (macros/data_check_with_conditions.sql)
[INFO]: Deleting Containers...
おわりに
TROCCOワークフローのデータチェック機能をdbtマクロとして内製化することで、クエリ管理をdbtに一元化できます。「ワークフローはシンプルに、判定ロジックはdbtで管理」という運用が可能になります。
参考
Discussion