Zenn
🗂️

Snowflake 外部テーブルで S3 に蓄積された JSON をテーブル化する

に公開
1

はじめに

こんにちは!シンプルフォームの山岸です。

Snowflake 外部テーブルを使って S3 に蓄積された JSON ファイルをテーブル化し、さらにそれをマテリアライズしたテーブルを増分更新する方法について検討してみたので、本記事ではその内容についてご紹介できればと思います。

JSON + S3 を前提とした内容になっていますが、その他のファイル形式・オブジェクトストレージ(Azure Blob Storage, Google Cloud Storage)にも適用可能な内容かと思いますので、参考になるものがあれば幸いです。

Snowflake 外部テーブルとは

外部テーブル とは 公式ドキュメント にもある通り、外部ステージ に格納されているデータに対して、Snowflake テーブルのようにクエリできる機能です。読み取り専用ではありますが、データを Snowflake 内に取り込まなくても Snowflake から簡単にアクセスする手段を提供します。

外部テーブルを作成する際、ファイルの中身だけでなく、S3 オブジェクトキーの一部をカラムとして生成することができます。Glue データカタログ化することなどを見越して S3 上でパーティション分割している場合、これをそのまま外部テーブルのカラムとして持たせ、かつ外部テーブルの partition としても指定しておくことで、効率的なデータ更新・クエリ実行を実現できます。

注意点としては、クエリする度に外部ストレージへの参照が発生するため、通常テーブルと比較すると読み取りパフォーマンスは悪くなります。分析でよく利用する可能性がある場合は、外部テーブルをソースに別途 Materialized View や Table を作成しておくのが良さそうです。

Terraform による外部テーブルの作成

前提

データを格納する S3 とのストレージ統合 [1] が設定されていることを前提とします。

また、S3 フォルダ構成として以下のようなものを想定します。
拡張子を除くファイル名をレコード ID、更新日 ( update_date ) をパーティションキーとして Snowflake 外部テーブルに対応させ、JSON の中身を VALUE カラムに持たせてみます。

外部ステージ

まず、外部ステージを作成します。外部テーブル作成時の location 指定の起点になります。

stage.tf
locals {
  storage_integration = {
    name = "TEST_DB_S3INT"
  }
  url = {
    s3_bucket = "example-bucket"
    s3_prefix = "integrations/test_db/test_schema"
  }
}

resource "snowflake_stage" "default" {
  name = "DEFAULT_STAGE"

  database_name = "TEST_DB"
  schema_name   = "TEST_SCHEMA"

  storage_integration = local.storage_integration.name
  url                 = "s3://${var.url.s3_bucket}/${var.url.s3_prefix}"
}

外部テーブル

続いて、外部テーブルを作成します。

  • location ... 作成した外部ステージからの相対パスを指定します。
  • file_format ... 取り込み対象ファイルのフォーマットを指定します。今回は JSON を取り込むので、ネイティブの TYPE = JSON を指定します。
  • partition_by ... パーティションキーを構成するカラムリストを指定します。
  • auto_refresh, column ... (後述)
external_table.tf
locals {
  qualified_stage_name = "TEST_DB.TEST_SCHEMA_STAGING.DEFAULT_STAGE"
  path_parts           = ["demo_exttbl", "diff"]
}

resource "snowflake_external_table" "default" {
  name = "DEMO_EXTTBL_DIFF"

  database = "TEST_DB"
  schema   = "TEST_SCHEMA"
  location = "@${local.qualified_stage_name}/${join("/", local.path_parts)}/"

  file_format  = "TYPE = JSON"
  auto_refresh = false

  column {
    name = "record_id"
    type = "varchar"
    as   = "SPLIT_PART(SPLIT_PART(metadata$filename, '/', 7), '.', 1)"
  }
  column {
    name = "updated_date"
    type = "date"
    as   = "TO_DATE(SPLIT_PART(SPLIT_PART(metadata$filename, '/', 6), '=', 2))"
  }
  column {
    name = "last_modified"
    type = "timestamp_ntz"
    as   = "METADATA$FILE_LAST_MODIFIED"
  }

  partition_by = ["updated_date"]
}

auto_refresh

自動更新を有効にするかどうかを指定します。有効にする場合、S3 イベント通知または SNS トピックを構成する必要があります。[2]

無効の場合、新規に作成された S3 オブジェクトを参照するには、メタデータの手動 Refresh [3] を実行します。(データベース・スキーマ・ステージの USAGE 権限が必要です)

ALTER EXTERNAL TABLE TEST_DB.TEST_SCHEMA.DEMO_EXTTBL_DIFF REFRESH;

column (Block)

S3 オブジェクトキーは METADATA$FILENAME から取得できます。SPLIT_PART 関数などと組み合わせることで、パーティション情報などを取得できます。

external_table.tf - "updated_date" column
column {
  name = "updated_date"
  type = "date"
  as   = "TO_DATE(SPLIT_PART(SPLIT_PART(METADATA$FILENAME, '/', 6), '=', 2))"
}

METADATA$FILENAME 以外に利用可能なメタデータに関しては、以下をご確認ください。

https://docs.snowflake.com/ja/user-guide/querying-metadata#metadata-columns

動作確認

作成した外部テーブルに対して SELECT クエリを実行してみます。

SELECT * FROM TEST_DB.TEST_SCHEMA.DEMO_EXTTBL_DIFF LIMIT 100;

増分更新の実装

外部テーブルをマテリアライズする際、対象テーブルを増分更新する方法について考えてみます。

分析環境バケットへのオブジェクト複製

筆者のユースケースでは、JSON オブジェクトが生成されるソースバケットと、ストレージ統合を設定する分析環境バケットが異なっていたため、オブジェクトを複製する仕組みを実装しました。

具体的には、ソースバケットで所定のパスパターンに一致するオブジェクトが生成された際、S3 イベント通知を通じて、分析環境バケットの full/, diff/ に対して Copy Object を実行する Lambda 関数をトリガーするように設定しました。

外部テーブルの作成

作成手順は前述の通りです。ここでは増分データ用 _DIFF と全量データ用 _FULL をそれぞれ作成します。(過去に遡ってのバックフィルが不要の場合、全量 _FULL の方は省いても良いかもしれません)

全量 _FULL 用の S3 フォルダには更新日 update_date のパーティションキーを設けないため、外部テーブルの構成カラムにも含めていません。

dbt incremental モデル

以下のような dbt incremental モデルを作成してみました。

通常の増分更新では _DIFF 外部テーブルを参照し、直近のデータが格納されるパーティションを指定してターゲットテーブルに反映します。一方、--full-refresh オプションが指定された場合は、最新データ全量が格納される _FULL 外部テーブルをターゲットテーブルに反映しています。

demo_materialized.sql
{{
    config(
        materialized='incremental',
        unique_key='record_id',
    )
}}
WITH 
{% if is_incremental() %}
demo_exttbl_data_diff AS (
    SELECT 
        record_id, 
        value, 
        last_modified,
        ROW_NUMBER() OVER (PARTITION BY report_id ORDER BY last_modified DESC) AS row_num,
    FROM 
        {{ source('test_schema_staging', 'demo_exttbl_diff') }}
    WHERE updated_date >= 
    {% if var("updated_date_from", None) is not none %}
        TO_DATE('{{ var("updated_date_from") }}')  -- 更新開始日付を明示的に指定
    {% else %}
        DATEADD(day, -1, current_date())  -- 前日(デフォルト)
    {% endif %}
),
demo_exttbl_data AS (
    SELECT report_id, value, last_modified
    FROM demo_exttbl_data_diff
    WHERE row_num = 1
)
{% else %}
demo_exttbl_data AS (
    SELECT record_id, value, last_modified
    FROM {{ source('test_schema_staging', 'demo_exttbl_full') }}
)
{% endif %}
SELECT 
    record_id, 
    value,
    last_modified
FROM 
    demo_exttbl_data

更新用の dbt run コマンドは以下のようになります。

# 通常の増分更新
dbt run --select demo_materialized
# 増分更新(日付指定)
dbt run --select demo_materialized --vars '{"updated_date_from": "2025-03-30"}'
# 全件洗い替え
dbt run --select demo_materialized --full-refresh

SFN ワークフロー

外部テーブルの auto_refresh を無効にしているので、前述の通り dbt incremental モデルの build 実行前に手動 Refresh を実行する必要があります。

手動 Refresh の処理を Lambda 関数などで実装し、例えば以下のような Step Functions (SFN) ワークフローを構築しておくことで、一連の処理を自動化できます。


実装に関する説明は以上です。

最後まで読んで頂き、ありがとうございました。

脚注
  1. オプション1: Amazon S3にアクセスするためのSnowflakeストレージ統合の構成 ↩︎

  2. Amazon S3に対する外部テーブルの自動更新 - Snowflake Documentation ↩︎

  3. メタデータを手動でリフレッシュする - Snowflake Documentation ↩︎

1
Snowflake Data Heroes

Discussion

ログインするとコメントできます