Frosty Friday Week2 Intermediate Streams
こんにちは! がく@ちゅらデータエンジニアです。
こちらの6日目の記事になります。
Frosty Friday Week2 Intermediate Streams
こちらのチャレンジになります。
Week2のチャレンジの内容
人事部の担当者は、変更管理をしていきたいと考えています。
しかし、作成したSTREAMが、関係ないものも拾ってしまっていると考えていました。
Parquet形式のデータをロードし、テーブルに変換します。
DEPTとJOB_TITLEのカラムへの変更のみを表示するSTREAMを作成してください
Parquetのデータは、
https://frostyfridaychallenges.s3.eu-west-1.amazonaws.com/challenge_2/employees.parquet
にあります。
データの取り込みが済んだら、次のコマンドを実行します
UPDATE <table_name> SET COUNTRY = 'Japan' WHERE EMPLOYEE_ID = 8;
UPDATE <table_name> SET LAST_NAME = 'Forester' WHERE EMPLOYEE_ID = 22;
UPDATE <table_name> SET DEPT = 'Marketing' WHERE EMPLOYEE_ID = 25;
UPDATE <table_name> SET TITLE = 'Ms' WHERE EMPLOYEE_ID = 32;
UPDATE <table_name> SET JOB_TITLE = 'Senior Financial Analyst' WHERE EMPLOYEE_ID = 68;
結果は次のようになります
DEPTとJOB_TITLEが変更されたUPDATEのみSTREAMに記録されています。
こちらを実現してみましょう!
環境を設定する
use role sysadmin;
use warehouse gaku_wh;
use database frosty_friday;
create schema week002;
データベース「FROSTY_FRIDAY」はすでに作ってあったので、スキーマ「WEEK002」を作ります。
解法① INFER_SCHEMAを使う方法
ファイルを読み込むFILE FORMATを作成
今回の対象ファイルは
ですので、ファイル形式は、parquetです。create or replace file format week2_parquet type = 'parquet';
大変シンプルですが、一旦 type = 'parquet'
のみです。
TYPE=PARQUETのオプションは
- SNAPPY_COMPRESSION = TRUE | FALSE
- BINARY_AS_TEXT = TRUE | FALSE
- TRIM_SPACE = TRUE | FALSE
- USE_LOGICAL_TYPE = TRUE | FALSE
- REPLACE_INVALID_CHARACTERS = TRUE | FALSE
- NULL_IF = ( 'string1' [ , 'string2' , ... ] )
TYPE=CSVより少ないですね。
※CSVはほんと耐エラー性に乏しいというかなんというか・・・・
※でもSnowflakeの読み込みがParquetよりcsvのほうが早いというのは、Snowflake頑張り過ぎというか変態というか(ぇ
ステージの作成
また、こちらはhttpsのパスですが、Stageで読み込む際は、
s3://frostyfridaychallenges/challenge_2/employees.parquet
を使うことになります。
-- STAGEの作成(↑で作成したFILE FORMATを使って)
-- https://frostyfridaychallenges.s3.eu-west-1.amazonaws.com/challenge_2/employees.parquet
create or replace stage week2_ext_stage
url = 's3://frostyfridaychallenges/challenge_2/employees.parquet'
file_format = (FORMAT_NAME = 'week2_parquet')
;
-- STAGE内を確認
list @week2_ext_stage;
読み込むファイルは一つだけですね
ファイルの中身を確認してみましょう
select
$1
, $2
, $3
, metadata$filename
, metadata$file_row_number
from
@week2_ext_stage
;
こちらを実行するとエラーになります
$2, $3を設定したら「PARQUET ファイル形式は、型バリアント、オブジェクトまたは配列の列を1つだけ生成できます。the MATCH_BY_COLUMN_NAME copy option or コピーと変換を使用して、データを個別の列にロードします。」
ってな感じのエラーです
select
$1
, metadata$filename
, metadata$file_row_number
from
@week2_ext_stage
;
こんな感じで中身が見れます
metadata
これは必ず取り込みましょう!とりあえず、取り込んでおきましょう!取り込んでおくと色々必ず役に立ちます!
parquetファイルからカラム抽出を行います。
そこで使うのが、INFER_SCHEMA
SELECT *
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@week2_ext_stage'
, FILE_FORMAT=>'week2_parquet'
)
);
を行うと
(一部抜粋)
INFER_SCHEMAとCREATE TABLE USING TEMPLATE を組み合わせてテーブルを生成します
CREATE OR REPLACE TABLE week2_table
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
WITHIN GROUP (ORDER BY order_id)
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@week2_ext_stage',
FILE_FORMAT=>'week2_parquet'
)
));
desc table week2_table;
こんな感じで生成できます。
ちなみに
OBJECT_CONSTRUCT(*)は、{*}
でもかけるようになったので、
CREATE OR REPLACE TABLE week2_table
USING TEMPLATE (
SELECT ARRAY_AGG({*})
WITHIN GROUP (ORDER BY order_id)
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@week2_ext_stage',
FILE_FORMAT=>'week2_parquet'
)
));
でも全く同じことが実現できます。
データの投入(COPY INTO)
copy into week2_table from @week2_ext_stage ;
ってやると、SQL compilation errorになります
ってことのようです。
なので、
copy into week2_table from @week2_ext_stage MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE';
こちらでデータを投入します。
select
"employee_id"
, "dept"
, "job_title"
from
week2_table;
このまま、week2_table
にたいして、STREAMを作ると、すべてのカラムの変更を検出してしまします。
今回のチャレンジでは「deptとjob_title」だけを変更検出したいので、ビューを作成します。
※今回の肝のテクニック
※これは、自分もわからなかったので、他の方の解法を頂きましたw
create view week2_view as
select
"employee_id"
, "dept"
, "job_title"
from
week2_table;
STREAMを作る
CREATE OR REPLACE STREAM week2_stream
ON VIEW week2_view
;
streamの中身を確認(この時点では空)します。
select * from week2_stream;
空ですね
テーブルに変更を加える
UPDATE week2_table SET "country" = 'Japan' WHERE "employee_id" = 8;
select * from week2_stream; -- streamにはいらない
UPDATE week2_table SET "last_name" = 'Forester' WHERE "employee_id" = 22;
select * from week2_stream; -- streamにはいらない
UPDATE week2_table SET "dept" = 'Marketing' WHERE "employee_id" = 25;
select * from week2_stream; -- streamにはいる
最後のUPDATEだけが反映されます。
UPDATE week2_table SET "title" = 'Ms' WHERE "employee_id" = 32;
select * from week2_stream; -- streamにはいらない
UPDATE week2_table SET "job_title" = 'Senior Financial Analyst' WHERE "employee_id" = 68;
select * from week2_stream;
STREAMのデータを消す=消費する方法
STREAMのデータを消費するためには、STREAMの中身を使ってテーブルに書き込んだりすると消えます。
ですので
create temporary table week2_tbl_tmp as select * from week2_stream;
このように temporaryテーブルで作ると、Sessionが切れたら消えるのでいい感じで消せます。
他の方法としては
create table target_table.....;
INSERT INTO target_table
SELECT *
FROM week2_stream;
drop table target_table;
とかする方法がありますが、ちょびっとめんどくさい(CREATE TABLEするところなど)
補足 INFER_SCHEMAのオプション「IGNORE_CASE=>TRUE」を使おう
ちなみに、INFER_SCHEMAに IGNORE_CASE = TRUE
を入れると、小文字カラムにならない=「"で囲む必要がなくなる」になります
CREATE OR REPLACE TABLE week2_2_table
USING TEMPLATE (
SELECT ARRAY_AGG({*})
WITHIN GROUP (ORDER BY order_id)
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@week2_ext_stage',
FILE_FORMAT=>'week2_parquet',
IGNORE_CASE => TRUE
)
));
Snowflake的には、IGNORE_CASE => TRUE
をつけたほうがいいかなーと個人的には思います。
解法② INFER_SCHEMAを使わない?方法 いや USING TEMPLATEを使わない方法
とベタ書きをする方法があります。
ただ、parquetファイルのスキーマ?を調べるには、INFER_SCHEMAを使うのが楽・・・ですよね・・・
-- parquet形式を読み込むFILE FORMATの作成
create or replace file format week2_parquet_2 type = 'parquet';
-- 解法①との違いは、URLとderectory
create or replace stage week2_ext_stage_2
url = 's3://frostyfridaychallenges/challenge_2/'
file_format = (FORMAT_NAME = 'week2_parquet_2')
directory = ( enable = true );
;
list @week2_ext_stage_2;
-- Snowsight でStageをみると directory = ( enable = true ); の違いがよく分かります
SELECT *
FROM TABLE(
INFER_SCHEMA(
location =>'@week2_ext_stage_2'
, file_format => 'week2_parquet_2'
, files => 'employees.parquet'
, ignore_case => true
)
);
あとは、ベタ書き・・・・・結構辛いですね!!!
create or replace table week2_table_2
( employee_id number ,
first_name varchar ,
last_name varchar ,
email varchar ,
street_num number ,
street_name varchar ,
city varchar ,
postcode varchar ,
country varchar ,
country_code varchar ,
time_zone varchar ,
payroll_iban varchar ,
dept varchar ,
job_title varchar ,
education varchar ,
title varchar ,
suffix varchar
);
copy into week2_table_2
from
( select $1:employee_id::number,
$1:first_name::varchar,
$1:last_name::varchar,
$1:email::varchar,
$1:street_num::number,
$1:street_name::varchar,
$1:city::varchar,
$1:postcode::varchar,
$1:country::varchar,
$1:country_code::varchar,
$1:time_zone::varchar,
$1:payroll_iban::varchar,
$1:dept::varchar,
$1:job_title::varchar,
$1:education::varchar,
$1:title::varchar,
$1:suffix::varchar
from '@week2_ext_stage_2/employees.parquet')
FILE_FORMAT = week2_parquet;
STREAMについて
STREAMとは、Snowflakeにおいて、データの変更(追加・更新・削除)を追跡するためのメカニズムです。CDC(Change Data Capture)の仕組みです。
あるテーブルにデータが変更されたら、変更されたデータに対して何らかの処理をしたい、TASKと組み合わせて、STREAMにデータが有れば処理を動かす・・・みたいなことができます。
主なユースケースとしては
- リアルタイムデータ同期は、pub/sub的なストリーミング的に同期ができるのでニアリアルで連携できそう。
- 差分(ETL)処理は、自分では一番使いそうなユースケース
- 監査ログの作成にも使えそう
どのケースも、TASKと併用することで真価を発揮しそう。
dbtだと、incrementalを使ったりして、増分データについて処理をしたりしますが、そちらより使いやすいなーとは思っています(私見)
STREAMは、SnowflakeだけでCDCを実現して、増分データだけに処理ができてとてもいいのですが、dbtなどDAGでデータパイプラインを管理するようになると使わなくなりましたね。
まとめ
STREAMはSnowflakeにおけるCDCのメカニズムで、とても便利な機能です。
こちらのコードは、
にて公開しています。
Discussion