🛤️

TROCCO のワークフローデータチェックのクエリを dbt で管理する

に公開

はじめに

TROCCOにdbt Coreを導入(dbt連携)すると、「すべてのクエリをdbtで管理したい」という欲求が高まります。データマート定義は簡単に置き換えられますが、ワークフローの「データチェック」タスクをそのままdbtに移行するのは簡単ではありません。

そこで本記事では、ワークフローデータチェックと同等の機能を持つdbtマクロを作成し、dbtでデータチェックを管理する方法を紹介します。

ワークフローデータチェックとは

ワークフロー内に設定できるタスクのひとつです。DWHに対するクエリ結果とエラー条件を突合し、条件に合致する場合に該当のタスクをエラーとする機能です。

https://documents.trocco.io/docs/workflow-data-check

  • 結構自由度が高く、クエリ結果によって、ワークフローをエラーで停止させることができます。
  • 素朴にクエリ書いて、エラー条件設定ができます。

ワークフローデータチェックの設定項目

下記の設定項目でデータチェックを行います。

  • データチェックするクエリの定義
    • クエリ結果が1行1列の数値となるように、SELECT文のクエリを記述
  • エラー条件を指定
    • クエリ結果の定義
    • 選択できるエラー条件指定
      • 以上
      • 以下
      • より大きい
      • より小さい
      • 等しい
      • 等しくない
  • クエリ結果がNULLの場合に成功とする(ON/OFF)

dbtマクロによる再現

以下のマクロを用意し、dbt run-operation で呼び出すことで、ワークフローデータチェックと同じ動きをdbt上で実現します。
先ほどのワークフローデータチェックの設定項目と同様のオプションを設定できる dbt マクロになっていて、データチェックするクエリの定義は別途、データチェック用のモデルを作成する必要があります。

macros/data_check_with_conditions.sql
{% macro data_check_with_conditions(
    model_name, threshold, condition, null_is_success=False
) %}
    {#
        model_name: チェック対象のモデル名(ref指定)、 11列の数値を返すクエリを設定してください。それ以外の場合にはエラーとなります。小数点は切り捨てられます。 
        threshold: 数値
        condition: 比較条件("以上: >=", "以下: <=", "より大きい: >", "より小さい: <", "等しい: ==", "等しくない: !=")
        null_is_success: クエリ結果がNULLの場合に成功とする

        例: dbt run-operation data_check_with_conditions --args '{model_name: test_001, threshold: 1, condition: "<=" }'
    #}
    {% set model_ref = ref(model_name) %}
    {% set check_query %}
        select * from {{ model_ref }}
    {% endset %}

    {% set results = run_query(check_query) %}

    {% if execute %}
        {% set result_value = results.columns[0].values()[0] %}

        {% if result_value is none %}
            {% if null_is_success %}
                {% do log(
                    "✅ タスク成功: クエリ結果がNULLですが、成功と見なします (モデル名: "
                    ~ model_name
                    ~ ")",
                    info=True,
                ) %}
            {% else %}
                {{
                    exceptions.raise_compiler_error(
                        "❌ タスク失敗: クエリ結果がNULLのため、エラーとします (モデル名: "
                        ~ model_name
                        ~ ")"
                    )
                }}
            {% endif %}
        {% else %}
            {% set result_value = result_value | int %}
            {% set threshold = threshold | int %}

            {% set operators = {
                ">=": {"label": "以上", "check": result_value >= threshold},
                "<=": {"label": "以下", "check": result_value <= threshold},
                ">": {
                    "label": "より大きい",
                    "check": result_value > threshold,
                },
                "<": {
                    "label": "より小さい",
                    "check": result_value < threshold,
                },
                "==": {
                    "label": "に等しい",
                    "check": result_value == threshold,
                },
                "!=": {
                    "label": "に等しくない",
                    "check": result_value != threshold,
                },
            } %}

            {% if condition in operators %}
                {% set op = operators[condition] %}
                {% if op.check %}
                    {{
                        exceptions.raise_compiler_error(
                            "❌ タスク失敗: "
                            ~ "クエリ結果 "
                            ~ result_value
                            ~ " がエラー条件「"
                            ~ threshold
                            ~ " "
                            ~ op.label
                            ~ "」を満たしています (モデル名: "
                            ~ model_name
                            ~ ")"
                        )
                    }}
                {% else %}
                    {% do log(
                        "✅ タスク成功: "
                        ~ "クエリ結果 "
                        ~ result_value
                        ~ " がエラー条件「"
                        ~ threshold
                        ~ " "
                        ~ op.label
                        ~ "」を満たしません (モデル名: "
                        ~ model_name
                        ~ ")",
                        info=True,
                    ) %}
                {% endif %}
            {% else %}
                {{
                    exceptions.raise_compiler_error(
                        "❌ タスク失敗: 不正な条件が指定されました: "
                        ~ condition
                        ~ " (モデル名: "
                        ~ model_name
                        ~ ")"
                    )
                }}
            {% endif %}
        {% endif %}
    {% endif %}
{% endmacro %}

モデル例:取り込み有無チェック

ビューでモデルを作成します

models/records_check/ga4_import_existence.sql
/* 昨日分のGA4イベントが取り込まれているか */
select count(*) as count
from {{ source("analytics_999999999", "events_*") }}
where _table_suffix = format_date("%Y%m%d", current_date("Asia/Tokyo") - 1)

実行手順

下記の実行コマンドをTROCCOのdbt連携でdbtジョブ設定を行います。
ワークフローで失敗通知設定を行うことで、様々なデータのモニタリングで使用できます。
https://documents.trocco.io/docs/about-dbt-integration

  1. 対象モデルを一度ビルド(念の為)
dbt build --select ga4_import_existence
  1. マクロを実行
dbt run-operation data_check_with_conditions --args '{model_name: ga4_import_existence, threshold: 0, condition: "<=" }'

実行結果サンプル

成功

[INFO]: ------------------------------
[INFO]: running... dbt run-operation data_check_with_conditions --args \{model_name:\ ga4_import_existence,\ threshold:\ 0,\ condition:\ \"\<\=\"\ \}
[INFO]: 02:02:38  Running with dbt=1.9.10
[INFO]: 02:02:40  Registered adapter: bigquery=1.9.2
[INFO]: 02:02:45  ✅ タスク成功: クエリ結果 99999 がエラー条件「0 以下」を満たしません (モデル名: records_check__ga4_import_existence)
[INFO]: ------------------------------

失敗

[INFO]: ------------------------------
[INFO]: running... dbt run-operation data_check_with_conditions --args \{model_name:\ ga4_import_existence,\ threshold:\ 0,\ condition:\ \"\<\=\"\ \}
[INFO]: 01:01:56  Running with dbt=1.9.10
[INFO]: 01:01:58  Registered adapter: bigquery=1.9.2
[INFO]: 01:02:03  Encountered an error while running operation: Compilation Error in macro data_check_with_conditions (macros/data_check_with_conditions.sql)
[INFO]:   ❌ タスク失敗: クエリ結果 0 がエラー条件「0 以下」を満たしています (モデル名: ga4_import_existence)
[INFO]: 
[INFO]:   > in macro data_check_with_conditions (macros/data_check_with_conditions.sql)
[INFO]:   > called by macro data_check_with_conditions (macros/data_check_with_conditions.sql)
[INFO]: Deleting Containers...

おわりに

TROCCOワークフローのデータチェック機能をdbtマクロとして内製化することで、クエリ管理をdbtに一元化できます。「ワークフローはシンプルに、判定ロジックはdbtで管理」という運用が可能になります。

参考

https://documents.trocco.io/docs/about-dbt-integration
https://documents.trocco.io/docs/workflow-data-check
https://docs.getdbt.com/reference/dbt-jinja-functions

Discussion