Snowpro Advance:Architectの振り返り
2023年12月中旬にSnowpro Advance:Architectを受験し、何とか合格しました。
資格の概要や、どうやって勉強したとか、どんな教材を使ったとか等は、他の合格された先輩方が記事として残されているので私は資格勉強中に触れたことのない機能があったので復習したいと思いました。
試してみた機能
- CHANGES
- Snowflake REST API
- INFER_SCHEMA
1. CHANGES
CHANGES句を使用すると、明示的なトランザクションオフセットでテーブルストリームを作成しなくても、指定された時間間隔でテーブルまたはビューの変更追跡メタデータをクエリできます。
複数のクエリにより、異なるトランザクションの開始と終了の間で変更追跡メタデータを取得できます。
資格勉強でCHANGES句を知るまで、Snowflakeでテーブルの変更追跡データを取得する方法Streamしか知らなかったので、試験終わったら絶対使ってみたいと思っていました。
1-2. 使用上の注意
公式の注意事項
- ディレクトリテーブル または 外部テーブル の変更(変更追跡メタデータを使用して解決)をクエリする場合、 CHANGES句はサポートされません。
- 現在、テーブルの変更追跡メタデータが記録される前に、次の 少なくとも1つがtrueである必要があります。
- テーブルで変更追跡が有効になります( ALTER TABLE ... CHANGE_TRACKING = TRUE を使用)。
- テーブルのストリームが作成されます( CREATE STREAM を使用)
- AT | BEFORE 句は必須であり、変更追跡メタデータの現在のオフセットを設定します。
- オプションの END 句は、変更間隔の終了タイムスタンプを設定します。 END 値が指定されていない場合、現在のタイムスタンプが変更間隔の終了として使用されます。
END 句は、変更追跡メタデータをクエリする目的で CHANGES 句と組み合わせる場合にのみ有効であることに注意してください(つまり、他のオブジェクトの履歴データをクエリするためにTime Travelを使用する場合、この句を AT|BEFORE と組み合わせることはできない)。 - TIMESTAMP または OFFSET の値は定数式でなければなりません。
- TIMESTAMP の最小時間解像度はミリ秒です。
- 要求されたデータがTime Travel保持期間(デフォルトは1日)を超えている場合、ステートメントは失敗します。
さらに、要求されたデータがTime Travel保持期間内にあるが、履歴データが利用できない場合(保持期間が延長された場合など)、ステートメントは失敗します。
1-3. ユースケース
取り込んだ差分データの件数や妥当性、データ自体の正しさ・異常値の有無についてチェックをする際に有用みたいです。
確かに差分更新した時に、動作確認として期待するデータだけ追加される事を確認したい時があったな〜って今思いました。
その時これ使ってたらめっちゃ楽だったわ。。。
1-4. いざ検証!
まずはSnowflakeの公式のチュートリアルに沿って、検証してみます。
-- テーブルの作成
> CREATE OR REPLACE TABLE t1 (id number(8) NOT NULL,c1 varchar(255) default NULL);
-- テーブルにCHANGE_TRACKINGを有効にする
> ALTER TABLE t1 SET CHANGE_TRACKING = TRUE;
-- 変数ts1に現在の時刻を代入する
> SET ts1 = (SELECT CURRENT_TIMESTAMP());
> SELECT $ts1;
+-------------------------------+
| $TS1 |
|-------------------------------|
| 2023-12-23 17:06:49.065 -0800 |
+-------------------------------+
1 Row(s) produced. Time Elapsed: 0.310s
-- テーブルにレコードを追加(Insert)する
> INSERT INTO t1 (id,c1)
VALUES
(1,'red'),
(2,'blue'),
(3,'green');
-- idが1のレコードを削除
> DELETE FROM t1 WHERE id = 1;
-- idが2のc1カラムのレコードを'purple'に更新(Update)する
> UPDATE t1 SET c1 = 'purple' WHERE id = 2;
-- $ts1から現在時刻までの間のテーブルの変更追跡メタデータを問い合わせて変更差分を返す。
> SELECT * FROM t1 CHANGES(INFORMATION => DEFAULT) AT(TIMESTAMP => $ts1);
+----+--------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----------------+-------------------+------------------------------------------|
| 2 | purple | INSERT | False | 577a4eef912afe59dba11f04ac336bc846777dde |
| 3 | green | INSERT | False | 03fe43ecb02a2910bf0924b7d13734d398d43cb2 |
+----+--------+-----------------+-------------------+------------------------------------------+
-- APPEND_ONLYにすると追加(Insert)された行のみ返します。
>SELECT * FROM t1 CHANGES(INFORMATION => APPEND_ONLY) AT(TIMESTAMP => $ts1);
+----+-------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+-------+-----------------+-------------------+------------------------------------------|
| 1 | red | INSERT | False | 4dfaad080624f9650aeb456f61bd1a25ca47cb7e |
| 2 | blue | INSERT | False | 577a4eef912afe59dba11f04ac336bc846777dde |
| 3 | green | INSERT | False | 03fe43ecb02a2910bf0924b7d13734d398d43cb2 |
+----+-------+-----------------+-------------------+------------------------------------------+
1-5. ポイント
どういうことかと言うと、続けて以下のクエリを実行します。
-- さらにカラムを追加
> INSERT INTO t1 (id,c1) VALUES (4,'yellow');
> SELECT * FROM t1 CHANGES(INFORMATION => DEFAULT) AT(TIMESTAMP => $ts1);
+----+--------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----------------+-------------------+------------------------------------------|
| 2 | purple | INSERT | False | 577a4eef912afe59dba11f04ac336bc846777dde |
| 3 | green | INSERT | False | 03fe43ecb02a2910bf0924b7d13734d398d43cb2 |
| 4 | yellow | INSERT | False | 7fe78761a0ca47b907a447a442758e514c733e8c |
+----+--------+-----------------+-------------------+------------------------------------------+
-- yellowを削除
> DELETE FROM t1 WHERE id = 4;
-- DEFAULTにしているのに、DELETEが追跡されない。
> SELECT * FROM t1 CHANGES(INFORMATION => DEFAULT) AT(TIMESTAMP => $ts1);
+----+--------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----------------+-------------------+------------------------------------------|
| 2 | purple | INSERT | False | 577a4eef912afe59dba11f04ac336bc846777dde |
| 3 | green | INSERT | False | 03fe43ecb02a2910bf0924b7d13734d398d43cb2 |
+----+--------+-----------------+-------------------+------------------------------------------+
最後のSELECT句でid4を削除した変更が表示されませんでした。
この動作が実質的な効果として、テーブル内の2つのトランザクションポイントの間に挿入されてから削除された行は、デルタで削除されます(つまり、クエリ結果内に返されない)。
にあたります。
最初の検証時に何故DELETE変更が追跡されなかった理由は、CHANGE_TRACKINGを有効にした後に3つのレコードをINSERTしていました。
つまり、CHANGE_TRACKING有効時はなんのレコードも無いため、トラッキング有効後にINSERTしたレコードを削除しても、トラッキング有効時と比較しても差分がないため追跡されないと言うことだと思います。
では、どうすればDELETEやUPDATEも追跡されるのか?
やっていきましょう!
-- 新しくt2テーブルを作成する
> CREATE OR REPLACE TABLE t2 (id number(8) NOT NULL,c1 varchar(255) default NULL);
-- レコードを追加する
> INSERT INTO t2 (id,c1) VALUES (1,'red'),(2,'blue'),(3,'green');
-- レコードを追加したタイミングでCHANGE_TRACKINGを有効にします
> ALTER TABLE t2 SET CHANGE_TRACKING = TRUE;
-- yellowを追加
> INSERT INTO t2 (id,c1) VALUES (4,'yellow');
-- INSERTの変更が追跡されています。
> SELECT * FROM t2 CHANGES(INFORMATION => DEFAULT) AT(TIMESTAMP => '2023-12-23 17:43:49'::timestamp_ltz);
+----+--------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----------------+-------------------+------------------------------------------|
| 4 | yellow | INSERT | False | e8648e277869fb3521bdf20fa7b4b763047d3ec5 |
+----+--------+-----------------+-------------------+------------------------------------------+
-- ts2のid1を削除します。
> DELETE FROM t2 WHERE id = 1;
-- DELETEの変更が追跡されています。
> SELECT * FROM t2 CHANGES(INFORMATION => DEFAULT) AT(TIMESTAMP => '2023-12-23 17:43:49'::timestamp_ltz);
+----+--------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----------------+-------------------+------------------------------------------|
| 4 | yellow | INSERT | False | e8648e277869fb3521bdf20fa7b4b763047d3ec5 |
| 1 | red | DELETE | False | e9c0f12374f8391a58927cc4dc65c29e712bade2 |
+----+--------+-----------------+-------------------+------------------------------------------+
-- 続けてUPDATEの動作を見てみます。
> UPDATE t2 SET c1 = 'purple' WHERE id = 2;
-- UPDATEの追跡はid2のblueがDELETEされ、purpleがINSERTした追跡結果となりました。
> SELECT * FROM t2 CHANGES(INFORMATION => DEFAULT) AT(TIMESTAMP => '2023-12-23 17:43:49'::timestamp_ltz);
+----+--------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----------------+-------------------+------------------------------------------|
| 2 | purple | INSERT | True | a8a1ca2357e3f4468f8772ca0053147f0f65605f |
| 4 | yellow | INSERT | False | e8648e277869fb3521bdf20fa7b4b763047d3ec5 |
| 2 | blue | DELETE | True | a8a1ca2357e3f4468f8772ca0053147f0f65605f |
| 1 | red | DELETE | False | e9c0f12374f8391a58927cc4dc65c29e712bade2 |
+----+--------+-----------------+-------------------+------------------------------------------+
テーブルのレコードも追加した状態でトラッキングを有効にすることで、その後のDELETEやUPDATEもしっかり追跡してくれました。
1-6. その他のオプション
TIMESTAMP
タイムスタンプ型を指定することができます。
ts1みたいに、変数に時刻を設定しなくても、時間指定することが出来ます。
> SELECT * FROM t2 CHANGES(INFORMATION => DEFAULT) AT(TIMESTAMP => '2023-12-23 17:43:49'::timestamp_ltz);
OFFSET
> SELECT * FROM t2 CHANGES(INFORMATION => DEFAULT) AT(OFFSET => -60*20);
STATEMENT
> SELECT * FROM t2 CHANGES(INFORMATION => DEFAULT) AT(STATEMENT => '01b13147-0001-3053-0000-00014ebe4195');
STREAM
STREAM設定してなかったのでクエリは割愛
2. Snowflake REST API
今までREST APIを触ったことがなかったので、この機会にチャレンジしてみたいと思います。
特に以下の制限などは模擬テストにも出ていました。
試験対策としては、それぞれのエンドポイントの利用方法は最低限押さえておくと良いと思います。
- insertFiles
- insertReport
- loadHistoryScan
参考にしたSnowflakeのドキュメント
参考にさせていただいたブログ様
2-1. Snowpipe REST API プロセスフロー
プロセスフローとしては、内部もしくは外部ステージに配置したデータをクライアントアプリケーションから取り込むデータやPipeを使って対象のテーブルにロードさせます。
2-2. 事前準備
今回クライアントはPythonを利用します。
pip3 install snowflake-ingest
Snowflakeのユーザに公開鍵を設定
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
alter user kyamisama set RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9*****************2iAfKhMoV4wIDAQAB';
ステージを作成する(必要な場合)
サポートするステージタイプ
- 名前付き内部(Snowflake)または外部(つまり、Amazon S3、Google Cloud Storage、またはMicrosoft Azure)ステージ
- テーブルステージ
適当に内部ステージを作成する
CREATE STAGE my_int_stage file_format = (type = csv FIELD_delimiter = ',' skip_header = 1);
適当なテーブルを作成
CREATE TABLE region LIKE snowflake_sample_data.tpch_sf1.region;
パイプを作成
CREATE PIPE rest_api_pipe2 AS
COPY INTO region
FROM @my_int_stage
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);
適当なCSVファイルの用意
R_REGIONKEY,R_NAME,R_COMMENT
0,AFRICA,This is the AFRICA
1,AMERICA,This is the AMERICA
2,ASIA,This is the ASIA
3,EUROPE,This is the EUROPE
4,MIDDLE EAST,This is the MIDDLE EAST
内部ステージにCSVをPUT
kyamisama#COMPUTE_WH@DEMO_DB.PUBLIC>PUT file://./region.csv @my_int_stage;
+------------+---------------+-------------+-------------+--------------------+--------------------+----------+---------+
| source | target | source_size | target_size | source_compression | target_compression | status | message |
|------------+---------------+-------------+-------------+--------------------+--------------------+----------+---------|
| region.csv | region.csv.gz | 177 | 144 | NONE | GZIP | UPLOADED | |
+------------+---------------+-------------+-------------+--------------------+--------------------+----------+---------+
1 Row(s) produced. Time Elapsed: 1.810s
Python SDK のサンプルプログラム
Python SDK のサンプルプログラム
from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging
logging.basicConfig(
filename='/tmp/ingest.log',
level=logging.DEBUG)
logger = getLogger(__name__)
# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
return 'XXXXXX'
with open("/home/new_key/rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
get_private_key_passphrase().encode(),
default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format
# List of files in the stage specified in the pipe definition
file_list=['region.csv.gz']
ingest_manager = SimpleIngestManager(account='mt67XXX',
host='mt67XXX.ap-northeast-1.aws.snowflakecomputing.com',
user='XXX',
pipe='demo_db.public.rest_api_pipe2',
private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = []
for file_name in file_list:
staged_file_list.append(StagedFile(file_name, None))
try:
resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
# HTTP error, may need to retry
logger.error(e)
exit(1)
# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')
# Needs to wait for a while to get result in history
while True:
history_resp = ingest_manager.get_history()
if len(history_resp['files']) > 0:
print('Ingest Report:\n')
print(history_resp)
break
else:
# wait for 20 seconds
time.sleep(20)
hour = timedelta(hours=1)
date = datetime.datetime.utcnow() - hour
history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')
print('\nHistory scan report: \n')
print(history_range_resp)
プログラムの実行
root@5de415f4ec39:/home# python3 snowflake-rest.py
History scan report:
{'files': [{'path': 'region.csv.gz', 'stageLocation': 'stages/3b12a2a8-ab53-4af7-b8fe-14e43ea508b3/', 'fileSize': 144, 'timeReceived': '2024-01-20T08:54:55.402Z', 'lastInsertTime': '2024-01-20T08:55:14.472Z', 'rowsInserted': 5, 'rowsParsed': 5, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}], 'startTimeInclusive': '2024-01-20T07:55:16.209Z', 'endTimeExclusive': '2024-01-20T08:55:17.170Z', 'rangeStartTime': '2024-01-20T08:55:14.472Z', 'rangeEndTime': '2024-01-20T08:55:14.472Z', 'pipe': 'demo_db.public.rest_api_pipe2', 'completeResult': 'true'}
------------
0
Ingest Report:
{'pipe': 'DEMO_DB.PUBLIC.REST_API_PIPE2', 'completeResult': True, 'nextBeginMark': '1_0', 'files': [{'path': 'region.csv.gz', 'stageLocation': 'stages/3b12a2a8-ab53-4af7-b8fe-14e43ea508b3/', 'fileSize': 144, 'timeReceived': '2024-01-20T08:54:55.402Z', 'lastInsertTime': '2024-01-20T08:55:14.792Z', 'rowsInserted': 5, 'rowsParsed': 5, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}], 'statistics': {'activeFilesCount': 0}}
データがロードされたことの確認
無事データが取り込まれたことを確認出来ました。
3. INFER_SCHEMA
※REST APIの環境を流用して簡単に試してみます。
INFER_SCHEMAは、半構造化データを含む一連のステージングデータファイル内のファイルメタデータスキーマを自動的に検出し、列定義を取得します。
さらにINFER_SCHEMAの結果を基にテーブルを作成することもできます。
試験対策としては、INFER_SCHEMAを使ったテーブル作成のクエリのオプションなどを押さえておくと良いと思います。
3-1. FILE_FORMATの作成
ポイント
PARSE_HEADER=Trueを設定することで、カラム名を引き継いでくれる。
これが無いと、以下のように「C」から始まる連番が設定されちゃいます。
ちなみにこの設定をする場合SKIP_HEADERオプションが使えないです。
+----+----+----+
| c1 | c2 | c3 |
|----+----+----|
+----+----+----+
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = csv
FIELD_DELIMITER = ',' PARSE_HEADER=True;
3-2. INFRE_SCHEMAを使ってステージのデータの列定義を検出
SELECT *
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@my_int_stage/region.csv.gz'
, FILE_FORMAT=>'my_csv_format'
)
);
+-------------+--------------+----------+------------------+---------------+----------+
| COLUMN_NAME | TYPE | NULLABLE | EXPRESSION | FILENAMES | ORDER_ID |
|-------------+--------------+----------+------------------+---------------+----------|
| R_REGIONKEY | NUMBER(1, 0) | True | $1::NUMBER(1, 0) | region.csv.gz | 0 |
| R_NAME | TEXT | True | $2::TEXT | region.csv.gz | 1 |
| R_COMMENT | TEXT | True | $3::TEXT | region.csv.gz | 2 |
+-------------+--------------+----------+------------------+---------------+----------+
3 Row(s) produced. Time Elapsed: 0.913s
3-3. INFRE_SCHEMAの結果をテーブルとして作成する
create or replace table region_inf
using template (
select array_agg(object_construct(*))
from TABLE(
INFER_SCHEMA(
LOCATION=>'@my_int_stage/region.csv.gz'
, FILE_FORMAT=>'my_csv_format'
)
)
);
作成したテーブルを確認してみると
kyamisama#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * FROM REGION_INF;
+-------------+--------+-----------+
| R_REGIONKEY | R_NAME | R_COMMENT |
|-------------+--------+-----------|
+-------------+--------+-----------+
0 Row(s) produced. Time Elapsed: 0.756s
3-4. 内部テーブルのデータをREGION_INFテーブルにロードする
kyamisama#COMPUTE_WH@DEMO_DB.PUBLIC>copy into region_inf from @my_int_stage/region.csv.gz;
+----------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
| file | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
|----------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
| my_int_stage/region.csv.gz | LOADED | 5 | 5 | 1 | 0 | NULL | NULL | NULL | NULL |
+----------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
1 Row(s) produced. Time Elapsed: 1.394s
kyamisama#COMPUTE_WH@DEMO_DB.PUBLIC>
REGION_INFテーブルの確認
kyamisama#COMPUTE_WH@DEMO_DB.PUBLIC>SELECT * FROM REGION_INF;
+-------------+-------------+-------------------------+
| R_REGIONKEY | R_NAME | R_COMMENT |
|-------------+-------------+-------------------------|
| 0 | AFRICA | This is the AFRICA |
| 1 | AMERICA | This is the AMERICA |
| 2 | ASIA | This is the ASIA |
| 3 | EUROPE | This is the EUROPE |
| 4 | MIDDLE EAST | This is the MIDDLE EAST |
+-------------+-------------+-------------------------+
5 Row(s) produced. Time Elapsed: 0.300s
kyamisama#COMPUTE_WH@DEMO_DB.PUBLIC>
Discussion