dbt macro tips advent calendar 2022 day 22 - ref,source改造その2
便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 22日目です。
昨日は、 staging
環境の切り替えのために条件部分作成をしていました。
背景や、昨日までの部分は こちらを見ていただくとして今日は、その続きです。
実際の切り替え
一口にprod
環境のデータを参照すると言っても、いくつか手段があります。
-
prod
環境のデータを文字通り別のテーブルとしてcopyする (deep copy?) -
prod
環境のRelationを参照するviewを作成する (shallow copy?) -
prod
環境のRelationをそのまま参照する (link?)
どの手段でも良いのですが、共通して必要となるmacroがあります。それはstaging
環境のRelationをprod
環境に書き換えるmacroです。
環境というものをどのように設定しているか?というのは個別のprojectによって変わるとは思います。
今回の例では schema名で切り替えてるものとしましょう。
環境はprofile名によって決まり、schemaを変えることで環境を変えてるという場合おそらく generate_schema_name
のmacroを上書きして実装することが多いでしょう。 例として以下のようなmacroを提示しようと思います。
{%- macro enviroment_name() %}
{{ return(target.name) }}
{%- endmacro %}
{%- macro is_stg() %}
{%- if enviroment_name() == 'stg' %}
{{ return(True) }}
{%- else %}
{{ return(False) }}
{%- endif %}
{%- endmacro %}
{% macro generate_schema_name(custom_schema_name, node) -%}
{{ return(generate_schema_name_for_env(enviroment_name(), custom_schema_name)) }}
{%- endmacro %}
{%- macro generate_schema_name_for_env(env_name, custom_schema_name) -%}
{%- set default_schema = 'advcal' -%}
{%- if env_name == 'prod' %}
{%- if custom_schema_name is none -%}
{{ return(default_schema) }}
{%- else -%}
{{ return(custom_schema_name | trim) }}
{%- endif %}
{%- endif %}
{%- if env_name == 'stg' %}
{%- if custom_schema_name is none -%}
{{ return('stg_'~default_schema) }}
{%- else -%}
{{ return('stg_'~custom_schema_name | trim) }}
{%- endif %}
{%- endif %}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ env_name | trim }}_{{ default_schema }}
{%- else -%}
{{ env_name | trim }}_{{ custom_schema_name | trim }}
{%- endif %}
{%- endmacro %}
このように環境を実現しているとき、staring
環境からprod
環境へのRelationの書き換えは以下のようになります。
{%- macro rewrite_relation_stg_to_prod(unique_id,relation) %}
{%- set node = graph.nodes | selectattr('unique_id', 'eq', unique_id) | first %}
{%- if node is not none and 'config' in node and (node.config.schema | trim | length) > 0 %}
{%- set custom_schema_name = (node.config.schema | trim)%}
{%- else %}
{%- set custom_schema_name = none %}
{%- endif %}
{{ return(relation.replace_path(schema=generate_schema_name_for_env('prod',custom_schema_name))) }}
{%- endmacro %}
Relationにはreplace_pathというメソッドがあるようなので、schemaを generate_schema_name_for_env
という用意したmacroでprod用のschemaを手に入れれば良いようです。
ここで、custom_schema_nameを取得するために、graph
コンテキスト変数にアクセスして、unique_idが一致する物を取得し、その結果から、configに指定されてるschemaを取得するということもやっています。
この辺は8日目の応用ですね。
書き換えの戦略には3種類くらいあると言いましたが、ここでは一旦一番簡単な 3番目で実装してみましょう。
ここまでのrefとsourceの改造の全貌はこちらとなります。
{%- macro ref() %}
{%- set relation = builtins.ref(*varargs) %}
{%- if not is_stg() %}
{{ return(relation) }}
{%- endif %}
{{ return(switch_relation_for_stg(model_unique_id(*varargs),relation)) }}
{%- endmacro %}
{%- macro source() %}
{%- set relation = builtins.source(*varargs) %}
{%- if not is_stg() %}
{{ return(relation) }}
{%- endif %}
{{ return(switch_relation_for_stg(source_unique_id(*varargs),relation)) }}
{%- endmacro %}
{%- macro model_unique_id() %}
{%- set package_name = model.package_name %}
{%- if (varargs | length) == 1 %}
{%- set model_name = varargs[0] %}
{%- else %}
{%- set package_name = varargs[0] %}
{%- set model_name = varargs[1] %}
{%- endif %}
{{ return('model.'~package_name~'.'~model_name) }}
{%- endmacro %}
{%- macro source_unique_id() %}
{%- set package_name = model.package_name %}
{%- if (varargs | length) == 1 %}
{%- set table_name = varargs[0] %}
{%- else %}
{%- set package_name = varargs[0] %}
{%- set table_name = varargs[1] %}
{%- endif %}
{{ return('source.'~package_name~'.'~model_name) }}
{%- endmacro %}
{%- macro switch_relation_for_stg(unique_id,relation) %}
{# 1.解析フェーズはそのまま帰す。 #}
{%- if not execute %}
{{ return(relation) }}
{%- endif %}
{# 2.実行対象に選択されてるリソースはそのまま帰す #}
{%- if unique_id in selected_resources %}
{{ return(relation) }}
{%- endif %}
{{ return(rewrite_relation_stg_to_prod(unique_id,relation)) }}
{%- endmacro %}
{%- macro rewrite_relation_stg_to_prod(unique_id,relation) %}
{%- set node = graph.nodes | selectattr('unique_id', 'eq', unique_id) | first %}
{%- if node is not none and 'config' in node and (node.config.schema | trim | length) > 0 %}
{%- set custom_schema_name = (node.config.schema | trim)%}
{%- else %}
{%- set custom_schema_name = none %}
{%- endif %}
{{ return(relation.replace_path(schema=generate_schema_name_for_env('prod',custom_schema_name))) }}
{%- endmacro %}
$ dbt build --target stg --select my_second_dbt_model
03:07:33 Running with dbt=1.3.1
03:07:33 Change detected to override macro used during parsing. Starting full parse.
03:07:34 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:07:34
03:07:34 Concurrency: 4 threads (target='stg')
03:07:34
03:07:34 1 of 3 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:07:34 1 of 3 ERROR creating sql view model stg_advcal.my_second_dbt_model ................ [ERROR in 0.08s]
03:07:34 2 of 3 SKIP test not_null_my_second_dbt_model_id ............................... [SKIP]
03:07:34 3 of 3 SKIP test unique_my_second_dbt_model_id ................................. [SKIP]
03:07:34
03:07:34 Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.29 seconds (0.29s).
03:07:34
03:07:34 Completed with 1 error and 0 warnings:
03:07:34
03:07:34 Database Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
03:07:34 relation "advcal.my_first_dbt_model" does not exist
03:07:34 LINE 6: from "postgres"."advcal"."my_first_dbt_model"
03:07:34 ^
03:07:34 compiled Code at target/run/macro_tips_advcal/models/example/my_second_dbt_model.sql
03:07:34
03:07:34 Done. PASS=0 WARN=0 ERROR=1 SKIP=2 TOTAL=3
$ dbt build --target prod --select +my_second_dbt_model
03:09:43 Running with dbt=1.3.1
03:09:43 Unable to do partial parsing because config vars, config profile, or config target have changed
03:09:44 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:09:44
03:09:44 Concurrency: 4 threads (target='prod')
03:09:44
03:09:44 1 of 6 START sql table model advcal.my_first_dbt_model ......................... [RUN]
03:09:44 1 of 6 OK created sql table model advcal.my_first_dbt_model .................... [SELECT 1 in 0.12s]
03:09:44 2 of 6 START test not_null_my_first_dbt_model_id ............................... [RUN]
03:09:44 3 of 6 START test unique_my_first_dbt_model_id ................................. [RUN]
03:09:44 2 of 6 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.10s]
03:09:44 3 of 6 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.10s]
03:09:44 4 of 6 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:09:44 4 of 6 OK created sql view model stg_advcal.my_second_dbt_model .................... [CREATE VIEW in 0.11s]
03:09:44 5 of 6 START test not_null_my_second_dbt_model_id .............................. [RUN]
03:09:44 6 of 6 START test unique_my_second_dbt_model_id ................................ [RUN]
03:09:44 5 of 6 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.05s]
03:09:44 6 of 6 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.05s]
03:09:44
03:09:44 Finished running 1 table model, 4 tests, 1 view model in 0 hours 0 minutes and 0.58 seconds (0.58s).
03:09:44
03:09:44 Completed successfully
03:09:44
03:09:44 Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6
$ dbt build --target stg --select my_second_dbt_model
03:09:51 Running with dbt=1.3.1
03:09:51 Unable to do partial parsing because config vars, config profile, or config target have changed
03:09:52 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:09:52
03:09:52 Concurrency: 4 threads (target='stg')
03:09:52
03:09:52 1 of 3 START sql view model stg_advcal.my_second_dbt_model ......................... [RUN]
03:09:52 1 of 3 OK created sql view model stg_advcal.my_second_dbt_model .................... [CREATE VIEW in 0.12s]
03:09:52 2 of 3 START test not_null_my_second_dbt_model_id .............................. [RUN]
03:09:52 3 of 3 START test unique_my_second_dbt_model_id ................................ [RUN]
03:09:52 2 of 3 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.08s]
03:09:52 3 of 3 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.08s]
03:09:52
03:09:52 Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.43 seconds (0.43s).
03:09:52
03:09:52 Completed successfully
03:09:52
03:09:52 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
挙動としては良さそうですね。
一旦これで基本的な部分は押さえました。23日目は更に実用的にするために改造していきます。
Discussion