🐕

dbt macro tips advent calendar 2022 day 22 - ref,source改造その2

2022/12/22に公開

便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 22日目です。

昨日は、 staging 環境の切り替えのために条件部分作成をしていました。
背景や、昨日までの部分は こちらを見ていただくとして今日は、その続きです。

実際の切り替え

一口にprod環境のデータを参照すると言っても、いくつか手段があります。

  1. prod環境のデータを文字通り別のテーブルとしてcopyする (deep copy?)
  2. prod環境のRelationを参照するviewを作成する (shallow copy?)
  3. prod環境のRelationをそのまま参照する (link?)

どの手段でも良いのですが、共通して必要となるmacroがあります。それはstaging環境のRelationをprod 環境に書き換えるmacroです。
環境というものをどのように設定しているか?というのは個別のprojectによって変わるとは思います。
今回の例では schema名で切り替えてるものとしましょう。

環境はprofile名によって決まり、schemaを変えることで環境を変えてるという場合おそらく generate_schema_nameのmacroを上書きして実装することが多いでしょう。 例として以下のようなmacroを提示しようと思います。

macros/environment.sql
{%- macro enviroment_name() %}
    {{ return(target.name) }}
{%- endmacro %}

{%- macro is_stg() %}
	{%- if enviroment_name() == 'stg' %}
		{{ return(True) }}
	{%- else %}
		{{ return(False) }}
	{%- endif %}
{%- endmacro %}

{% macro generate_schema_name(custom_schema_name, node) -%}
    {{ return(generate_schema_name_for_env(enviroment_name(), custom_schema_name)) }}
{%- endmacro %}

{%- macro generate_schema_name_for_env(env_name, custom_schema_name) -%}
    {%- set default_schema = 'advcal' -%}
    {%- if env_name == 'prod' %}
        {%- if custom_schema_name is none -%}
            {{ return(default_schema) }}
        {%- else -%}
            {{ return(custom_schema_name | trim) }}
        {%- endif %}
    {%- endif %}

    {%- if env_name == 'stg' %}
        {%- if custom_schema_name is none -%}
            {{ return('stg_'~default_schema) }}
        {%- else -%}
            {{ return('stg_'~custom_schema_name | trim) }}
        {%- endif %}
    {%- endif %}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ env_name | trim }}_{{ default_schema }}
    {%- else -%}
        {{ env_name | trim }}_{{ custom_schema_name | trim }}
    {%- endif %}
{%- endmacro %}

このように環境を実現しているとき、staring環境からprod環境へのRelationの書き換えは以下のようになります。

{%- macro rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set node = graph.nodes | selectattr('unique_id', 'eq', unique_id) | first %}
    {%- if node is not none and 'config' in node and (node.config.schema | trim | length) > 0 %}
        {%- set custom_schema_name = (node.config.schema | trim)%}
    {%- else %}
        {%- set custom_schema_name = none %}
    {%- endif %}

    {{ return(relation.replace_path(schema=generate_schema_name_for_env('prod',custom_schema_name))) }}
{%- endmacro %}

Relationにはreplace_pathというメソッドがあるようなので、schemaを generate_schema_name_for_env という用意したmacroでprod用のschemaを手に入れれば良いようです。

ここで、custom_schema_nameを取得するために、graph コンテキスト変数にアクセスして、unique_idが一致する物を取得し、その結果から、configに指定されてるschemaを取得するということもやっています。
この辺は8日目の応用ですね。

書き換えの戦略には3種類くらいあると言いましたが、ここでは一旦一番簡単な 3番目で実装してみましょう。

ここまでのrefとsourceの改造の全貌はこちらとなります。

macros/overrides.sql
{%- macro ref() %}
    {%- set relation = builtins.ref(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(model_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro source() %}
    {%- set relation = builtins.source(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(source_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro model_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set model_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set model_name = varargs[1] %}
    {%- endif %}
    {{ return('model.'~package_name~'.'~model_name) }}
{%- endmacro %}

{%- macro source_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set table_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set table_name = varargs[1] %}
    {%- endif %}
    {{ return('source.'~package_name~'.'~model_name) }}
{%- endmacro %}


{%- macro switch_relation_for_stg(unique_id,relation) %}
    {# 1.解析フェーズはそのまま帰す。 #}
    {%- if not execute %}
        {{ return(relation) }}
    {%- endif %}
    {# 2.実行対象に選択されてるリソースはそのまま帰す #}
    {%- if unique_id in selected_resources %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(rewrite_relation_stg_to_prod(unique_id,relation)) }}
{%- endmacro %}


{%- macro rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set node = graph.nodes | selectattr('unique_id', 'eq', unique_id) | first %}
    {%- if node is not none and 'config' in node and (node.config.schema | trim | length) > 0 %}
        {%- set custom_schema_name = (node.config.schema | trim)%}
    {%- else %}
        {%- set custom_schema_name = none %}
    {%- endif %}

    {{ return(relation.replace_path(schema=generate_schema_name_for_env('prod',custom_schema_name))) }}
{%- endmacro %}
$ dbt build --target stg --select my_second_dbt_model
03:07:33  Running with dbt=1.3.1
03:07:33  Change detected to override macro used during parsing. Starting full parse.
03:07:34  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:07:34  
03:07:34  Concurrency: 4 threads (target='stg')
03:07:34  
03:07:34  1 of 3 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:07:34  1 of 3 ERROR creating sql view model stg_advcal.my_second_dbt_model ................ [ERROR in 0.08s]
03:07:34  2 of 3 SKIP test not_null_my_second_dbt_model_id ............................... [SKIP]
03:07:34  3 of 3 SKIP test unique_my_second_dbt_model_id ................................. [SKIP]
03:07:34  
03:07:34  Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.29 seconds (0.29s).
03:07:34  
03:07:34  Completed with 1 error and 0 warnings:
03:07:34  
03:07:34  Database Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
03:07:34    relation "advcal.my_first_dbt_model" does not exist
03:07:34    LINE 6: from "postgres"."advcal"."my_first_dbt_model"
03:07:34                 ^
03:07:34    compiled Code at target/run/macro_tips_advcal/models/example/my_second_dbt_model.sql
03:07:34  
03:07:34  Done. PASS=0 WARN=0 ERROR=1 SKIP=2 TOTAL=3

$ dbt build --target prod --select +my_second_dbt_model
03:09:43  Running with dbt=1.3.1
03:09:43  Unable to do partial parsing because config vars, config profile, or config target have changed
03:09:44  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:09:44  
03:09:44  Concurrency: 4 threads (target='prod')
03:09:44  
03:09:44  1 of 6 START sql table model advcal.my_first_dbt_model ......................... [RUN]
03:09:44  1 of 6 OK created sql table model advcal.my_first_dbt_model .................... [SELECT 1 in 0.12s]
03:09:44  2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
03:09:44  3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
03:09:44  2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.10s]
03:09:44  3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.10s]
03:09:44  4 of 6 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:09:44  4 of 6 OK created sql view model stg_advcal.my_second_dbt_model .................... [CREATE VIEW in 0.11s]
03:09:44  5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
03:09:44  6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
03:09:44  5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.05s]
03:09:44  6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.05s]
03:09:44  
03:09:44  Finished running 1 table model, 4 tests, 1 view model in 0 hours 0 minutes and 0.58 seconds (0.58s).
03:09:44  
03:09:44  Completed successfully
03:09:44  
03:09:44  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

$ dbt build --target stg --select my_second_dbt_model  
03:09:51  Running with dbt=1.3.1
03:09:51  Unable to do partial parsing because config vars, config profile, or config target have changed
03:09:52  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:09:52  
03:09:52  Concurrency: 4 threads (target='stg')
03:09:52  
03:09:52  1 of 3 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:09:52  1 of 3 OK created sql view model stg_advcal.my_second_dbt_model .................... [CREATE VIEW in 0.12s]
03:09:52  2 of 3 START test not_null_my_second_dbt_model_id .............................. [RUN]
03:09:52  3 of 3 START test unique_my_second_dbt_model_id ................................ [RUN]
03:09:52  2 of 3 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.08s]
03:09:52  3 of 3 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.08s]
03:09:52  
03:09:52  Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.43 seconds (0.43s).
03:09:52  
03:09:52  Completed successfully
03:09:52  
03:09:52  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

挙動としては良さそうですね。


一旦これで基本的な部分は押さえました。23日目は更に実用的にするために改造していきます。

Discussion