dbtからSnowflake Dynamic Tablesを作成してリアルタイムデータパイプラインを構築してみる
これは何?
こんにちは。
dely株式会社でデータエンジニアをしておりますharry(@gappy50)です。
この記事は、昨年書いた以下の記事の続きの記事になります。
SnowflakeではDynamic TablesのPuPrが開始されており、宣言的なデータパイプラインの全貌徐々に見え隠れしております。
また、これに追従する形でdbt1.6でもMaterialized View(SnowflakeではDynamic Table)をサポートしはじめました。
このDynamic Tablesのメリットとして一番わかりやすいのは、ニアリアルタイムなストリーミングパイプラインをクエリを書くだけで実現が可能になる面だと思います。
これまではモデルを作成したあとのワークロードの実行は dbt build
を実行するタイミングとなってしまうため、リアルタイムなデータパイプラインの構築が難しい側面がありました。
リアルタイムなデータを使いたい場合は昨年書いた記事のように、Snowflakeのstream+taskを用いたデータパイプラインの構築をするか、Lambda Viewと呼ばれる履歴テーブルのモデルとそのモデルと同様のロジックをもつ最新ビューをunion allをするような方法を取るなど少々煩雑かつ専門的な対応が必要になります。(もしくはdbt Jobをせっかちな間隔で更新し続ける方法もあるかもしれません)
今回は、dbtでモデリングされているLambda ViewでのリアルタイムなデータパイプラインをDynamic Tablesへ移行をするサンプルを動かしてみて、Dynamic Tablesとは何なのかを理解していきたいと思います。
Lambda Viewを実装する
簡単な例にてLambda Viewを実装してみます。最終的には以下のようなData Lineageになります。
まずは、ソースとなるテーブルをダミーデータとして作成します。
create or replace transient table src_table as
select
object_construct(
'session_id', uuid_string(),
'user_id', abs(random())%1000,
'num_of_conversion', abs(random())%100,
'_created_at', dateadd(sec, 0 - abs(random())%1000000, current_timestamp)::timestamp_ntz
)::variant as source
from
table(generator(rowcount => 1000000));
VARIANT列に半構造化データが1,000,000レコード格納されています。
{ "_created_at": "2023-08-07 17:32:00.099", "num_of_conversion": 67, "session_id": "9fdd66ac-fd1e-4278-80c9-369ff2df3bdb", "user_id": 861 }
{ "_created_at": "2023-08-07 10:21:21.099", "num_of_conversion": 36, "session_id": "c30db4a4-2c9e-4acd-a5e2-ee23ee85330a", "user_id": 309 }
{ "_created_at": "2023-07-30 08:25:34.099", "num_of_conversion": 1, "session_id": "f14a878b-a0f2-445e-8393-39524ea67a8f", "user_id": 658 }
{ "_created_at": "2023-08-05 17:16:14.099", "num_of_conversion": 85, "session_id": "8247c1e4-8736-40a8-a490-5e0640ff480c", "user_id": 768 }
{ "_created_at": "2023-08-04 14:14:46.099", "num_of_conversion": 27, "session_id": "1a8a4f3d-b3a7-427c-ad9b-cf28dd16b387", "user_id": 881 }
・・・
これらのソーステーブルを使ってdbtで履歴テーブルに該当するモデルを作ってみましょう。
version: 2
sources:
- name: dynamic_table
database: test
schema: test
tables:
- name: src_table
{{ config( materialized='table' ) }}
select
-- staging層で必要な構造化処理や不要なデータのclean upなどをここで行う
source:session_id::string as session_id,
source:user_id::string as user_id,
source:num_of_conversion::number as number_of_conversion,
source:_created_at::timestamp as created_at
from
{{ source('dynamic_table', 'src_table') }}
続いて、履歴テーブルと同様のロジックをもつ最新ビューも作成してみます。
{{ config( materialized='view' ) }}
select
-- staging層で必要な構造化処理や不要なデータのclean upなどをここで行う
source:session_id::string as session_id,
source:user_id::string as user_id,
source:num_of_conversion::number as number_of_conversion,
source:_created_at::timestamp as created_at
from
{{ source('dynamic_table', 'src_table') }}
これらの2つのモデルを使って、テーブル化された過去データによる性能を担保しながら、テーブル化されていない最新のデータをビューとして利用できるようにします。
{{ config( materialized='view' ) }}
with current_view as (
select * from {{ref('current_view')}}
-- 必要に応じてhistorical_tableのmax値などにするのもあり
where created_at >= '{{ run_started_at }}'
),
historical_table as (
select * from {{ref('historical_table')}}
where created_at < '{{ run_started_at }}'
),
final as (
select * from current_view
union all
select * from historical_table
)
select * from final
dbt build --select +lambda_view
を実行しLambda Viewを作成したあとに select * from lambda_view
をしてると、先程作成した100万レコードのデータが構造化された状態でクエリできていることが確認できます。
続いて、ソーステーブルに10万レコードのダミーデータを追加してみます。
insert into src_table
select
object_construct(
'session_id', uuid_string(),
'user_id', abs(random())%1000,
'num_of_conversion', abs(random())%100,
'_created_at', dateadd(sec, abs(random())%1000000, current_timestamp)::timestamp_ntz
)::variant as source
from
table(generator(rowcount => 100000));
データ追加後に再度 select * from lambda_view
を実行してみると、履歴テーブルの100万レコードと最新ビューの10万レコードの合計110万レコードが重複なしにクエリできることが確認できます。
このようにLambda Viewは最新のデータも含めた分析が可能となるモデルを作成することが可能な反面、同一のロジックを作成しなければならない煩雑性が常に発生することや、複雑なモデリングが必要な場合はより考えることが多くなってきます。
Dynamic Table
それではこのLambda ViewをDynamic Tableへ置換してみます。
{{
config(
materialized = 'dynamic_table',
snowflake_warehouse = 'PC_DBT_WH',
target_lag = '1 minute',
on_configuration_change = 'apply',
)
}}
with final as (
select * from {{ref('current_view')}}
)
select * from final
これだけで、ニアリアルタイムなデータパイプラインの実装が可能になります。
…めっちゃ楽や…
WarehouseやLagの設定は本番のデータ量やワークロードに対して許容できる設定にする必要があります。
コスト観点などから開発環境は本番環境でラグの時間を変更したい場合は以下ようなmacroを作成してlarget_lagに設定することもできるようです[1]。
{% macro target_lag_environment() %}
{% set lag = '1 minute' if target.name == "prod" else '35 days' %}
{{ return(lag) }}
{% endmacro %}
dbt build --select dynamic_table
を実行するとDynamic Tableとしてクエリが実行可能となります。
作成後にLambda Viewと同様に新たに10万件のダミーデータを追加してみたいと思います。
insert into src_table
select
object_construct(
'session_id', uuid_string(),
'user_id', abs(random())%1000,
'num_of_conversion', abs(random())%100,
'_created_at', dateadd(sec, abs(random())%1000000, current_timestamp)::timestamp_ntz
)::variant as source
from
table(generator(rowcount => 100000));
約1分後に追加されたデータがロードされ120万レコードのデータが参照できていることが確認できます。
あわせてダイナミックテーブルの履歴タブからも10万レコードが追加されたことが確認ができます。
最後に
いかがでしたでしょうか?
今回はLambda ViewをベースにDynamic Tablesのメリットを見ていきましたが、Dynamic Tableは複雑なワークロードを宣言的に書けるメリットがあるためかなりモデル〜デプロイ〜運用が楽になりそうですね。
また、マテリアライズド・ビューとは異なり結合やUNIONなどの複雑なモデリングにも対応をしていることから、Lambda View以外の既存のdbtモデル自体もDynamic Tableにすることでワークロードを考える負荷を下げながらデータエンジニアリングできるという意味でも検討の余地がありそうな気がしています。
引き続き、色々と情報収集をしながらみんなでDynamic Tableの良さを語りたいものですね。
Snowlfake データクラウドのユーザ会 SnowVillage のメンバーで運営しています。 Publication参加方法はこちらをご参照ください。 zenn.dev/dataheroes/articles/db5da0959b4bdd
Discussion