dbt macro tips advent calendar 2022 day 21 - ref,source改造その1
便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 21日目です。
prod環境からstaging環境にデータをコピーしたい
皆様、dbtで複数環境区切っていますか?
データエンジニアリング界隈では、 staging
というとソースシステムからDWHにデータを引っ張ってきたときの生データを置く場所というイメージが強いと思います。
一方で、サーバーサイドエンジニアやフロントサイドエンジニアなど、いわゆるプロダクトサイドのソフトウェアエンジニアが staging
というと、 prod
環境へリリースする前に最終チェックを行うような環境のことをイメージすることが多いと思います。
『ソフトウェア・エンジニアリングのベストプラクティスを!』 というテーマがあるようにdbtを使ったデータパイプラインを築くときにはプロダクトサイドのソフトウェア・エンジニアリングの良いプラクティスを取り入れることは多いでしょう。
prod
,staging
,dev
という3環境を用意して、staging
環境を用意することはデータパイプラインの品質を保つ上で非常に有用です。
ところで、この staging
環境のデータってどうするのでしょうか?
おそらく、一般的には prod
環境のデータをstaging
環境にコピーします。
prod
環境のデータと同じものを使うというのが大事ですので。
(※ dbtのModelを冪等に作っていれば、ソースデータが同じであれば同じものが出るはずという話でもあります。そのため、コピーするのはソースデータだけでもよく、それなら参照切り替えでも良いという話もあります。)
これをdbtで実現する場合はどうすればよいのでしょうか?
1つの手段として、『ref
とsource
のbuiltinのmacroを改造して、stg環境の特定条件下のみprod
環境のRelationを取得するように切り替える。』という方法があります。
ここでは、staging
環境の実現のために ref
とsource
を改造する例を出します。
準備 環境特定用のmacro
どういう条件で環境を認識するのか? というのはおそらく個々のprojectによって変わってくるでしょう。
そこで、project間の際をなくすために以下のようなmacroを用意することにしましょう。
{%- macro is_stg() %}
{%- if target.name == 'stg' %}
{{ return(True) }}
{%- else %}
{{ return(False) }}
{%- endif %}
{%- endmacro %}
これは、profileのtarget名がstgであればstaging
環境であるとみなすというmacroになります。
if
の条件を env
や var
などを使ったものに変えたり等、個々のprojectでの環境特定の手段に置き換えることは容易だと思います。
{%- macro is_stg() %}
{%- if target.name == 'stg' %}
{{ return(True) }}
{%- else %}
{{ return(False) }}
{%- endif %}
{%- endmacro %}
この macroを使って、stgの条件のときだけ エラーになるようにしてみましょう。
{%- macro ref() %}
{%- set relation = builtins.ref(*varargs) %}
{%- if not is_stg() %}
{{ return(relation) }}
{%- endif %}
{{ exceptions.raise_compiler_error("ref() for stg not implement yet") }}
{%- endmacro %}
{%- macro source() %}
{%- set relation = builtins.source(*varargs) %}
{%- if not is_stg() %}
{{ return(relation) }}
{%- endif %}
{{ exceptions.raise_compiler_error("source() for stg not implement yet") }}
{%- endmacro %}
ここで使われている exceptionsについては以下のdocumentを示しておきます。
エラーを意図的に起こしたいときに、よく使います。
$ dbt build
01:27:51 Running with dbt=1.3.1
01:27:51 Unable to do partial parsing because profile has changed
01:27:52 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 303 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
01:27:52
01:27:53 Concurrency: 4 threads (target='dev')
01:27:53
01:27:53 1 of 6 START sql table model dev.my_first_dbt_model ............................ [RUN]
01:27:53 1 of 6 OK created sql table model dev.my_first_dbt_model ....................... [SELECT 1 in 0.15s]
01:27:53 2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
01:27:53 3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
01:27:53 3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.10s]
01:27:53 2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.12s]
01:27:53 4 of 6 START sql view model dev.my_second_dbt_model ............................ [RUN]
01:27:53 4 of 6 OK created sql view model dev.my_second_dbt_model ....................... [CREATE VIEW in 0.09s]
01:27:53 5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
01:27:53 6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
01:27:53 5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.08s]
01:27:53 6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.09s]
01:27:53
01:27:53 Finished running 1 table model, 4 tests, 1 view model in 0 hours 0 minutes and 0.81 seconds (0.81s).
01:27:53
01:27:53 Completed successfully
01:27:53
01:27:53 Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6
$ dbt build --target stg
01:28:56 Running with dbt=1.3.1
01:28:56 Unable to do partial parsing because config vars, config profile, or config target have changed
01:28:58 Encountered an error:
Compilation Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
ref() for stg not implement yet
> in macro ref (macros/overrides.sql)
> called by model my_second_dbt_model (models/example/my_second_dbt_model.sql)
ちゃんと書き換えられてるようですね。
参照切り替え対象を特定する。
話はstaging
環境での実行に移ります。
どういう条件で参照の切り替えが必要でしょうか?
- execute phaseである。
- modelが実行時のリソースとして指定されていない。
(今回の実行でmodelが実行されないということはその実行においては、他のmodelはソースデータになる)
今回はこのような条件を考えてみました。
1つ目はおなじみのexecute
コンテキスト変数を使えばいいですね。
2つ目はどうしたら良いでしょう。 こちらは selected_resources
コンテキスト変数というものがあり、そちらを調べることで実装できそうです。
selected_resourcesはunique_idというdbt中のresourceを一意に特定するIDが入っているので、そこの変換はちょっと考える必要がありそうです。
manifestを覗いてみると model.<package_name>.<model_name>
や source.<package_name>.<table_name>
というような形になっているようです。
実際に上記の条件判定を実装したmacroを見てみましょう。
{%- macro ref() %}
{%- set relation = builtins.ref(*varargs) %}
{%- if not is_stg() %}
{{ return(relation) }}
{%- endif %}
{{ return(switch_relation_for_stg(model_unique_id(*varargs),relation)) }}
{%- endmacro %}
{%- macro source() %}
{%- set relation = builtins.source(*varargs) %}
{%- if not is_stg() %}
{{ return(relation) }}
{%- endif %}
{{ return(switch_relation_for_stg(source_unique_id(*varargs),relation)) }}
{%- endmacro %}
{%- macro model_unique_id() %}
{%- set package_name = model.package_name %}
{%- if (varargs | length) == 1 %}
{%- set model_name = varargs[0] %}
{%- else %}
{%- set package_name = varargs[0] %}
{%- set model_name = varargs[1] %}
{%- endif %}
{{ return('model.'~package_name~'.'~model_name) }}
{%- endmacro %}
{%- macro source_unique_id() %}
{%- set package_name = model.package_name %}
{%- if (varargs | length) == 1 %}
{%- set table_name = varargs[0] %}
{%- else %}
{%- set package_name = varargs[0] %}
{%- set table_name = varargs[1] %}
{%- endif %}
{{ return('source.'~package_name~'.'~model_name) }}
{%- endmacro %}
{%- macro switch_relation_for_stg(unique_id,relation) %}
{# 1.解析フェーズはそのまま帰す。 #}
{%- if not execute %}
{{ return(relation) }}
{%- endif %}
{# 2.実行対象に選択されてるリソースはそのまま帰す #}
{%- if unique_id in selected_resources %}
{{ return(relation) }}
{%- endif %}
{{ exceptions.raise_compiler_error(unique_id ~ "のRelation "~relation~" をstaging環境向けに書き換える部分はまだ未実装です。") }}
{%- endmacro %}
ちょっと長くなってきましたね。これをstgで実行してみましょう。
$ dbt build --target stg --select my_first_dbt_model
02:11:02 Running with dbt=1.3.1
02:11:02 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
02:11:02
02:11:02 Concurrency: 4 threads (target='stg')
02:11:02
02:11:02 1 of 3 START sql table model stg.my_first_dbt_model ............................ [RUN]
02:11:02 1 of 3 OK created sql table model stg.my_first_dbt_model ....................... [SELECT 1 in 0.16s]
02:11:02 2 of 3 START test not_null_my_first_dbt_model_id ............................... [RUN]
02:11:02 3 of 3 START test unique_my_first_dbt_model_id ................................. [RUN]
02:11:02 2 of 3 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.09s]
02:11:02 3 of 3 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.09s]
02:11:02
02:11:02 Finished running 1 table model, 2 tests in 0 hours 0 minutes and 0.58 seconds (0.58s).
02:11:02
02:11:02 Completed successfully
02:11:02
02:11:02 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
dbtの初期プロジェクトにあるexample、 my_first_dbt_model
では何も依存がないので成功します。
$ dbt build --target stg --select my_second_dbt_model
02:11:11 Running with dbt=1.3.1
02:11:11 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
02:11:11
02:11:11 Concurrency: 4 threads (target='stg')
02:11:11
02:11:11 1 of 3 START sql view model stg.my_second_dbt_model ............................ [RUN]
02:11:11 1 of 3 ERROR creating sql view model stg.my_second_dbt_model ................... [ERROR in 0.02s]
02:11:11 2 of 3 SKIP test not_null_my_second_dbt_model_id ............................... [SKIP]
02:11:11 3 of 3 SKIP test unique_my_second_dbt_model_id ................................. [SKIP]
02:11:11
02:11:11 Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.31 seconds (0.31s).
02:11:11
02:11:11 Completed with 1 error and 0 warnings:
02:11:11
02:11:11 Compilation Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
02:11:11 model.macro_tips_advcal.my_first_dbt_modelのRelation "postgres"."stg"."my_first_dbt_model" をstaging環境向けに書き換える部分はまだ未実装です。
02:11:11
02:11:11 > in macro switch_relation_for_stg (macros/overrides.sql)
02:11:11 > called by macro ref (macros/overrides.sql)
02:11:11 > called by model my_second_dbt_model (models/example/my_second_dbt_model.sql)
02:11:11 > called by model my_second_dbt_model (models/example/my_second_dbt_model.sql)
02:11:11
02:11:11 Done. PASS=0 WARN=0 ERROR=1 SKIP=2 TOTAL=3
my_second_dbt_model
は my_first_dbt_model
に依存していて、 my_second_dbt_model
だけを実行するならば、これはprod環境への参照書き換え対象になります。
$ dbt build --target stg --select +my_second_dbt_model
02:16:28 Running with dbt=1.3.1
02:16:28 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
02:16:28
02:16:28 Concurrency: 4 threads (target='stg')
02:16:28
02:16:28 1 of 6 START sql table model stg.my_first_dbt_model ............................ [RUN]
02:16:29 1 of 6 OK created sql table model stg.my_first_dbt_model ....................... [SELECT 1 in 0.17s]
02:16:29 2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
02:16:29 3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
02:16:29 2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.08s]
02:16:29 3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.08s]
02:16:29 4 of 6 START sql view model stg.my_second_dbt_model ............................ [RUN]
02:16:29 4 of 6 OK created sql view model stg.my_second_dbt_model ....................... [CREATE VIEW in 0.08s]
02:16:29 6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
02:16:29 5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
02:16:29 5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.05s]
02:16:29 6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.05s]
02:16:29
02:16:29 Finished running 1 table model, 4 tests, 1 view model in 0 hours 0 minutes and 0.60 seconds (0.60s).
02:16:29
02:16:29 Completed successfully
02:16:29
02:16:29 Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6
my_second_dbt_model
の依存も含めて実行するように --select +my_second_dbt_model
と記述して実行すると、問題なく成功します。
ここまでで、staging
環境実現のための ref
と source
の書き換え条件判定の部分は実装完了しました。
22日目は、実際の切替部分の実装を行っていきます。
Discussion