🔖

Snowpipe を Terraform 移行しただけでコスト20%削減した話

に公開

背景

Snowflake には COPY INTO / Snowpipe / Snowpipe Streaming など、
複数のデータ取り込み方法が用意されています。しかし、シロクでは長らく
「Task + COPY INTO」 を 1 時間に 2 回実行する方式 だけを採用していました。
この方式はバッチ処理としては十分ですが、高頻度・イベント駆動・低レイテンシを求めるデータ には適していません。

そこで本記事では、これら Snowpipe に適したデータ を Snowpipe へ移行した取り組み を紹介します。

その結果、コストは約 20% 削減 され、リアルタイム性も大きく改善しました。


▼ 比較表

観点 Snowpipe Snowpipe Streaming COPY INTO + Task
トリガ ストレージイベント(S3/SNS・SQS、GCS Pub/Sub、Azure Event Grid) アプリからSDKで直接挿入(行/マイクロバッチ) タスクでスケジュール/依存実行
データ経路 外部/内部ステージ → COPY INTO SDK → テーブル(ステージ不要) 外部/内部ステージ → COPY INTO
典型レイテンシ 秒〜分 さらに低遅延(リアルタイム向け) タスク間隔次第(分〜時)
課金 サーバーレス(Snowpipe) Snowpipe Streaming課金 Warehouse課金
代表ユースケース オブジェクトストレージに継続着信するファイル IoT/イベント流/高頻度少量の即時反映 毎時/毎日などのバッチ、複雑な依存管理
監視/運用要点 SYSTEM$PIPE_STATUS / DESC PIPE で状態や通知チャネル確認 送信側のバッファ/再送/順序制御 TASK_HISTORY や DAG 管理・Warehouse 最適化

今回 Snowpipe に移行したデータは、Kinesis から取得しているページのインプレッションなどの高頻度イベントデータ です。

これらのデータは、到着頻度が非常に高く、1 件あたりは小さいものの総量は大きく、できるだけ遅延なく処理したいという特徴があります。

Snowflake が提供する COPY INTO / Snowpipe / Snowpipe Streaming それぞれの方式の特性を比較したうえで、今回のワークロードに最も適していたのが Snowpipe でした。

▼ ディレクトリ構成と構築フロー

まず最初に全体像が分かるよう、Terraform のディレクトリ構成を明示します。

infra/
  ├─ modules/
  │    ├─ snowflake/
  │    │     ├─ storage_integration.tf
  │    │     ├─ stage.tf
  │    │     ├─ pipe.tf
  │    │     └─ variables.tf
  │    └─ aws/
  │          ├─ s3.tf
  │          ├─ iam.tf
  │          └─ notification.tf
  ├─ envs/
  │    └─ production/
  │          ├─ main.tf
  │          └─ variables.tf
  └─ README.md

▼ Snowpipe の導入フロー(S3 → Snowflake 自動取り込み)

以下では、Amazon S3 にアップロードされたファイルを Snowpipe により自動的に Snowflake に取り込むための基本フローをまとめます。
構成は大きく IAMロール → ストレージ統合 → ステージ作成 → Snowpipe 作成 → S3 イベント連携 の流れとなります。

1. IAMロールの作成(S3 バケットへのアクセス権付与)

Snowflake が S3 上のファイルへアクセスできるように、
S3 にアクセス権限を持つ IAM ロールを作成します。

■ 必要となるポリシー例

{
 "Version": "2012-10-17",
 "Statement": [
   {
     "Effect": "Allow",
     "Action": [
       "s3:GetObject",
       "s3:ListBucket"
     ],
     "Resource": [
       "arn:aws:s3:::your-bucket-name",
       "arn:aws:s3:::your-bucket-name/*"
     ]
   }
 ]
}

2. ストレージ統合(Storage Integration)の作成

ストレージ統合は、Snowflake が AWS IAM ロールを用いて
S3 バケットへ安全にアクセスするための Snowflake 側のオブジェクトです。

CREATE STORAGE INTEGRATION <integration_name>
 TYPE = EXTERNAL_STAGE
 STORAGE_PROVIDER = 'S3'
 ENABLED = TRUE
 STORAGE_AWS_ROLE_ARN = '<iam_role>'
 STORAGE_ALLOWED_LOCATIONS = ('<protocol>://<bucket>/<path>/', '<protocol>://<bucket>/<path>/')
 [ STORAGE_BLOCKED_LOCATIONS = ('<protocol>://<bucket>/<path>/', '<protocol>://<bucket>/<path>/') ]

ここで

 STORAGE_AWS_ROLE_ARN = '<iam_role>'

が先作成したroleのawsのarnです。 STORAGE_ALLOWED_LOCATIONSがs3 pathです。

3. IAM ロール側の Trust Policy 設定

DESC INTEGRATION <integration_name>;

STORAGE_AWS_IAM_USER_ARN と STORAGE_AWS_EXTERNAL_ID が返されます。

これらを IAM ロールの「Trust relationships」→「Edit trust policy」 に追加します。

{
 "Version": "2012-10-17",
 "Statement": [
   {
     "Sid": "",
     "Effect": "Allow",
     "Principal": {
       "AWS": "<STORAGE_AWS_IAM_USER_ARN>"
     },
     "Action": "sts:AssumeRole",
     "Condition": {
       "StringEquals": {
         "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
       }
     }
   }
 ]
}

4. STAGE の作成(S3 の実体を Snowflake へ接続)

CREATE OR REPLACE STAGE your_stage_name
 URL = 's3://your-bucket/path/'
 STORAGE_INTEGRATION = your_integration_name;

Snowflake から S3 の対象ディレクトリへアクセスできるようになります。

5. Terraform で Snowpipe を作成

Snowpipe を Terraform で作成し、自動取り込み(auto_ingest)を有効にします。

resource "snowflake_pipe" "main" {
  database    = var.snowflake_database.name
  schema      = var.target_snowflake_schema.name
  name        = upper("snowflake_pipe_s3_to_table_${var.table_name}")
  auto_ingest = true

  copy_statement = <<-EOT
    COPY INTO ${var.snowflake_database.name}.${var.target_snowflake_schema.name}.${var.table_name}
    FROM @${var.snowflake_database.name}.${var.common_snowflake_schema.name}.${var.snowflake_stage.name}${var.s3_directory}
    ${var.copy_option}
  EOT
}

できた実際の画面はこちらです。

6. Snowpipe の notification_channel を S3 に登録

Snowpipe が作成されると、内部的に SQS の notification_channel が生成されます。
まずは Snowflake からそれを取得します。

   DESC PIPE <pipe_name>;

返される notification_channel を S3 のイベント通知へ設定します。

※ 本プロジェクトでは Terraform で一括設定しました。

resource "aws_s3_bucket_notification" "kinesis_pipe_notifications" {
bucket = aws_s3_bucket.main.id

dynamic "queue" {
 for_each = var.kinesis_pipe_notifications
 content {
   id            = "kinesis-${queue.value.table_name}-notification"
   queue_arn     = queue.value.notification_channel
   events        = ["s3:ObjectCreated:*"]
   filter_prefix = queue.value.s3_prefix
 }
}

queue {
 id            = "kuroko-inventory-synchronization"
 queue_arn     = var.kuroko_inventory_sqs_queue_arn
 events        = ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]
 filter_prefix = "snowflake/mart_kuroko_inventory/"
}

depends_on = [
 aws_s3_bucket_policy.data_platform_prd_policy,
 aws_iam_role_policy.main
]
}

7. 動作イメージ

ファイルが S3 にアップロード

S3 → SQS(Snowpipe Notification Channel)へ通知

Snowpipe が通知を受け取り、該当ファイルをステージから読み取る

Snowflake テーブルへ自動で COPY INTO が実行される

これで S3 → Snowflake の完全自動取り込みパイプラインが完成する。

▼ 監視・運用の最小コマンド

-- パイプの状態確認
SELECT SYSTEM$PIPE_STATUS('DB.SCHEMA.MY_PIPE');

-- Snowpipe の詳細情報
DESC PIPE DB.SCHEMA.MY_PIPE;

-- ステージ上の未処理ファイルを強制取り込み
ALTER PIPE DB.SCHEMA.MY_PIPE REFRESH;

▼ 実際のコスト削減

kinesisなどの膨大なデータのcopy intoから snowpipeにして、コストが約20%くらい下がったことが確認できました。

▼ 参考リンク(公式ドキュメント)

Snowpipe(S3 自動取り込み・イベント連携)
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3

Terraform Provider: snowflake_pipe リソース
https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/pipe

Snowpipe のエラーハンドリング
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-errors

Snowpipe の課金モデル
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-billing

▼ おわりに

ファイル到着後すぐ取り込みたい → Snowpipe

行レベルで超低遅延が必要 → Snowpipe Streaming

スケジュールや依存制御が大事 → COPY INTO + Task

まずは Snowpipe(S3×SNS) を最小構成で立てて、SYSTEM$PIPE_STATUS を見ながら運用に載せるのが手堅いです。

シロク エンジニアブログ

Discussion