🔄

Snowpipeで起きる問題検知からリカバリーまでの実践ガイド

2024/12/13に公開

Snowpipeは、Snowflakeが提供する強力なデータ取り込みツールです。
本記事では、Snowpipeの核となる機能であるデータを確実にSnowflakeに届けるという役目が全うできない状態に陥った時に、どのようにリカバリーするかを解説します。

導入前の設計段階での考慮事項については、以下の記事をご参照ください

https://zenn.dev/pei0804/articles/snowflake-snowpipe-production-ready

1. 全体像の理解

Snowpipeは、ファイルが準備でき次第、小規模なバッチでデータをロード(マイクロバッチロード)する仕組みを提供します。

この仕組みは非常に便利ですが、内部の動作は少々複雑です。
運用中に問題が発生した際、全体感を理解していることが、トラブルシューティングの大きな助けとなります。

シーケンス図で理解するSnowpipe

以下は、Amazon S3を使用した場合のSnowpipeの基本的な処理フローです。
※Snowpipe本体の内部実装の詳細は利用者からは見えないため、一部推測を含みます。

2. Snowpipeで発生する問題のパターン

Snowpipeで発生する問題は、以下の4つのカテゴリーに分類されます。これらすべてのパターンを完全に理解することは困難ですが、どういう問題が発生するかを大まかに把握しておくことが重要です。

  1. データ

    • データフォーマットやカラムの不整合
    • 必須項目の欠落
    • データ型の不一致
  2. パイプ定義

    • COPY INTOの誤り
    • ステージ設定の変更による問題
    • カラムマッピングの誤り
  3. インフラ/権限

    • AWSなどのインフラ障害
    • SNSトピックのサブスクリプション削除
    • ストレージ統合の認証問題
  4. メタデータ

問題に対する現実的なアプローチ

まず前提として、Snowpipeで起きる問題に対しては、完璧な解決を目指すのではなく、問題が起きたとしても耐えれるシステムの耐障害性を高める方が効果的です。

そして、Snowpipeに期待することを 「1回以上確実にデータが届けられる」 に限定することを推奨します。
加えて、できるだけCOPY INTOやSnowpipeのメタデータに依存しない(重複ロード制御)運用を推奨します。

3. 問題の検知

何よりもまず、問題を検知できる体制を整えることが重要です。
気付けない問題に対しては、どれだけ準備をしても意味がないためです。

エラー通知

Snowpipeのエラー通知は必須の設定です。この仕組みが最も早く問題を検知できます。

通知を受ける方法として簡単なのは、Amazon SNSのサブスクリプションに開発メンバーのメーリングリストを設定する方法です。

また、Slackを使用している場合は、メールアドレス機能で発行したメールアドレスをサブスクリプションに設定することで、チャンネル通知を簡単に実現できます。

データパイプラインの鮮度テスト

Snowpipeでエラーが発生していなくても、実際には問題が起きているケースがあります。
例えば、データソース側で何らかの問題が発生し、オブジェクト作成通知イベントがSnowpipeまで届いていないような場合です。

そこで重要となるのが、鮮度テストです。dbtを使用している場合は、Snowpipeによって作成されるすべてのsourceにfreshness testを設定することを推奨します。

なお、Snowpipeに限らず、基本的にすべてのデータソースに、鮮度テストを設定することをお勧めします。
これは、後段のデータ変換プロセスで問題が発生した際に、鮮度テストがない状態だと、まずデータが正しく到着しているかを確認する必要が出てきます。これによってトラブルシューティングが難しくなります。

※Snowflakeのみで同等の機能を実現したい場合は、FRESHNESS (system data metric function)が利用可能です。

鮮度テストで意識しておくべきこととして、設定された時間になるまで問題を検知できないため、鮮度テストの設定時間が長いほど、問題の発見が遅れる可能性があることに注意が必要です。

設定値の監視

特定の設定のSnowpipeを検知したい場合もあります。
例えば、AUTO_INGESTがfalseになっているSnowpipeの検出です。

Snowflakeのアラート機能 を使用すれば簡単に監視できます。すべてのSnowpipeが継続的にロードを行う状態を前提とする場合、AUTO_INGESTがtrueであることを監視するのも有効な方法です。

-- AUTO_INGESTがfalseなSnowpipeの一覧を取得
-- 遅延があることに注意
SELECT *
FROM SNOWFLAKE.ACCOUNT_USAGE.PIPES
WHERE IS_AUTOINGEST_ENABLED = 'NO'

Snowflake、AWSのステータス監視

SnowflakeのステータスページAmazon Health Dashboardからの通知を受け取る仕組みを整えることは重要です。

データもSnowpipeの設定も問題ないのに何かがおかしい場合、クラウド側の問題を疑いましょう。

実例として、STSで障害の時は、Snowpipeまでオブジェクト作成イベントが届かず、データをロードできない状況が発生しました。

この時もそうでしたが、ステータスページはリアルタイムに問題が通知されるわけではないため、過信は禁物です。

ステータスページには反映されていない問題が疑われる場合は、X(旧Twitter)等のSNSで状況を確認することも有効な手段となります。(結構有効です)

4. 問題に対する初動

問題の原因究明において、最初に確認すべき点は以下の2つです。

  1. データ基盤に変更が加えられていないか
  2. データに想定外の変更が発生していないか

データ基盤は、変更がなくても障害が発生することがあります。これは多くの場合、データが想定と異なる状態になっているためです。

どちらのパターンかによって、その後の対応が変わってくるため、まずここを確認します。

稀にどちらでもないケースがありますが、その場合は利用しているクラウドの障害が疑われます。

パターン: 基盤の変更

問題が発生する直前に変更があったことが確認できた場合、まずSnowpipeの状態を確認します。原因が判明したら、その箇所を修正していきます。

Snowflakeの環境をTerraform + gitで管理している場合、コードで変更前後の差分を確認することで、簡単に前後の違いを確認できるはずです。

以下、私がよく確認する項目を紹介します。

パイプの状態確認

SYSTEM$PIPE_STATUSを使用してSnowpipeの状態を確認します。

SELECT SYSTEM$PIPE_STATUS('<pipe_name>');

確認ポイント:

  • executionState
    • 現在の状態を確認します
  • error
    • エラー情報が記録されている場合、重大な問題が発生している可能性があります
  • lastForwardedFilePath
    • タイムスタンプが古い場合、Snowpipeまでイベントが到達していない可能性があります

ステージ設定の確認

DESCRIBE STAGEを使用して設定を確認します。

DESC STAGE my_stage;

主な確認ポイント:

  • STAGE_LOCATION
  • STAGE_FILE_FORMAT
  • STORAGE_INTEGRATION

ステージに問題がありそうな場合は、実際にステージを経由してクエリを実行してみることで問題が特定できることがあります。

SELECT * FROM @my_stage/clicks/2024/01/02/10/

パターン: データの破損

基盤に特に変更がないにもかかわらず問題が発生しているケースです。

経験上、大多数の問題はこのパターンに該当します。
これは、Snowpipeの設定は一度行うと頻繁に変更するものではないため、多くの場合、データ側に問題が発生しているためです。

COPY_HISTORYによる失敗の確認

データが破損しているパターンの場合、COPY_HISTORYに原因が記録されています。
問題の発生時間と場所を特定します。再現性がある場合は、データソースのロジックに問題があるため、データソース側の修正が必要になります。この場合、データオーナーへの早急な連絡が推奨されます。

前は確認が少し面倒でしたが、Snowsightから簡単に確認できるようになりました。

パターン: クラウドの障害

基本的にはクラウド側のリカバリーを待つしかありませんが、待つだけではなく、リカバリー後の対応を事前に検討しておくことが重要です。

参考として、STSの障害時には、S3からSNSへのメッセージが全く送信されない状態になりましたが、障害から復旧後にメッセージが一斉に送信されたので、自動的にリカバリーできました。

でも、実際どうなるかは分からないので、仮に全てのメッセージが消失するならどうする?を考えるのは重要です。

蛇足ですが、STS、SQS、SNSなどは、AWSの基幹システムとも言えるサービスのため、これらで障害が発生すると、影響範囲の予測が困難です。

5. データをリカバリーする

データのリカバリー方法は、以下の3つの方法があります。

  1. PIPE REFRESH
  2. COPY INTO
  3. 再アップロード

それぞれの方法には適用条件と制限があるため、状況に応じて適切な方法を選択する必要があります。

リカバリー方法の選択基準

以下のフローチャートに従って、最適なリカバリー方法を選択してください。
※厳密にはステージの状態に関する問いも用意すべきですが、そこまで考えると大変なのでこのくらいに留めるのが良いです。

REFRESHによるリカバリー

PIPE REFRESH

REFRESHはオペレーションの工程的に、最も簡潔なリカバリー方法ですが、以下の条件をすべて満たす必要があります。

  • 外部ステージにステージング(≒S3にアップロード)されて7日以内のファイル
  • パイプが再作成されていない
  • パイプが停止している場合、停止から14日以内
-- 基本的な実行方法
ALTER PIPE <pipe_name> REFRESH;

-- 特定のプレフィックスのみ処理する場合
ALTER PIPE <pipe_name> REFRESH PREFIX = '<path/to/files/>';

REFRESHの注意点

  • パイプの再作成後はREFRESH機能が有効に機能しない場合があります
    • REFRESHの動作には、Snowpipeごとに管理されているメタデータが依存しているため
      Snowpipeのメタデータの状態が不明確な場合は、REFRESH機能の使用を避けることを推奨します
  • パイプが14日以上停止している場合は、SYSTEM$PIPE_FORCE_RESUMEを実行する必要があります
    • 上記の方法で再開された場合、Snowpipeはこれらの古い通知をベストエフォートベースで処理します
      • 完全なリカバリーを期待することは避けるべきです

COPY INTOによるリカバリー

COPY INTO <テーブル>

REFRESH機能が使用できない、または正常に動作する確信が持てない場合は、COPY INTOを使用します。

以下の場合に特に有効です:

  • 外部ステージにステージング(≒S3にアップロード)されて7日以上経過したファイル
  • パイプが再作成された場合
  • 何らかの理由で、REFRESH機能が正常に動作しない可能性がある場合

COPY INTOの注意点

Snowpipeで定義されているCOPY INTO定義をそのまま使用することを推奨します。
不適切なCOPY INTOコマンドを実行すると、データの整合性に深刻な影響を及ぼす可能性があります。

-- `definition`カラムのCOPY INTOを使用します
DESC PIPE <pipe_name>;

複数回の実行が必要な場合は、FORCEオプションを使用する必要があります。

COPY INTO load1 FROM @%load1/data1/
    FILES=('test1.csv', 'test2.csv')
    FORCE=TRUE;

再アップロードによるリカバリー

データ自体に問題がある場合(不正なファイル形式、カラム不整合など)は、データを修正して再アップロードする必要があります。
このケースの場合、Snowpipeは一度ロードを試みた結果失敗していることが多いので、REFRESHだとロードしてくれません。

重要な注意点

  • Snowpipeは同名ファイルの再アップロードでは取り込まれないため、ファイル名を変更する必要があります
    • 問題管理用のサフィックスを付加することで、追跡が容易になります(例:-issue-1234-retry.gz
  • 元のパーティション構造を維持してアップロードすることが重要です
    • 例:s3://clicks/2024/11/24 のデータは、同一のs3://clicks/2024/11/24パーティションパスに再アップロードしましょう
      • ほとんどの場合、後段のデータパイプラインはパーテーションを軸に処理をしているはずです。

重複データを許容するデータパイプラインの設計

ここまでの説明から、リカバリー作業によってデータが重複してロードされる可能性があることがお分かりいただけたと思います。

実際の運用においては、完璧な重複排除を目指すよりも、まずはデータの確実な取り込みを優先することを推奨します。

問題が限定的な場合はいいですが、同時多発的に発生した場合、状況の把握が極めて困難になります。
また、問題発生から時間が経過するほど、状況は複雑化します。そのため、まずはSnowflakeへのデータ取り込みを最優先とするリカバリー戦略を採用することが賢明です。

そのためには、重複データの発生を許容する設計が必要となります。データのロード部分で重複を完全に防ごうとすると、リカバリー手順は著しく複雑化します。

では、重複したデータはどのように対処すべきでしょうか。
後段のデータパイプラインで重複排除処理を実装することで、この問題は解決可能です。

そもそも分散システムにおいて、重複イベントの発生は避けられないものです。これを前提としたシステム設計を行うことで、データのリカバリー戦略も簡素化することができます。

6. リカバリー確認

リカバリー作業実施後は、データが正しく取り込まれたことを確認する必要があります。

SnowpipeからSnowflakeのテーブルにデータを取り込む際、metadata$filenameをテーブルのカラムに含めることで、オブジェクトが想定通り取り込まれているか確認することができます。

確認方法は、ステージのファイル名一覧とロード先テーブルに格納されているファイル名一覧を照合することで、すべてのファイルが正しくロードされたか確認できます。

ただし、データ量が相当数ある場合は、絞り込みが必要となります。

WITH source_files AS (
   SELECT
    metadata$filename AS filename
  FROM @my_stage
),
loaded_files AS (
  SELECT 
    _metadata_filename AS filename
  FROM my_table
)
SELECT 
  s.filename,
  'not loaded' AS status
FROM source_files s
LEFT JOIN loaded_files l 
  ON s.filename = l.filename
WHERE l.filename IS NULL;

7. 後段処理のリカバリー

データのリカバリーは、問題解決の第一段階に過ぎません。後段には、そのデータに紐づいた業務処理が存在するはずです。それらもリカバリーしていく必要があります。

データの問題は、リカバリーに時間がかかればかかるほど、どんどん問題が複雑化するので、早くこの段階にたどり着くことが重要です。

まとめ

本記事では、Snowpipeの運用において発生する問題への対処方法について解説してきました。
重要なポイントは以下の通りです。

  • Snowpipe本来の仕事であるデータを届けるだけに注力させる
  • 問題が起きた時に、多角的な視点で気付けるようにしておく

Snowpipeとうまく付き合うことで、データのロードを圧倒的に簡易になります。
ですが、実際に運用が始まってから起きる問題から、どのようにリカバリーするのかは深く解説されていなかったので、特大ボリュームでまとめてみました。

参考になれば幸いです。

Snowflake Data Heroes

Discussion