🤖

dbt macro tips advent calendar 2022 day 21 - ref,source改造その1

2022/12/21に公開

便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 21日目です。

prod環境からstaging環境にデータをコピーしたい

皆様、dbtで複数環境区切っていますか?

データエンジニアリング界隈では、 staging というとソースシステムからDWHにデータを引っ張ってきたときの生データを置く場所というイメージが強いと思います。
一方で、サーバーサイドエンジニアやフロントサイドエンジニアなど、いわゆるプロダクトサイドのソフトウェアエンジニアが staging というと、 prod 環境へリリースする前に最終チェックを行うような環境のことをイメージすることが多いと思います。

『ソフトウェア・エンジニアリングのベストプラクティスを!』 というテーマがあるようにdbtを使ったデータパイプラインを築くときにはプロダクトサイドのソフトウェア・エンジニアリングの良いプラクティスを取り入れることは多いでしょう。
prod,staging,dev という3環境を用意して、staging環境を用意することはデータパイプラインの品質を保つ上で非常に有用です。

ところで、この staging 環境のデータってどうするのでしょうか?
おそらく、一般的には prod環境のデータをstaging環境にコピーします。
prod環境のデータと同じものを使うというのが大事ですので。

(※ dbtのModelを冪等に作っていれば、ソースデータが同じであれば同じものが出るはずという話でもあります。そのため、コピーするのはソースデータだけでもよく、それなら参照切り替えでも良いという話もあります。)

これをdbtで実現する場合はどうすればよいのでしょうか?
1つの手段として、『refsourceのbuiltinのmacroを改造して、stg環境の特定条件下のみprod環境のRelationを取得するように切り替える。』という方法があります。

ここでは、staging環境の実現のために refsourceを改造する例を出します。

準備 環境特定用のmacro

どういう条件で環境を認識するのか? というのはおそらく個々のprojectによって変わってくるでしょう。
そこで、project間の際をなくすために以下のようなmacroを用意することにしましょう。

{%- macro is_stg() %}
	{%- if target.name == 'stg' %}
		{{ return(True) }}
	{%- else %}
		{{ return(False) }}
	{%- endif %}
{%- endmacro %}

これは、profileのtarget名がstgであればstaging環境であるとみなすというmacroになります。
if の条件を envvar などを使ったものに変えたり等、個々のprojectでの環境特定の手段に置き換えることは容易だと思います。

{%- macro is_stg() %}
	{%- if target.name == 'stg' %}
		{{ return(True) }}
	{%- else %}
		{{ return(False) }}
	{%- endif %}
{%- endmacro %}

この macroを使って、stgの条件のときだけ エラーになるようにしてみましょう。

{%- macro ref() %}
    {%- set relation = builtins.ref(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ exceptions.raise_compiler_error("ref() for stg not implement yet") }}
{%- endmacro %}

{%- macro source() %}
    {%- set relation = builtins.source(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ exceptions.raise_compiler_error("source() for stg not implement yet") }}
{%- endmacro %}

ここで使われている exceptionsについては以下のdocumentを示しておきます。

https://docs.getdbt.com/reference/dbt-jinja-functions/exceptions

エラーを意図的に起こしたいときに、よく使います。

$ dbt build               
01:27:51  Running with dbt=1.3.1
01:27:51  Unable to do partial parsing because profile has changed
01:27:52  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 303 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
01:27:52  
01:27:53  Concurrency: 4 threads (target='dev')
01:27:53  
01:27:53  1 of 6 START sql table model dev.my_first_dbt_model ............................ [RUN]
01:27:53  1 of 6 OK created sql table model dev.my_first_dbt_model ....................... [SELECT 1 in 0.15s]
01:27:53  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
01:27:53  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
01:27:53  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.10s]
01:27:53  2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.12s]
01:27:53  4 of 6 START sql view model dev.my_second_dbt_model ............................ [RUN]
01:27:53  4 of 6 OK created sql view model dev.my_second_dbt_model ....................... [CREATE VIEW in 0.09s]
01:27:53  5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
01:27:53  6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
01:27:53  5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.08s]
01:27:53  6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.09s]
01:27:53  
01:27:53  Finished running 1 table model, 4 tests, 1 view model in 0 hours 0 minutes and 0.81 seconds (0.81s).
01:27:53  
01:27:53  Completed successfully
01:27:53  
01:27:53  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6 
$ dbt build --target stg
01:28:56  Running with dbt=1.3.1
01:28:56  Unable to do partial parsing because config vars, config profile, or config target have changed
01:28:58  Encountered an error:
Compilation Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
  ref() for stg not implement yet
  
  > in macro ref (macros/overrides.sql)
  > called by model my_second_dbt_model (models/example/my_second_dbt_model.sql)

ちゃんと書き換えられてるようですね。

参照切り替え対象を特定する。

話はstaging環境での実行に移ります。
どういう条件で参照の切り替えが必要でしょうか?

  1. execute phaseである。
  2. modelが実行時のリソースとして指定されていない。
     (今回の実行でmodelが実行されないということはその実行においては、他のmodelはソースデータになる)

今回はこのような条件を考えてみました。

1つ目はおなじみのexecute コンテキスト変数を使えばいいですね。
2つ目はどうしたら良いでしょう。 こちらは selected_resources コンテキスト変数というものがあり、そちらを調べることで実装できそうです。

https://docs.getdbt.com/reference/dbt-jinja-functions/selected_resources

selected_resourcesはunique_idというdbt中のresourceを一意に特定するIDが入っているので、そこの変換はちょっと考える必要がありそうです。

manifestを覗いてみると model.<package_name>.<model_name>source.<package_name>.<table_name> というような形になっているようです。

実際に上記の条件判定を実装したmacroを見てみましょう。

{%- macro ref() %}
    {%- set relation = builtins.ref(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(model_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro source() %}
    {%- set relation = builtins.source(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(source_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro model_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set model_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set model_name = varargs[1] %}
    {%- endif %}
    {{ return('model.'~package_name~'.'~model_name) }}
{%- endmacro %}

{%- macro source_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set table_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set table_name = varargs[1] %}
    {%- endif %}
    {{ return('source.'~package_name~'.'~model_name) }}
{%- endmacro %}


{%- macro switch_relation_for_stg(unique_id,relation) %}
    {# 1.解析フェーズはそのまま帰す。 #}
    {%- if not execute %}
        {{ return(relation) }}
    {%- endif %}
    {# 2.実行対象に選択されてるリソースはそのまま帰す #}
    {%- if unique_id in selected_resources %}
        {{ return(relation) }}
    {%- endif %}

    {{ exceptions.raise_compiler_error(unique_id ~ "のRelation "~relation~" をstaging環境向けに書き換える部分はまだ未実装です。") }}
{%- endmacro %}

ちょっと長くなってきましたね。これをstgで実行してみましょう。

$ dbt build --target stg --select my_first_dbt_model 
02:11:02  Running with dbt=1.3.1
02:11:02  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
02:11:02  
02:11:02  Concurrency: 4 threads (target='stg')
02:11:02  
02:11:02  1 of 3 START sql table model stg.my_first_dbt_model ............................ [RUN]
02:11:02  1 of 3 OK created sql table model stg.my_first_dbt_model ....................... [SELECT 1 in 0.16s]
02:11:02  2 of 3 START test not_null_my_first_dbt_model_id ............................... [RUN]
02:11:02  3 of 3 START test unique_my_first_dbt_model_id ................................. [RUN]
02:11:02  2 of 3 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.09s]
02:11:02  3 of 3 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.09s]
02:11:02  
02:11:02  Finished running 1 table model, 2 tests in 0 hours 0 minutes and 0.58 seconds (0.58s).
02:11:02  
02:11:02  Completed successfully
02:11:02  
02:11:02  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

dbtの初期プロジェクトにあるexample、 my_first_dbt_model では何も依存がないので成功します。

$ dbt build --target stg --select my_second_dbt_model
02:11:11  Running with dbt=1.3.1
02:11:11  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
02:11:11  
02:11:11  Concurrency: 4 threads (target='stg')
02:11:11  
02:11:11  1 of 3 START sql view model stg.my_second_dbt_model ............................ [RUN]
02:11:11  1 of 3 ERROR creating sql view model stg.my_second_dbt_model ................... [ERROR in 0.02s]
02:11:11  2 of 3 SKIP test not_null_my_second_dbt_model_id ............................... [SKIP]
02:11:11  3 of 3 SKIP test unique_my_second_dbt_model_id ................................. [SKIP]
02:11:11  
02:11:11  Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.31 seconds (0.31s).
02:11:11  
02:11:11  Completed with 1 error and 0 warnings:
02:11:11  
02:11:11  Compilation Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
02:11:11    model.macro_tips_advcal.my_first_dbt_modelのRelation "postgres"."stg"."my_first_dbt_model" をstaging環境向けに書き換える部分はまだ未実装です。
02:11:11    
02:11:11    > in macro switch_relation_for_stg (macros/overrides.sql)
02:11:11    > called by macro ref (macros/overrides.sql)
02:11:11    > called by model my_second_dbt_model (models/example/my_second_dbt_model.sql)
02:11:11    > called by model my_second_dbt_model (models/example/my_second_dbt_model.sql)
02:11:11  
02:11:11  Done. PASS=0 WARN=0 ERROR=1 SKIP=2 TOTAL=3

my_second_dbt_modelmy_first_dbt_model に依存していて、 my_second_dbt_modelだけを実行するならば、これはprod環境への参照書き換え対象になります。

$ dbt build --target stg --select +my_second_dbt_model
02:16:28  Running with dbt=1.3.1
02:16:28  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
02:16:28  
02:16:28  Concurrency: 4 threads (target='stg')
02:16:28  
02:16:28  1 of 6 START sql table model stg.my_first_dbt_model ............................ [RUN]
02:16:29  1 of 6 OK created sql table model stg.my_first_dbt_model ....................... [SELECT 1 in 0.17s]
02:16:29  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
02:16:29  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
02:16:29  2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.08s]
02:16:29  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.08s]
02:16:29  4 of 6 START sql view model stg.my_second_dbt_model ............................ [RUN]
02:16:29  4 of 6 OK created sql view model stg.my_second_dbt_model ....................... [CREATE VIEW in 0.08s]
02:16:29  6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
02:16:29  5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
02:16:29  5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.05s]
02:16:29  6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.05s]
02:16:29  
02:16:29  Finished running 1 table model, 4 tests, 1 view model in 0 hours 0 minutes and 0.60 seconds (0.60s).
02:16:29  
02:16:29  Completed successfully
02:16:29  
02:16:29  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

my_second_dbt_modelの依存も含めて実行するように --select +my_second_dbt_model と記述して実行すると、問題なく成功します。


ここまでで、staging 環境実現のための refsource の書き換え条件判定の部分は実装完了しました。
22日目は、実際の切替部分の実装を行っていきます。

Discussion