🐡

dbtのincrementalモデルで作成したテーブルの環境差異をなくす方法

2023/12/18に公開

はじめに

dbtのincrementalモデルは大規模データに対して様々な方法での増分更新を実現できる便利さの反面、無数の落とし穴がある。

そのうちの1つが、改修による影響を開発環境で確認することの難しさである。

普通のtableやviewなら、開発環境でdbt buildするだけで検証できるが、incrementalモデルの場合は開発環境でテストにパスしても、本番環境にリリースした際のbuildでエラーとなって頭を抱えることも多い。

だいたいのケースでは、本番環境と同じデータでテストしていないことによるエラーであり、同じデータを用意できていれば防げる。

だが、多くのincrementalモデルは定期的なbuild実行を前提としているため、開発時に1回buildし直しただけでは、同じデータにならないことも多い。

確実に同じデータを作成するには、本番環境からコピーしてくれば良いので、本記事ではその方法を紹介する。

Pythonで作成する

サクッと実装できるので、まずはPythonで作成する方法から。

dbt側で使用する環境

  • dbt project evaluator: 0.8.0
  • DWH: BigQuery

まずは対象となるテーブル一覧を取得するSQLを作成する。
先日紹介した、dbt project evaluatorの出力結果からincrementalモデルで作成しているテーブル一覧を取得する方法を使う。
https://zenn.dev/analytics_eng/articles/7c46c815398b5f

SELECT
    database AS project,
    schema AS dataset,
    COALESCE(alias, name) AS table_name 
FROM
    `{project}.{dataset}.stg_nodes`
WHERE
    materialized = "incremental"
    AND package_name = {your_package_name}
    AND is_enabled

取得できたデータはDataframeに変換しておく。

次に、コピー先となる開発環境のデータセットを指定する。

DESTINATION_DATASETS = ["tanuki", "tiger", "rabbit"]

指定したデータセットごとに以下のようにループさせる。

from google.cloud import bigquery

client = bigquery.Client(project = {your_gcp_dev_project_id})
 
for destination_dataset in DESTINATION_DATASETS:
    print(f"copying tables/views to {destination_dataset}")
 
    # loop by dataframe rows
    for _, table in df.iterrows():
     
        # Copy the filtered tables/views to the destination dataset
        source_table_id = f"{table['project']}.{table['dataset']}.{table['table_name']}"
        destination_table_id = f"{your_gcp_dev_project_id}.{destination_dataset}.{table['table_name']}"

        query = f"""
            CREATE OR REPLACE TABLE `{destination_table_id}`
            CLONE `{source_table_id}`
        """

        job_config = bigquery.job.QueryJobConfig()
        job = client.query(query, job_config=job_config)
        
        # Wait for the copy job to complete
        job.result()
        print(f"Table/view {source_table_id} copied to {destination_table_id}")

ポイントとして、比較的最近にGAとなったテーブルクローンを使用している。この機能を使うと、元テーブルとの差分に対してのみストレージコストが発生するだけなので、コストを抑えた形で環境同期ができる。

https://cloud.google.com/bigquery/docs/table-clones-intro?hl=ja

クラウドDWHはいくらストレージ費用が安いといっても、TB級のテーブルをいくつものデータセットに物理テーブルとしてコピーしていると、

👮🏼‍♀️「最近このプロジェクトのストレージ費用多くない?」

とクラウドの管理者から指摘される(実体験)ので、大規模データを複製する際はそれなりの注意が必要である。

他の方法

Pythonだと簡単に作れるが、dbtのエコシステムとは独立した環境を用意する必要がある。dbt内で完結したい場合は、pre_hook機能を使い、buildする前にmacroをcallすることもできそうだ。開発環境でだけ動作させるには、target変数など、dbtでの環境分離の仕組みをそのまま使うと良い。

サンプルコード

{% macro clone_table(source_table_id, destination_table_id) %}
    {% if target.name == 'dev' %}
        CREATE OR REPLACE TABLE `{{ destination_table_id }}`
        CLONE `{{ source_table_id }}`
    {% endif %}
{% endmacro %}

また、dbt core 1.6 よりdbt clone コマンドが追加されている。リンク先でも紹介されているように、Pull Request作成時のCI環境を構築する場合は、このコマンドで十分だろう。

https://docs.getdbt.com/best-practices/clone-incremental-models

Discussion