dbt macroのロジックを自動でテストしたい
モチベーション
dbtで随所で使われるマクロがあると思います。
generate_schema_name[1]などは色々な企業で該当するのでは?と勝手に思っています。
(ヒアリングはしていないのであくまで個人的な感想です)
このように広く使われるマクロでバグや意図しない変更がデプロイされると影響範囲が大きく、バックフィルやデータ修正に大きく時間が取られることになります。
またテストが入ってないことで、動作確認や検証、レビューにも時間がかかり積極的に触りたくないマクロになります。
これを解決するために自動テストを導入して安定性を高められないか?と考えて、実装してみたのでこれを共有します。
(dbtのマクロは変数が定義されていない場合にnot definedエラーを出さないでNoneで通る[2]など罠を踏んで痛い思いをしたのがきっかけです)
実装について
乗り越えるべき課題
- どうやって自動テストを実行するのか?
- dbtの組み込みの機能には無さそう
- データに対するテストはあるけど、macroの出力結果をassertする機能は無さそう
- dbtの組み込みの機能には無さそう
- run-operationを使ってassertすると良さそうだが、出力が標準出力に出ない
-
{{ return () }}
、{{ hoge }}
だけ書いても標準出力に出てこない
-
- run-operationの実行が遅い
調べた限りではダメそうだと判断したのですが、何か良い方法があったら教えて欲しいです。
出来上がった構成
- dbtのマクロを呼び出すマクロを作成する(log_macro_results)
- マクロの出力結果を一意に定める役割を持つ
- dbtのProgrammatic invocations[3]を使ってPythonからコマンドを実行できるラッパーを作成する
- 汎用的に使えるように独自のparse機能を作っておく
- pytestのテストファイルを作成する
実装方法の詳細
テスト用マクロ
ここはelementaryさんのテストってどうなっているのかな?みたいなのを見てヒントを得ました。
elementaryさんのロジックをマルパクリするのが良さそうだなぁ...と思いつつ、必要最小限な部分だけ抜いて実装しました。
このmacroを経由して実行させることで、macroでコンパイルされた {{ hoge }}
や {{ return }}
を標準出力に出しつつ、macro results:
というプレフィックスが付いている場合は、マクロの結果とみなすことで次のinvocationのparseをやりやすくしています。
{#
elementaryのログutilを参考にmacroの返り値を出力できるutilを作成
see. https://github.com/elementary-data/dbt-data-reliability/blob/master/macros/utils/log_macro_results.sql
#}
{% macro log_macro_results(macro_name, macro_args=none) %}
{% if macro_args is none %} {% set macro_args = {} %} {% endif %}
{% set macro = context[macro_name] %}
{% set results = macro(**macro_args) %}
{% do log("macro results: " ~ tojson(results), info=True) %}
{% endmacro %}
dbtのProgrammatic invocationsを使ったラッパー
何も考えずに dbtRunner().invoke()
を実行するとテスト時間が結構長いです。
環境にもよるとは思いますが、筆者の場合は5つのテストケースで26秒ぐらいかかりました。
これを改善するために何が問題なのか検証したところ、初期化(特にmanifestのparse)に時間がかかっていそうなことが分かりました。
そのため、parseを初回だけ行うようにしてインスタンスをpytest全体で使い回すようにシングルトンで実装したところ26秒 -> 7秒くらいに改善しました。
またparse部分は以下の記事も参考にさせていただきました。いつもありがとうございます。
from dbt.cli.main import dbtRunner
from dbt.events.base_types import EventMsg
import json
class DbtTestClient:
"""dbtをPythonから呼び出すためのRunnerをラップしたクラス
呼び出す時には、get_instance経由で使うように.
get_instance経由でインスタンス生成をしないと実行速度が遅くなることに注意する.
see. https://docs.getdbt.com/reference/programmatic-invocations
"""
@classmethod
def get_instance(cls):
"""既にインスタンスが生成済みなら、それを返す
Clientの初期化処理がテストを行う上でボトルネックになる.
これを回避するために、シングルトンを実装している.
実装方法の参考
see. https://qiita.com/ttsubo/items/c4af71ceba15b5b213f8
"""
if not hasattr(cls, "_instance"):
cls._instance = cls()
return cls._instance
def __init__(self):
self.results = []
self.error_messages = []
# 何度もparseするのを防ぐために初期化のタイミングでparseを一度行う
res = dbtRunner().invoke(["parse"])
manifest = res.result
self.dbt_runner = dbtRunner(manifest=manifest)
def run_macro(
self,
macro_name: str,
macro_args: dict | None = None,
extend_cli_args: list[str] | None = None,
) -> list[str]:
"""macroを実行して、その結果を取得するための関数
内部はdbtのrun-operationコマンドをPythonで実行しているだけ. 以下のコマンドと等価.
`dbt run-operation --log-level error log_macro_results --args {macro_name: $macro_name, macro_args: $macro_args} $extend_cli_args`
Args:
macro_name (str): 実行したいmacro
macro_args (dict | None, optional): macroに渡す引数. Defaults to None.
extend_cli_args (list[str] | None, optional): CLIの引数に追加する引数. Defaults to None.
Raises:
RuntimeError
Returns:
list[str]: parseした出力結果.(log_macro_resutls経由で出力されたlogのみ入っている)
"""
if macro_args is None:
macro_args = {}
if extend_cli_args is None:
extend_cli_args = []
cli_args = [
"--log-level",
"error", # 不要なログを出さないためにinfoは消す
"run-operation",
"log_macro_results",
"--args",
f"{{macro_name: {macro_name}, macro_args: {json.dumps(macro_args)}}}",
]
cli_args += extend_cli_args
result = []
error = []
def _macro_results_callback(event: EventMsg) -> None:
"""出力結果をparseするcallback
本当はprivateなstaticメソッドなどにしてresultとerrorを返すようにしたいがdbt側の制約で難しい.
Args:
event (EventMsg): dbtの出力結果
"""
# JinjaLogInfoかつlog_macro_resultsマクロの出力結果にはprefixが付いているので、
# prefixを除いてreturn値のみ格納する
result_log_name = "JinjaLogInfo"
macro_results_prefix = "macro results: "
if (
event.info.name == result_log_name
and event.data.msg.startswith(macro_results_prefix)
):
result.append(event.data.msg.replace(macro_results_prefix, ""))
# エラーハンドリングのparseの書き方は結構微妙かも(dbt-coreで想定しない使い方かも)
# 特にエラーメッセージの情報はinfo.msgに入っているが、dbt-coreで型として認識されていない(そのためignoreをつけている)
if event.info.level == "error":
error.append(event.info.msg) # type: ignore
self.dbt_runner.callbacks = [_macro_results_callback]
# run the command
res = self.dbt_runner.invoke(cli_args)
if not res.success:
raise RuntimeError("\n".join(error))
return result
Pytestでの呼び出し
使っているマクロは大分適当に作りました。
開発環境か、本番環境かを判断するようなマクロです。
{% macro test_hoge() -%}
{%- set release_server = env_var("RELEASE_SERVER", "") -%}
{% if release_server != "0" and release_server != "1" %}
{% do exceptions.raise_compiler_error(
"RELEASE_SERVER変数がセットされていないのは異常です."
) %}
{% endif %}
{{ return(release_server != "1") }}
{%- endmacro %}
import pytest
from pytest import MonkeyPatch
from macro_integration_test.util import DbtTestClient
@pytest.fixture()
def client() -> DbtTestClient:
return DbtTestClient.get_instance()
@pytest.mark.parametrize(
("release_server", "expected"),
[
("0", "true"),
("1", "false"),
],
)
def test_output(
client: DbtTestClient,
monkeypatch: MonkeyPatch,
release_server: str,
expected: str,
) -> None:
monkeypatch.setenv("RELEASE_SERVER", release_server)
result = client.run_macro(
macro_name="test_hoge", extend_cli_args=["--target", "dev"]
)
assert (
len(result) == 1
) # 何かしらの変更でlogが増えた場合には適切にテスト出来ていない可能性が高いのでチェック
assert result[0] == expected
def test_raise_error_when_release_server_is_not_seted(
client: DbtTestClient,
monkeypatch: MonkeyPatch,
) -> None:
monkeypatch.setenv("RELEASE_SERVER", "hoge")
with pytest.raises(RuntimeError) as e:
client.run_macro(
macro_name="test_hoge", extend_cli_args=["--target", "dev"]
)
assert (
e.value.args[0].find(
"RELEASE_SERVER変数がセットされていないのは異常です."
)
!= -1
)
実行結果
./poetry run pytest macro_integration_test/test_hoge.py
============================================================= test session starts =============================================================
platform darwin -- Python 3.11.3, pytest-8.2.2, pluggy-1.5.0
rootdir: /Users/kashira/Documents/work/dbt
configfile: pyproject.toml
collected 3 items
macro_integration_test/test_hoge.py ... [100%]
============================================================== 3 passed in 5.39s ==============================================================
感想
自動でテストが出来るようになったので、大分安心して影響範囲の大きい共通ロジックを触ることが出来るので良かったなと思っています。
自動テストとは関係ないですが、モデルのバリデーションも同じくinnvocationを使うとやりやすそうなのでどこかでやっていきたいなと思っています。
もっとこうしたら良さそうとか、良い実装方法があるなどあればぜひ教えてください!!
Discussion