❄️

Frosty Friday Week2 Intermediate Streams

こんにちは! がく@ちゅらデータエンジニアです。

https://qiita.com/advent-calendar/2024/frostyfriday

こちらの6日目の記事になります。

Frosty Friday Week2 Intermediate Streams

https://frostyfriday.org/blog/2022/07/15/week-2-intermediate/

こちらのチャレンジになります。

Week2のチャレンジの内容

人事部の担当者は、変更管理をしていきたいと考えています。
しかし、作成したSTREAMが、関係ないものも拾ってしまっていると考えていました。

Parquet形式のデータをロードし、テーブルに変換します。
DEPTとJOB_TITLEのカラムへの変更のみを表示するSTREAMを作成してください

Parquetのデータは、
https://frostyfridaychallenges.s3.eu-west-1.amazonaws.com/challenge_2/employees.parquet
にあります。

データの取り込みが済んだら、次のコマンドを実行します

UPDATE <table_name> SET COUNTRY = 'Japan' WHERE EMPLOYEE_ID = 8;
UPDATE <table_name> SET LAST_NAME = 'Forester' WHERE EMPLOYEE_ID = 22;
UPDATE <table_name> SET DEPT = 'Marketing' WHERE EMPLOYEE_ID = 25;
UPDATE <table_name> SET TITLE = 'Ms' WHERE EMPLOYEE_ID = 32;
UPDATE <table_name> SET JOB_TITLE = 'Senior Financial Analyst' WHERE EMPLOYEE_ID = 68;

結果は次のようになります

DEPTとJOB_TITLEが変更されたUPDATEのみSTREAMに記録されています。
こちらを実現してみましょう!

環境を設定する

use role sysadmin;
use warehouse gaku_wh;
use database frosty_friday;
create schema week002;

データベース「FROSTY_FRIDAY」はすでに作ってあったので、スキーマ「WEEK002」を作ります。

解法① INFER_SCHEMAを使う方法

ファイルを読み込むFILE FORMATを作成

今回の対象ファイルは
https://frostyfridaychallenges.s3.eu-west-1.amazonaws.com/challenge_2/employees.parquet
ですので、ファイル形式は、parquetです。

create or replace file format week2_parquet type = 'parquet';

大変シンプルですが、一旦 type = 'parquet' のみです。

TYPE=PARQUETのオプションは

  • SNAPPY_COMPRESSION = TRUE | FALSE
  • BINARY_AS_TEXT = TRUE | FALSE
  • TRIM_SPACE = TRUE | FALSE
  • USE_LOGICAL_TYPE = TRUE | FALSE
  • REPLACE_INVALID_CHARACTERS = TRUE | FALSE
  • NULL_IF = ( 'string1' [ , 'string2' , ... ] )

TYPE=CSVより少ないですね。

※CSVはほんと耐エラー性に乏しいというかなんというか・・・・
※でもSnowflakeの読み込みがParquetよりcsvのほうが早いというのは、Snowflake頑張り過ぎというか変態というか(ぇ

ステージの作成

また、こちらはhttpsのパスですが、Stageで読み込む際は、
s3://frostyfridaychallenges/challenge_2/employees.parquet
を使うことになります。

 -- STAGEの作成(↑で作成したFILE  FORMATを使って)
 -- https://frostyfridaychallenges.s3.eu-west-1.amazonaws.com/challenge_2/employees.parquet
 create or replace stage week2_ext_stage 
  url = 's3://frostyfridaychallenges/challenge_2/employees.parquet'
  file_format = (FORMAT_NAME = 'week2_parquet')
;

-- STAGE内を確認
list @week2_ext_stage;


読み込むファイルは一つだけですね

ファイルの中身を確認してみましょう

select 
    $1
    , $2
    , $3
    , metadata$filename 
    , metadata$file_row_number
from 
    @week2_ext_stage
;

こちらを実行するとエラーになります

$2, $3を設定したら「PARQUET ファイル形式は、型バリアント、オブジェクトまたは配列の列を1つだけ生成できます。the MATCH_BY_COLUMN_NAME copy option or コピーと変換を使用して、データを個別の列にロードします。」

ってな感じのエラーです

select 
    $1
    , metadata$filename 
    , metadata$file_row_number
from 
    @week2_ext_stage
;


こんな感じで中身が見れます

metadatafilename**, **metadatafile_row_numberはメタデータです。
これは必ず取り込みましょう!とりあえず、取り込んでおきましょう!取り込んでおくと色々必ず役に立ちます!

parquetファイルからカラム抽出を行います。
そこで使うのが、INFER_SCHEMA

https://docs.snowflake.com/ja/sql-reference/functions/infer_schema?utm_source=snowscope&utm_medium=serp&utm_term=INFER_SCHEMA

SELECT *
FROM TABLE(
    INFER_SCHEMA(
      LOCATION=>'@week2_ext_stage'
      , FILE_FORMAT=>'week2_parquet'    
    )
);

を行うと

(一部抜粋)

INFER_SCHEMAとCREATE TABLE USING TEMPLATE を組み合わせてテーブルを生成します

CREATE OR REPLACE TABLE week2_table
  USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
    WITHIN GROUP (ORDER BY order_id)
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@week2_ext_stage',
          FILE_FORMAT=>'week2_parquet'
        )
      ));

desc table week2_table;


こんな感じで生成できます。

ちなみに
OBJECT_CONSTRUCT(*)は、{*}でもかけるようになったので、

CREATE OR REPLACE TABLE week2_table
  USING TEMPLATE (
    SELECT ARRAY_AGG({*})
    WITHIN GROUP (ORDER BY order_id)
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@week2_ext_stage',
          FILE_FORMAT=>'week2_parquet'
        )
      ));

でも全く同じことが実現できます。

データの投入(COPY INTO)

copy into week2_table from @week2_ext_stage ;

ってやると、SQL compilation errorになります

ってことのようです。

なので、

copy into week2_table from @week2_ext_stage MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE';

こちらでデータを投入します。

select
    "employee_id"
    , "dept"
    , "job_title"
from
    week2_table;

このまま、week2_tableにたいして、STREAMを作ると、すべてのカラムの変更を検出してしまします。
今回のチャレンジでは「deptjob_title」だけを変更検出したいので、ビューを作成します。
※今回の肝のテクニック
※これは、自分もわからなかったので、他の方の解法を頂きましたw

create view week2_view as
select
    "employee_id"
    , "dept"
    , "job_title"
from
    week2_table;

STREAMを作る

CREATE OR REPLACE STREAM week2_stream
    ON VIEW week2_view
    ;

streamの中身を確認(この時点では空)します。

select * from week2_stream;

空ですね

テーブルに変更を加える

UPDATE week2_table SET "country" = 'Japan' WHERE "employee_id" = 8;
select * from week2_stream; -- streamにはいらない

UPDATE week2_table SET "last_name" = 'Forester' WHERE "employee_id" = 22;
select * from week2_stream; -- streamにはいらない

UPDATE week2_table SET "dept" = 'Marketing' WHERE "employee_id" = 25;
select * from week2_stream; -- streamにはいる


最後のUPDATEだけが反映されます。

UPDATE week2_table SET "title" = 'Ms' WHERE "employee_id" = 32;
select * from week2_stream; -- streamにはいらない

UPDATE week2_table SET "job_title" = 'Senior Financial Analyst' WHERE "employee_id" = 68;
select * from week2_stream;

STREAMのデータを消す=消費する方法

STREAMのデータを消費するためには、STREAMの中身を使ってテーブルに書き込んだりすると消えます。
ですので

create temporary table week2_tbl_tmp as select * from week2_stream;

このように temporaryテーブルで作ると、Sessionが切れたら消えるのでいい感じで消せます。
他の方法としては

create table target_table.....;
INSERT INTO target_table
SELECT *
FROM week2_stream;
drop table target_table;

とかする方法がありますが、ちょびっとめんどくさい(CREATE TABLEするところなど)

補足 INFER_SCHEMAのオプション「IGNORE_CASE=>TRUE」を使おう

ちなみに、INFER_SCHEMAに IGNORE_CASE = TRUEを入れると、小文字カラムにならない=「"で囲む必要がなくなる」になります

CREATE OR REPLACE TABLE week2_2_table
  USING TEMPLATE (
    SELECT ARRAY_AGG({*})
    WITHIN GROUP (ORDER BY order_id)
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@week2_ext_stage',
          FILE_FORMAT=>'week2_parquet',
          IGNORE_CASE => TRUE
        )
      ));

Snowflake的には、IGNORE_CASE => TRUE をつけたほうがいいかなーと個人的には思います。

解法② INFER_SCHEMAを使わない?方法 いや USING TEMPLATEを使わない方法

とベタ書きをする方法があります。
ただ、parquetファイルのスキーマ?を調べるには、INFER_SCHEMAを使うのが楽・・・ですよね・・・

-- parquet形式を読み込むFILE FORMATの作成
create or replace file format week2_parquet_2 type = 'parquet';

-- 解法①との違いは、URLとderectory
create or replace stage week2_ext_stage_2
  url = 's3://frostyfridaychallenges/challenge_2/'
  file_format = (FORMAT_NAME = 'week2_parquet_2')
  	directory = ( enable = true );
;

list @week2_ext_stage_2;
-- Snowsight でStageをみると directory = ( enable = true ); の違いがよく分かります

SELECT *
FROM TABLE(
    INFER_SCHEMA(
      location =>'@week2_ext_stage_2'
      , file_format => 'week2_parquet_2'
      , files => 'employees.parquet'
      , ignore_case => true
    )
);

あとは、ベタ書き・・・・・結構辛いですね!!!

create or replace table week2_table_2 
(   employee_id number , 
    first_name varchar , 
    last_name varchar , 
    email varchar , 
    street_num number , 
    street_name varchar , 
    city varchar , 
    postcode varchar , 
    country varchar , 
    country_code varchar , 
    time_zone varchar , 
    payroll_iban varchar , 
    dept varchar , 
    job_title varchar , 
    education varchar , 
    title varchar , 
    suffix varchar 
);

copy into week2_table_2
from 
(   select $1:employee_id::number, 
           $1:first_name::varchar, 
           $1:last_name::varchar, 
           $1:email::varchar, 
           $1:street_num::number, 
           $1:street_name::varchar, 
           $1:city::varchar, 
           $1:postcode::varchar, 
           $1:country::varchar, 
           $1:country_code::varchar, 
           $1:time_zone::varchar, 
           $1:payroll_iban::varchar, 
           $1:dept::varchar, 
           $1:job_title::varchar, 
           $1:education::varchar, 
           $1:title::varchar, 
           $1:suffix::varchar
	from '@week2_ext_stage_2/employees.parquet') 
FILE_FORMAT = week2_parquet;

STREAMについて

STREAMとは、Snowflakeにおいて、データの変更(追加・更新・削除)を追跡するためのメカニズムです。CDC(Change Data Capture)の仕組みです。

あるテーブルにデータが変更されたら、変更されたデータに対して何らかの処理をしたい、TASKと組み合わせて、STREAMにデータが有れば処理を動かす・・・みたいなことができます。

主なユースケースとしては

  • リアルタイムデータ同期は、pub/sub的なストリーミング的に同期ができるのでニアリアルで連携できそう。
  • 差分(ETL)処理は、自分では一番使いそうなユースケース
  • 監査ログの作成にも使えそう

どのケースも、TASKと併用することで真価を発揮しそう。

dbtだと、incrementalを使ったりして、増分データについて処理をしたりしますが、そちらより使いやすいなーとは思っています(私見)

STREAMは、SnowflakeだけでCDCを実現して、増分データだけに処理ができてとてもいいのですが、dbtなどDAGでデータパイプラインを管理するようになると使わなくなりましたね。

まとめ

STREAMはSnowflakeにおけるCDCのメカニズムで、とても便利な機能です。

こちらのコードは、

https://github.com/gakut12/Frosty-Friday/blob/main/week002_intermediate_streams/week2.sql

にて公開しています。

ちゅらデータ株式会社

Discussion