🦁

Snowflake Stream機能の振り返り

2023/09/16に公開

Snowflakeストリーム機能について

過去の記事より

Snowflakeのストリーム機能について動作を確認したいと思います。

ドキュメントより抜粋
ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。
このプロセスは、変更データキャプチャ(CDC)と呼ばれます。
個々のテーブルストリームは、 ソーステーブルの行に加えられた変更を追跡します。
テーブルストリーム(単に「ストリーム」とも呼ばれます)は、テーブル内の2つのトランザクションポイント間で行レベルで変更された内容の「変更テーブル」を利用可能にします。
これにより、トランザクション形式で一連の変更記録をクエリおよび使用できます。

ドキュメントからではあまり意味が分からないのが正直なところですね。。。。
とりあえずハンズオンでイメージをつかんでみます。

環境準備

DBの作成

CREATE OR REPLACE TRANSIENT DATABASE STREAMS_DB;

記録用テーブルの作成

このテーブルは挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録する用のテーブルです。

create or replace table sales_raw_staging(
  id varchar,
  product varchar,
  price varchar,
  amount varchar,
  store_id varchar);

sales_raw_stagingにサンプルデータを入れ込みます

insert into sales_raw_staging
    values
        (1,'Banana',1.99,1,1),
        (2,'Lemon',0.99,1,1),
        (3,'Apple',1.79,1,2),
        (4,'Orange Juice',1.89,1,2),
        (5,'Cereals',5.98,2,1);

ストア用テーブルの作成

記録用テーブル内の'store_id'に関連した情報を持つテーブルとしてstore_tableを作成します。

create or replace table store_table(
  store_id number,
  location varchar,
  employees number);

ストア用テーブルにサンプルデータを入れ込みます

INSERT INTO STORE_TABLE VALUES(1,'Chicago',33);
INSERT INTO STORE_TABLE VALUES(2,'London',12);

記録用テーブルとストア用テーブルを結合させたようなテーブルを作成

ターゲットテーブルとしての役割です。

create or replace table sales_final_table(
  id int,
  product varchar,
  price number,
  amount int,
  store_id int,
  location varchar,
  employees int);

sales_final_tableにサンプルデータを入れ込みます

INSERT INTO sales_final_table
    SELECT
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION,
    ST.EMPLOYEES
    FROM SALES_RAW_STAGING SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;

ストリーム機能の動作確認

ストリームオブジェクトの作成

create or replace stream {ストリーム名} on table {対象テーブル名};
対象テーブルで起きた変更全てをキャプチャします。

create or replace stream sales_stream on table sales_raw_staging;

ストリーム情報を取得できます。

SHOW STREAMS;
created_on name database_name schema_name owner comment table_name source_type base_tables type stale mode stale_after invalid_reason
2022-10-28 06:07:43.371 -0700 SALES_STREAM STREAMS_DB PUBLIC ACCOUNTADMIN STREAMS_DB.PUBLIC.SALES_RAW_STAGING Table STREAMS_DB.PUBLIC.SALES_RAW_STAGING DELTA FALSE DEFAULT 02:43.4 N/A

ストリームオブジェクトに含まれる変更データを確認する

select * from sales_stream;

現在は変更データはありません。

sales_raw_stagingのデータを確認する

select * from sales_raw_staging;

現状5つのデータが存在します。

sales_raw_stagingにデータをINSERTした時の動作

INSERT句でデータを追加してみる

insert into sales_raw_staging
    values
        (6,'Mango',1.99,1,2),
        (7,'Garlic',0.99,1,1);

もちろんsales_raw_stagingステージは7レコードになります。

select * from sales_raw_staging;

再度ストリームオブジェクトを確認してみる

select * from sales_stream;

先ほどsales_raw_stagingステージに追加した2レコードの情報が格納されています。
つまり変更履歴のデータをキャプチャしています。

sales_final_tableを確認してみる

 select * from sales_final_table;

このテーブルを更新したわけではないので変更なしで5レコードのままです。

sales_streamを使ってsales_final_tableに2レコードを追加する

INSERT INTO sales_final_table
    SELECT
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION, 
    ST.EMPLOYEES 
    FROM SALES_STREAM SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;

sales_streamのキャプチャデータを消費したため空になります

select * from sales_stream;


以上がストリーム機能の動作になります。

もう一度INSERTからの動作を確認してみる

INSERTでデータを追加する
insert into sales_raw_staging  
    values
        (8,'Paprika',4.99,1,2),
        (9,'Tomato',3.99,1,2);
SALES_STREAMがキャプチャした変更履歴を使ってsales_final_tableにデータを追加する
INSERT INTO sales_final_table 
    SELECT 
    SA.id,
    SA.product,
    SA.price,
    SA.amount,
    ST.STORE_ID,
    ST.LOCATION,
    ST.EMPLOYEES
    FROM SALES_STREAM SA
    JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
SALES_FINAL_TABLEテーブルを確認すると9レコードとなっている
SELECT * FROM SALES_FINAL_TABLE;

sales_raw_stagingデータをUPDATEした時の動作

先ほどはINSERTの動作確認でしたが、今回はUPDATEの動作確認をしてみます。

現状確認

SELECT * FROM SALES_RAW_STAGING;

9レコードが格納されています。

SELECT * FROM SALES_STREAM;

ストリームオブジェクト上のキャプチャされたデータはありません。

UPDATE句でデータの変更をおこなう

変更箇所はPRODUCTカラムの'Banana'を'Potato'に変更します。

UPDATE SALES_RAW_STAGING
SET PRODUCT ='Potato' WHERE PRODUCT = 'Banana'

テーブルを確認してみると'Banana'から'Potato'に変わっています。

SELECT * FROM SALES_RAW_STAGING;


ストリームオブジェクトを確認してみると2の変更履歴がキャプチャされています。

SELECT * FROM SALES_STREAM;

さらにINSERTと違う点はUPDATE箇所が1点しかなかったが、変更履歴は2レコードあるということです。
これは'Banana'をDELETEし'Potato'をINSERTしたため2レコード(DELETEとINSERT)存在しています。
そして'METADATA$ISUPDATE'も'True'となっています。なので更新を意味しています。

SALES_FINAL_TABLEにUPDATEをマージする

merge into SALES_FINAL_TABLE F
table
using SALES_STREAM S
   on  f.id = s.id
when matched
    and S.METADATA$ACTION ='INSERT'
    and S.METADATA$ISUPDATE ='TRUE'
    then update
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id;

SALES_FINAL_TABLEを確認する

SELECT * FROM SALES_FINAL_TABLE;

マージした通りの結果となっています。

sales_raw_stagingデータをDELETEした時の動作

現状確認

SALES_FINAL_TABLEもSALES_RAW_STAGINGも下記通り同じテーブル状態で、SALES_STREAMは空です。

DELETE句でデータを削除する

DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemon';

削除されたことを確認する

SELECT * FROM SALES_RAW_STAGING;


さらにストリームオブジェクトを確認する

DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemon';

SALES_FINAL_TABLEにDELETEをマージする

merge into SALES_FINAL_TABLE F
using SALES_STREAM S
   on  f.id = s.id
when matched
    and S.METADATA$ACTION ='DELETE'
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete

SALES_FINAL_TABLEを確認する

SELECT * FROM SALES_FINAL_TABLE;

マージした通りの結果となっています。

最後に

これまでINSERT,UPDATE,DELETEとそれぞれ動作を見来ました。
次回はこれらの各動作を一緒に処理する方法を見ていきたいと思います。

Snowflake ストリーム機能スマートな処理について

過去の記事より

前回記事ではストリーム機能を使って、INSERT,UPDATE,DELETEの動作確認をそれぞれ確認しました。
今回はそれらの動作をスマートに実施する方法を見ていきます。

環境の確認

SALES_RAW_STAGINGの確認

前回記事で最後に実施した状態でID2が欠落している状態です。

SALES_FINAL_TABLEの確認

SALES_RAW_STAGINGと同じ状態なので結果は割愛します。

SELECT * FROM SALES_FINAL_TABLE;

SALES_STREAMの確認

空なので結果は割愛します。

SELECT * FROM SALES_STREAM;

INSERTしてデータを追加する

前回記事で削除したID2をここで追加します。
※特に深い意味はありません。

INSERT INTO SALES_RAW_STAGING VALUES (2,'Lemon',0.99,1,1);

ストリームオブジェクトの確認
INSERT履歴が格納されていることを確認

SELECT * FROM SALES_STREAM;

スマートにSALES_FINAL_TABLEにMERGEする

ちなみにですが下記SQLでINERT,UPDATE,DELETEすべてに対応できます。

merge into SALES_FINAL_TABLE F
USING ( SELECT STRE.*,ST.location,ST.employees
        FROM SALES_STREAM STRE
        JOIN STORE_TABLE ST
        ON STRE.store_id = ST.store_id
       ) S
ON F.id=S.id
when matched
    and S.METADATA$ACTION ='DELETE'
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete
when matched
and S.METADATA$ACTION ='INSERT'
    and S.METADATA$ISUPDATE  = 'TRUE'
    then update
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id
when not matched
    and S.METADATA$ACTION ='INSERT'
    then insert
    (id,product,price,store_id,amount,employees,location)
    values
    (s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location)

SALES_FINAL_TABLEのデータを確認してみると、追加したID2がマージされています。

SELECT * FROM SALES_FINAL_TABLE;

他の操作も見ていきます

UPDATEの動作

'Lemon'を'Lemonade'に更新します。

UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Lemonade'
WHERE PRODUCT ='Lemon'

先ほど実行した複雑なMERGE句を実行します。
そしてSALES_FINAL_TABLEを確認してみる。

SELECT * FROM SALES_FINAL_TABLE;

DELETEの動作

'Lemonade'を削除します。

DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemonade';

再度先ほど実行した複雑なMERGE句を実行します。
そしてSALES_FINAL_TABLEを確認してみる。
'Lemonade'が消えマージされています。

SELECT * FROM SALES_FINAL_TABLE;


マージするSQLは複雑ですが、操作によってSQLを変更しなくてもよいことは素晴らしいです。

Snowflake ストリーム機能とタスク連携について

過去の記事より

前回前々回とストリーム機能について動作を見てきましたが、どうやらタスク機能と連携することで真のストリーム機能の発揮できるようです。
それでは実際にどのように連携するのか見ていきましょう。

タスク機能との連携

タスクのトリガーとしてSTREAM_HAS_DATA{対象のオブジェクト名}を利用しています。
今回はSALES_STREAMにキャプチャデータあると実行されるようになっています。
そしてタスクのスケジュールは1分間隔で設定しています。
これで1分間隔でSALES_STREAMを参照しデータがあればMERGE句を実行します。

CREATE OR REPLACE TASK all_data_changes
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('SALES_STREAM')
    AS
merge into SALES_FINAL_TABLE F
USING ( SELECT STRE.*,ST.location,ST.employees
        FROM SALES_STREAM STRE
        JOIN STORE_TABLE ST
        ON STRE.store_id = ST.store_id
       ) S
ON F.id=S.id
when matched                        -- DELETE condition
    and S.METADATA$ACTION ='DELETE'
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete
when matched                        -- UPDATE condition
    and S.METADATA$ACTION ='INSERT'
    and S.METADATA$ISUPDATE  = 'TRUE'
    then update
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id
when not matched
    and S.METADATA$ACTION ='INSERT'
    then insert
    (id,product,price,store_id,amount,employees,location)
    values
    (s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location)

Streamのデータクリア処理について

Streamのデータクリア処理について

業務でStreamとTaskを使ったデータ更新処理を行っているのですが、Streamのデータをクリアする処理について動作を確認してみました。

検証用テーブルの作成
tony#COMPUTE_WH@DEMO_DB.PUBLIC>CREATE table test_table as SELECT 1 as no;
+----------------------------------------+
| status                                 |
|----------------------------------------|
| Table TEST_TABLE successfully created. |
+----------------------------------------+
1 Row(s) produced. Time Elapsed: 0.894s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from TEST_TABLE;
+----+
| NO |
|----|
|  1 |
+----+
1 Row(s) produced. Time Elapsed: 0.182s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>
ストリームの設定
tony#COMPUTE_WH@DEMO_DB.PUBLIC>CREATE STREAM sample_stream ON TABLE TEST_TABLE;
+--------------------------------------------+
| status                                     |
|--------------------------------------------|
| Stream SAMPLE_STREAM successfully created. |
+--------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.338s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>
test_tableへレコードを追加
tony#COMPUTE_WH@DEMO_DB.PUBLIC>INSERT into TEST_TABLE SELECT 2 as no;
+-------------------------+
| number of rows inserted |
|-------------------------|
|                       1 |
+-------------------------+
1 Row(s) produced. Time Elapsed: 0.483s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>
Streamが保持している追跡データの確認
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from sample_stream;
+----+-----------------+-------------------+------------------------------------------+
| NO | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----+-----------------+-------------------+------------------------------------------|
|  2 | INSERT          | False             | 047d883040b6aebcbba56a63ca86318d29fa17b2 |
+----+-----------------+-------------------+------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.310s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>

以上より、SELECTではStreamでデータはクリアされません。

INSERTすれば消えるのか?

2つのテーブルを作成
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from FIRST_TABLE;
+----+
| NO |
|----|
|  1 |
+----+
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from SECOND_TABLE;
+----+
| NO |
|----|
|  1 |
+----+
FIRST_TABLEにStream設定をする
tony#COMPUTE_WH@DEMO_DB.PUBLIC>create or replace stream test_stream on table FIRST_TABLE;
+------------------------------------------+
| status                                   |
|------------------------------------------|
| Stream TEST_STREAM successfully created. |
+------------------------------------------+
1 Row(s) produced. Time Elapsed: 1.846s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>INSERT into FIRST_TABLE SELECT 2 as no;
+-------------------------+
| number of rows inserted |
|-------------------------|
|                       1 |
+-------------------------+
1 Row(s) produced. Time Elapsed: 0.422s
FIRST/SECONDA_TABLEの状態を確認
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from FIRST_TABLE;
+----+
| NO |
|----|
|  1 |
|  2 |
+----+
2 Row(s) produced. Time Elapsed: 0.326s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from SECOND_TABLE;
+----+
| NO |
|----|
|  1 |
+----+
1 Row(s) produced. Time Elapsed: 0.141s
INSERTを使いTEST_STREAMからデータを投入
tony#COMPUTE_WH@DEMO_DB.PUBLIC>INSERT INTO SECOND_TABLE SELECT no from test_stream;
+-------------------------+
| number of rows inserted |
|-------------------------|
|                       1 |
+-------------------------+
1 Row(s) produced. Time Elapsed: 0.525s
SECOND_TABLEもレコードが追加されStreamのデータも消えている
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from SECOND_TABLE;
+----+
| NO |
|----|
|  1 |
|  2 |
+----+
2 Row(s) produced. Time Elapsed: 0.256s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from test_stream;
+----+-----------------+-------------------+-----------------+
| NO | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-----------------+-------------------+-----------------|
+----+-----------------+-------------------+-----------------+
0 Row(s) produced. Time Elapsed: 0.361s
DELETEでも消えることを確認しました
tony#COMPUTE_WH@DEMO_DB.PUBLIC>DELETE from SECOND_TABLE where not EXISTS ( SELECT 1 from test_stream UNION all SELECT 1);
+------------------------+
| number of rows deleted |
|------------------------|
|                      0 |
+------------------------+
0 Row(s) produced. Time Elapsed: 0.590s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from test_stream;
+----+-----------------+-------------------+-----------------+
| NO | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-----------------+-------------------+-----------------|
+----+-----------------+-------------------+-----------------+
0 Row(s) produced. Time Elapsed: 0.255s

Discussion