dbt macro tips advent calendar 2022 day 9 - statmentとrun_query
便利なデータ変換ツールである dbt の中のmacroに関するtipsを書いていく dbt macro tips Advent Calendar 2022 9日目です。
statement blocks
そろそろ、dbtの実行時にDWHにアクセスして、その結果を元にsqlを書き換えたくなってきてる頃合いではないでしょうか?
それを実現するのが statement blocks
です。
こちらは単純に、macroやmodelなどJinjaテンプレート中からDWHへクエリを発行するものとなっております。
簡単な例として、DWH上の現在時刻を取得するマクロを作成してみましょう。
{%- macro db_now() %}
{%- if execute %}
{%- call statement('db_now', fetch_result=True) -%}
select {{ dbt.current_timestamp() }}
{%- endcall -%}
{%- set db_now = load_result('db_now') -%}
{{ return(db_now['table'].rows[0][0]) }}
{%- endif %}
{%- endmacro %}
{{
config(
materialized='table',
)
}}
{% do log(db_now(), info=True) %}
SELECT 1
$ dbt build --select db_now
05:11:39 Running with dbt=1.3.1
05:11:40
05:11:40 Found 3 models, 4 tests, 0 snapshots, 0 analyses, 290 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
05:11:40
05:11:40 Concurrency: 4 threads (target='dev')
05:11:40
05:11:40 1 of 1 START sql table model dev.db_now ........................................ [RUN]
05:11:40 2022-12-05 05:11:40.364279+00:00
05:11:40 1 of 1 OK created sql table model dev.db_now ................................... [SELECT 1 in 0.17s]
05:11:40
05:11:40 Finished running 1 table model in 0 hours 0 minutes and 0.44 seconds (0.44s).
05:11:40
05:11:40 Completed successfully
05:11:40
05:11:40 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
どうでしょうか? 実行できたでしょうか。 それっぽいものがlogに出力されてますね。
run_query
statements blocks の使い方について説明しましたが、少し使い方が複雑です。
ドキュメントを読むと、トランザクションについてや結果の取得について等色々と細かく制御できるというメリットがありますが、コレを毎回書くのは手間です。
そこで、簡易的にクエリを実行してその結果を取得するためのラッパーとして run_query
というmacroが提供されています。
先程のクエリを以下のように書き換えてみます。
{%- macro db_now() %}
{%- if execute %}
{%- set sql-%}
select {{ dbt.current_timestamp() }}
{%- endset -%}
{{ return(run_query(sql).rows[0][0]) }}
{%- endif %}
{%- endmacro %}
少し短くなりましたね。実用上は、run_query
macroを使うだけでいいと思います。
ちなみにですが、run_query
の戻り値は agateと呼ばれるPythonモジュールのTableオブジェクトになります。
色々便利なインタフェースが用意されているので、ドキュメントを読み解けば便利に使えます。
例えば以下のようなものはどうでしょう。
{%- macro print_model(this) %}
{%- if execute %}
{%- set sql-%}
select * from {{ this }}
{%- endset -%}
{%- do log(this ~ ' is following:', info=True) %}
{%- do run_query(sql).print_table(**kwargs) %}
{%- endif %}
{%- endmacro %}
{%- macro print_model_as_bar(this,key) %}
{%- if execute %}
{%- set sql-%}
select * from {{ this }}
{%- endset -%}
{%- do log(this ~ ' show bar graph:', info=True) %}
{%- do run_query(sql).pivot(key).print_bars(key) %}
{%- endif %}
{%- endmacro %}
{{
config(
materialized='table',
post_hook=[
after_commit(print_model(this)),
after_commit(print_model_as_bar(this,'region')),
],
)
}}
{%- set data =[
[1, 'hoge', 'us'],
[2, 'fuga', 'us'],
[3, 'piyo', 'jp'],
[4, 'tora', 'jp'],
[5, 'alice', 'us'],
[6, 'smith', 'us'],
] %}
with source_data as (
{%- for record in data %}
select {{ record[0] }} as id, '{{ record[1] }}' as name, '{{ record[2] }}' as region
{%- if not loop.last %} union all{%- endif %}
{%- endfor %}
)
select *
from source_data
$ dbt build --select my_first_dbt_model
07:17:51 Running with dbt=1.3.1
07:17:52 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 291 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
07:17:52
07:17:52 Concurrency: 4 threads (target='dev')
07:17:52
07:17:52 1 of 3 START sql table model dev.my_first_dbt_model ............................ [RUN]
07:17:52 "postgres"."dev"."my_first_dbt_model" is following:
| id | name | region |
| -- | ----- | ------ |
| 1 | hoge | us |
| 2 | fuga | us |
| 3 | piyo | jp |
| 4 | tora | jp |
| 5 | alice | us |
| 6 | smith | us |
07:17:52 "postgres"."dev"."my_first_dbt_model" show bar graph:
region Count
us 4 ▓░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
jp 2 ▓░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
+-------------------------+--------------------------+--------------------------+-------------------------+
0 1 2 3 4
07:17:52 1 of 3 OK created sql table model dev.my_first_dbt_model ....................... [SELECT 6 in 0.47s]
07:17:52 2 of 3 START test not_null_my_first_dbt_model_id ............................... [RUN]
07:17:52 3 of 3 START test unique_my_first_dbt_model_id ................................. [RUN]
07:17:53 2 of 3 PASS not_null_my_first_dbt_model_id ..................................... [PASS in 0.13s]
07:17:53 3 of 3 PASS unique_my_first_dbt_model_id ....................................... [PASS in 0.13s]
07:17:53
07:17:53 Finished running 1 table model, 2 tests in 0 hours 0 minutes and 1.00 seconds (1.00s).
07:17:53
07:17:53 Completed successfully
07:17:53
07:17:53 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
便利ですね!
ところで皆様お気づきでしょうか? executeのif文があります。
そう、実はrun_queryもstatment blocksも実はexecute phaseでのみ使えるという制限があります。
Any Jinja that relies on a result being returned from the database will error during the parse phase.
このようにあります。応用的なマクロを書く際には動作モードの意識が非常に重要ですね。
10日目は、より実用的なマクロを書くために on-run-end Context について書きます。
Discussion