❄️

Snowflake Streams :テーブル変更履歴(CDC)を捉える仕組みと活用法

に公開

🚀 はじめに

https://zenn.dev/yujmatsu/articles/20251026_sf_dt

先日の記事で 「ダイナミックテーブル」とStreams & Tasks について取り上げました。
今回は Stream にフォーカスを当てていきます。

Snowflakeでデータパイプラインを構築する際、「テーブルに加えられた変更(INSERT/UPDATE/DELETE)だけを効率的に処理したい」という要求は非常に一般的です。
例えば、

  • Rawテーブルにロードされた新規データだけ をStagingテーブルに反映したい。
  • ユーザーテーブルの更新・削除 履歴を監査ログとして記録したい。

このようなChange Data Capture (CDC) を実現するSnowflakeのネイティブ機能が「Streams」です。

この記事では、Streamsがどのような仕組みで変更を追跡し、どのように設定・活用するのか、そして注意すべき点は何かを、Snowflake公式ドキュメントに基づいて解説していきたいと思います。

1. Snowflake Streamsとは?:テーブルの「差分」を読む仕組み

Streamsは、テーブル(やビューなど)に加えられたDML変更(INSERT, UPDATE, DELETE)の履歴を行レベルで追跡 するSnowflakeオブジェクトです。

ブックマーク(オフセット)のようなもの

Streamsは、対象オブジェクト(例:my_table)に対する 「ブックマーク(読み取りオフセット)」 のようなものだと考えると理解しやすいです。

  1. Stream作成時点でのテーブルの状態(タイムスタンプ=オフセット)を記録します。
  2. その後、my_table にDML変更が行われると、Streamはその変更内容(どの行がどう変わったか)を記録します。
  3. ユーザーがDMLトランザクション内でStreamを消費(後述)すると、ブックマーク(オフセット)が現在のテーブルの状態まで前進し、Streamは(そのトランザクションから見た場合)空になります。SELECT でStreamを読むだけではオフセットは進みません

イメージは以下のようなイメージになります。(内部ステージ(ディレクトリテーブル)のStreamの例)

メタデータカラム:変更の種類を教えてくれる

Streamを SELECT すると、元のテーブルのカラムに加えて、以下のメタデータカラムが付与されます。これらが変更の種類や詳細を教えてくれます。

  • METADATA$ACTION: その行に対する操作 (INSERT または DELETE) を示します。
  • METADATA$ISUPDATE: その行が UPDATE 操作の一部であるか (TRUE/FALSE) を示します。
    • 通常: UPDATE は、古い行の DELETE と新しい行の INSERTペアとして記録され、両方の行で METADATA$ISUPDATE = TRUE になります。
    • 例外: 同一オフセット内で INSERT直後に同じ行が UPDATE された場合、差分は最終的な状態の INSERT1行として記録され、その行の METADATA$ISUPDATEFALSE になります(「更新」ではなく「新規挿入」として表現されるため)。
  • METADATA$ROW_ID: 変更された行の一意なIDを示します。
    • 注意: ROW_ID は各ストリーム内では一意かつ不変ですが、テーブル上のStreamとビュー上のStreamで同じ行に同じ ROW_ID が付与される保証はありません(相互比較には使わないでください)。また、ベースオブジェクトのChange Trackingが無効化 \rightarrow 再有効化されると ROW_ID変わり得ます

2. Streamの種類(モード):何を追跡したい?

Streamを作成する際には、追跡したい変更の種類や対象オブジェクトに応じて「モード」を選択できます。

  1. Standard Stream (デフォルト):

    • INSERT, UPDATE, DELETEすべてを追跡します。TRUNCATE による削除も追跡対象です(削除が記録されます)。
    • テーブルまたはビューに対して作成できます。
    • 最も一般的なモードです。
  2. Append-only Stream:

    • INSERT 操作のみを追跡します。UPDATEDELETE は無視されます。
    • テーブルまたはビューに対して作成できます。
    • 単純な追記型データ(ログデータなど)の処理に適しています。
  3. Insert-only Stream (挿入のみ):

    • 外部テーブルおよび外部カタログ管理の Iceberg テーブルに対して必須のモードです。
    • 追加された新しいファイルの行のみを追跡します。

3. Streamの設定方法・注意点

Streamは様々なオブジェクトの上に作成できますが、対象によって挙動や注意点が異なります。

3.1. テーブルに対するStream (Standard/Append-only)

最も基本的な使い方です。

作成方法:

-- my_table の変更を追跡する standard stream を作成
CREATE OR REPLACE STREAM my_stream ON TABLE my_table;

-- my_log_table の INSERT のみを追跡する append-only stream を作成
CREATE OR REPLACE STREAM my_append_only_stream ON TABLE my_log_table
  APPEND_ONLY = TRUE;

権限(作成時):

  • 対象スキーマに対する CREATE STREAM 権限。
  • ベーステーブルに対する SELECT 権限。
  • データベースとスキーマに対する USAGE 権限。
  • 重要 (Change Tracking自動有効化): ベーステーブルで Change Tracking が未有効の場合初回のストリーム作成はテーブル所有ロール(OWNERSHIP)でのみ可能です(作成時に自動で Change Tracking を有効化するため)。
    • 回避策: 所有者でないロールで作成したい場合は、先に所有ロールで ALTER TABLE ... SET CHANGE_TRACKING = TRUE; を実行します。

権限(読み取り時):

  • Stream自体に対する SELECT 権限。
  • ベーステーブルに対する SELECT 権限。
  • データベース/スキーマに対する USAGE 権限。

注意点 (Stale StreamとTime Travel):

  • Time Travel 依存: StreamはSnowflakeのTime Travel(Change Tracking)機能を利用して変更履歴を取得します。対象テーブルの Time Travel 保持期間(データ保持期間)が過ぎると、Streamは「Stale(古くなった)」状態になり、読み取り不能になります。
  • 保持期間の拡張: Snowflakeは未消費のストリームに対して、テーブルの保持期間を超えて 最大 14 日(アカウントパラメータ MAX_DATA_EXTENSION_TIME_IN_DAYS で設定可能)まで一時的に延長することがあります。延長期間中は追加のストレージコストが発生します。原則として、テーブルの保持期間内に消費してください。
  • Stale化の防止: Streamを消費する処理(例:Task)は、テーブルのTime Travel期間内に確実に実行されるように設計・監視する必要があります。SYSTEM$STREAM_HAS_DATA('<stream_name>')未消費データの有無を確認する関数です。空のストリームに対して FALSE を返す呼び出しは stale 化を防ぐ効果がありますが、オフセット自体は前進しません。オフセットを確実に前進させる(そしてStale化を防ぐ)には、DMLを伴う消費(コミット)が必要です。オフセットを明示的に進めたい場合は、DMLを伴う消費(例: INSERT INTO dummy_table SELECT * FROM my_stream WHERE 1=0;)が有効です。

3.2. ビューに対するStream (Standard/Append-only)

ビュー(Secure Viewを含む)に対してもStreamを作成できますが、ビューの定義内容に厳格な制約があります。

作成できるビュー:

  • 許可されるのは以下の要素のみで構成されるビューです:
    • 射影 (Projection)
    • フィルター (Filter)
    • 内部結合 (Inner Join) またはクロス結合 (Cross Join)
    • UNION ALL

作成できないビュー:

  • マテリアライズドビュー
  • 上記以外の構文を含むビュー (例: 外部結合, GROUP BY, QUALIFY, HAVING, ORDER BY, LIMIT, DISTINCT, 集合演算子 EXCEPT/INTERSECT, LATERAL ビュー, CONNECT BY, UDFs/UDTFs, ストアドプロシージャ, (FROM句以外での)サブクエリ, 相関サブクエリなど)

作成方法:

-- my_view (JOINを含む可能性あり) の変更を追跡
CREATE OR REPLACE STREAM stream_on_view ON VIEW my_view;

権限(作成時):

  • 対象スキーマに対する CREATE STREAM 権限。
  • ベースビューに対する SELECT 権限。
  • データベースとスキーマに対する USAGE 権限。
  • 重要 (Change Tracking自動有効化): ビューまたはその基盤テーブルで Change Tracking が未有効の場合、初回のストリーム作成にはビューとその基盤テーブルの**所有権(OWNERSHIP)**が必要になります。

権限(読み取り時):

  • Stream自体に対する SELECT 権限。
  • ビューに対する SELECT 権限。
  • データベース/スキーマに対する USAGE 権限。

注意点:

  • Change Tracking: ビューおよび、そのビューが参照する全てのベーステーブルでChange Trackingが有効である必要があります。
  • 複雑なビューへの非対応: Snowflakeがビューの変更差分を正確に追跡できないため、多くの一般的なビュー構文はサポートされません。

3.3. ディレクトリテーブルに対するStream (Standard/Append-only)

ステージ上のファイル一覧を提供するディレクトリテーブルにもStreamを作成できます。

作成方法:

-- my_directory_table (ディレクトリテーブル) の変更を追跡
CREATE OR REPLACE STREAM stream_on_directory ON TABLE my_directory_table;

権限(読み取り時):

  • Streamへの SELECT 権限。
  • ステージに対する権限:外部ステージは USAGE内部ステージは READ
    • (補足) ディレクトリテーブルそのものを直接 SELECT する場合は権限要件が異なる場合があります。

注意点:

  • METADATA$ROW_ID常にNULLになります。
  • ステージの種類により、変更検知の仕組み(イベント通知 or 定期スキャン)が異なります。

3.4. 外部テーブルに対するStream (Insert-only 必須)

データレイク(S3等)上のファイルを参照する外部テーブル、および外部カタログ管理の Iceberg テーブルには Insert-only モード(挿入のみ)のStreamを作成します。

作成方法:

-- my_external_table の変更を追跡 (Insert-only モード)
CREATE OR REPLACE STREAM stream_on_ext_table ON EXTERNAL TABLE my_external_table
  INSERT_ONLY = TRUE;

権限(読み取り時):

  • Streamへの SELECT 権限。
  • 外部テーブルへの SELECT 権限。
  • (DB/Schema への USAGE

注意点:

  • INSERT_ONLY = TRUE必須です(Standard/Append-onlyはサポートされません)。
  • Streamが変更を検知するには、外部テーブル自体がリフレッシュされている必要があります(自動リフレッシュ設定または手動 ALTER EXTERNAL TABLE ... REFRESH)。

4. Streamの消費:変更データを読み取る仕組み

Streamに記録された変更データは、DMLトランザクション内で読み取られる(消費される)とそのStreamのオフセット(ブックマーク)が進みます。

消費の基本パターン (Taskとの連携)

最も一般的なのは、Taskを使って定期的にStreamを消費するパターンです。

CREATE OR REPLACE TASK my_consume_task
  WAREHOUSE = my_wh
  SCHEDULE = '5 MINUTE'
WHEN
  SYSTEM$STREAM_HAS_DATA('my_stream') -- Streamにデータがある時だけ実行
AS
  -- StreamのデータをTargetテーブルにMERGEする (このDMLがStreamを消費する)
  MERGE INTO target_table t
  USING my_stream s
  ON t.id = s.id
  WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' AND NOT s.METADATA$ISUPDATE THEN -- UPDATEのDELETE側を除外
    DELETE
  WHEN MATCHED AND s.METADATA$ACTION = 'INSERT' THEN -- INSERT または UPDATEのINSERT側 (METADATA$ACTION='INSERT')
    UPDATE SET col1 = s.col1, col2 = s.col2 -- UPDATE処理
  WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN -- 新規INSERTのみ
    INSERT (id, col1, col2) VALUES (s.id, s.col1, s.col2); -- INSERT処理
  • SYSTEM$STREAM_HAS_DATA(): Streamに未消費のデータがあるかを確認する関数。Taskの WHEN 句で使うことで、無駄なTask実行を防げます。
  • トランザクション: MERGE 文(または INSERT, UPDATE, DELETE, CTAS, COPY INTO <location>)がコミットされると、そのトランザクションで読み取られた範囲までStreamのオフセットが進みます。SELECT 文でStreamを読むだけでは消費されません。

注意点:Stale Stream(古くなったStream)

前述の通り、Streamのオフセットが指す時点が、テーブルのTime Travel保持期間(+最大14日等の拡張期間)を超えてしまうと、Streamは Stale 状態になります。

  • Staleになると、Streamからの読み取りは一切できなくなります(エラーが発生)。
  • 回復不能: 一度Staleになると元に戻せません。Streamを再作成する必要があります。
  • 対策: Streamを消費する処理(Taskなど)が、テーブルの保持期間内に必ず成功するように設計・監視することが極めて重要です。

5. Streamの活用方法

Streamsは様々なデータパイプラインやアーキテクチャで活用できます。

  1. 増分ELT/ETL (Streams & Tasks):
    Rawテーブル \rightarrow Stagingテーブルへの差分マージなど、従来からの基本的な使い方です。

  2. 監査ログ・変更履歴の記録:
    重要なテーブルの変更内容(METADATA$* を含む)を監査ログテーブルに INSERT することで、変更履歴を簡単に記録できます。

  3. データレプリケーション・同期:
    あるテーブルの変更を、Streamを使って別のテーブルや外部システム(Taskから外部APIを呼び出すなど)に準リアルタイムで反映させることができます。

  4. ダイナミックテーブルとの使い分け:
    Snowflake内で完結する単純な増分パイプラインであれば、現在ではダイナミックテーブルの方がシンプルに実装できることが多いです。しかし、Snowflake外部との連携や、複雑な処理ロジック(ストアドプロシージャ呼び出しなど)をTaskで実行したい場合は、依然としてStreams & Tasksパターンが有効です。

😌 おわりに

Snowflake Streamsは、テーブルの変更履歴(CDC)を効率的に捉えるための強力かつ柔軟な機能となっています。

  • オフセット(ブックマーク) で変更を追跡し、メタデータカラムで変更内容を詳細に把握できます(METADATA$ISUPDATEの例外挙動やMETADATA$ROW_IDの注意点を理解)。
  • テーブルだけでなく、ビュー(厳格な制約あり)や外部テーブルInsert-only必須)など様々なオブジェクトに適用できます。
  • Time Travelへの依存Stale化のリスク(および延長期間とコスト)を理解し、適切な消費サイクルを設計することが重要です (SYSTEM$STREAM_HAS_DATA の効果を正しく理解)。
  • 単純なELTだけでなく、監査やデータ同期など、幅広い用途で活用できます。

ダイナミックテーブルが登場した現在でも、Streams & Tasksはその手続き的な制御力とCDC機能の外部利用の可能性から、依然として重要な選択肢です。両者の特性を理解し、適切に使い分けられると良いでしょう

📚 参考出典

Discussion