🤖

コードからdbtを理解する

2024/09/29に公開

パッケージを作りたいとき、バグっぽい挙動に出くわしたとき、単なる知的好奇心…などなど、dbtの裏側のコードを読む機会があるかもしれません。dbtのコードを読むときに、どこを見ればいいのか、どのような構造になっているのか、といったことをいくつかの例と一緒にまとめてみました。

要約

dbt-coreではなく、dbt-bigqueryなどadapter側の実装の方が重要なことが多いです。

アーキテクチャ

まずはdbtのアーキテクチャをざっくりと説明します。dbtでは環境構築時にcoreと各DWH用のadapterをインストールしますが、それぞれ以下のような役割を担っています。

  • dbt-core: CLI等のユーザーインターフェースを担当
  • dbt-adapters: DWHとの接続を担当する抽象化レイヤー
  • dbt-bigquery など各種adapter: dbt-adaptersを継承した具体的な実装


https://docs.getdbt.com/guides/adapter-creation?step=1 より引用

そのため、DWHのレスポンスを含む結果を調査したいケースでは、adapter側のコードを読むと具体的な実装が書いてあり、原因の究明に役立つことが多いです。

では、よく参照しそうな箇所をいくつか挙げてみます。

macros/materializationでは、materializedで指定するタイプごとの具体的な実装が書かれています。

https://github.com/dbt-labs/dbt-bigquery/blob/v1.8.2/dbt/include/bigquery/macros/materializations/view.sql

ここでは、{% materialization view, adapter='bigquery' -%} と書くことで、BigQuery独自のviewの挙動を定義しています。[1]

BigQueryでは、既に同名のテーブルが存在する場合はviewを作成することはできませんが、冒頭のbigquery__handle_existing_tableの実装を見ると、full_refreshが有効の場合は先にテーブルを消去する仕組みになっていることが分かります。

もう1つはAdapterクラスです。BigQueryとのやり取りを行う具体的なメソッドを集めたクラスで、dbtの主要な処理がこのAdapterクラスを通して行われています。dbt-adapters repositoryで定義されているBaseAdapterを継承して実装していることが読み取れます。

https://github.com/dbt-labs/dbt-bigquery/blob/v1.8.2/dbt/adapters/bigquery/impl.py#L106-L113

とはいえこれだけだとまだあまり面白くないので、実際に遭遇した不思議な挙動を例にとりつつ、実際のコードを読んで理解を深めましょう。

Case1: パーティションを変更するテーブル上書き処理がエラーにならない

BigQueryでは、パーティションを変更する際には、元々のテーブルを削除してから新しいテーブルを作成する必要があります。

元々dayでパーティションを切っていたテーブルに対して、以下のようなDDLステートメントを使ってmonthでパーティションを切り直そうとすると、以下のようなエラーが出ます。

Cannot replace a table with a different partitioning spec. Instead, DROP the table, and then recreate it. New partitioning spec is interval(type:month,field:date) and existing spec is interval(type:day,field:date)

-- DDLステートメント
CREATE OR REPLACE TABLE
  `your-project.your-dataset.sample_partitioned_table`
PARTITION BY
  DATE_TRUNC(date, MONTH) AS
SELECT
  ...

ところが、dbtで同じ処理を書いてdbt runを行うとエラーになりません。これはなぜでしょうか。

-- これはエラーにならない
{{ config(
    materialized='table',
    partition_by={
      "field": "date",
      "data_type": "date",
      "granularity": "month"
    }
)}}

select
...

先ほど紹介したmaterializationの具体的な実装の中に答えがあります。

https://github.com/dbt-labs/dbt-bigquery/blob/v1.8.2/dbt/include/bigquery/macros/materializations/table.sql#L23-L35

ここでは条件分岐が実装されており、adapter.is_replaceableがfalseだと、dbt runで指定したテーブルを作成する前に、元のテーブルをdrop_relation経由で削除する処理がAPI経由で実行されていることが分かります。

ではadapterクラスのコードにあるis_replaceableも見てみましょう。

https://github.com/dbt-labs/dbt-bigquery/blob/v1.8.2/dbt/adapters/bigquery/impl.py#L540-L566

is_replaceableメソッドはpartitionとclusterの差分がある場合はfalseを返すように実装されています。

ユーザー側はただdbt runをしているだけですが、このようにdbtはadapterを通じて裏側でデータを比較したり、テーブルを削除していることが分かります。これが、dbt runで異なるパーティションのテーブルを作成してもエラーにならない仕組みでした。

Case2: metadataを使ったsource freshnessで現在時刻が返ってくる

もう1つの例として、source freshnessの実装を見てみましょう。

source freshnessはBigQueryでは2通りの計算方法が用意されており、loaded_at_field の有無によって挙動が変わります。

  • If a loaded_at_field is provided, dbt will calculate freshness via a select query (behavior prior to v1.7).
  • If a loaded_at_field is not provided, dbt will calculate freshness via warehouse metadata tables when possible (new in v1.7 on supported adapters).

参考:https://docs.getdbt.com/reference/resource-properties/freshness#definition

今回はloaded_at_fieldを定義せず、シャーディングテーブルのsource freshnessをメタデータから取得してみます。

version: 2

sources:

  - name: dummy_source
    database: your-project
    schema: your-dataset
    freshness:
      warn_after: {count: 1, period: day}
      error_after: {count: 2, period: day}
    tables:
      - name: dummy_*

source freshnessを実行した結果のsource.jsonは以下のようになりました。

{
    "metadata": {
        "dbt_schema_version": "https://schemas.getdbt.com/dbt/sources/v3.json",
        "dbt_version": "1.8.3",
        "generated_at": "2024-09-27T10:10:58.326203Z",
        "invocation_id": "xxxxxx",
        "env": {}
    },
    "results": [
        {
            "unique_id": "source.my_new_project.dummy_source.dummy_*",
            "max_loaded_at": "2024-09-27T10:10:57.753000+00:00",
            "snapshotted_at": "2024-09-27T10:10:58.320901+00:00",
            "max_loaded_at_time_ago_in_s": 0.567901,
            "status": "pass",
            "criteria": {
                "warn_after": {
                    "count": 1,
                    "period": "day"
                },
                "error_after": {
                    "count": 2,
                    "period": "day"
                },
                "filter": null
            },
            ...
        }
    ],
    "elapsed_time": 7.178150415420532
}

dbt source freshness コマンド自体は正常終了しましたが、おかしなことにmax_loaded_at_time_ago_in_sがわずか0.57秒、つまり max_loaded_at が現在時刻らしき値になっており、正常にデータが取得できていないように見えます。

このメタデータ経由のsource freshnessはadapterクラスで定義されており、以下のようになっています。

https://github.com/dbt-labs/dbt-bigquery/blob/v1.8.2/dbt/adapters/bigquery/impl.py#L726-L744

ソースコードを見るまでINFORMATION_SCHEMA辺りから取得しているのかと推測していたのですが、実はBigQuery APIからget_tableメソッドを使ってTableインスタンスを取得しており、max_loaded_atTable.modifiedを使用していることが分かります。

また、テーブルをパスを保持するBaseRelationクラスの渡し方からも、このメソッドは単一テーブルのメタデータから更新時間を取得するためのものであるため、ワイルドカードつきのsourceを渡すと正常に動作しないことが推測できます。

そしてなぜ現在時刻が返されるのか、という点についてはBigQuery APIの仕様のようです。
試しに手元で以下のようなコードを実行してみると、現在時刻が返ってくることが再現できます。

from google.cloud import bigquery

client = bigquery.Client(project="your-project")

table_ref = client.dataset("your_dataset").table("dummy_*")
table = client.get_table(table_ref)

print(table.modified)

dbt-bigqueryの開発者がこの挙動を知っているのかは不明ですが、とにかくメタデータ経由でのsource freshnessの計算はワイルドカードつきのsourceに対しては正常に動作しないことがコードから分かりました。

追記(2024/10/03)

BigQueryクライアント側にissueを作って質問したところ、get_table メソッドは、ワイルドカートつきの文字列を渡すとマッチしたテーブル群をマージした一時テーブルを作るという仕様であることを教えていただきました。つまり、modified はリクエスト時に作成された一時テーブルの更新時刻として現在時刻を返しているとのことです。

あまり直感的な実装ではないと感じつつも、dbt-bigquery側でこの挙動を考慮していないことが原因なため、この件についてもissueとして報告しています。

最後に

今回はdbt-core側の具体的な実装は完全に無視しましたが、コンポーネント間の依存関係を簡単に理解しておくだけでも、ソースコードの解読がスムーズになりました。また、dbtのような大規模なオープンソースのコードは可読性に優れているので、コードを読んでいくことは自身で良いコードを書く際にも役立つでしょう。

脚注
  1. このmaterialization blockはmacro block同様、ユーザーでも定義できるようになっているため、自分だけのmaterializationを作ることも可能です。こちらを参考にしてみてください。 ↩︎

Discussion