❄️

Snowflake Snowpipeを本番導入する前に読むやつ

2023/04/06に公開
2

Snowpipeの導入検討。既に使ってるけど、これでいいんだっけ?を確認したい人向けの記事です。
Snowpipeとは、なんぞや?という人は、公式ドキュメントのAmazon S3用Snowpipeの自動化 を御覧ください。

実運用に入ってから、どのように問題に対処していけば良いか知りたい方は、以下の記事を御覧ください。

https://zenn.dev/dataheroes/articles/snowpipe-data-recovery-guide

概要

筆者は、CARTA HOLDINGSのZucksで構築しているアドテクプラットフォームから生成されるログを、効率的にデータ可視化できるデータ基盤を作っています。
その中で、データロードで大活躍しているのが、Snowpipe(以下pipeとも書きます)です。
Snowpipeは、Snowflakeが提供している機能で、例えば、S3にPutされたログを検知して、Snowflakeにロードすることができる最適なソリューションです。設定し、ロードが始まると、ほとんど手間かからずに動作します。

当たり前ですが、いざ導入検討すると、これどうするん?ってことはいくつかありました。今回は試行錯誤の結果、これでなんとかなるやろっていうプラクティスが見えたので、社内向けドキュメント兼記事にしました。

テーブル定義、パイプ定義

筆者の環境では、ログは基本的にJSON Lines (以下JSONL)形式を採用しています。JSONLの採用理由については、別の機会に紹介できればと思います。
試行錯誤の結果、以下のようなテーブル定義とパイプ定義に落ち着きました。

テーブル定義
create table sample_table (
  raw_data variant,
  _metadata_filename varchar(16777216),
  _metadata_file_row_number number(38,0),
  _load_at timestamp_ntz(9),
  _s3_partition timestamp_ntz(9),
  _partition_daily date,
  _partition_hourly timestamp_ntz(9)
);
カラム 説明 値の例
raw_data ログの内容が丸ごと入っている {"hoge": "fuga"}
_metadata_filename ログが格納されているAmazon S3のパスが入ってくる hoge/2023/03/09/16/hoge.gz
_metadata_file_row_number ログファイルの何行目のレコードかを示す数値が入ってくる 3
_load_at ロードされた時間が入ってくる 2023-03-09T16:29:02.531458322Z
_s3_partition S3のパーティションを示す時間が入ってくる 2023-03-09T16:00:00Z
_partition_daily 日毎のパーティション 2023-03-09
_partition_hourly 時間毎のパーティション 2023-03-09T16:00:00Z
パイプ定義
create pipe sample_table_pipe auto_ingest = true
aws_sns_topic = 'arn:aws:sns:ap-northeast-1:xxx:hoge' as
copy into sample_table
from (
  select
  $1::variant as raw_data
  , metadata$filename::varchar as _metadata_filename
  , metadata$file_row_number::bigint as _metadata_file_row_number
  , metadata$start_scan_time::timestamp_ntz as _load_at
  , to_timestamp(split_part(metadata$filename, '/', 2)
  || '/' || split_part(metadata$filename, '/', 3)
  || '/' || split_part(metadata$filename, '/', 4)
  || '/' || split_part(metadata$filename, '/', 5), 'YYYY/MM/DD/HH24')::timestamp_ntz as _s3_partition
  , time_slice(_s3_partition, 1, 'day')::date as _partition_daily
  , time_slice(_s3_partition, 1, 'hour')::timestamp_ntz as _partition_hourly
  from @sample_table_stage
) file_format = (type = 'json')

RAWデータをそのまま格納する利点

取り込み漏れが起きない

COPY INTOの時点で、JSONをパースして、ロード時点で、カラム化することは、当然できます。
最初の作成時点で、これが問題になることは少ないでしょう。しかし、ログのフィールドは当然ですが、増えたり減ったりするものです。
例えば、フィールドが増えた場合に、よくありがちなのが、ログのフォーマットは変わったけど、pipeはそれを考慮してないので、取り漏らしてましたという問題。また、そもそも、ロジックに考慮漏れがあって正しく入らないこともあるでしょう。
もちろん再度入れ直せばいいわけですが、正直オペレーションがかなり面倒です。とりあえず、全てのデータはテーブルに入ってるので、変換レイヤーに使っているdbt-core(以下dbt)でどうにかすれば良いに倒しています。
経験的に、ETLの失敗は、非常にストレスフルですが、ELTの失敗は、わりと簡単に取り戻しやすいというのもあり、ETL層では極限まで、シンプルに倒すようにしています。

まんま格納したデータを、その後どうしてるかについて、軽く触れておきます。
さすがに、RAW_DATAのまんまだと、後段の変換レイヤーで扱うには、非常に不便です。なので、RAW_DATAをパースするだけのViewが用意しています。

{{
  config(
    materialized='view',
  )
}}

with source as (
  select * from sample_table
),

renamed as (

  select
    convert_timezone('UTC', source.raw_data['request_time']::timestamp) as sample_at,
    source.raw_data['sample_id']::string as sample_id,
    ...

    _metadata_filename as _metadata_filename,
    _metadata_file_row_number as _metadata_file_row_number,
    _s3_partition as _s3_partition,
    _partition_hourly as _partition_hourly,
    _partition_daily as _partition_daily,
    _load_at as _load_at

  from source

)

select * from renamed

pipeの変更機会が減る

ログのフィールドが変わる度に、pipeも変更する運用の場合、pipeは作り直しになります。
個人的にpipeの作り直しは、難しい手順が発生するため、あまりやりたくない仕事です。

参考: Snowpipeの管理: パイプの再作成

プルーニング効かせたいものはカラム化する

ここまで、ログをそのまま格納することによるメリットを書いてきましたが、逆にカラム化しているパーティションカラムについて、触れようと思います。
筆者の環境では、時間毎、日毎で、格納されたdbtを使って加工しています。その際に、処理範囲を決定するのに、_partition_daily_partition_hourlyを使っています。
仮にViewで、クエリするタイミングにパーティションを確定していると、プルーニングが効かず、非常に効率が悪くなります。

例えば、以下のようなクエリで、処理範囲を特定してるとします。

select * from sample
where _partition_hourly = '2023-04-04T10:00'

クエリプロファイルを見ると、プルーニングが効かずに、すべてのパーティションを見てしまってることが分かります。
内部的に起きていることは、Viewで_partition_houly をクエリしたタイミングで、確定しています。

フルスキャンの例

これをSnowpipeでのロード時点で、パーティションカラムを確定させると、プルーニングがしっかり効いてくれます。
※テーブルを作り直したりしたので、データ量が全然違うのですが、ご容赦願います。

プルーニングが効いてる例

参考: dbtとデータパーティショニングで、大量データを扱う

トラブルシューティングのしやすさ

  • _metadata_filename_metadata_file_row_number
    • S3にPutされているログを特定できます。稀にログファイルどうなってるか知りたくなるのですが、そういう時に役立ちます。
    • stageの状態とcopy historyとの比較で、ロード漏れが起きていることも見つけようと思えば見つけることができます。
  • _load_at
    • ログが重複した際に、後勝ちにしたいとかのニーズがあれば、このカラムを使ってdedupすることができます。
    • dbtを使ったtestで、全量テストせずに、直近1時間だけnot_null testする時に使ったりもできます。
  • _s3_partition
    • S3のパスからtimestampへの変換が意図通りに、変換ができてるかどうかの確認に使えます。

参考: Stack overflow: Replaying outstanding snowpipe notifications/messages in Snowflake

Snowpipeに期待するサービスレベル

実際にSnowpipeを使っていくぞ!となると、Snowpipeにどれくらい期待していいんだっけ?を決めておく必要があります。
結論から書くと、「S3にPutされたログが、At least onceで、Snowflakeにロードされる」 です。

そんなの分散システムだったら、当たり前では?と思う人も居るかもしれませんが、大事なことなので、明文化しています。
これはどういうことを許容するかというと、全く同じログが、何度も、例えば100回くらいロードされる可能性を許容するということです。
仮に、ログを作り出すシステムがものすごくて、絶対に重複したログを発生させないとします。それでも、At least onceの期待値で運用するのが、おすすめです。

Snowpipeには、過去どういったデータをロードしたか保持する機能があります。これにより、何度もロードされてしまうことを防止してくれます。
ですが、これに頼った運用するのは、あまりおすすめできません。なぜなら、pipeを作り直したり、何もせずとも一定期間が経過すると消えるからです。
履歴が消えた状態で、alter pipe refreshを使うと、過去は考慮されずにデータが取り込まれます。そのため、重複が発生します。他にも間違いをリカバリするために、refreshを使いたいケースは発生するでしょう。そういった時に、Snowpipeからやってきたデータは、重複している前提の運用ができていれば、履歴があるなしは特に問題ではなくなります。

では、どうやって重複を始めとする品質担保をするか?それは、後段の変換レイヤーで担保することにしています。そもそも、Snowpipeが使うAmazon SNSから、同じイベントが複数回通知が来たりしますし、ログをPutするサーバーのなんらかしらで、複数回Putされることは、どうやっても起きます。なので、Snowpipeに限った話ではなくAt least onceでで運用するのがおすすめです。

参考:
Snowpipe: 履歴データのロード
ALTER PIPE: パイプの更新

監視

Amazon SNS

Snowpipeが使用するAmazon SNS(以下、SNS)を監視したい場合は、以下のメトリクスが役立ちます。GCPやAzureを使用している場合は、対応するメトリクスに変換して読んでください。

メトリクス名 説明
NumberOfMessagesPublished このメトリクスは、特定の期間にSNSトピックにPublishされたメッセージの数を示します。例えば、全くPublishされていない場合、ログがPutされていないか、設定が間違っている可能性があります。
NumberOfNotificationsFailed このメトリクスは、特定の期間にSNSトピックからの通知の送信が失敗した回数を示します。例えば、Snowpipe関連の設定ミスに気づくことができます。

参考:CloudWatchを使用したAmazon SNSのモニタリング

NOTIFICATION INTEGRATION

エラーが検出された場合、Snowpipeが特定のSNSに対してエラーメッセージをPublishしてくれるもので、素早くエラーに気づくことができます。
マルチアカウントの対応ももちろん可能で、一箇所にエラーを集約することもできます。

ちなみに、COPY_HISTORYビューを使えば、丸っとエラーを検知することがありますが、2時間程度のラグがあるので注意してください。

参考:
Amazon SNSのSnowpipeエラー通知の有効化
COPY_HISTORY ビュー

その他

そもそも全くログが取り込めてないことに気づきたい場合は、dbt fresshnessを使うとか、TABLES ビュー の結果を定期的に監視することで、実現可能です。

他にも、Pipeのステータス を監視することで、意図せず止まってることを検知することができます。

結論

テーブル定義や、パイプ定義、監視に関しては、ワークロードによって、多少の違いが出てくると思いますが、基本的なスタンスとしては、分散システムなので、当たり前ですが「At least once」であることを前提に付き合うのがおすすめです。(ここまで、書いて当たり前なことを書いてしまった!w)
もし、他にも良いプラクティスあれば、是非教えて下さい。

Discussion

Yohei OnishiYohei Onishi

監視について言えば、エラーが過去に起きた事実だけでなく、今現在、pipeのステータスは何かが重要なので、うちは定期的にAirflowでステータスを監視しています。意図せずに止まっているやつないかとか。

SYSTEM$PIPE_STATUS( '<pipe_name>' ) でステータスは取れます。

https://docs.snowflake.com/en/sql-reference/functions/system_pipe_status

が、実際には走っているのに、この関数が止まっていると言うケースがあるので、その時は PIPE_EXECUTION_PAUSED パラメタも見て、内部ステータスが本当に停止状態かも見てます。

https://docs.snowflake.com/en/sql-reference/parameters#pipe-execution-paused

この2つのアプローチはおそらく内部的には別々のものを監視しているようで、たまに別の結果を返すことがあり、ややこしいですが、将来的にはこの問題は解消されるようです。

ぺい(pei0804)ぺい(pei0804)

サイレントに止まってるのは、かなり怖いですね。これも確かに監視をしておくと安心そうです。