Snowflake Streams :テーブル変更履歴(CDC)を捉える仕組みと活用法
🚀 はじめに
先日の記事で 「ダイナミックテーブル」とStreams & Tasks について取り上げました。
今回は Stream にフォーカスを当てていきます。
Snowflakeでデータパイプラインを構築する際、「テーブルに加えられた変更(INSERT/UPDATE/DELETE)だけを効率的に処理したい」という要求は非常に一般的です。
例えば、
- Rawテーブルにロードされた新規データだけ をStagingテーブルに反映したい。
- ユーザーテーブルの更新・削除 履歴を監査ログとして記録したい。
このようなChange Data Capture (CDC) を実現するSnowflakeのネイティブ機能が「Streams」です。
この記事では、Streamsがどのような仕組みで変更を追跡し、どのように設定・活用するのか、そして注意すべき点は何かを、Snowflake公式ドキュメントに基づいて解説していきたいと思います。
1. Snowflake Streamsとは?:テーブルの「差分」を読む仕組み
Streamsは、テーブル(やビューなど)に加えられたDML変更(INSERT, UPDATE, DELETE)の履歴を行レベルで追跡 するSnowflakeオブジェクトです。
ブックマーク(オフセット)のようなもの
Streamsは、対象オブジェクト(例:my_table)に対する 「ブックマーク(読み取りオフセット)」 のようなものだと考えると理解しやすいです。
- Stream作成時点でのテーブルの状態(タイムスタンプ=オフセット)を記録します。
- その後、
my_tableにDML変更が行われると、Streamはその変更内容(どの行がどう変わったか)を記録します。 - ユーザーがDMLトランザクション内でStreamを消費(後述)すると、ブックマーク(オフセット)が現在のテーブルの状態まで前進し、Streamは(そのトランザクションから見た場合)空になります。
SELECTでStreamを読むだけではオフセットは進みません。
イメージは以下のようなイメージになります。(内部ステージ(ディレクトリテーブル)のStreamの例)

メタデータカラム:変更の種類を教えてくれる
Streamを SELECT すると、元のテーブルのカラムに加えて、以下のメタデータカラムが付与されます。これらが変更の種類や詳細を教えてくれます。
-
METADATA$ACTION: その行に対する操作 (INSERTまたはDELETE) を示します。 -
METADATA$ISUPDATE: その行がUPDATE操作の一部であるか (TRUE/FALSE) を示します。-
通常:
UPDATEは、古い行のDELETEと新しい行のINSERTのペアとして記録され、両方の行でMETADATA$ISUPDATE = TRUEになります。 -
例外: 同一オフセット内で
INSERTの直後に同じ行がUPDATEされた場合、差分は最終的な状態のINSERT行1行として記録され、その行のMETADATA$ISUPDATEはFALSEになります(「更新」ではなく「新規挿入」として表現されるため)。
-
通常:
-
METADATA$ROW_ID: 変更された行の一意なIDを示します。-
注意:
ROW_IDは各ストリーム内では一意かつ不変ですが、テーブル上のStreamとビュー上のStreamで同じ行に同じROW_IDが付与される保証はありません(相互比較には使わないでください)。また、ベースオブジェクトのChange Trackingが無効化 再有効化されると\rightarrow ROW_IDは変わり得ます。
-
注意:
2. Streamの種類(モード):何を追跡したい?
Streamを作成する際には、追跡したい変更の種類や対象オブジェクトに応じて「モード」を選択できます。
-
Standard Stream (デフォルト):
-
INSERT,UPDATE,DELETEのすべてを追跡します。TRUNCATEによる削除も追跡対象です(削除が記録されます)。 - テーブルまたはビューに対して作成できます。
- 最も一般的なモードです。
-
-
Append-only Stream:
-
INSERT操作のみを追跡します。UPDATEやDELETEは無視されます。 - テーブルまたはビューに対して作成できます。
- 単純な追記型データ(ログデータなど)の処理に適しています。
-
-
Insert-only Stream (挿入のみ):
- 外部テーブルおよび外部カタログ管理の Iceberg テーブルに対して必須のモードです。
- 追加された新しいファイルの行のみを追跡します。
3. Streamの設定方法・注意点
Streamは様々なオブジェクトの上に作成できますが、対象によって挙動や注意点が異なります。
3.1. テーブルに対するStream (Standard/Append-only)
最も基本的な使い方です。
作成方法:
-- my_table の変更を追跡する standard stream を作成
CREATE OR REPLACE STREAM my_stream ON TABLE my_table;
-- my_log_table の INSERT のみを追跡する append-only stream を作成
CREATE OR REPLACE STREAM my_append_only_stream ON TABLE my_log_table
APPEND_ONLY = TRUE;
権限(作成時):
- 対象スキーマに対する
CREATE STREAM権限。 - ベーステーブルに対する
SELECT権限。 - データベースとスキーマに対する
USAGE権限。 -
重要 (Change Tracking自動有効化): ベーステーブルで Change Tracking が未有効の場合、初回のストリーム作成はテーブル所有ロール(OWNERSHIP)でのみ可能です(作成時に自動で Change Tracking を有効化するため)。
-
回避策: 所有者でないロールで作成したい場合は、先に所有ロールで
ALTER TABLE ... SET CHANGE_TRACKING = TRUE;を実行します。
-
回避策: 所有者でないロールで作成したい場合は、先に所有ロールで
権限(読み取り時):
- Stream自体に対する
SELECT権限。 - ベーステーブルに対する
SELECT権限。 - データベース/スキーマに対する
USAGE権限。
注意点 (Stale StreamとTime Travel):
- Time Travel 依存: StreamはSnowflakeのTime Travel(Change Tracking)機能を利用して変更履歴を取得します。対象テーブルの Time Travel 保持期間(データ保持期間)が過ぎると、Streamは「Stale(古くなった)」状態になり、読み取り不能になります。
-
保持期間の拡張: Snowflakeは未消費のストリームに対して、テーブルの保持期間を超えて 最大 14 日(アカウントパラメータ
MAX_DATA_EXTENSION_TIME_IN_DAYSで設定可能)まで一時的に延長することがあります。延長期間中は追加のストレージコストが発生します。原則として、テーブルの保持期間内に消費してください。 -
Stale化の防止: Streamを消費する処理(例:Task)は、テーブルのTime Travel期間内に確実に実行されるように設計・監視する必要があります。
SYSTEM$STREAM_HAS_DATA('<stream_name>')は未消費データの有無を確認する関数です。空のストリームに対してFALSEを返す呼び出しは stale 化を防ぐ効果がありますが、オフセット自体は前進しません。オフセットを確実に前進させる(そしてStale化を防ぐ)には、DMLを伴う消費(コミット)が必要です。オフセットを明示的に進めたい場合は、DMLを伴う消費(例:INSERT INTO dummy_table SELECT * FROM my_stream WHERE 1=0;)が有効です。
3.2. ビューに対するStream (Standard/Append-only)
ビュー(Secure Viewを含む)に対してもStreamを作成できますが、ビューの定義内容に厳格な制約があります。
作成できるビュー:
- 許可されるのは以下の要素のみで構成されるビューです:
- 射影 (Projection)
- フィルター (Filter)
- 内部結合 (Inner Join) またはクロス結合 (Cross Join)
- UNION ALL
作成できないビュー:
- マテリアライズドビュー
- 上記以外の構文を含むビュー (例: 外部結合,
GROUP BY,QUALIFY,HAVING,ORDER BY,LIMIT,DISTINCT, 集合演算子 EXCEPT/INTERSECT,LATERALビュー,CONNECT BY, UDFs/UDTFs, ストアドプロシージャ, (FROM句以外での)サブクエリ, 相関サブクエリなど)
作成方法:
-- my_view (JOINを含む可能性あり) の変更を追跡
CREATE OR REPLACE STREAM stream_on_view ON VIEW my_view;
権限(作成時):
- 対象スキーマに対する
CREATE STREAM権限。 - ベースビューに対する
SELECT権限。 - データベースとスキーマに対する
USAGE権限。 - 重要 (Change Tracking自動有効化): ビューまたはその基盤テーブルで Change Tracking が未有効の場合、初回のストリーム作成にはビューとその基盤テーブルの**所有権(OWNERSHIP)**が必要になります。
権限(読み取り時):
- Stream自体に対する
SELECT権限。 - ビューに対する
SELECT権限。 - データベース/スキーマに対する
USAGE権限。
注意点:
- Change Tracking: ビューおよび、そのビューが参照する全てのベーステーブルでChange Trackingが有効である必要があります。
- 複雑なビューへの非対応: Snowflakeがビューの変更差分を正確に追跡できないため、多くの一般的なビュー構文はサポートされません。
3.3. ディレクトリテーブルに対するStream (Standard/Append-only)
ステージ上のファイル一覧を提供するディレクトリテーブルにもStreamを作成できます。
作成方法:
-- my_directory_table (ディレクトリテーブル) の変更を追跡
CREATE OR REPLACE STREAM stream_on_directory ON TABLE my_directory_table;
権限(読み取り時):
- Streamへの
SELECT権限。 - ステージに対する権限:外部ステージは
USAGE、内部ステージはREAD。-
(補足) ディレクトリテーブルそのものを直接
SELECTする場合は権限要件が異なる場合があります。
-
(補足) ディレクトリテーブルそのものを直接
注意点:
-
METADATA$ROW_IDは常にNULLになります。 - ステージの種類により、変更検知の仕組み(イベント通知 or 定期スキャン)が異なります。
3.4. 外部テーブルに対するStream (Insert-only 必須)
データレイク(S3等)上のファイルを参照する外部テーブル、および外部カタログ管理の Iceberg テーブルには Insert-only モード(挿入のみ)のStreamを作成します。
作成方法:
-- my_external_table の変更を追跡 (Insert-only モード)
CREATE OR REPLACE STREAM stream_on_ext_table ON EXTERNAL TABLE my_external_table
INSERT_ONLY = TRUE;
権限(読み取り時):
- Streamへの
SELECT権限。 - 外部テーブルへの
SELECT権限。 - (DB/Schema への
USAGE)
注意点:
-
INSERT_ONLY = TRUEが必須です(Standard/Append-onlyはサポートされません)。 - Streamが変更を検知するには、外部テーブル自体がリフレッシュされている必要があります(自動リフレッシュ設定または手動
ALTER EXTERNAL TABLE ... REFRESH)。
4. Streamの消費:変更データを読み取る仕組み
Streamに記録された変更データは、DMLトランザクション内で読み取られる(消費される)とそのStreamのオフセット(ブックマーク)が進みます。
消費の基本パターン (Taskとの連携)
最も一般的なのは、Taskを使って定期的にStreamを消費するパターンです。
CREATE OR REPLACE TASK my_consume_task
WAREHOUSE = my_wh
SCHEDULE = '5 MINUTE'
WHEN
SYSTEM$STREAM_HAS_DATA('my_stream') -- Streamにデータがある時だけ実行
AS
-- StreamのデータをTargetテーブルにMERGEする (このDMLがStreamを消費する)
MERGE INTO target_table t
USING my_stream s
ON t.id = s.id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' AND NOT s.METADATA$ISUPDATE THEN -- UPDATEのDELETE側を除外
DELETE
WHEN MATCHED AND s.METADATA$ACTION = 'INSERT' THEN -- INSERT または UPDATEのINSERT側 (METADATA$ACTION='INSERT')
UPDATE SET col1 = s.col1, col2 = s.col2 -- UPDATE処理
WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN -- 新規INSERTのみ
INSERT (id, col1, col2) VALUES (s.id, s.col1, s.col2); -- INSERT処理
-
SYSTEM$STREAM_HAS_DATA(): Streamに未消費のデータがあるかを確認する関数。TaskのWHEN句で使うことで、無駄なTask実行を防げます。 -
トランザクション:
MERGE文(またはINSERT,UPDATE,DELETE,CTAS,COPY INTO <location>)がコミットされると、そのトランザクションで読み取られた範囲までStreamのオフセットが進みます。SELECT文でStreamを読むだけでは消費されません。
注意点:Stale Stream(古くなったStream)
前述の通り、Streamのオフセットが指す時点が、テーブルのTime Travel保持期間(+最大14日等の拡張期間)を超えてしまうと、Streamは Stale 状態になります。
- Staleになると、Streamからの読み取りは一切できなくなります(エラーが発生)。
- 回復不能: 一度Staleになると元に戻せません。Streamを再作成する必要があります。
- 対策: Streamを消費する処理(Taskなど)が、テーブルの保持期間内に必ず成功するように設計・監視することが極めて重要です。
5. Streamの活用方法
Streamsは様々なデータパイプラインやアーキテクチャで活用できます。
-
増分ELT/ETL (Streams & Tasks):
Rawテーブル Stagingテーブルへの差分マージなど、従来からの基本的な使い方です。\rightarrow -
監査ログ・変更履歴の記録:
重要なテーブルの変更内容(METADATA$*を含む)を監査ログテーブルにINSERTすることで、変更履歴を簡単に記録できます。 -
データレプリケーション・同期:
あるテーブルの変更を、Streamを使って別のテーブルや外部システム(Taskから外部APIを呼び出すなど)に準リアルタイムで反映させることができます。 -
ダイナミックテーブルとの使い分け:
Snowflake内で完結する単純な増分パイプラインであれば、現在ではダイナミックテーブルの方がシンプルに実装できることが多いです。しかし、Snowflake外部との連携や、複雑な処理ロジック(ストアドプロシージャ呼び出しなど)をTaskで実行したい場合は、依然としてStreams & Tasksパターンが有効です。
😌 おわりに
Snowflake Streamsは、テーブルの変更履歴(CDC)を効率的に捉えるための強力かつ柔軟な機能となっています。
-
オフセット(ブックマーク) で変更を追跡し、メタデータカラムで変更内容を詳細に把握できます(
METADATA$ISUPDATEの例外挙動やMETADATA$ROW_IDの注意点を理解)。 - テーブルだけでなく、ビュー(厳格な制約あり)や外部テーブル(Insert-only必須)など様々なオブジェクトに適用できます。
-
Time Travelへの依存とStale化のリスク(および延長期間とコスト)を理解し、適切な消費サイクルを設計することが重要です (
SYSTEM$STREAM_HAS_DATAの効果を正しく理解)。 - 単純なELTだけでなく、監査やデータ同期など、幅広い用途で活用できます。
ダイナミックテーブルが登場した現在でも、Streams & Tasksはその手続き的な制御力とCDC機能の外部利用の可能性から、依然として重要な選択肢です。両者の特性を理解し、適切に使い分けられると良いでしょう
📚 参考出典
- Snowflakeドキュメント | Introduction to Streams (概要、オフセット、Stale、保持期間延長、消費、TRUNCATE)
- Snowflakeドキュメント | CREATE STREAM (モード、外部テーブル/IcebergのInsert-only必須、Change Tracking自動有効化の権限)
- Snowflakeドキュメント | Access control privileges for streams (読み取り権限詳細)
- Snowflakeドキュメント | SYSTEM$STREAM_HAS_DATA (Stale化防止効果の正確な説明)
- Snowflakeドキュメント | Example: Using Streams in MERGE Statements Within Tasks (MERGE文の条件式)
Discussion