🐕

Snowflake ストリーム機能について

2022/11/10に公開約7,200字

Snowflakeのストリーム機能について動作を確認したいと思います。

ドキュメントより抜粋
ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。
このプロセスは、変更データキャプチャ(CDC)と呼ばれます。
個々のテーブルストリームは、 ソーステーブルの行に加えられた変更を追跡します。
テーブルストリーム(単に「ストリーム」とも呼ばれます)は、テーブル内の2つのトランザクションポイント間で行レベルで変更された内容の「変更テーブル」を利用可能にします。
これにより、トランザクション形式で一連の変更記録をクエリおよび使用できます。

ドキュメントからではあまり意味が分からないのが正直なところですね。。。。
とりあえずハンズオンでイメージをつかんでみます。

環境準備

DBの作成

CREATE OR REPLACE TRANSIENT DATABASE STREAMS_DB;

記録用テーブルの作成

このテーブルは挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録する用のテーブルです。

create or replace table sales_raw_staging(
  id varchar,
  product varchar,
  price varchar,
  amount varchar,
  store_id varchar);

sales_raw_stagingにサンプルデータを入れ込みます

insert into sales_raw_staging 
    values
        (1,'Banana',1.99,1,1),
        (2,'Lemon',0.99,1,1),
        (3,'Apple',1.79,1,2),
        (4,'Orange Juice',1.89,1,2),
        (5,'Cereals',5.98,2,1); 

ストア用テーブルの作成

記録用テーブル内の'store_id'に関連した情報を持つテーブルとしてstore_tableを作成します。

create or replace table store_table(
  store_id number,
  location varchar,
  employees number);

ストア用テーブルにサンプルデータを入れ込みます

INSERT INTO STORE_TABLE VALUES(1,'Chicago',33);
INSERT INTO STORE_TABLE VALUES(2,'London',12);

記録用テーブルとストア用テーブルを結合させたようなテーブルを作成

ターゲットテーブルとしての役割です。

create or replace table sales_final_table(
  id int,
  product varchar,
  price number,
  amount int,
  store_id int,
  location varchar,
  employees int);

sales_final_tableにサンプルデータを入れ込みます

INSERT INTO sales_final_table 
    SELECT 
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION, 
    ST.EMPLOYEES 
    FROM SALES_RAW_STAGING SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;

ストリーム機能の動作確認

ストリームオブジェクトの作成

create or replace stream {ストリーム名} on table {対象テーブル名};
対象テーブルで起きた変更全てをキャプチャします。

create or replace stream sales_stream on table sales_raw_staging;

ストリーム情報を取得できます。

SHOW STREAMS;
created_on name database_name schema_name owner comment table_name source_type base_tables type stale mode stale_after invalid_reason
2022-10-28 06:07:43.371 -0700 SALES_STREAM STREAMS_DB PUBLIC ACCOUNTADMIN STREAMS_DB.PUBLIC.SALES_RAW_STAGING Table STREAMS_DB.PUBLIC.SALES_RAW_STAGING DELTA FALSE DEFAULT 02:43.4 N/A

ストリームオブジェクトに含まれる変更データを確認する

select * from sales_stream;

現在は変更データはありません。

sales_raw_stagingのデータを確認する

select * from sales_raw_staging;

現状5つのデータが存在します。

sales_raw_stagingにデータをINSERTした時の動作

INSERT句でデータを追加してみる

insert into sales_raw_staging  
    values
        (6,'Mango',1.99,1,2),
        (7,'Garlic',0.99,1,1);

もちろんsales_raw_stagingステージは7レコードになります。

select * from sales_raw_staging;

再度ストリームオブジェクトを確認してみる

select * from sales_stream;

先ほどsales_raw_stagingステージに追加した2レコードの情報が格納されています。
つまり変更履歴のデータをキャプチャしています。

sales_final_tableを確認してみる

 select * from sales_final_table;   

このテーブルを更新したわけではないので変更なしで5レコードのままです。

sales_streamを使ってsales_final_tableに2レコードを追加する

INSERT INTO sales_final_table 
    SELECT 
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION, 
    ST.EMPLOYEES 
    FROM SALES_STREAM SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;

sales_streamのキャプチャデータを消費したため空になります

select * from sales_stream;


以上がストリーム機能の動作になります。

もう一度INSERTからの動作を確認してみる

INSERTでデータを追加する
insert into sales_raw_staging  
    values
        (8,'Paprika',4.99,1,2),
        (9,'Tomato',3.99,1,2);
SALES_STREAMがキャプチャした変更履歴を使ってsales_final_tableにデータを追加する
INSERT INTO sales_final_table 
    SELECT 
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION, 
    ST.EMPLOYEES 
    FROM SALES_STREAM SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
SALES_FINAL_TABLEテーブルを確認すると9レコードとなっている
SELECT * FROM SALES_FINAL_TABLE;  

sales_raw_stagingデータをUPDATEした時の動作

先ほどはINSERTの動作確認でしたが、今回はUPDATEの動作確認をしてみます。

現状確認

SELECT * FROM SALES_RAW_STAGING;     

9レコードが格納されています。

SELECT * FROM SALES_STREAM;

ストリームオブジェクト上のキャプチャされたデータはありません。

UPDATE句でデータの変更をおこなう

変更箇所はPRODUCTカラムの'Banana'を'Potato'に変更します。

UPDATE SALES_RAW_STAGING
SET PRODUCT ='Potato' WHERE PRODUCT = 'Banana'

テーブルを確認してみると'Banana'から'Potato'に変わっています。

SELECT * FROM SALES_RAW_STAGING;     


ストリームオブジェクトを確認してみると2の変更履歴がキャプチャされています。

SELECT * FROM SALES_STREAM;

さらにINSERTと違う点はUPDATE箇所が1点しかなかったが、変更履歴は2レコードあるということです。
これは'Banana'をDELETEし'Potato'をINSERTしたため2レコード(DELETEとINSERT)存在しています。
そして'METADATA$ISUPDATE'も'True'となっています。なので更新を意味しています。

SALES_FINAL_TABLEにUPDATEをマージする

merge into SALES_FINAL_TABLE F
table
using SALES_STREAM S
   on  f.id = s.id                 
when matched 
    and S.METADATA$ACTION ='INSERT'
    and S.METADATA$ISUPDATE ='TRUE' 
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id;

SALES_FINAL_TABLEを確認する

SELECT * FROM SALES_FINAL_TABLE;

マージした通りの結果となっています。

sales_raw_stagingデータをDELETEした時の動作

現状確認

SALES_FINAL_TABLEもSALES_RAW_STAGINGも下記通り同じテーブル状態で、SALES_STREAMは空です。

DELETE句でデータを削除する

DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemon';

削除されたことを確認する

SELECT * FROM SALES_RAW_STAGING; 


さらにストリームオブジェクトを確認する

DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemon';

SALES_FINAL_TABLEにDELETEをマージする

merge into SALES_FINAL_TABLE F
using SALES_STREAM S
   on  f.id = s.id          
when matched 
    and S.METADATA$ACTION ='DELETE' 
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete 

SALES_FINAL_TABLEを確認する

SELECT * FROM SALES_FINAL_TABLE;

マージした通りの結果となっています。

最後に

これまでINSERT,UPDATE,DELETEとそれぞれ動作を見来ました。
次回はこれらの各動作を一緒に処理する方法を見ていきたいと思います。

Discussion

ログインするとコメントできます