❄️

第4回Snowflake Rookies Camp ~FrostyFriday week38~

に公開

こんばんは!
Snowflake Rookies Camp幹事のhueです。
Snowflake Rookies Camp 第4回の勉強会ハンズオン予定のFrostyFriday Week38の回答例を確認できるように事前に準備しておきました。予習・復習にご利用ください!

FrostyFriday Week38の回答例

権限を変更

-- 権限
use role sysadmin;
use warehouse compute_wh;

データベースをまず定義し、その中にスキーマ・テーブルを作成する

まずはデータベース作成し、スキーマを作成します(空っぽの枠だけ)

-- データベース作成
create database db_week38;
use database db_week38;
create schema schema_week38;
use schema schema_week38;

テーブルを作成し、ダミーデータを入れます

1つ目 従業員情報

-- 1つ目のテーブル作成
create table employees (
    id int,
    name varchar(50),
    department varchar(50));

-- ダミーデータの挿入
insert into employees
(id, name, department)
values
(1, 'm.fujita', 'Sales'),
(2, 'hue', 'Marketing'),
(3, 'naoki_yokozawa', 'Marketing'),
(4, 'Kaori', 'Sales'),
(5, 'Hiroki', 'Sales'),
(6, 'jinya_tonai', 'Sales');

2つ目 売上情報

-- 2つ目のテーブル作成
create table sales (
    id int,
    employee_id int,
    sale_amount decimal(10, 2));

-- ダミーデータの挿入
insert into sales
(id, employee_id, sale_amount)
values
(1, 1, 100.00),
(2, 1, 200.00),
(3, 2, 150.00),
(4, 4, 250.00),
(5, 3, 150.00),
(6, 5, 100.00),
(7, 2, 150.00),
(8, 4, 100.00);

テーブルを結合します

-- テーブル結合
create view employee_sales as
select 
    e.id,
    e.name,
    e.department,
    s.sale_amount
from employees e
    join sales s 
    on e.id = s.employee_id;

table employee_sales;

Streamを作成します

今回のお題でもあるStreamを作成します。
Streamとはテーブルに加えられた変更(新規挿入、更新、削除)を追跡するオブジェクトです。変更をキャプチャし、その変更データをストリーム内の行として利用可能にします。

create or replace stream stream_week38 on view employee_sales;

どのようなデータが入っているか確認します。

show streams;
select * from stream_week38;

クエリで結果は生成されません。
Streamを作成してから変更した履歴がないためです。

データを削除してみます

delete from sales 
where id = 1;

-- Salesテーブルを表示しデータの削除を確認します
table sales;

Streamを確認する

select * from stream_week38;

結果はどうでしたか???

データをもう一度削除してみます

delete from sales 
where id = 3;

table sales;

select * from stream_week38;

今度の結果はどうでした?

削除データの確認テーブル作成

METADATAについてまとめている公式ドキュメントはこちら

create or replace table deleted_table(
    id int,
    name varchar,
    department varchar,
    sale_amount decimal(10, 2),
    metadata$action varchar,
    metadata$isupdate boolean,
    metadata$row_id varchar,
    deleted_at timestamp default current_timestamp());


insert into deleted_table(
    id,
    name,
    department,
    sale_amount,
    metadata$action,
    metadata$isupdate,
    metadata$row_id)
select
    id,
    name,
    department,
    sale_amount,
    metadata$action,
    metadata$isupdate,
    metadata$row_id
from stream_week38;

-- 表示して確認
table deleted_table;

タスクにて処理を自動化させます

Taskは、定期的にSQLコマンドを実行するためのスケジューリング機能です。
公式ドキュメントはこちら

タスクにて実行するための権限を付与します

use role accountadmin;
grant execute task on account to role sysadmin;
grant execute managed task on account to role sysadmin;

タスクを作成します

タスクとストリームを組み合わせて、テーブルに加えられた変更を自動的に別のテーブルに記録する処理を定義

use role sysadmin;
create or replace task deleted_data
target_completion_interval='1 minute'
when system$stream_has_data('stream_week38')
as
insert into deleted_table(
    id,
    name,
    department,
    sale_amount,
    metadata$action,
    metadata$isupdate,
    metadata$row_id)
select
    id,
    name,
    department,
    sale_amount,
    metadata$action,
    metadata$isupdate,
    metadata$row_id
from stream_week38;

show tasks;
  • 「deleted_data」という名称のタスクを作成
  • TARGET_COMPLETION_INTERVAL = '1 minute'
    タスクの実行をSnowflakeが1分ごとに試みるようにスケジューリングする設定です。
  • when system$stream_has_data('stream_week38')
    stream_week38という名前のストリームに新しいデータがある場合のみ、タスクを実行します。新しいデータがなければ、タスクはスキップされ、コンピューティングリソースとコストが節約されます。
  • as insert into deleted_table ... select ... from stream_week38;
    これは、タスクが実行するSQLコマンドです

定期実行させます

定期実行するために有効化させます

alter task deleted_data resume;
  • タスクは作成された時点ではSUSPENDED(一時停止)状態になっています。これは、タスクの準備が整うまで意図せず実行されないようにするためです。

alter task~~ コマンドを実行することで、タスクはSTARTED状態に変わり、設定されたスケジュール(この場合はtarget_completion_interval='1 minute')と条件(when system$stream_has_data(...))に基づいて、自動的に実行されるようになります。

再度データを消して確認します

-- 再度データを消して確認 
delete from sales 
where id = 4;

table deleted_table;

定期実行を停止させて終わりです

alter task deleted_data suspend;

コード全文

-- 権限
use role sysadmin;
use warehouse compute_wh;

-- データベース作成
create database db_week38;
use database db_week38;
create schema schema_week38;
use schema schema_week38;

-- 1つ目のテーブル作成
create table employees (
    id int,
    name varchar(50),
    department varchar(50));

-- ダミーデータの挿入
insert into employees
(id, name, department)
values
(1, 'm.fujita', 'Sales'),
(2, 'hue', 'Marketing'),
(3, 'naoki_yokozawa', 'Marketing'),
(4, 'Kaori', 'Sales'),
(5, 'Hiroki', 'Sales'),
(6, 'jinya_tonai', 'Sales');

-- 2つ目のテーブル作成
create table sales (
    id int,
    employee_id int,
    sale_amount decimal(10, 2));

-- ダミーデータの挿入
insert into sales
(id, employee_id, sale_amount)
values
(1, 1, 100.00),
(2, 1, 200.00),
(3, 2, 150.00),
(4, 4, 250.00),
(5, 3, 150.00),
(6, 5, 100.00),
(7, 2, 150.00),
(8, 4, 100.00);

-- テーブル結合
create view employee_sales as
select 
    e.id,
    e.name,
    e.department,
    s.sale_amount
from employees e
    join sales s 
    on e.id = s.employee_id;

table employee_sales;

-- Streamの作成
create or replace stream stream_week38 on view employee_sales;

show streams;
select * from stream_week38;

-- データを削除してみる
delete from sales 
where id = 1;

table sales;

select * from stream_week38;

-- データをもう一度削除
delete from sales 
where id = 3;

table sales;

select * from stream_week38;

-- 削除データの確認テーブル作成
-- https://docs.snowflake.com/ja/user-guide/streams-intro#stream-columns
create or replace table deleted_table(
    id int,
    name varchar,
    department varchar,
    sale_amount decimal(10, 2),
    metadata$action varchar,
    metadata$isupdate boolean,
    metadata$row_id varchar,
    deleted_at timestamp default current_timestamp());

insert into deleted_table(
    id,name,department,sale_amount,metadata$action,metadata$isupdate,metadata$row_id)
select 
    id,name,department,sale_amount,metadata$action,metadata$isupdate,metadata$row_id
from stream_week38;
    
table deleted_table;

-- 時間あればタスク化
-- タスク実行権限付与
use role accountadmin;
grant execute task on account to role sysadmin;
grant execute managed task on account to role sysadmin;

-- タスク作成 
use role sysadmin;
create or replace task deleted_data
target_completion_interval='1 minute'
when system$stream_has_data('stream_week38')
as
insert into deleted_table(
    id,name,department,sale_amount,metadata$action,metadata$isupdate,metadata$row_id)
select id,name,department,sale_amount,metadata$action,metadata$isupdate,metadata$row_id
from stream_week38;

show tasks;

-- 定期実行させる
alter task deleted_data resume;

-- 再度データを消して確認 
delete from sales 
where id = 4;

table deleted_table;

-- 定期実行を停止させる
alter task deleted_data suspend;

Discussion