Snowpipe を Terraform 移行しただけでコスト20%削減した話
背景
Snowflake には COPY INTO / Snowpipe / Snowpipe Streaming など、
複数のデータ取り込み方法が用意されています。しかし、シロクでは長らく
「Task + COPY INTO」 を 1 時間に 2 回実行する方式 だけを採用していました。
この方式はバッチ処理としては十分ですが、高頻度・イベント駆動・低レイテンシを求めるデータ には適していません。
そこで本記事では、これら Snowpipe に適したデータ を Snowpipe へ移行した取り組み を紹介します。
その結果、コストは約 20% 削減 され、リアルタイム性も大きく改善しました。
▼ 比較表
| 観点 | Snowpipe | Snowpipe Streaming | COPY INTO + Task |
|---|---|---|---|
| トリガ | ストレージイベント(S3/SNS・SQS、GCS Pub/Sub、Azure Event Grid) | アプリからSDKで直接挿入(行/マイクロバッチ) | タスクでスケジュール/依存実行 |
| データ経路 | 外部/内部ステージ → COPY INTO
|
SDK → テーブル(ステージ不要) | 外部/内部ステージ → COPY INTO
|
| 典型レイテンシ | 秒〜分 | さらに低遅延(リアルタイム向け) | タスク間隔次第(分〜時) |
| 課金 | サーバーレス(Snowpipe) | Snowpipe Streaming課金 | Warehouse課金 |
| 代表ユースケース | オブジェクトストレージに継続着信するファイル | IoT/イベント流/高頻度少量の即時反映 | 毎時/毎日などのバッチ、複雑な依存管理 |
| 監視/運用要点 |
SYSTEM$PIPE_STATUS / DESC PIPE で状態や通知チャネル確認 |
送信側のバッファ/再送/順序制御 |
TASK_HISTORY や DAG 管理・Warehouse 最適化 |
今回 Snowpipe に移行したデータは、Kinesis から取得しているページのインプレッションなどの高頻度イベントデータ です。
これらのデータは、到着頻度が非常に高く、1 件あたりは小さいものの総量は大きく、できるだけ遅延なく処理したいという特徴があります。
Snowflake が提供する COPY INTO / Snowpipe / Snowpipe Streaming それぞれの方式の特性を比較したうえで、今回のワークロードに最も適していたのが Snowpipe でした。
▼ ディレクトリ構成と構築フロー
まず最初に全体像が分かるよう、Terraform のディレクトリ構成を明示します。
infra/
├─ modules/
│ ├─ snowflake/
│ │ ├─ storage_integration.tf
│ │ ├─ stage.tf
│ │ ├─ pipe.tf
│ │ └─ variables.tf
│ └─ aws/
│ ├─ s3.tf
│ ├─ iam.tf
│ └─ notification.tf
├─ envs/
│ └─ production/
│ ├─ main.tf
│ └─ variables.tf
└─ README.md
▼ Snowpipe の導入フロー(S3 → Snowflake 自動取り込み)
以下では、Amazon S3 にアップロードされたファイルを Snowpipe により自動的に Snowflake に取り込むための基本フローをまとめます。
構成は大きく IAMロール → ストレージ統合 → ステージ作成 → Snowpipe 作成 → S3 イベント連携 の流れとなります。
1. IAMロールの作成(S3 バケットへのアクセス権付与)
Snowflake が S3 上のファイルへアクセスできるように、
S3 にアクセス権限を持つ IAM ロールを作成します。
■ 必要となるポリシー例
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::your-bucket-name",
"arn:aws:s3:::your-bucket-name/*"
]
}
]
}
2. ストレージ統合(Storage Integration)の作成
ストレージ統合は、Snowflake が AWS IAM ロールを用いて
S3 バケットへ安全にアクセスするための Snowflake 側のオブジェクトです。
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '<iam_role>'
STORAGE_ALLOWED_LOCATIONS = ('<protocol>://<bucket>/<path>/', '<protocol>://<bucket>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('<protocol>://<bucket>/<path>/', '<protocol>://<bucket>/<path>/') ]
ここで
STORAGE_AWS_ROLE_ARN = '<iam_role>'
が先作成したroleのawsのarnです。 STORAGE_ALLOWED_LOCATIONSがs3 pathです。
3. IAM ロール側の Trust Policy 設定
DESC INTEGRATION <integration_name>;
STORAGE_AWS_IAM_USER_ARN と STORAGE_AWS_EXTERNAL_ID が返されます。
これらを IAM ロールの「Trust relationships」→「Edit trust policy」 に追加します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<STORAGE_AWS_IAM_USER_ARN>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
}
}
}
]
}
4. STAGE の作成(S3 の実体を Snowflake へ接続)
CREATE OR REPLACE STAGE your_stage_name
URL = 's3://your-bucket/path/'
STORAGE_INTEGRATION = your_integration_name;
Snowflake から S3 の対象ディレクトリへアクセスできるようになります。
5. Terraform で Snowpipe を作成
Snowpipe を Terraform で作成し、自動取り込み(auto_ingest)を有効にします。
resource "snowflake_pipe" "main" {
database = var.snowflake_database.name
schema = var.target_snowflake_schema.name
name = upper("snowflake_pipe_s3_to_table_${var.table_name}")
auto_ingest = true
copy_statement = <<-EOT
COPY INTO ${var.snowflake_database.name}.${var.target_snowflake_schema.name}.${var.table_name}
FROM @${var.snowflake_database.name}.${var.common_snowflake_schema.name}.${var.snowflake_stage.name}${var.s3_directory}
${var.copy_option}
EOT
}
できた実際の画面はこちらです。

6. Snowpipe の notification_channel を S3 に登録
Snowpipe が作成されると、内部的に SQS の notification_channel が生成されます。
まずは Snowflake からそれを取得します。
DESC PIPE <pipe_name>;
返される notification_channel を S3 のイベント通知へ設定します。
※ 本プロジェクトでは Terraform で一括設定しました。
resource "aws_s3_bucket_notification" "kinesis_pipe_notifications" {
bucket = aws_s3_bucket.main.id
dynamic "queue" {
for_each = var.kinesis_pipe_notifications
content {
id = "kinesis-${queue.value.table_name}-notification"
queue_arn = queue.value.notification_channel
events = ["s3:ObjectCreated:*"]
filter_prefix = queue.value.s3_prefix
}
}
queue {
id = "kuroko-inventory-synchronization"
queue_arn = var.kuroko_inventory_sqs_queue_arn
events = ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]
filter_prefix = "snowflake/mart_kuroko_inventory/"
}
depends_on = [
aws_s3_bucket_policy.data_platform_prd_policy,
aws_iam_role_policy.main
]
}
7. 動作イメージ
ファイルが S3 にアップロード
S3 → SQS(Snowpipe Notification Channel)へ通知
Snowpipe が通知を受け取り、該当ファイルをステージから読み取る
Snowflake テーブルへ自動で COPY INTO が実行される
これで S3 → Snowflake の完全自動取り込みパイプラインが完成する。
▼ 監視・運用の最小コマンド
-- パイプの状態確認
SELECT SYSTEM$PIPE_STATUS('DB.SCHEMA.MY_PIPE');
-- Snowpipe の詳細情報
DESC PIPE DB.SCHEMA.MY_PIPE;
-- ステージ上の未処理ファイルを強制取り込み
ALTER PIPE DB.SCHEMA.MY_PIPE REFRESH;
▼ 実際のコスト削減
kinesisなどの膨大なデータのcopy intoから snowpipeにして、コストが約20%くらい下がったことが確認できました。

▼ 参考リンク(公式ドキュメント)
Snowpipe(S3 自動取り込み・イベント連携)
Terraform Provider: snowflake_pipe リソース
Snowpipe のエラーハンドリング
Snowpipe の課金モデル
▼ おわりに
ファイル到着後すぐ取り込みたい → Snowpipe
行レベルで超低遅延が必要 → Snowpipe Streaming
スケジュールや依存制御が大事 → COPY INTO + Task
まずは Snowpipe(S3×SNS) を最小構成で立てて、SYSTEM$PIPE_STATUS を見ながら運用に載せるのが手堅いです。
「N organic」、「FAS」等の化粧品ブランドを展開している株式会社シロクのエンジニアブログです。 ECサイトを中心とした自社サービスの開発・運用を行っています。 sirok.jp/norganic
Discussion