🐷

dbt macro tips advent calendar 2022 day 23 - ref,source改造その3

2022/12/23に公開

便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 23日目です。

昨日は、 staging 環境への実際の切り替えを実装してました。
背景などはは こちらを見ていただくとして今日は、その最終的な仕上げです。

仕上げ

さて、ここまでの部分で最低限の実装としては使えるのですがいくつか問題があります。

  • prod環境のRelationが存在しない場合は? (まだ開発中だったりしてリリースされてないという状況)
  • prod環境のRelationが削除された場合、cascadeでviewが消される

1つ目は実際にRelattionが存在するか?を確認して存在しなかったら一旦警告を出しつつ stgのRelationにフォールバックすることにします。
2つ目はstg_relation_strategyというconfigを設定できるようにして、copyするのかそれともprodへのrelationをswitchするのみに留めるのかモデルごとに切り替えられるようにしてみましょう。

まず1つ目のフォールバックに関してです。
https://docs.getdbt.com/reference/dbt-jinja-functions/adapter#load_relation
adapterには実際のRelationの存在を確認するのに便利なメソッドが用意されてます。
こちらは、Relationキャッシュにアクセスして、そのキャッシュデータをとってくるものなのですが、ドキュメントの説明にあるように、実際のRelationがない場合はNoneが帰ってきます。

このことを利用して、load_relationによって、prod環境のrelationを確認してNoneであるならば、staging環境のものを返します。
実際のmacroを見てみましょう。

{%- macro switch_relation_for_stg(unique_id,relation) %}
    {# 1.解析フェーズはそのまま帰す。 #}
    {%- if not execute %}
        {{ return(relation) }}
    {%- endif %}
    {# 2.実行対象に選択されてるリソースはそのまま帰す #}
    {%- if unique_id in selected_resources %}
        {{ return(relation) }}
    {%- endif %}

    {%- set prod_relation = rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set exists_relation = load_relation(prod_relation) %}
    {%- if exists_relation is none %}
        {% do exceptions.warn(prod_relation~' is not exists. fallback to ' ~ relation) %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(exists_relation) }}
{%- endmacro %}

実際に動かしてみると、警告メッセージがちゃんと出ていることが確認できます。

$ dbt build --target stg --select my_second_dbt_model
03:32:24  Running with dbt=1.3.1
03:32:24  Change detected to override macro used during parsing. Starting full parse.
03:32:25  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 310 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
03:32:25  
03:32:25  Concurrency: 4 threads (target='stg')
03:32:25  
03:32:25  1 of 3 START sql view model stg_advcal.my_second_dbt_model ..................... [RUN]
03:32:25  "postgres"."advcal"."my_first_dbt_model" is not exists. fallback to "postgres"."stg_advcal"."my_first_dbt_model"
03:32:25  1 of 3 ERROR creating sql view model stg_advcal.my_second_dbt_model ............ [ERROR in 0.10s]
03:32:25  2 of 3 SKIP test not_null_my_second_dbt_model_id ............................... [SKIP]
03:32:25  3 of 3 SKIP test unique_my_second_dbt_model_id ................................. [SKIP]
03:32:25  
03:32:25  Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.34 seconds (0.34s).
03:32:25  
03:32:25  Completed with 1 error and 0 warnings:
03:32:25  
03:32:25  Database Error in model my_second_dbt_model (models/example/my_second_dbt_model.sql)
03:32:25    relation "stg_advcal.my_first_dbt_model" does not exist
03:32:25    LINE 6: from "postgres"."stg_advcal"."my_first_dbt_model"
03:32:25                 ^
03:32:25    compiled Code at target/run/macro_tips_advcal/models/example/my_second_dbt_model.sql
03:32:25  
03:32:25  Done. PASS=0 WARN=0 ERROR=1 SKIP=2 TOTAL=3

次は、configによる切り替えです。
前回同様 graph コンテキスト変数を使ってconfigにアクセスしてみましょう。
2回目なので、macroとして切り出してしまいましょう。

{%- macro config_get_with_graph(unique_id,key, default=none) %}
    {%- set node = graph.nodes.values() | selectattr('unique_id', 'eq', unique_id | trim) | first %}
    {%- if node is none %}
        {{ return(default) }}
    {%- endif %}
    {%- if 'config' not in node or node.config is none %}
        {{ return(default) }}
    {%- endif %}
    {%- if key not in node.config or node.config[key] is none %}
        {{ return(default) }}
    {%- endif %}
    {{ return(node.config[key]) }}
{%- endmacro %}

そして、このmacroを使って

{%- macro switch_relation_for_stg(unique_id,relation) %}
    {# 1.解析フェーズはそのまま帰す。 #}
    {%- if not execute %}
        {{ return(relation) }}
    {%- endif %}
    {# 2.実行対象に選択されてるリソースはそのまま帰す #}
    {%- if unique_id in selected_resources %}
        {{ return(relation) }}
    {%- endif %}

    {%- set prod_relation = rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set exists_relation = load_relation(prod_relation) %}
    {%- if exists_relation is none %}
        {% do exceptions.warn(prod_relation~' is not exists. fallback to ' ~ relation) %}
        {{ return(relation) }}
    {%- endif %}
    {%- set strategy = config_get_with_graph(unique_id, 'stg_relation_strategy', 'switch_relation') %}
    {%- if strategy == 'switch_relation' %}
        {{ return(exists_relation) }}
    {%- endif %}

    {%- if strategy == 'copy' %}
        {%- do log('copy '~ exists_relation ~ ' to ' ~ relation, info=True) %}
        {%- set exists_stg_relation = load_relation(relation) %}
        {%- if exists_stg_relation is not none %}
            {%- do drop_relation_if_exists(exists_stg_relation) %}
        {%- endif %}
        {%- do run_query(create_table_as(False, relation, 'select * from '~exists_relation)) %}
        {{ return(relation) }}
    {%- endif %}

    {{ exceptions.raise_compiler_error("Invalid `stg_relation_strategy`. Got: " ~ strategy) }}
{%- endmacro %}

このようになります。

models/example/my_first_dbt_model.sql
{{
    config(
        materialized='table',
        stg_relation_strategy='copy',
    )
}}

with source_data as (
    select 1 as id
)

select *
from source_data

このようにして実行すると以下のようになります。

$ dbt build --target stg --select my_second_dbt_model
04:06:34  Running with dbt=1.3.1
04:06:34  Change detected to override macro used during parsing. Starting full parse.
04:06:35  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 311 macros, 0 operations, 0 seed files, 3 sources, 0 exposures, 0 metrics
04:06:35  
04:06:35  Concurrency: 4 threads (target='stg')
04:06:35  
04:06:35  1 of 3 START sql view model stg_advcal.my_second_dbt_model ..................... [RUN]
04:06:35  copy "postgres"."advcal"."my_first_dbt_model" to "postgres"."stg_advcal"."my_first_dbt_model"
04:06:35  1 of 3 OK created sql view model stg_advcal.my_second_dbt_model ................ [CREATE VIEW in 0.14s]
04:06:35  2 of 3 START test not_null_my_second_dbt_model_id .............................. [RUN]
04:06:35  3 of 3 START test unique_my_second_dbt_model_id ................................ [RUN]
04:06:35  2 of 3 PASS not_null_my_second_dbt_model_id .................................... [PASS in 0.08s]
04:06:35  3 of 3 PASS unique_my_second_dbt_model_id ...................................... [PASS in 0.07s]
04:06:35  
04:06:35  Finished running 1 view model, 2 tests in 0 hours 0 minutes and 0.43 seconds (0.43s).
04:06:35  
04:06:35  Completed successfully
04:06:35  
04:06:35  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

ログにprod環境からstaging環境へのデータコピーがされているということが出力されてますね。

このようにしてconfigに独自の設定を追加したり、その追加した設定をgraph経由で取得して挙動を変えたりすることができる例でした。
あとは各々のprojectで改変していくと使いやすいでしょう。

以下、ここ3日のmacroの全貌を貼ります。

{%- macro enviroment_name() %}
    {{ return(target.name) }}
{%- endmacro %}

{%- macro is_stg() %}
	{%- if enviroment_name() == 'stg' %}
		{{ return(True) }}
	{%- else %}
		{{ return(False) }}
	{%- endif %}
{%- endmacro %}

{% macro generate_schema_name(custom_schema_name, node) -%}
    {{ return(generate_schema_name_for_env(enviroment_name(), custom_schema_name)) }}
{%- endmacro %}

{%- macro generate_schema_name_for_env(env_name, custom_schema_name) -%}
    {%- set default_schema = 'advcal' -%}
    {%- if env_name == 'prod' %}
        {%- if custom_schema_name is none -%}
            {{ return(default_schema) }}
        {%- else -%}
            {{ return(custom_schema_name | trim) }}
        {%- endif %}
    {%- endif %}

    {%- if env_name == 'stg' %}
        {%- if custom_schema_name is none -%}
            {{ return('stg_'~default_schema) }}
        {%- else -%}
            {{ return('stg_'~custom_schema_name | trim) }}
        {%- endif %}
    {%- endif %}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ env_name | trim }}_{{ default_schema }}
    {%- else -%}
        {{ env_name | trim }}_{{ custom_schema_name | trim }}
    {%- endif %}
{%- endmacro %}
overrides.sql
{%- macro ref() %}
    {%- set relation = builtins.ref(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(model_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro source() %}
    {%- set relation = builtins.source(*varargs) %}
    {%- if not is_stg() %}
        {{ return(relation) }}
    {%- endif %}
    {{ return(switch_relation_for_stg(source_unique_id(*varargs),relation)) }}
{%- endmacro %}

{%- macro model_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set model_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set model_name = varargs[1] %}
    {%- endif %}
    {{ return('model.'~package_name~'.'~model_name) }}
{%- endmacro %}

{%- macro source_unique_id() %}
    {%- set package_name = model.package_name %}
    {%- if (varargs | length) == 1 %}
        {%- set table_name = varargs[0] %}
    {%- else %}
        {%- set package_name = varargs[0] %}
        {%- set table_name = varargs[1] %}
    {%- endif %}
    {{ return('source.'~package_name~'.'~model_name) }}
{%- endmacro %}


{%- macro switch_relation_for_stg(unique_id,relation) %}
    {# 1.解析フェーズはそのまま帰す。 #}
    {%- if not execute %}
        {{ return(relation) }}
    {%- endif %}
    {# 2.実行対象に選択されてるリソースはそのまま帰す #}
    {%- if unique_id in selected_resources %}
        {{ return(relation) }}
    {%- endif %}

    {%- set prod_relation = rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set exists_relation = load_relation(prod_relation) %}
    {%- if exists_relation is none %}
        {% do exceptions.warn(prod_relation~' is not exists. fallback to ' ~ relation) %}
        {{ return(relation) }}
    {%- endif %}
    {%- set strategy = config_get_with_graph(unique_id, 'stg_relation_strategy', 'switch_relation') %}
    {%- if strategy == 'switch_relation' %}
        {{ return(exists_relation) }}
    {%- endif %}

    {%- if strategy == 'copy' %}
        {%- do log('copy '~ exists_relation ~ ' to ' ~ relation, info=True) %}
        {%- set exists_stg_relation = load_relation(relation) %}
        {%- if exists_stg_relation is not none %}
            {%- do drop_relation_if_exists(exists_stg_relation) %}
        {%- endif %}
        {%- do run_query(create_table_as(False, relation, 'select * from '~exists_relation)) %}
        {{ return(relation) }}
    {%- endif %}

    {{ exceptions.raise_compiler_error("Invalid `stg_relation_strategy`. Got: " ~ strategy) }}
{%- endmacro %}

{%- macro config_get_with_graph(unique_id,key, default=none) %}
    {%- set node = graph.nodes.values() | selectattr('unique_id', 'eq', unique_id | trim) | first %}
    {%- if node is none %}
        {{ return(default) }}
    {%- endif %}
    {%- if 'config' not in node or node.config is none %}
        {{ return(default) }}
    {%- endif %}
    {%- if key not in node.config or node.config[key] is none %}
        {{ return(default) }}
    {%- endif %}
    {{ return(node.config[key]) }}
{%- endmacro %}

{%- macro rewrite_relation_stg_to_prod(unique_id,relation) %}
    {%- set custom_schema_name = config_get_with_graph(unique_id,'schema') %}
    {{ return(relation.replace_path(schema=generate_schema_name_for_env('prod',custom_schema_name))) }}
{%- endmacro %}

皆様も是非staging環境を作ってみてください。


24日目と25日目はニッチな小ネタで締めたいと思います。

Discussion