👋

Snowflake ストリーム機能とタスク連携について

2022/11/13に公開約1,200字2件のコメント

前回前々回とストリーム機能について動作を見てきましたが、どうやらタスク機能と連携することで真のストリーム機能の発揮できるようです。
それでは実際にどのように連携するのか見ていきましょう。

タスク機能との連携

タスクのトリガーとしてSTREAM_HAS_DATA{対象のオブジェクト名}を利用しています。
今回はSALES_STREAMにキャプチャデータあると実行されるようになっています。
そしてタスクのスケジュールは1分間隔で設定しています。
これで1分間隔でSALES_STREAMを参照しデータがあればMERGE句を実行します。

CREATE OR REPLACE TASK all_data_changes
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('SALES_STREAM') 
    AS 
merge into SALES_FINAL_TABLE F 
USING ( SELECT STRE.*,ST.location,ST.employees
        FROM SALES_STREAM STRE
        JOIN STORE_TABLE ST
        ON STRE.store_id = ST.store_id
       ) S
ON F.id=S.id
when matched                        -- DELETE condition
    and S.METADATA$ACTION ='DELETE' 
    and S.METADATA$ISUPDATE = 'FALSE'
    then delete                   
when matched                        -- UPDATE condition
    and S.METADATA$ACTION ='INSERT' 
    and S.METADATA$ISUPDATE  = 'TRUE'       
    then update 
    set f.product = s.product,
        f.price = s.price,
        f.amount= s.amount,
        f.store_id=s.store_id
when not matched 
    and S.METADATA$ACTION ='INSERT'
    then insert 
    (id,product,price,store_id,amount,employees,location)
    values
    (s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location)

Discussion

いつもありがとうございます!Snowflakeが予め用意しているコンピュートリソースを処理の重さ分だけ自動で使用するサーバレスタスクもありますので、こちらも使ってみて頂けると嬉しいです。
https://docs.snowflake.com/ja/user-guide/tasks-intro.html#serverless-tasks

コメントありがとうございます!
是非サーバレスタスクも触って記事にしますね!!

ログインするとコメントできます