🐕

dbt macro tips advent calendar 2022 day 22 - ref,source改造その2

に公開

便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 22日目です。

昨日は、 staging 環境の切り替えのために条件部分作成をしていました。
背景や、昨日までの部分は こちらを見ていただくとして今日は、その続きです。

実際の切り替え

一口にprod環境のデータを参照すると言っても、いくつか手段があります。

  1. prod環境のデータを文字通り別のテーブルとしてcopyする (deep copy?)
  2. prod環境のRelationを参照するviewを作成する (shallow copy?)
  3. prod環境のRelationをそのまま参照する (link?)

どの手段でも良いのですが、共通して必要となるmacroがあります。それはstaging環境のRelationをprod 環境に書き換えるmacroです。
環境というものをどのように設定しているか?というのは個別のprojectによって変わるとは思います。
今回の例では schema名で切り替えてるものとしましょう。

環境はprofile名によって決まり、schemaを変えることで環境を変えてるという場合おそらく generate_schema_nameのmacroを上書きして実装することが多いでしょう。 例として以下のようなmacroを提示しようと思います。

macros/environment.sql
{%- macro enviroment_name() %}
    {{ return(target.name) }}
{%- endmacro %}

{%- macro is_stg() %}
	{%- if enviroment_name() == 'stg' %}
		{{ return(True) }}
	{%- else %}
		{{ return(False) }}
	{%- endif %}
{%- endmacro %}

{% macro generate_schema_name(custom_schema_name, node) -%}
    {{ return(generate_schema_name_for_env(enviroment_name(), custom_schema_name)) }}
{%- endmacro %}

{%- macro generate_schema_name_for_env(env_name, custom_schema_name) -%}
    {%- set default_schema = 'advcal' -%}
    {%- if env_name == 'prod' %}
        {%- if custom_schema_name is none -%}
            {{ return(default_schema) }}
        {%- else -%}
            {{ return(custom_schema_name | trim) }}
        {%- endif %}
    {%- endif %}

    {%- if env_name == 'stg' %}
        {%- if custom_schema_name is none -%}
            {{ return('stg_'~default_schema) }}
        {%- else -%}
            {{ return('stg_'~custom_schema_name | trim) }}
        {%- endif %}
    {%- endif %}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ env_name | trim }}_{{ default_schema }}
    {%- else -%}
        {{ env_name | trim }}_{{ custom_schema_name | trim }}
    {%- endif %}
{%- endmacro %}

このように環境を実現しているとき、staring環境からprod環境へのRelationの書き換えは以下のようになります。

{%- macro rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set node = graph.nodes | selectattr('unique_id', 'eq', unique_id) | first %}
    {%- if node is not none and 'config' in node and (node.config.schema | trim | length) > 0 %}
        {%- set custom_schema_name = (node.config.schema | trim)%}
    {%- else %}
        {%- set custom_schema_name = none %}
    {%- endif %}

    {{ return(relation.replace_path(schema=generate_schema_name_for_env('prod',custom_schema_name))) }}
{%- endmacro %}

Relationにはreplace_pathというメソッドがあるようなので、schemaを generate_schema_name_for_env という用意したmacroでprod用のschemaを手に入れれば良いようです。

ここで、custom_schema_nameを取得するために、graph コンテキスト変数にアクセスして、unique_idが一致する物を取得し、その結果から、configに指定されてるschemaを取得するということもやっています。
この辺は8日目の応用ですね。

書き換えの戦略には3種類くらいあると言いましたが、ここでは一旦一番簡単な 3番目で実装してみましょう。

ここまでのrefとsourceの改造の全貌はこちらとなります。

macros/overrides.sql
{%- macro ref() %}
    {%- set relation = builtins.ref(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(model_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro source() %}
    {%- set relation = builtins.source(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(source_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro model_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set model_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set model_name = varargs[1] %}
    {%- endif %}
    {{ return('model.'~package_name~'.'~model_name) }}
{%- endmacro %}

{%- macro source_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set table_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set table_name = varargs[1] %}
    {%- endif %}
    {{ return('source.'~package_name~'.'~model_name) }}
{%- endmacro %}


{%- macro switch_relation_for_stg(unique_id,relation) %}
    {# 1.解析フェーズはそのまま帰す。 #}
    {%- if not execute %}
        {{ return(relation) }}
    {%- endif %}
    {# 2.実行対象に選択されてるリソースはそのまま帰す #}
    {%- if unique_id in selected_resources %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(rewrite_relation_stg_to_prod(unique_id,relation)) }}
{%- endmacro %}


{%- macro rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set node = graph.nodes | selectattr('unique_id', 'eq', unique_id) | first %}
    {%- if node is not none and 'config' in node and (node.config.schema | trim | length) > 0 %}
        {%- set custom_schema_name = (node.config.schema | trim)%}
    {%- else %}
        {%- set custom_schema_name = none %}
    {%- endif %}

    {{ return(relation.replace_path(schema=generate_schema_name_for_env('prod',custom_schema_name))) }}
{%- endmacro %}
$ dbt build --target stg --select my_second_dbt_model
03:07:33  Running with dbt=1.3.1
03:07:33  Change detected to override macro used during parsing. Starting full parse.
03:07:34  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:07:34  
03:07:34  Concurrency: 4 threads (target='stg')
03:07:34  
03:07:34  1 of 3 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:07:34  1 of 3 ERROR creating sql view model stg_advcal.my_second_dbt_model ................ [ERROR in 0.08s]
03:07:34  2 of 3 SKIP test not_null_my_second_dbt_model_id ............................... [SKIP]
03:07:34  3 of 3 SKIP test unique_my_second_dbt_model_id ................................. [SKIP]
03:07:34  
03:07:34  Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.29 seconds (0.29s).
03:07:34  
03:07:34  Completed with 1 error and 0 warnings:
03:07:34  
03:07:34  Database Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
03:07:34    relation "advcal.my_first_dbt_model" does not exist
03:07:34    LINE 6: from "postgres"."advcal"."my_first_dbt_model"
03:07:34                 ^
03:07:34    compiled Code at target/run/macro_tips_advcal/models/example/my_second_dbt_model.sql
03:07:34  
03:07:34  Done. PASS=0 WARN=0 ERROR=1 SKIP=2 TOTAL=3

$ dbt build --target prod --select +my_second_dbt_model
03:09:43  Running with dbt=1.3.1
03:09:43  Unable to do partial parsing because config vars, config profile, or config target have changed
03:09:44  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:09:44  
03:09:44  Concurrency: 4 threads (target='prod')
03:09:44  
03:09:44  1 of 6 START sql table model advcal.my_first_dbt_model ......................... [RUN]
03:09:44  1 of 6 OK created sql table model advcal.my_first_dbt_model .................... [SELECT 1 in 0.12s]
03:09:44  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
03:09:44  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
03:09:44  2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.10s]
03:09:44  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.10s]
03:09:44  4 of 6 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:09:44  4 of 6 OK created sql view model stg_advcal.my_second_dbt_model .................... [CREATE VIEW in 0.11s]
03:09:44  5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
03:09:44  6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
03:09:44  5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.05s]
03:09:44  6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.05s]
03:09:44  
03:09:44  Finished running 1 table model, 4 tests, 1 view model in 0 hours 0 minutes and 0.58 seconds (0.58s).
03:09:44  
03:09:44  Completed successfully
03:09:44  
03:09:44  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

$ dbt build --target stg --select my_second_dbt_model  
03:09:51  Running with dbt=1.3.1
03:09:51  Unable to do partial parsing because config vars, config profile, or config target have changed
03:09:52  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:09:52  
03:09:52  Concurrency: 4 threads (target='stg')
03:09:52  
03:09:52  1 of 3 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:09:52  1 of 3 OK created sql view model stg_advcal.my_second_dbt_model .................... [CREATE VIEW in 0.12s]
03:09:52  2 of 3 START test not_null_my_second_dbt_model_id .............................. [RUN]
03:09:52  3 of 3 START test unique_my_second_dbt_model_id ................................ [RUN]
03:09:52  2 of 3 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.08s]
03:09:52  3 of 3 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.08s]
03:09:52  
03:09:52  Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.43 seconds (0.43s).
03:09:52  
03:09:52  Completed successfully
03:09:52  
03:09:52  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

挙動としては良さそうですね。


一旦これで基本的な部分は押さえました。23日目は更に実用的にするために改造していきます。

Discussion