Snowflake Stream機能の振り返り
Snowflakeストリーム機能について
過去の記事より
Snowflakeのストリーム機能について動作を確認したいと思います。
ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。
このプロセスは、変更データキャプチャ(CDC)と呼ばれます。
個々のテーブルストリームは、 ソーステーブルの行に加えられた変更を追跡します。
テーブルストリーム(単に「ストリーム」とも呼ばれます)は、テーブル内の2つのトランザクションポイント間で行レベルで変更された内容の「変更テーブル」を利用可能にします。
これにより、トランザクション形式で一連の変更記録をクエリおよび使用できます。
ドキュメントからではあまり意味が分からないのが正直なところですね。。。。
とりあえずハンズオンでイメージをつかんでみます。
環境準備
DBの作成
CREATE OR REPLACE TRANSIENT DATABASE STREAMS_DB;
記録用テーブルの作成
このテーブルは挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録する用のテーブルです。
create or replace table sales_raw_staging(
id varchar,
product varchar,
price varchar,
amount varchar,
store_id varchar);
sales_raw_stagingにサンプルデータを入れ込みます
insert into sales_raw_staging
values
(1,'Banana',1.99,1,1),
(2,'Lemon',0.99,1,1),
(3,'Apple',1.79,1,2),
(4,'Orange Juice',1.89,1,2),
(5,'Cereals',5.98,2,1);
ストア用テーブルの作成
記録用テーブル内の'store_id'に関連した情報を持つテーブルとしてstore_tableを作成します。
create or replace table store_table(
store_id number,
location varchar,
employees number);
ストア用テーブルにサンプルデータを入れ込みます
INSERT INTO STORE_TABLE VALUES(1,'Chicago',33);
INSERT INTO STORE_TABLE VALUES(2,'London',12);
記録用テーブルとストア用テーブルを結合させたようなテーブルを作成
ターゲットテーブルとしての役割です。
create or replace table sales_final_table(
id int,
product varchar,
price number,
amount int,
store_id int,
location varchar,
employees int);
sales_final_tableにサンプルデータを入れ込みます
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_RAW_STAGING SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
ストリーム機能の動作確認
ストリームオブジェクトの作成
create or replace stream {ストリーム名} on table {対象テーブル名};
対象テーブルで起きた変更全てをキャプチャします。
create or replace stream sales_stream on table sales_raw_staging;
ストリーム情報を取得できます。
SHOW STREAMS;
created_on | name | database_name | schema_name | owner | comment | table_name | source_type | base_tables | type | stale | mode | stale_after | invalid_reason |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2022-10-28 06:07:43.371 -0700 | SALES_STREAM | STREAMS_DB | PUBLIC | ACCOUNTADMIN | STREAMS_DB.PUBLIC.SALES_RAW_STAGING | Table | STREAMS_DB.PUBLIC.SALES_RAW_STAGING | DELTA | FALSE | DEFAULT | 02:43.4 | N/A |
ストリームオブジェクトに含まれる変更データを確認する
select * from sales_stream;
現在は変更データはありません。
sales_raw_stagingのデータを確認する
select * from sales_raw_staging;
現状5つのデータが存在します。
sales_raw_stagingにデータをINSERTした時の動作
INSERT句でデータを追加してみる
insert into sales_raw_staging
values
(6,'Mango',1.99,1,2),
(7,'Garlic',0.99,1,1);
もちろんsales_raw_stagingステージは7レコードになります。
select * from sales_raw_staging;
再度ストリームオブジェクトを確認してみる
select * from sales_stream;
先ほどsales_raw_stagingステージに追加した2レコードの情報が格納されています。
つまり変更履歴のデータをキャプチャしています。
sales_final_tableを確認してみる
select * from sales_final_table;
このテーブルを更新したわけではないので変更なしで5レコードのままです。
sales_streamを使ってsales_final_tableに2レコードを追加する
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_STREAM SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
sales_streamのキャプチャデータを消費したため空になります
select * from sales_stream;
以上がストリーム機能の動作になります。
もう一度INSERTからの動作を確認してみる
insert into sales_raw_staging
values
(8,'Paprika',4.99,1,2),
(9,'Tomato',3.99,1,2);
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_STREAM SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
SELECT * FROM SALES_FINAL_TABLE;
sales_raw_stagingデータをUPDATEした時の動作
先ほどはINSERTの動作確認でしたが、今回はUPDATEの動作確認をしてみます。
現状確認
SELECT * FROM SALES_RAW_STAGING;
9レコードが格納されています。
SELECT * FROM SALES_STREAM;
ストリームオブジェクト上のキャプチャされたデータはありません。
UPDATE句でデータの変更をおこなう
変更箇所はPRODUCTカラムの'Banana'を'Potato'に変更します。
UPDATE SALES_RAW_STAGING
SET PRODUCT ='Potato' WHERE PRODUCT = 'Banana'
テーブルを確認してみると'Banana'から'Potato'に変わっています。
SELECT * FROM SALES_RAW_STAGING;
ストリームオブジェクトを確認してみると2の変更履歴がキャプチャされています。
SELECT * FROM SALES_STREAM;
さらにINSERTと違う点はUPDATE箇所が1点しかなかったが、変更履歴は2レコードあるということです。
これは'Banana'をDELETEし'Potato'をINSERTしたため2レコード(DELETEとINSERT)存在しています。
そして'METADATA$ISUPDATE'も'True'となっています。なので更新を意味しています。
SALES_FINAL_TABLEにUPDATEをマージする
merge into SALES_FINAL_TABLE F
table
using SALES_STREAM S
on f.id = s.id
when matched
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE ='TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id;
SALES_FINAL_TABLEを確認する
SELECT * FROM SALES_FINAL_TABLE;
マージした通りの結果となっています。
sales_raw_stagingデータをDELETEした時の動作
現状確認
SALES_FINAL_TABLEもSALES_RAW_STAGINGも下記通り同じテーブル状態で、SALES_STREAMは空です。
DELETE句でデータを削除する
DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemon';
削除されたことを確認する
SELECT * FROM SALES_RAW_STAGING;
さらにストリームオブジェクトを確認する
DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemon';
SALES_FINAL_TABLEにDELETEをマージする
merge into SALES_FINAL_TABLE F
using SALES_STREAM S
on f.id = s.id
when matched
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
SALES_FINAL_TABLEを確認する
SELECT * FROM SALES_FINAL_TABLE;
マージした通りの結果となっています。
最後に
これまでINSERT,UPDATE,DELETEとそれぞれ動作を見来ました。
次回はこれらの各動作を一緒に処理する方法を見ていきたいと思います。
Snowflake ストリーム機能スマートな処理について
過去の記事より
前回記事ではストリーム機能を使って、INSERT,UPDATE,DELETEの動作確認をそれぞれ確認しました。
今回はそれらの動作をスマートに実施する方法を見ていきます。
環境の確認
SALES_RAW_STAGINGの確認
前回記事で最後に実施した状態でID2が欠落している状態です。
SALES_FINAL_TABLEの確認
SALES_RAW_STAGINGと同じ状態なので結果は割愛します。
SELECT * FROM SALES_FINAL_TABLE;
SALES_STREAMの確認
空なので結果は割愛します。
SELECT * FROM SALES_STREAM;
INSERTしてデータを追加する
前回記事で削除したID2をここで追加します。
※特に深い意味はありません。
INSERT INTO SALES_RAW_STAGING VALUES (2,'Lemon',0.99,1,1);
ストリームオブジェクトの確認
INSERT履歴が格納されていることを確認
SELECT * FROM SALES_STREAM;
スマートにSALES_FINAL_TABLEにMERGEする
ちなみにですが下記SQLでINERT,UPDATE,DELETEすべてに対応できます。
merge into SALES_FINAL_TABLE F
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location)
SALES_FINAL_TABLEのデータを確認してみると、追加したID2がマージされています。
SELECT * FROM SALES_FINAL_TABLE;
他の操作も見ていきます
UPDATEの動作
'Lemon'を'Lemonade'に更新します。
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Lemonade'
WHERE PRODUCT ='Lemon'
先ほど実行した複雑なMERGE句を実行します。
そしてSALES_FINAL_TABLEを確認してみる。
SELECT * FROM SALES_FINAL_TABLE;
DELETEの動作
'Lemonade'を削除します。
DELETE FROM SALES_RAW_STAGING WHERE PRODUCT = 'Lemonade';
再度先ほど実行した複雑なMERGE句を実行します。
そしてSALES_FINAL_TABLEを確認してみる。
'Lemonade'が消えマージされています。
SELECT * FROM SALES_FINAL_TABLE;
マージするSQLは複雑ですが、操作によってSQLを変更しなくてもよいことは素晴らしいです。
Snowflake ストリーム機能とタスク連携について
過去の記事より
前回前々回とストリーム機能について動作を見てきましたが、どうやらタスク機能と連携することで真のストリーム機能の発揮できるようです。
それでは実際にどのように連携するのか見ていきましょう。
タスク機能との連携
タスクのトリガーとしてSTREAM_HAS_DATA{対象のオブジェクト名}
を利用しています。
今回はSALES_STREAMにキャプチャデータあると実行されるようになっています。
そしてタスクのスケジュールは1分間隔で設定しています。
これで1分間隔でSALES_STREAMを参照しデータがあればMERGE句を実行します。
CREATE OR REPLACE TASK all_data_changes
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('SALES_STREAM')
AS
merge into SALES_FINAL_TABLE F
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched -- DELETE condition
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched -- UPDATE condition
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location)
Streamのデータクリア処理について
Streamのデータクリア処理について
業務でStreamとTaskを使ったデータ更新処理を行っているのですが、Streamのデータをクリアする処理について動作を確認してみました。
tony#COMPUTE_WH@DEMO_DB.PUBLIC>CREATE table test_table as SELECT 1 as no;
+----------------------------------------+
| status |
|----------------------------------------|
| Table TEST_TABLE successfully created. |
+----------------------------------------+
1 Row(s) produced. Time Elapsed: 0.894s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from TEST_TABLE;
+----+
| NO |
|----|
| 1 |
+----+
1 Row(s) produced. Time Elapsed: 0.182s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>
tony#COMPUTE_WH@DEMO_DB.PUBLIC>CREATE STREAM sample_stream ON TABLE TEST_TABLE;
+--------------------------------------------+
| status |
|--------------------------------------------|
| Stream SAMPLE_STREAM successfully created. |
+--------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.338s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>
tony#COMPUTE_WH@DEMO_DB.PUBLIC>INSERT into TEST_TABLE SELECT 2 as no;
+-------------------------+
| number of rows inserted |
|-------------------------|
| 1 |
+-------------------------+
1 Row(s) produced. Time Elapsed: 0.483s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from sample_stream;
+----+-----------------+-------------------+------------------------------------------+
| NO | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-----------------+-------------------+------------------------------------------|
| 2 | INSERT | False | 047d883040b6aebcbba56a63ca86318d29fa17b2 |
+----+-----------------+-------------------+------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.310s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>
以上より、SELECTではStreamでデータはクリアされません。
INSERTすれば消えるのか?
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from FIRST_TABLE;
+----+
| NO |
|----|
| 1 |
+----+
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from SECOND_TABLE;
+----+
| NO |
|----|
| 1 |
+----+
tony#COMPUTE_WH@DEMO_DB.PUBLIC>create or replace stream test_stream on table FIRST_TABLE;
+------------------------------------------+
| status |
|------------------------------------------|
| Stream TEST_STREAM successfully created. |
+------------------------------------------+
1 Row(s) produced. Time Elapsed: 1.846s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>INSERT into FIRST_TABLE SELECT 2 as no;
+-------------------------+
| number of rows inserted |
|-------------------------|
| 1 |
+-------------------------+
1 Row(s) produced. Time Elapsed: 0.422s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from FIRST_TABLE;
+----+
| NO |
|----|
| 1 |
| 2 |
+----+
2 Row(s) produced. Time Elapsed: 0.326s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from SECOND_TABLE;
+----+
| NO |
|----|
| 1 |
+----+
1 Row(s) produced. Time Elapsed: 0.141s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>INSERT INTO SECOND_TABLE SELECT no from test_stream;
+-------------------------+
| number of rows inserted |
|-------------------------|
| 1 |
+-------------------------+
1 Row(s) produced. Time Elapsed: 0.525s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from SECOND_TABLE;
+----+
| NO |
|----|
| 1 |
| 2 |
+----+
2 Row(s) produced. Time Elapsed: 0.256s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from test_stream;
+----+-----------------+-------------------+-----------------+
| NO | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-----------------+-------------------+-----------------|
+----+-----------------+-------------------+-----------------+
0 Row(s) produced. Time Elapsed: 0.361s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>DELETE from SECOND_TABLE where not EXISTS ( SELECT 1 from test_stream UNION all SELECT 1);
+------------------------+
| number of rows deleted |
|------------------------|
| 0 |
+------------------------+
0 Row(s) produced. Time Elapsed: 0.590s
tony#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * from test_stream;
+----+-----------------+-------------------+-----------------+
| NO | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-----------------+-------------------+-----------------|
+----+-----------------+-------------------+-----------------+
0 Row(s) produced. Time Elapsed: 0.255s
Discussion