How to incrementally load data into Snowflake with dbt - dbtにCOPY

2022/10/12に公開

がく@ちゅらデータエンジニア です。
※2022年9月よりちゅらデータ株式会社にJoinしました

dbtは、ELTのうちのTを司るツール・・・・という話ではあるのですが、某案件で S3に上げたParquetファイルをdbtでSTAGEから読み込んで、増分を取り込む仕組みがあったんです。
で、dbtでも、SnowflakeのCOPY INTOをつかって取り込みができないか?
SQLが実行できるんだから、dbtでもできんじゃね?
ってことで、調べてたらこんな記事にたどり着きました

http://mamykin.com/posts/fast-data-load-snowflake-dbt/

TLDR

dbtは基本的に、すでにデータがDWHにロードされていて、単純なSELECTで色々マートを作ったり、モデリングすることが想定されてる。

この記事ではS3からSnowflakeにデータを効率的にロードする方法と、この方法をカスタムメタライゼーションマクロを使ってdbtに統合する方法について説明する

はじめに

最近、S3に保存されているテラバイトの生データをdbtで効率的に新しいSnowflakeアカウントにロードする方法を模索している。

dbtは、データ変換やデータモデリングを司るツール

SnowflakeのSTAGEのオブジェクトの基本

Snowflakeには、STAGEと呼ばれるデータベースオブジェクトがある。
これは、S3やGCSなどにデータファイルにアクセスできるようにする仕組み
その中でも、S3においてるファイルにアクセスするので、「外部ステージ」を参照することになる

create or replace stage my_s3_stage url='s3://mybucket/raw_files/'
  credentials=(aws_key_id='...' aws_secret_key='...')
  file_format = (type = PARQUET);

※STAGEが参照するURLやCredentialsは、別途Integrationと呼ばれるSnowflakeオブジェクトで設定するほうがいいと思う。こちらは、とりあえずわかりやすく書いてみただけとおもう。

STAGEを作成したら、STAGEの中のファイルをSnowflakeのLIST句で一覧できる。

list @my_s3_stage/path

内容を見ることもできる

select * from @my_s3_stage

※見ることがはできるが、パフォーマンスは期待できない。Snowflakeのマイクロパーティションといったパフォーマンスを最適化する仕組みが使えないので、あくまでも「単に見るだけ」の方法がこちら

最初の試み:dbtテーブルのマテリアライゼーション

外部ステージから実体化した簡単なdbtモデルは次のようになります。

{{
  config(materialized='table')
}}

select
  $1:field_one::int as field_one,
  $1:field_two::string as field_two
from @my_s3_stage

dbt runで簡単なテストを行った後、ステージから読み込まれたデータでモデルが作られた。
小さなサイズのテーブルでは、これ十分。
しかし、読み込むデータがかなりの大きさになると、この実体化はすぐに欠点が見えてきます。最初のロードの後、dbtを実行すると、テーブル全体が取り込まれてしまう。

なぜでしょうか?dbtのテーブルマテリアライズはCTAS(create table as select)ステートメントを使用しているからです。上のモデルの場合、それは次のようになります。

create table my_model as (
  select
    $1:field_one::int as field_one,
    $1:field_two::string as field_two
  from @my_s3_stage
)

大きなRAWテーブルの場合、ステージS3のロケーションで作成された新しいファイルのみを検出し、ロードすることが必要になります。

2回めの試み:dbtによるインクリメンタルなマテリアライゼーション

生データにevent_timeフィールドがあると仮定して、欠落しているレコードだけを段階的にロードするために、次のようにモデルを書き直すことにしましょう。
※ただ、この方法だと event_timeが常に増えるとか・・・そういう部分で足をすくわれてしまう

{{
  config(materialized='incremental')
}}

select
  $1:field_one::int as field_one,
  $1:field_two::string as field_two,
  $1:event_time::timestamp as event_time
from @my_s3_stage
{% if is_incremental() %}
  -- this filter will only be applied on an incremental run
  where event_time > (select max(event_time) from {{ this }})
{% endif %}

このモデルを2回目に実行すると、増分された負荷が元のフル負荷とほぼ同じ時間を要することがわかります。なぜでしょうか?
Snowflakeのステージはテーブルではないので、

select ... from @my_s3_stage where event_time > ...

はステージ内のすべてのrawファイルを読み込み、それをパースして、event_timeに基づいてフィルタリングするからです。

これは、Snowflakeが計算時間に対して課金することを考えると、まだまだ無駄が多い

もっと良い方法はないだろうか?S3のパーティションを使って何かスマートな方法を試すこともできますし(データをパーティショニングする ことは常に良い習慣です)、
リスト @my_s3_stageを使って新しいファイルを明示的に検出し、それらのファイルだけを@my_s3_stage/path/to/new/file.parquet からロードすることも可能です。

しかし、SnowflakeのCOPY INTOコマンドを使うほうが良きです

SnowflakeのCOPY INTOテーブル

COPY INTOは、冪等性が保証(?)されてます。
同じSTAGEのファイルであれば、複数回実行してもすでに取り込んだファイルは取り込まれません。
再度同じファイルを取り込む場合は、

  • すでにロードされたファイルのチェックサムが変更された場合
  • ロードされたメタデータは64日後に期限切れになる

3回目の試み:COPY INTOを使ったカスタムマテリアライゼーション

幸運なことに、dbtでは独自のメタライゼーションを定義することができるので、COPY INTOを使うマテリア来ゼーションを作成してみます。
※  from_externai_stage っていうマクロ名。個人的には、COPY INTO してる!って主張するマクロがいいと思うなー

{{
  config(
    materialized='from_external_stage',
    stage_url = 's3://bucket/path/to/model/raw/data/'
  )
}}

select
  $1:field_one::int as field_one,
  $1:field_two::string as field_two
from {{ external_stage() }}

macros/from_external_stage_materialization.sql
として下記のマクロを作成する

{% macro external_stage(path='') %}
    @__STAGE_TOKEN__{{path}}
{% endmacro %}

{% macro ensure_external_stage(stage_name, s3_url, file_format, temporary=False) %}
    {{ log('Making external stage: ' ~ [stage_name, s3_url, file_format, temporary] | join(', ')) }}
    create or replace stage {{ 'temporary' if temporary }} {{ stage_name }}
        url='{{ s3_url }}'
        credentials=(aws_key_id='{{ env_var("SNOWFLAKE_AWS_ACCESS_KEY_ID") }}' aws_secret_key='{{ env_var("SNOWFLAKE_AWS_SECRET_ACCESS_KEY") }}')
        file_format = {{ file_format }};
{% endmacro %}

{% materialization from_external_stage, adapter='snowflake' -%}
    {%- set identifier = model['alias'] -%}
    {%- set stage_name = namespace_stage_name(config.get('stage_name', default=identifier ~ '_stage')) -%}
    {%- set stage_url = config.require('stage_url') -%}
    {%- set stage_file_format = config.get('stage_file_format', default='(type = PARQUET)') -%}
    {%- call statement() -%}
        {{ ensure_external_stage(stage_name, stage_url, stage_file_format, temporary=False) }}
    {%- endcall -%}

    {%- set old_relation = adapter.get_relation(schema=schema, identifier=identifier) -%}
    {%- set target_relation = api.Relation.create(schema=schema, identifier=identifier, type='table') -%}

    {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
    {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
    {%- set should_drop = (full_refresh_mode or not exists_as_table) -%}

    -- setup
    {% if old_relation is none -%}
        -- noop
    {%- elif should_drop -%}
        {{ adapter.drop_relation(old_relation) }}
        {%- set old_relation = none -%}
    {%- endif %}

    {{ run_hooks(pre_hooks, inside_transaction=False) }}

    -- `BEGIN` happens here:
    {{ run_hooks(pre_hooks, inside_transaction=True) }}

    -- build model
    {% if full_refresh_mode or old_relation is none -%}
        {#
            -- Create an empty table with columns as specified in sql.
            -- We append a unique invocation_id to ensure no files are actually loaded, and an empty row set is returned,
            -- which serves as a template to create the table.
        #}
        {%- call statement() -%}
            CREATE OR REPLACE TABLE {{ target_relation }} AS (
                {{ sql | replace('__STAGE_TOKEN__', stage_name ~ '/' ~ invocation_id) }}
            )
        {%- endcall -%}
    {%- endif %}

    {# Perform the main load operation using COPY INTO #}
    {# See https://docs.snowflake.net/manuals/user-guide/data-load-considerations-load.html #}
    {# See https://docs.snowflake.net/manuals/user-guide/data-load-transform.html #}
    {%- call statement('main') -%}
        {# TODO: Figure out how to deal with the ordering of columns changing in the model sql... #}
        COPY INTO {{ target_relation }}
        FROM (
            {{ sql | replace('__STAGE_TOKEN__', stage_name)}}
        )
    {% endcall %}

    {{ run_hooks(post_hooks, inside_transaction=True) }}

    -- `COMMIT` happens here
    {{ adapter.commit() }}

    {{ run_hooks(post_hooks, inside_transaction=False) }}

{%- endmaterialization %}

Discussion