dbtのincrementalモデルで作成したテーブルの環境差異をなくす方法
はじめに
dbtのincrementalモデルは大規模データに対して様々な方法での増分更新を実現できる便利さの反面、無数の落とし穴がある。
そのうちの1つが、改修による影響を開発環境で確認することの難しさである。
普通のtableやviewなら、開発環境でdbt buildするだけで検証できるが、incrementalモデルの場合は開発環境でテストにパスしても、本番環境にリリースした際のbuildでエラーとなって頭を抱えることも多い。
だいたいのケースでは、本番環境と同じデータでテストしていないことによるエラーであり、同じデータを用意できていれば防げる。
だが、多くのincrementalモデルは定期的なbuild実行を前提としているため、開発時に1回buildし直しただけでは、同じデータにならないことも多い。
確実に同じデータを作成するには、本番環境からコピーしてくれば良いので、本記事ではその方法を紹介する。
Pythonで作成する
サクッと実装できるので、まずはPythonで作成する方法から。
dbt側で使用する環境
- dbt project evaluator: 0.8.0
- DWH: BigQuery
まずは対象となるテーブル一覧を取得するSQLを作成する。
先日紹介した、dbt project evaluatorの出力結果からincrementalモデルで作成しているテーブル一覧を取得する方法を使う。
SELECT
database AS project,
schema AS dataset,
COALESCE(alias, name) AS table_name
FROM
`{project}.{dataset}.stg_nodes`
WHERE
materialized = "incremental"
AND package_name = {your_package_name}
AND is_enabled
取得できたデータはDataframeに変換しておく。
次に、コピー先となる開発環境のデータセットを指定する。
DESTINATION_DATASETS = ["tanuki", "tiger", "rabbit"]
指定したデータセットごとに以下のようにループさせる。
from google.cloud import bigquery
client = bigquery.Client(project = {your_gcp_dev_project_id})
for destination_dataset in DESTINATION_DATASETS:
print(f"copying tables/views to {destination_dataset}")
# loop by dataframe rows
for _, table in df.iterrows():
# Copy the filtered tables/views to the destination dataset
source_table_id = f"{table['project']}.{table['dataset']}.{table['table_name']}"
destination_table_id = f"{your_gcp_dev_project_id}.{destination_dataset}.{table['table_name']}"
query = f"""
CREATE OR REPLACE TABLE `{destination_table_id}`
CLONE `{source_table_id}`
"""
job_config = bigquery.job.QueryJobConfig()
job = client.query(query, job_config=job_config)
# Wait for the copy job to complete
job.result()
print(f"Table/view {source_table_id} copied to {destination_table_id}")
ポイントとして、比較的最近にGAとなったテーブルクローンを使用している。この機能を使うと、元テーブルとの差分に対してのみストレージコストが発生するだけなので、コストを抑えた形で環境同期ができる。
クラウドDWHはいくらストレージ費用が安いといっても、TB級のテーブルをいくつものデータセットに物理テーブルとしてコピーしていると、
👮🏼♀️「最近このプロジェクトのストレージ費用多くない?」
とクラウドの管理者から指摘される(実体験)ので、大規模データを複製する際はそれなりの注意が必要である。
他の方法
Pythonだと簡単に作れるが、dbtのエコシステムとは独立した環境を用意する必要がある。dbt内で完結したい場合は、pre_hook機能を使い、buildする前にmacroをcallすることもできそうだ。開発環境でだけ動作させるには、target変数など、dbtでの環境分離の仕組みをそのまま使うと良い。
サンプルコード
{% macro clone_table(source_table_id, destination_table_id) %}
{% if target.name == 'dev' %}
CREATE OR REPLACE TABLE `{{ destination_table_id }}`
CLONE `{{ source_table_id }}`
{% endif %}
{% endmacro %}
また、dbt core 1.6 よりdbt clone コマンドが追加されている。リンク先でも紹介されているように、Pull Request作成時のCI環境を構築する場合は、このコマンドで十分だろう。
Discussion