📝

dbt macroのロジックを自動でテストしたい

2024/06/11に公開

モチベーション

dbtで随所で使われるマクロがあると思います。
generate_schema_name[1]などは色々な企業で該当するのでは?と勝手に思っています。
(ヒアリングはしていないのであくまで個人的な感想です)

このように広く使われるマクロでバグや意図しない変更がデプロイされると影響範囲が大きく、バックフィルやデータ修正に大きく時間が取られることになります。
またテストが入ってないことで、動作確認や検証、レビューにも時間がかかり積極的に触りたくないマクロになります。

これを解決するために自動テストを導入して安定性を高められないか?と考えて、実装してみたのでこれを共有します。

(dbtのマクロは変数が定義されていない場合にnot definedエラーを出さないでNoneで通る[2]など罠を踏んで痛い思いをしたのがきっかけです)

実装について

乗り越えるべき課題

  1. どうやって自動テストを実行するのか?
    • dbtの組み込みの機能には無さそう
      • データに対するテストはあるけど、macroの出力結果をassertする機能は無さそう
  2. run-operationを使ってassertすると良さそうだが、出力が標準出力に出ない
    • {{ return () }}{{ hoge }}だけ書いても標準出力に出てこない
  3. run-operationの実行が遅い

調べた限りではダメそうだと判断したのですが、何か良い方法があったら教えて欲しいです。

出来上がった構成

  1. dbtのマクロを呼び出すマクロを作成する(log_macro_results)
    • マクロの出力結果を一意に定める役割を持つ
  2. dbtのProgrammatic invocations[3]を使ってPythonからコマンドを実行できるラッパーを作成する
    • 汎用的に使えるように独自のparse機能を作っておく
  3. pytestのテストファイルを作成する

実装方法の詳細

テスト用マクロ

ここはelementaryさんのテストってどうなっているのかな?みたいなのを見てヒントを得ました。
elementaryさんのロジックをマルパクリするのが良さそうだなぁ...と思いつつ、必要最小限な部分だけ抜いて実装しました。

このmacroを経由して実行させることで、macroでコンパイルされた {{ hoge }}{{ return }}を標準出力に出しつつ、macro results: というプレフィックスが付いている場合は、マクロの結果とみなすことで次のinvocationのparseをやりやすくしています。

macro/log_macro_results.sql
{# 
    elementaryのログutilを参考にmacroの返り値を出力できるutilを作成
    see. https://github.com/elementary-data/dbt-data-reliability/blob/master/macros/utils/log_macro_results.sql 
#}
{% macro log_macro_results(macro_name, macro_args=none) %}
    {% if macro_args is none %} {% set macro_args = {} %} {% endif %}
    {% set macro = context[macro_name] %}
    {% set results = macro(**macro_args) %}
    {% do log("macro results: " ~ tojson(results), info=True) %}
{% endmacro %}

dbtのProgrammatic invocationsを使ったラッパー

何も考えずに dbtRunner().invoke()を実行するとテスト時間が結構長いです。
環境にもよるとは思いますが、筆者の場合は5つのテストケースで26秒ぐらいかかりました。

これを改善するために何が問題なのか検証したところ、初期化(特にmanifestのparse)に時間がかかっていそうなことが分かりました。
そのため、parseを初回だけ行うようにしてインスタンスをpytest全体で使い回すようにシングルトンで実装したところ26秒 -> 7秒くらいに改善しました。

またparse部分は以下の記事も参考にさせていただきました。いつもありがとうございます。
https://www.yasuhisay.info/entry/2024/05/17/024412

macro_integration_test/util.py
from dbt.cli.main import dbtRunner
from dbt.events.base_types import EventMsg
import json


class DbtTestClient:
    """dbtをPythonから呼び出すためのRunnerをラップしたクラス

    呼び出す時には、get_instance経由で使うように.
    get_instance経由でインスタンス生成をしないと実行速度が遅くなることに注意する.

    see. https://docs.getdbt.com/reference/programmatic-invocations
    """

    @classmethod
    def get_instance(cls):
        """既にインスタンスが生成済みなら、それを返す

        Clientの初期化処理がテストを行う上でボトルネックになる.
        これを回避するために、シングルトンを実装している.

        実装方法の参考
        see. https://qiita.com/ttsubo/items/c4af71ceba15b5b213f8
        """
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

    def __init__(self):
        self.results = []
        self.error_messages = []

        # 何度もparseするのを防ぐために初期化のタイミングでparseを一度行う
        res = dbtRunner().invoke(["parse"])
        manifest = res.result

        self.dbt_runner = dbtRunner(manifest=manifest)

    def run_macro(
        self,
        macro_name: str,
        macro_args: dict | None = None,
        extend_cli_args: list[str] | None = None,
    ) -> list[str]:
        """macroを実行して、その結果を取得するための関数

        内部はdbtのrun-operationコマンドをPythonで実行しているだけ. 以下のコマンドと等価.


        `dbt run-operation --log-level error log_macro_results --args {macro_name: $macro_name, macro_args: $macro_args} $extend_cli_args`

        Args:
            macro_name (str): 実行したいmacro
            macro_args (dict | None, optional): macroに渡す引数. Defaults to None.
            extend_cli_args (list[str] | None, optional): CLIの引数に追加する引数. Defaults to None.

        Raises:
            RuntimeError

        Returns:
            list[str]: parseした出力結果.(log_macro_resutls経由で出力されたlogのみ入っている)
        """

        if macro_args is None:
            macro_args = {}
        if extend_cli_args is None:
            extend_cli_args = []

        cli_args = [
            "--log-level",
            "error",  # 不要なログを出さないためにinfoは消す
            "run-operation",
            "log_macro_results",
            "--args",
            f"{{macro_name: {macro_name}, macro_args: {json.dumps(macro_args)}}}",
        ]

        cli_args += extend_cli_args

        result = []
        error = []

        def _macro_results_callback(event: EventMsg) -> None:
            """出力結果をparseするcallback

            本当はprivateなstaticメソッドなどにしてresultとerrorを返すようにしたいがdbt側の制約で難しい.

            Args:
                event (EventMsg): dbtの出力結果
            """

            # JinjaLogInfoかつlog_macro_resultsマクロの出力結果にはprefixが付いているので、
            # prefixを除いてreturn値のみ格納する
            result_log_name = "JinjaLogInfo"
            macro_results_prefix = "macro results: "
            if (
                event.info.name == result_log_name
                and event.data.msg.startswith(macro_results_prefix)
            ):
                result.append(event.data.msg.replace(macro_results_prefix, ""))

            # エラーハンドリングのparseの書き方は結構微妙かも(dbt-coreで想定しない使い方かも)
            # 特にエラーメッセージの情報はinfo.msgに入っているが、dbt-coreで型として認識されていない(そのためignoreをつけている)
            if event.info.level == "error":
                error.append(event.info.msg)  # type: ignore

        self.dbt_runner.callbacks = [_macro_results_callback]

        # run the command
        res = self.dbt_runner.invoke(cli_args)
        if not res.success:
            raise RuntimeError("\n".join(error))
        return result

Pytestでの呼び出し

使っているマクロは大分適当に作りました。
開発環境か、本番環境かを判断するようなマクロです。

macro/test_hoge.sql
{% macro test_hoge() -%}
    {%- set release_server = env_var("RELEASE_SERVER", "") -%}
    {% if release_server != "0" and release_server != "1" %}
        {% do exceptions.raise_compiler_error(
            "RELEASE_SERVER変数がセットされていないのは異常です."
        ) %}
    {% endif %}
    {{ return(release_server != "1") }}
{%- endmacro %}
macro_integration_test/test_hoge.py
import pytest
from pytest import MonkeyPatch
from macro_integration_test.util import DbtTestClient


@pytest.fixture()
def client() -> DbtTestClient:
    return DbtTestClient.get_instance()


@pytest.mark.parametrize(
    ("release_server", "expected"),
    [
        ("0", "true"),
        ("1", "false"),
    ],
)
def test_output(
    client: DbtTestClient,
    monkeypatch: MonkeyPatch,
    release_server: str,
    expected: str,
) -> None:
    monkeypatch.setenv("RELEASE_SERVER", release_server)
    result = client.run_macro(
        macro_name="test_hoge", extend_cli_args=["--target", "dev"]
    )
    assert (
        len(result) == 1
    )  # 何かしらの変更でlogが増えた場合には適切にテスト出来ていない可能性が高いのでチェック
    assert result[0] == expected


def test_raise_error_when_release_server_is_not_seted(
    client: DbtTestClient,
    monkeypatch: MonkeyPatch,
) -> None:
    monkeypatch.setenv("RELEASE_SERVER", "hoge")
    with pytest.raises(RuntimeError) as e:
        client.run_macro(
            macro_name="test_hoge", extend_cli_args=["--target", "dev"]
        )

    assert (
        e.value.args[0].find(
            "RELEASE_SERVER変数がセットされていないのは異常です."
        )
        != -1
    )

実行結果

./poetry run pytest macro_integration_test/test_hoge.py
============================================================= test session starts =============================================================
platform darwin -- Python 3.11.3, pytest-8.2.2, pluggy-1.5.0
rootdir: /Users/kashira/Documents/work/dbt
configfile: pyproject.toml
collected 3 items                                                                                                                             

macro_integration_test/test_hoge.py ...                                                                                                 [100%]

============================================================== 3 passed in 5.39s ==============================================================

感想

自動でテストが出来るようになったので、大分安心して影響範囲の大きい共通ロジックを触ることが出来るので良かったなと思っています。

自動テストとは関係ないですが、モデルのバリデーションも同じくinnvocationを使うとやりやすそうなのでどこかでやっていきたいなと思っています。

もっとこうしたら良さそうとか、良い実装方法があるなどあればぜひ教えてください!!

脚注
  1. https://docs.getdbt.com/docs/build/custom-schemas#understanding-custom-schemas ↩︎

  2. https://github.com/dbt-labs/dbt-core/issues/3320 ↩︎

  3. https://docs.getdbt.com/reference/programmatic-invocations ↩︎

Discussion