🧨

dbtからSnowflake Dynamic Tablesを作成してリアルタイムデータパイプラインを構築してみる

2023/08/10に公開

これは何?

こんにちは。
dely株式会社でデータエンジニアをしておりますharry(@gappy50)です。

この記事は、昨年書いた以下の記事の続きの記事になります。
https://zenn.dev/gappy/articles/bcced35e8156ba

SnowflakeではDynamic TablesのPuPrが開始されており、宣言的なデータパイプラインの全貌徐々に見え隠れしております。
https://docs.snowflake.com/en/user-guide/dynamic-tables-about

また、これに追従する形でdbt1.6でもMaterialized View(SnowflakeではDynamic Table)をサポートしはじめました。
https://docs.getdbt.com/blog/announcing-materialized-views

このDynamic Tablesのメリットとして一番わかりやすいのは、ニアリアルタイムなストリーミングパイプラインをクエリを書くだけで実現が可能になる面だと思います。

これまではモデルを作成したあとのワークロードの実行は dbt build を実行するタイミングとなってしまうため、リアルタイムなデータパイプラインの構築が難しい側面がありました。

リアルタイムなデータを使いたい場合は昨年書いた記事のように、Snowflakeのstream+taskを用いたデータパイプラインの構築をするか、Lambda Viewと呼ばれる履歴テーブルのモデルとそのモデルと同様のロジックをもつ最新ビューをunion allをするような方法を取るなど少々煩雑かつ専門的な対応が必要になります。(もしくはdbt Jobをせっかちな間隔で更新し続ける方法もあるかもしれません)

今回は、dbtでモデリングされているLambda ViewでのリアルタイムなデータパイプラインをDynamic Tablesへ移行をするサンプルを動かしてみて、Dynamic Tablesとは何なのかを理解していきたいと思います。

Lambda Viewを実装する

簡単な例にてLambda Viewを実装してみます。最終的には以下のようなData Lineageになります。

Lambda View

まずは、ソースとなるテーブルをダミーデータとして作成します。

create or replace transient table src_table as 
select
    object_construct(
        'session_id', uuid_string(),
        'user_id', abs(random())%1000,
        'num_of_conversion', abs(random())%100,
        '_created_at', dateadd(sec, 0 - abs(random())%1000000, current_timestamp)::timestamp_ntz
    )::variant as source
from 
    table(generator(rowcount => 1000000));

VARIANT列に半構造化データが1,000,000レコード格納されています。

{   "_created_at": "2023-08-07 17:32:00.099",   "num_of_conversion": 67,   "session_id": "9fdd66ac-fd1e-4278-80c9-369ff2df3bdb",   "user_id": 861 }
{   "_created_at": "2023-08-07 10:21:21.099",   "num_of_conversion": 36,   "session_id": "c30db4a4-2c9e-4acd-a5e2-ee23ee85330a",   "user_id": 309 }
{   "_created_at": "2023-07-30 08:25:34.099",   "num_of_conversion": 1,   "session_id": "f14a878b-a0f2-445e-8393-39524ea67a8f",   "user_id": 658 }
{   "_created_at": "2023-08-05 17:16:14.099",   "num_of_conversion": 85,   "session_id": "8247c1e4-8736-40a8-a490-5e0640ff480c",   "user_id": 768 }
{   "_created_at": "2023-08-04 14:14:46.099",   "num_of_conversion": 27,   "session_id": "1a8a4f3d-b3a7-427c-ad9b-cf28dd16b387",   "user_id": 881 }
・・・

これらのソーステーブルを使ってdbtで履歴テーブルに該当するモデルを作ってみましょう。

models/staging/_dynamic_table__sources.yml
version: 2

sources:
  - name: dynamic_table
    database: test
    schema: test
    tables:
      - name: src_table
models/staging/historical_table.sql
{{ config( materialized='table' ) }}

select
    -- staging層で必要な構造化処理や不要なデータのclean upなどをここで行う
    source:session_id::string as session_id,
    source:user_id::string as user_id,
    source:num_of_conversion::number as number_of_conversion,
    source:_created_at::timestamp as created_at
from 
    {{ source('dynamic_table', 'src_table') }}

続いて、履歴テーブルと同様のロジックをもつ最新ビューも作成してみます。

models/staging/current_view.sql
{{ config( materialized='view' ) }}

select
    -- staging層で必要な構造化処理や不要なデータのclean upなどをここで行う
    source:session_id::string as session_id,
    source:user_id::string as user_id,
    source:num_of_conversion::number as number_of_conversion,
    source:_created_at::timestamp as created_at
from 
    {{ source('dynamic_table', 'src_table') }}

これらの2つのモデルを使って、テーブル化された過去データによる性能を担保しながら、テーブル化されていない最新のデータをビューとして利用できるようにします。

Lambda Viewの考え方

models/marts/lambda_view.sql
{{ config( materialized='view' ) }}

with current_view as (
  select * from {{ref('current_view')}}
  -- 必要に応じてhistorical_tableのmax値などにするのもあり
  where created_at >= '{{ run_started_at }}' 
),
historical_table as (
  select * from {{ref('historical_table')}}
  where created_at < '{{ run_started_at }}'
),
final as (
  select * from current_view
  union all
  select * from historical_table
)
select * from final

dbt build --select +lambda_view を実行しLambda Viewを作成したあとに select * from lambda_view をしてると、先程作成した100万レコードのデータが構造化された状態でクエリできていることが確認できます。

続いて、ソーステーブルに10万レコードのダミーデータを追加してみます。

insert into src_table  
select
    object_construct(
        'session_id', uuid_string(),
        'user_id', abs(random())%1000,
        'num_of_conversion', abs(random())%100,
        '_created_at', dateadd(sec, abs(random())%1000000, current_timestamp)::timestamp_ntz
    )::variant as source
from 
    table(generator(rowcount => 100000));

データ追加後に再度 select * from lambda_view を実行してみると、履歴テーブルの100万レコードと最新ビューの10万レコードの合計110万レコードが重複なしにクエリできることが確認できます。

リアルタイムデータも最新ビューから取得をすることでクエリが可能に

このようにLambda Viewは最新のデータも含めた分析が可能となるモデルを作成することが可能な反面、同一のロジックを作成しなければならない煩雑性が常に発生することや、複雑なモデリングが必要な場合はより考えることが多くなってきます。

Dynamic Table

それではこのLambda ViewをDynamic Tableへ置換してみます。

models/marts/dynamic_table.sql
{{
config(
    materialized = 'dynamic_table',
    snowflake_warehouse = 'PC_DBT_WH',
    target_lag = '1 minute',
    on_configuration_change = 'apply',
)
}}

with final as (
  select * from {{ref('current_view')}}
)
select * from final

これだけで、ニアリアルタイムなデータパイプラインの実装が可能になります。

…めっちゃ楽や…

WarehouseやLagの設定は本番のデータ量やワークロードに対して許容できる設定にする必要があります。
コスト観点などから開発環境は本番環境でラグの時間を変更したい場合は以下ようなmacroを作成してlarget_lagに設定することもできるようです[1]

{% macro target_lag_environment() %}
{% set lag = '1 minute' if target.name == "prod" else '35 days' %}
{{ return(lag) }}
{% endmacro %}

dbt build --select dynamic_table を実行するとDynamic Tableとしてクエリが実行可能となります。

Dynamic Table

作成後にLambda Viewと同様に新たに10万件のダミーデータを追加してみたいと思います。

insert into src_table  
select
    object_construct(
        'session_id', uuid_string(),
        'user_id', abs(random())%1000,
        'num_of_conversion', abs(random())%100,
        '_created_at', dateadd(sec, abs(random())%1000000, current_timestamp)::timestamp_ntz
    )::variant as source
from 
    table(generator(rowcount => 100000));

約1分後に追加されたデータがロードされ120万レコードのデータが参照できていることが確認できます。

あわせてダイナミックテーブルの履歴タブからも10万レコードが追加されたことが確認ができます。

最後に

いかがでしたでしょうか?
今回はLambda ViewをベースにDynamic Tablesのメリットを見ていきましたが、Dynamic Tableは複雑なワークロードを宣言的に書けるメリットがあるためかなりモデル〜デプロイ〜運用が楽になりそうですね。

また、マテリアライズド・ビューとは異なり結合やUNIONなどの複雑なモデリングにも対応をしていることから、Lambda View以外の既存のdbtモデル自体もDynamic Tableにすることでワークロードを考える負荷を下げながらデータエンジニアリングできるという意味でも検討の余地がありそうな気がしています。

https://docs.snowflake.com/en/user-guide/dynamic-tables-comparison

引き続き、色々と情報収集をしながらみんなでDynamic Tableの良さを語りたいものですね。

脚注
  1. https://docs.getdbt.com/blog/announcing-materialized-views#production ↩︎

Snowflake Data Heroes

Discussion