Snowflake 外部テーブルで S3 に蓄積された JSON をテーブル化する
はじめに
こんにちは!シンプルフォームの山岸です。
Snowflake 外部テーブルを使って S3 に蓄積された JSON ファイルをテーブル化し、さらにそれをマテリアライズしたテーブルを増分更新する方法について検討してみたので、本記事ではその内容についてご紹介できればと思います。
JSON + S3 を前提とした内容になっていますが、その他のファイル形式・オブジェクトストレージ(Azure Blob Storage, Google Cloud Storage)にも適用可能な内容かと思いますので、参考になるものがあれば幸いです。
Snowflake 外部テーブルとは
外部テーブル とは 公式ドキュメント にもある通り、外部ステージ に格納されているデータに対して、Snowflake テーブルのようにクエリできる機能です。読み取り専用ではありますが、データを Snowflake 内に取り込まなくても Snowflake から簡単にアクセスする手段を提供します。
外部テーブルを作成する際、ファイルの中身だけでなく、S3 オブジェクトキーの一部をカラムとして生成することができます。Glue データカタログ化することなどを見越して S3 上でパーティション分割している場合、これをそのまま外部テーブルのカラムとして持たせ、かつ外部テーブルの partition としても指定しておくことで、効率的なデータ更新・クエリ実行を実現できます。
注意点としては、クエリする度に外部ストレージへの参照が発生するため、通常テーブルと比較すると読み取りパフォーマンスは悪くなります。分析でよく利用する可能性がある場合は、外部テーブルをソースに別途 Materialized View や Table を作成しておくのが良さそうです。
Terraform による外部テーブルの作成
前提
データを格納する S3 とのストレージ統合 [1] が設定されていることを前提とします。
また、S3 フォルダ構成として以下のようなものを想定します。
拡張子を除くファイル名をレコード ID、更新日 ( update_date
) をパーティションキーとして Snowflake 外部テーブルに対応させ、JSON の中身を VALUE カラムに持たせてみます。
外部ステージ
まず、外部ステージを作成します。外部テーブル作成時の location 指定の起点になります。
locals {
storage_integration = {
name = "TEST_DB_S3INT"
}
url = {
s3_bucket = "example-bucket"
s3_prefix = "integrations/test_db/test_schema"
}
}
resource "snowflake_stage" "default" {
name = "DEFAULT_STAGE"
database_name = "TEST_DB"
schema_name = "TEST_SCHEMA"
storage_integration = local.storage_integration.name
url = "s3://${var.url.s3_bucket}/${var.url.s3_prefix}"
}
外部テーブル
続いて、外部テーブルを作成します。
-
location
... 作成した外部ステージからの相対パスを指定します。 -
file_format
... 取り込み対象ファイルのフォーマットを指定します。今回は JSON を取り込むので、ネイティブのTYPE = JSON
を指定します。 -
partition_by
... パーティションキーを構成するカラムリストを指定します。 -
auto_refresh
,column
... (後述)
locals {
qualified_stage_name = "TEST_DB.TEST_SCHEMA_STAGING.DEFAULT_STAGE"
path_parts = ["demo_exttbl", "diff"]
}
resource "snowflake_external_table" "default" {
name = "DEMO_EXTTBL_DIFF"
database = "TEST_DB"
schema = "TEST_SCHEMA"
location = "@${local.qualified_stage_name}/${join("/", local.path_parts)}/"
file_format = "TYPE = JSON"
auto_refresh = false
column {
name = "record_id"
type = "varchar"
as = "SPLIT_PART(SPLIT_PART(metadata$filename, '/', 7), '.', 1)"
}
column {
name = "updated_date"
type = "date"
as = "TO_DATE(SPLIT_PART(SPLIT_PART(metadata$filename, '/', 6), '=', 2))"
}
column {
name = "last_modified"
type = "timestamp_ntz"
as = "METADATA$FILE_LAST_MODIFIED"
}
partition_by = ["updated_date"]
}
auto_refresh
自動更新を有効にするかどうかを指定します。有効にする場合、S3 イベント通知または SNS トピックを構成する必要があります。[2]
無効の場合、新規に作成された S3 オブジェクトを参照するには、メタデータの手動 Refresh [3] を実行します。(データベース・スキーマ・ステージの USAGE 権限が必要です)
ALTER EXTERNAL TABLE TEST_DB.TEST_SCHEMA.DEMO_EXTTBL_DIFF REFRESH;
column
(Block)
S3 オブジェクトキーは METADATA$FILENAME
から取得できます。SPLIT_PART 関数などと組み合わせることで、パーティション情報などを取得できます。
column {
name = "updated_date"
type = "date"
as = "TO_DATE(SPLIT_PART(SPLIT_PART(METADATA$FILENAME, '/', 6), '=', 2))"
}
METADATA$FILENAME
以外に利用可能なメタデータに関しては、以下をご確認ください。
動作確認
作成した外部テーブルに対して SELECT クエリを実行してみます。
SELECT * FROM TEST_DB.TEST_SCHEMA.DEMO_EXTTBL_DIFF LIMIT 100;
増分更新の実装
外部テーブルをマテリアライズする際、対象テーブルを増分更新する方法について考えてみます。
分析環境バケットへのオブジェクト複製
筆者のユースケースでは、JSON オブジェクトが生成されるソースバケットと、ストレージ統合を設定する分析環境バケットが異なっていたため、オブジェクトを複製する仕組みを実装しました。
具体的には、ソースバケットで所定のパスパターンに一致するオブジェクトが生成された際、S3 イベント通知を通じて、分析環境バケットの full/
, diff/
に対して Copy Object を実行する Lambda 関数をトリガーするように設定しました。
外部テーブルの作成
作成手順は前述の通りです。ここでは増分データ用 _DIFF
と全量データ用 _FULL
をそれぞれ作成します。(過去に遡ってのバックフィルが不要の場合、全量 _FULL
の方は省いても良いかもしれません)
全量 _FULL
用の S3 フォルダには更新日 update_date
のパーティションキーを設けないため、外部テーブルの構成カラムにも含めていません。
dbt incremental モデル
以下のような dbt incremental モデルを作成してみました。
通常の増分更新では _DIFF
外部テーブルを参照し、直近のデータが格納されるパーティションを指定してターゲットテーブルに反映します。一方、--full-refresh
オプションが指定された場合は、最新データ全量が格納される _FULL
外部テーブルをターゲットテーブルに反映しています。
{{
config(
materialized='incremental',
unique_key='record_id',
)
}}
WITH
{% if is_incremental() %}
demo_exttbl_data_diff AS (
SELECT
record_id,
value,
last_modified,
ROW_NUMBER() OVER (PARTITION BY report_id ORDER BY last_modified DESC) AS row_num,
FROM
{{ source('test_schema_staging', 'demo_exttbl_diff') }}
WHERE updated_date >=
{% if var("updated_date_from", None) is not none %}
TO_DATE('{{ var("updated_date_from") }}') -- 更新開始日付を明示的に指定
{% else %}
DATEADD(day, -1, current_date()) -- 前日(デフォルト)
{% endif %}
),
demo_exttbl_data AS (
SELECT report_id, value, last_modified
FROM demo_exttbl_data_diff
WHERE row_num = 1
)
{% else %}
demo_exttbl_data AS (
SELECT record_id, value, last_modified
FROM {{ source('test_schema_staging', 'demo_exttbl_full') }}
)
{% endif %}
SELECT
record_id,
value,
last_modified
FROM
demo_exttbl_data
更新用の dbt run コマンドは以下のようになります。
# 通常の増分更新
dbt run --select demo_materialized
# 増分更新(日付指定)
dbt run --select demo_materialized --vars '{"updated_date_from": "2025-03-30"}'
# 全件洗い替え
dbt run --select demo_materialized --full-refresh
SFN ワークフロー
外部テーブルの auto_refresh
を無効にしているので、前述の通り dbt incremental モデルの build 実行前に手動 Refresh を実行する必要があります。
手動 Refresh の処理を Lambda 関数などで実装し、例えば以下のような Step Functions (SFN) ワークフローを構築しておくことで、一連の処理を自動化できます。
実装に関する説明は以上です。
最後まで読んで頂き、ありがとうございました。
-
Amazon S3に対する外部テーブルの自動更新 - Snowflake Documentation ↩︎
-
メタデータを手動でリフレッシュする - Snowflake Documentation ↩︎

Snowlfake データクラウドのユーザ会 SnowVillage のメンバーで運営しています。 Publication参加方法はこちらをご参照ください。 zenn.dev/dataheroes/articles/db5da0959b4bdd
Discussion