📝

dbt macro tips advent calendar 2022 day 9 - statmentとrun_query

2022/12/09に公開約6,000字

便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 9日目です。

statement blocks

そろそろ、dbtの実行時にDWHにアクセスして、その結果を元にsqlを書き換えたくなってきてる頃合いではないでしょうか?

それを実現するのが statement blocksです。

https://docs.getdbt.com/reference/dbt-jinja-functions/statement-blocks

こちらは単純に、macroやmodelなどJinjaテンプレート中からDWHへクエリを発行するものとなっております。
簡単な例として、DWH上の現在時刻を取得するマクロを作成してみましょう。

macros/db_now.sql
{%- macro db_now() %}
    {%- if execute %}
    {%- call statement('db_now', fetch_result=True) -%}
        select {{ dbt.current_timestamp() }}
    {%- endcall -%}

    {%- set db_now = load_result('db_now') -%}
    {{ return(db_now['table'].rows[0][0]) }}
    {%- endif %}
{%- endmacro %}
models/db_now.sql
{{
    config(
        materialized='table',
    )
}}

{% do log(db_now(), info=True) %}
SELECT 1
$ dbt build --select db_now
05:11:39  Running with dbt=1.3.1
05:11:40  
05:11:40  Found 3 models, 4 tests, 0 snapshots, 0 analyses, 290 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
05:11:40  
05:11:40  Concurrency: 4 threads (target='dev')
05:11:40  
05:11:40  1 of 1 START sql table model dev.db_now ........................................ [RUN]
05:11:40  2022-12-05 05:11:40.364279+00:00
05:11:40  1 of 1 OK created sql table model dev.db_now ................................... [SELECT 1 in 0.17s]
05:11:40  
05:11:40  Finished running 1 table model in 0 hours 0 minutes and 0.44 seconds (0.44s).
05:11:40  
05:11:40  Completed successfully
05:11:40  
05:11:40  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

どうでしょうか? 実行できたでしょうか。 それっぽいものがlogに出力されてますね。

run_query

statements blocks の使い方について説明しましたが、少し使い方が複雑です。
ドキュメントを読むと、トランザクションについてや結果の取得について等色々と細かく制御できるというメリットがありますが、コレを毎回書くのは手間です。
そこで、簡易的にクエリを実行してその結果を取得するためのラッパーとして run_queryというmacroが提供されています。

https://docs.getdbt.com/reference/dbt-jinja-functions/run_query

先程のクエリを以下のように書き換えてみます。

macros/db_now.sql
{%- macro db_now() %}
    {%- if execute %}
    {%- set sql-%}
        select {{ dbt.current_timestamp() }}
    {%- endset -%}
    {{ return(run_query(sql).rows[0][0]) }}
    {%- endif %}
{%- endmacro %}

少し短くなりましたね。実用上は、run_query macroを使うだけでいいと思います。

ちなみにですが、run_queryの戻り値は agateと呼ばれるPythonモジュールのTableオブジェクトになります。

https://agate.readthedocs.io/en/latest/api/table.html

色々便利なインタフェースが用意されているので、ドキュメントを読み解けば便利に使えます。

例えば以下のようなものはどうでしょう。

macros/printer.sql
{%- macro print_model(this) %}
    {%- if execute %}
    {%- set sql-%}
            select * from {{ this }}
    {%- endset -%}
    {%- do log(this ~ ' is following:', info=True) %}
    {%- do run_query(sql).print_table(**kwargs) %}
    {%- endif %}
{%- endmacro %}

{%- macro print_model_as_bar(this,key) %}
    {%- if execute %}
    {%- set sql-%}
            select * from {{ this }}
    {%- endset -%}
    {%- do log(this ~ ' show bar graph:', info=True) %}
    {%- do run_query(sql).pivot(key).print_bars(key) %}
    {%- endif %}
{%- endmacro %}
models/example/my_first_dbt_model.sql
{{
    config(
        materialized='table',
        post_hook=[
            after_commit(print_model(this)),
            after_commit(print_model_as_bar(this,'region')),
        ],
    )
}}

{%- set data =[
    [1, 'hoge', 'us'],
    [2, 'fuga', 'us'],
    [3, 'piyo', 'jp'],
    [4, 'tora', 'jp'],
    [5, 'alice', 'us'],
    [6, 'smith', 'us'],
] %}
with source_data as (
    {%- for record in data %}
    select {{ record[0] }} as id, '{{ record[1] }}' as name, '{{ record[2] }}' as region
    {%- if not loop.last %} union all{%- endif %}
    {%- endfor %}
)

select *
from source_data
$ dbt build --select my_first_dbt_model
07:17:51  Running with dbt=1.3.1
07:17:52  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 291 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
07:17:52  
07:17:52  Concurrency: 4 threads (target='dev')
07:17:52  
07:17:52  1 of 3 START sql table model dev.my_first_dbt_model ............................ [RUN]
07:17:52  "postgres"."dev"."my_first_dbt_model" is following:
| id | name  | region |
| -- | ----- | ------ |
|  1 | hoge  | us     |
|  2 | fuga  | us     |
|  3 | piyo  | jp     |
|  4 | tora  | jp     |
|  5 | alice | us     |
|  6 | smith | us     |
07:17:52  "postgres"."dev"."my_first_dbt_model" show bar graph:
region Count
us         4 ▓░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
jp         2 ▓░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░                                                     
             +-------------------------+--------------------------+--------------------------+-------------------------+
             0                         1                          2                          3                         4
07:17:52  1 of 3 OK created sql table model dev.my_first_dbt_model ....................... [SELECT 6 in 0.47s]
07:17:52  2 of 3 START test not_null_my_first_dbt_model_id ............................... [RUN]
07:17:52  3 of 3 START test unique_my_first_dbt_model_id ................................. [RUN]
07:17:53  2 of 3 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.13s]
07:17:53  3 of 3 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.13s]
07:17:53  
07:17:53  Finished running 1 table model, 2 tests in 0 hours 0 minutes and 1.00 seconds (1.00s).
07:17:53  
07:17:53  Completed successfully
07:17:53  
07:17:53  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

便利ですね!
ところで皆様お気づきでしょうか? executeのif文があります。

そう、実はrun_queryもstatment blocksも実はexecute phaseでのみ使えるという制限があります。

https://docs.getdbt.com/reference/dbt-jinja-functions/execute

Any Jinja that relies on a result being returned from the database will error during the parse phase.

このようにあります。応用的なマクロを書く際には動作モードの意識が非常に重要ですね。


10日目は、より実用的なマクロを書くために on-run-end Context について書きます。

Discussion

ログインするとコメントできます