コードからdbtを理解する
パッケージを作りたいとき、バグっぽい挙動に出くわしたとき、単なる知的好奇心…などなど、dbtの裏側のコードを読む機会があるかもしれません。dbtのコードを読むときに、どこを見ればいいのか、どのような構造になっているのか、といったことをいくつかの例と一緒にまとめてみました。
要約
dbt-coreではなく、dbt-bigqueryなどadapter側の実装の方が重要なことが多いです。
アーキテクチャ
まずはdbtのアーキテクチャをざっくりと説明します。dbtでは環境構築時にcoreと各DWH用のadapterをインストールしますが、それぞれ以下のような役割を担っています。
- dbt-core: CLI等のユーザーインターフェースを担当
- dbt-adapters: DWHとの接続を担当する抽象化レイヤー
- dbt-bigquery など各種adapter: dbt-adaptersを継承した具体的な実装
https://docs.getdbt.com/guides/adapter-creation?step=1 より引用
そのため、DWHのレスポンスを含む結果を調査したいケースでは、adapter側のコードを読むと具体的な実装が書いてあり、原因の究明に役立つことが多いです。
では、よく参照しそうな箇所をいくつか挙げてみます。
macros/materialization
では、materializedで指定するタイプごとの具体的な実装が書かれています。
ここでは、{% materialization view, adapter='bigquery' -%}
と書くことで、BigQuery独自のviewの挙動を定義しています。[1]
BigQueryでは、既に同名のテーブルが存在する場合はviewを作成することはできませんが、冒頭のbigquery__handle_existing_table
の実装を見ると、full_refreshが有効の場合は先にテーブルを消去する仕組みになっていることが分かります。
もう1つはAdapterクラスです。BigQueryとのやり取りを行う具体的なメソッドを集めたクラスで、dbtの主要な処理がこのAdapterクラスを通して行われています。dbt-adapters repositoryで定義されているBaseAdapterを継承して実装していることが読み取れます。
とはいえこれだけだとまだあまり面白くないので、実際に遭遇した不思議な挙動を例にとりつつ、実際のコードを読んで理解を深めましょう。
Case1: パーティションを変更するテーブル上書き処理がエラーにならない
BigQueryでは、パーティションを変更する際には、元々のテーブルを削除してから新しいテーブルを作成する必要があります。
元々dayでパーティションを切っていたテーブルに対して、以下のようなDDLステートメントを使ってmonthでパーティションを切り直そうとすると、以下のようなエラーが出ます。
Cannot replace a table with a different partitioning spec. Instead, DROP the table, and then recreate it. New partitioning spec is interval(type:month,field:date) and existing spec is interval(type:day,field:date)
-- DDLステートメント
CREATE OR REPLACE TABLE
`your-project.your-dataset.sample_partitioned_table`
PARTITION BY
DATE_TRUNC(date, MONTH) AS
SELECT
...
ところが、dbtで同じ処理を書いてdbt runを行うとエラーになりません。これはなぜでしょうか。
-- これはエラーにならない
{{ config(
materialized='table',
partition_by={
"field": "date",
"data_type": "date",
"granularity": "month"
}
)}}
select
...
先ほど紹介したmaterializationの具体的な実装の中に答えがあります。
ここでは条件分岐が実装されており、adapter.is_replaceable
がfalseだと、dbt runで指定したテーブルを作成する前に、元のテーブルをdrop_relation
経由で削除する処理がAPI経由で実行されていることが分かります。
ではadapterクラスのコードにあるis_replaceable
も見てみましょう。
is_replaceable
メソッドはpartitionとclusterの差分がある場合はfalseを返すように実装されています。
ユーザー側はただdbt runをしているだけですが、このようにdbtはadapterを通じて裏側でデータを比較したり、テーブルを削除していることが分かります。これが、dbt runで異なるパーティションのテーブルを作成してもエラーにならない仕組みでした。
Case2: metadataを使ったsource freshnessで現在時刻が返ってくる
もう1つの例として、source freshnessの実装を見てみましょう。
source freshnessはBigQueryでは2通りの計算方法が用意されており、loaded_at_field
の有無によって挙動が変わります。
- If a loaded_at_field is provided, dbt will calculate freshness via a select query (behavior prior to v1.7).
- If a loaded_at_field is not provided, dbt will calculate freshness via warehouse metadata tables when possible (new in v1.7 on supported adapters).
参考:https://docs.getdbt.com/reference/resource-properties/freshness#definition
今回はloaded_at_field
を定義せず、シャーディングテーブルのsource freshnessをメタデータから取得してみます。
version: 2
sources:
- name: dummy_source
database: your-project
schema: your-dataset
freshness:
warn_after: {count: 1, period: day}
error_after: {count: 2, period: day}
tables:
- name: dummy_*
source freshnessを実行した結果のsource.jsonは以下のようになりました。
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/sources/v3.json",
"dbt_version": "1.8.3",
"generated_at": "2024-09-27T10:10:58.326203Z",
"invocation_id": "xxxxxx",
"env": {}
},
"results": [
{
"unique_id": "source.my_new_project.dummy_source.dummy_*",
"max_loaded_at": "2024-09-27T10:10:57.753000+00:00",
"snapshotted_at": "2024-09-27T10:10:58.320901+00:00",
"max_loaded_at_time_ago_in_s": 0.567901,
"status": "pass",
"criteria": {
"warn_after": {
"count": 1,
"period": "day"
},
"error_after": {
"count": 2,
"period": "day"
},
"filter": null
},
...
}
],
"elapsed_time": 7.178150415420532
}
dbt source freshness コマンド自体は正常終了しましたが、おかしなことにmax_loaded_at_time_ago_in_s
がわずか0.57秒、つまり max_loaded_at
が現在時刻らしき値になっており、正常にデータが取得できていないように見えます。
このメタデータ経由のsource freshnessはadapterクラスで定義されており、以下のようになっています。
ソースコードを見るまでINFORMATION_SCHEMA辺りから取得しているのかと推測していたのですが、実はBigQuery APIからget_table
メソッドを使ってTableインスタンスを取得しており、max_loaded_at
はTable.modified
を使用していることが分かります。
また、テーブルをパスを保持するBaseRelationクラスの渡し方からも、このメソッドは単一テーブルのメタデータから更新時間を取得するためのものであるため、ワイルドカードつきのsourceを渡すと正常に動作しないことが推測できます。
そしてなぜ現在時刻が返されるのか、という点についてはBigQuery APIの仕様のようです。
試しに手元で以下のようなコードを実行してみると、現在時刻が返ってくることが再現できます。
from google.cloud import bigquery
client = bigquery.Client(project="your-project")
table_ref = client.dataset("your_dataset").table("dummy_*")
table = client.get_table(table_ref)
print(table.modified)
dbt-bigqueryの開発者がこの挙動を知っているのかは不明ですが、とにかくメタデータ経由でのsource freshnessの計算はワイルドカードつきのsourceに対しては正常に動作しないことがコードから分かりました。
追記(2024/10/03)
BigQueryクライアント側にissueを作って質問したところ、get_table
メソッドは、ワイルドカートつきの文字列を渡すとマッチしたテーブル群をマージした一時テーブルを作るという仕様であることを教えていただきました。つまり、modified
はリクエスト時に作成された一時テーブルの更新時刻として現在時刻を返しているとのことです。
あまり直感的な実装ではないと感じつつも、dbt-bigquery側でこの挙動を考慮していないことが原因なため、この件についてもissueとして報告しています。
最後に
今回はdbt-core側の具体的な実装は完全に無視しましたが、コンポーネント間の依存関係を簡単に理解しておくだけでも、ソースコードの解読がスムーズになりました。また、dbtのような大規模なオープンソースのコードは可読性に優れているので、コードを読んでいくことは自身で良いコードを書く際にも役立つでしょう。
Discussion