🙆

dbt test をdagster asset checksとして読み込む新機能を使ってみた

2024/03/10に公開

dbt test をdagster asset checksとして読み込む新機能を使ってみた

はじめに

dagster version 1.5.1 / 0.21.1 (libraries) からdbt testをassets checksとして
読み込む機能がEXPERIMENTAL(実験的機能)として使えるようになっていたので試してみる。

https://docs.dagster.io/integrations/dbt/reference#loading-dbt-tests-as-asset-checks

dagster asset checksの概要

公式

https://dagster.io/blog/dagster-asset-checks

dagster asset checkは、データアセットの品質を確認するための機能で、
データアセットが期待される品質を満たしているかどうかを自動的にチェックできるとのこと。

  • アセットの品質を確認するためのチェックを定義できる
  • チェックはアセットの materialization 後に実行される
  • チェックの結果は、アセットのメタデータに記録される
  • チェックに失敗した場合、アセットの materialization は失敗とマークされる

Asset Checksは、dbtのテストとも統合できる。dbtのテストをAsset Checksとして登録することで、dbtとDagsterを組み合わせたデータパイプラインの品質管理が容易になると紹介されていたので早速使ってみる。

Loading dbt tests as asset checks を有効にする

公式に記載の通りDagsterDbtTranslatorの中でDagsterDbtTranslatorSettingsを有効にしてあげるだけでOK。

from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import (
    DagsterDbtTranslator,
    DagsterDbtTranslatorSettings,
    DbtCliResource,
    dbt_assets,
)

manifest_path = Path("path/to/dbt_project/target/manifest.json")
dagster_dbt_translator = DagsterDbtTranslator(
    settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
)

@dbt_assets(
    manifest=manifest_path,
    dagster_dbt_translator=dagster_dbt_translator,
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

もしなんらかのCustomDagsterDbtTranslatorを作成している場合は

# 例
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    def get_group_name(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> Optional[str]:
        return "snowflake"

dagster_dbt_translator = CustomDagsterDbtTranslator(
    settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
)
# 以下略

としてDagsterDbtTranslatorを継承したCustomDagsterDbtTranslatorのsettingsを有効にする。

dbt tests as asset checksのUI確認

例として、jaffle_shop のプロジェクトを使用

(ビルド前)リネージュ図の各アセットに「Checks」という欄が追加されている
dbt assets checks before

(ビルド後)dbt testがNGだったものは赤くハイライトされ、マテリアライズの結果も「Failed」になっている
dbt asset checks after

アセット詳細では「Checks」というタブが増えており、各テストの履歴やアドホックな実行ができる
dbt asset checks attribute

dbt tests as asset checksを使うメリット

個人的に感じたメリットは以下になります。

  1. データパイプラインの品質管理がdagsterに一元化される
    1. 以前はdbt testを実装していても、dagster上ではSuccess/Failedと実行ログがあるのみでしたが、asset checksとして読み込むことで、どのテストがNGだったかなどのUI上での視認がより簡便になります
    2. またdbt project以外のアセットに関しても集約して管理することが容易になる印象です
  2. テスト結果とデータの関連付けが明確になる
    1. Asset Checksの結果は、データアセットのメタデータとして記録されます
    2. これにより、どのデータアセットがどのようなテストに合格したのかが明確になり、データの品質に関する情報がわかりやすくなります

注意点

DbtCliResourceのソースコード を見ると以下のようなコメントがありました

# It's nice to use the default dbt selection arguments when not subsetting for readability.
# However with asset checks, we make a tradeoff between readability and functionality for the user.
# This is because we use an explicit selection of each individual dbt resource we execute to ensure
# we control which models *and tests* run. By using explicit selection, we allow the user to
# also subset the execution of dbt tests.

日本語訳

デフォルトのdbt選択引数を使用することは、サブセットを使用しない場合の可読性の観点から好ましいです。
しかし、アセットチェックを使用する際には、可読性とユーザーの機能性とのトレードオフが発生します。
これは、実行する個々のdbtリソースを明示的に選択することで、どのモデルとテストを実行するかを正確に制御するためです。
明示的な選択を使用することで、ユーザーはdbtテストの実行のサブセットを作成することもできます。

実際に実行されるコマンドをログから見ると以下のようなことが起こっていました。

dbt tests as asset checks 無し

Running dbt command: `dbt build --target prod_athena --select fqn:*`.

dbt tests as asset checks 有効

A dbt subsetted execution is being performed. Overriding default dbt selection arguments `['--select', 'fqn:*']` with arguments: `['--select', 'fqn:jaffle_shop.staging.stg_customers fqn:jaffle_shop.staging.not_null_stg_customers_customer_id fqn:jaffle_shop.staging.unique_stg_customers_customer_id']`

Running dbt command: `dbt build --select fqn:jaffle_shop.staging.stg_customers fqn:jaffle_shop.staging.not_null_stg_customers_customer_id fqn:jaffle_shop.staging.unique_stg_customers_customer_id`.

つまりコメントにあるように、ジョブ定義時にfqn:*としていてもdbt tests as asset checks 有効時には各モデルが明示的にselectされるようでした。

なお、筆者の職場の環境では3000を超えるモデルのdbt projectをfqn:*で実行していたため、
dbt tests as asset checksを有効にした際に
OSError: [Errno 7] Argument list too long: 'dbt'
とコマンドの引数が長すぎて怒られてしまいました。

まとめ

本記事では、dagster version 1.5.1 / 0.21.1 (libraries) から導入された、dbt testをasset checksとして読み込む新機能について紹介しました。この機能を使うことで、データパイプラインの品質管理がdagsterに一元化され、テスト結果とデータの関連付けが明確になるなどのメリットがあります。

ただし、現時点ではEXPERIMENTAL(実験的機能)ですので今後のアップデートに期待しつつ、適切な規模のプロジェクトで試してみると良いでしょう。

参考資料

https://docs.dagster.io/integrations/dbt/reference#loading-dbt-tests-as-asset-checks

https://docs.dagster.io/_modules/dagster_dbt/core/resources_v2

Discussion