Snowflake CHANGES関数を使った簡易変更チェック
こんにちは! スターフェスティバルの@koonagiです。
突然ですが、皆さんはSnowflakeへのデータロードの確認をどのように行っていますか?
弊社では、Snowflakeの機能を使ってデータロードのチェックを行っています。今回はその方法について記事にしてみました。
要約
- CHANGES関数を使ってテーブルの更新がされているか定期チェック
- 実行はストアド&タスク
背景
弊社ではSnowflakeをデータウェアハウスとして利用しており、Snowflakeに各種データソースから取り込みを実施しています。
今回問題になっていたのは、赤線で囲ったKafka経由でデータの取り込み処理をしている箇所です。
プロデューサー側のDebeziumがうまく動作せず、Kafkaは正常に稼働している(プロデューサー側のエラーもLagも無い)のにデータが連携されないという現象に度々遭遇していました。
処理としては正常に動作しているように見えるのでアラートも上がらず気が付けない、データが更新されていないのでユーザー側から連絡をもらうという状況でした。(申し訳ない
そこで、データが更新されていることを簡単にチェックする仕組みがないか調べたところ、SnowflakeのCHANGES関数を見つけましたので、それを利用することにしました。
CHANGES関数
CHANGES関数では指定した期間のテーブル及びViewの変更を参照することができます。
CHANGES 句を使用すると、明示的なトランザクションオフセットでテーブルストリームを作成しなくても、指定された時間間隔でテーブルまたはビューの変更追跡メタデータをクエリできます。複数のクエリにより、異なるトランザクションの開始と終了の間で変更追跡メタデータを取得できます。
以下が公式ドキュメントのサンプルになるのですが、CHANGES関数が含まれている最後のSQLで更新があったカラムを参照することができます。データの追加や更新がないとレコード数が0件になるので、データが挿入されているかどうかのチェックに使いました。
CREATE OR REPLACE TABLE t1 (
id number(8) NOT NULL,
c1 varchar(255) default NULL
);
-- Enable change tracking on the table.
ALTER TABLE t1 SET CHANGE_TRACKING = TRUE;
-- Initialize a session variable for the current timestamp.
SET ts1 = (SELECT CURRENT_TIMESTAMP());
INSERT INTO t1 (id,c1)
VALUES
(1,'red'),
(2,'blue'),
(3,'green');
DELETE FROM t1 WHERE id = 1;
UPDATE t1 SET c1 = 'purple' WHERE id = 2;
-- Query the change tracking metadata in the table during the interval from $ts1 to the current time.
-- Return the full delta of the changes.
SELECT *
FROM t1
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => $ts1);
+----+--------+-----------------+-------------------+------------------------------------------+
| ID | C1 | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----------------+-------------------+------------------------------------------|
| 2 | purple | INSERT | False | 1614e92e93f86af6348f15af01a85c4229b42907 |
| 3 | green | INSERT | False | 86df000054a4d1dc64d5d74a44c3131c4c046a1f |
+----+--------+-----------------+-------------------+------------------------------------------+
ちなみに注意点としては、CHANGESを利用するテーブルの CHANGE_TRACKING オプションを有効にしておく必要があります。テーブル作成後でも変更可能です。
実装
ストアドを作成して、定期的にタスク実行することでデータ更新のチェックを行いました。
参考までに、実際の設定について下記に残しておきます。
ストアド
- テーブル名と期間をパラメータ化することで、汎用的に利用できるように
- CHANGES関数を実行して、行数が0件の場合に例外処理
CREATE OR REPLACE XXX.XXX.MONITOR_TABLE_CHANGES("TABLE_NAME" VARCHAR(16777216), "OFFSET_HOURS" NUMBER(38,0))
RETURNS VARCHAR(16777216)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
COMMENT='各テーブルのDMLを監視するストアドプロシージャ'
EXECUTE AS OWNER
AS 'from snowflake.snowpark.files import SnowflakeFile
def run(session, table_name, offset_hours):
# オフセットを秒に変換
offset_seconds = offset_hours * 3600
# クエリを実行して行数を取得
sql_query = f"""
SELECT COUNT(*) AS row_count
FROM {table_name}
CHANGES(INFORMATION => DEFAULT)
AT(OFFSET => -{offset_seconds})
"""
result = session.sql(sql_query).collect()
row_count = result[0][''ROW_COUNT'']
# 行数が0件の場合にエラーを発生させる
if row_count == 0:
raise Exception(f''アラート: {table_name} に変更がありません'')
else:
return f''{table_name} に変更があります: {row_count} 行''
';****
タスク
CALL MONITOR_TABLE_CHANGESの引数でチェック対象のテーブル名と、タスク実行から何時間前までの間に更新の有無のチェックをするか指定します。
以下の例だと毎日7時にタスクを実行し、7時間前(0時)からタスク実行時間(7時)までに更新の有無をチェックします。
create or replace task XXX.XXX.MONITOR_TABLE_CHANGES_TASK_TABLE
warehouse= WH
schedule='USING CRON 0 7 * * * Asia/Tokyo'
COMMENT='0時~7時の間にXXXテーブルに変更がなかった場合にアラートを上げる'
error_integration=SNS_ERROR_NOTIFICATION
as CALL MONITOR_TABLE_CHANGES('<テーブル名>', 7);
最後に
SnowflakeのCHANGES関数を利用することで、データの更新状況を簡単に監視することができるようになりました。今回調査するまでCHANGES関数自体を知らなかったのですが、Snowflakeが提供している関数には他にも便利なものがありそうですね。
もし皆さんも同様の課題を抱えている場合は、ぜひこの方法を試してみてください。
ご質問やご意見があれば、ぜひコメントで教えてください!
Discussion