バッチETLにさよなら:RisingWave×Snowflakeで実現するリアルタイム分析パイプライン
Snowflakeウェアハウスにフレッシュで分析可能なデータを届けるのは、時として時間との戦いのように感じられます。信頼性の高い従来型のバッチETLプロセスは、処理に遅延が生じる可能性があり、その結果、インサイトは常に現実より数時間(あるいはそれ以上)遅れてしまいます。もしこのレイテンシを大幅に縮小できたら、しかもすでに変換・整形済みのデータをSnowflakeに直接届けられたとしたら、どうでしょう?
それこそが、RisingWaveの新しいSnowflakeシンクコネクタで実現されることです。
初めての方のために説明すると、RisingWaveは統合型リアルタイムデータプラットフォームであり、イベントストリームデータを到着と同時に取り込み、処理し、変換するよう設計されています。RisingWaveはデータの「行き先」だけでなく、「出どころ」にも非常に柔軟です。Apache KafkaやApache Pulsarといった人気のストリーミングプラットフォームからのデータ取り込みはもちろん、PostgreSQLやMySQLといったトランザクションデータベースからのCDC(変更データキャプチャ)ストリームにも対応。あるいは、アプリケーションやマイクロサービス間のインタラクションから生成されるイベントストリームの取り込みも可能です。RisingWaveは、こうした多様なデータに対して、結合、集約、ウィンドウ処理といったSQLベースの複雑な演算をリアルタイムで行える強力なエンジンなのです。
そして今、Snowflakeシンクコネクタによって、RisingWaveからSnowflakeテーブルへ直接、継続的なデータブリッジを構築することが可能になりました。
つまり、Snowflakeに入るデータと、それをどのように扱うかに、どんな変化が起きるのでしょうか?
-
より早くデータを受け取り、迅速に行動を起こす:
最も即効性のある効果は、データの鮮度です。バッチジョブで一定間隔ごとにデータを収集・変換・ロードするのを待つ代わりに、RisingWaveは処理済みのデータを継続的にストリーミングします。これにより、Snowflakeに届くデータは常に最新で、ダッシュボードやレポートもリアルタイムの状況を反映します。 -
データ前処理の簡素化:
多くの重たい変換処理は、データがSnowflakeに到達する前にRisingWave内で実行されます。RisingWaveは、複雑なデータ変換、エンリッチメント、集約処理をストリーム上で実施するのが得意です。複数のストリームの結合、ランニングトータルの計算、複雑なロジックによるイベントフィルタリングなどがすべてRisingWave内で処理され、Snowflakeに届くデータはすでに整形され、即時利用可能な状態です。これにより、Snowflake内での変換ロジックが大幅に簡素化され、計算コストの削減にもつながる可能性があります。
内部ではどのように機能するのか?
その仕組みはとてもスムーズです。RisingWaveは指定されたデータソースからデータを取り込み、定義されたリアルタイム変換処理を実行し、その処理済みデータをJSON形式でユーザー管理のS3バケットへ書き出します。そこから、SnowflakeのSnowpipeサービスが自動的かつ継続的にそのデータをターゲットのSnowflakeテーブルへロードします。S3に対応したSnowpipeの既存の自動化機能とシームレスに連携するよう設計されています。
Snowpipeを使うなら、なぜRisingWaveも必要なのか?
この疑問はもっともです。最終的にデータはS3に書き出され、SnowpipeがSnowflakeへのロードを担うのであれば、なぜ最初から生データをS3に送ってSnowpipeだけで済ませないのか?
たしかに、SnowpipeはS3などのステージからSnowflakeテーブルへのファイルロードを効率的かつ自動的に行う優れた仕組みです。生データや準構造化データをデータウェアハウスに取り込むには最適な手段です。
しかし、RisingWaveの導入によって得られる決定的な違いは、データがS3に到達する前に何が起きるかという点にあります:
-
リアルタイムかつ複雑な変換処理:
RisingWaveはデータをストリーム中に処理・変換します。これにより、たとえばKafkaからのユーザークリックストリームとCDCストリームからのユーザー属性データをSQLで結合する、複雑な集約処理を行う、詳細なフィルタリングロジックを適用する、データをリッチ化する──といった高度な操作を、すべて出力前に実現可能です。一方、Snowpipe単体では単なるファイルロードしかできず、変換はSnowflake内で行う必要があり、その分レイテンシが増し、別途計算リソースも必要になります。 -
ステートフル処理とマテリアライズドビュー:
RisingWaveは、到着した新しいデータに応じてインクリメンタルに更新されるマテリアライズドビューを維持することができます。これにより、セッション分割やリアルタイムのリーダーボードといった複雑な分析ビューを事前に計算可能です。Snowflakeに取り込まれるのは生データではなく、こうした整形済みかつ即時クエリ可能なビューになります。 -
ソースとの直接統合と初期処理:
RisingWaveはKafka、Pulsar、CDCストリームなどの多様なソースと直接接続可能で、取り込みプロトコルの処理やデシリアライズ、初期フィルタリングやスキーマ検証といった処理も複雑な変換の前に実行可能です。Snowpipeをこれらのソースと直接連携させようとすると、別のシステムやカスタムコードで一旦S3ファイルに落とす必要があります。 -
Snowflake内のデータ整備・レイテンシ削減:
RisingWaveからSnowflakeへはすでに変換・集約され、分析準備が整った状態でデータが届くため、Snowflake内での追加処理は大幅に簡素化または不要になります。これにより、インサイトを得るまでの時間が短縮され、Snowflake上のコンピュートコストも削減できる可能性があります。
はじめに:導入の概要
導入は、いくつかの重要なステップから成ります。まずはS3バケットの詳細情報を用意し、SnowpipeがそのS3ロケーションを監視するように設定する必要があります(Snowflake公式のAmazon S3向けSnowpipe自動化ガイドが参考になります)。
次に、RisingWave内でシンクを作成します。たとえば、すでにストリーミングデータを処理しているマテリアライズドビュー ss_mv
がある場合、以下のように設定します:
CREATE SINK snowflake_sink
FROM ss_mv -- マテリアライズドビューまたはソース
WITH (
connector = 'snowflake',
type = 'append-only', -- アップサート対応も可能!後述参照
s3.bucket_name = 'your-s3-bucket-name',
s3.credentials.access = 'your-aws-access-key',
s3.credentials.secret = 'your-aws-secret-key',
s3.region_name = 'your-s3-bucket-region',
s3.path = 'path/to/data/', -- オプション:バケット内の特定パス
force_append_only = 'true' -- 追記のみが必要な場合
);
更新や削除(アップサート)はどうするのか?
RisingWaveで AS CHANGELOG
を使ってマテリアライズドビューを定義すると、RisingWaveは新規行だけでなく、更新や削除も追跡するようになります。このとき、__op
(操作タイプ:insert, delete, update_before, update_after)や __row_id
(変更の順序付け用)といった特殊な列が付与されます。
CREATE SINK snowflake_sink as WITH sub AS changelog FROM user_behaviors
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp,
changelog_op AS __op,
_changelog_row_id::bigint AS __row_id
FROM
sub WITH (
connector = 'snowflake',
type = 'append-only',
s3.bucket_name = 'EXAMPLE_S3_BUCKET',
s3.credentials.access = 'EXAMPLE_AWS_ACCESS',
s3.credentials.secret = 'EXAMPLE_AWS_SECRET',
s3.region_name = 'EXAMPLE_REGION',
s3.path = 'EXAMPLE_S3_PATH',
);
そしてSnowflake側では、以下のようにDynamic Tableを定義することで、常に最新状態を保つビューを自動的に構築できます:
-- Snowflake内で実行
CREATE OR REPLACE DYNAMIC TABLE current_user_behaviors
TARGET_LAG = '1 minute' -- 鮮度の指定(例:1分以内)
WAREHOUSE = your_snowflake_warehouse
AS
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY primary_key_column ORDER BY __row_id DESC) as rn
FROM your_staging_table_fed_by_snowpipe
)
WHERE rn = 1 AND (__op = 1 OR __op = 3); -- __op=1(Insert)、__op=3(Update_after)
提供状況
Snowflakeシンクコネクタは、RisingWaveのPremium Edition機能のひとつです。RisingWave Cloudでは追加費用なしで利用可能です。セルフホスト環境でも、4コア以下のデプロイメントであれば無料でPremium Editionのすべての機能を使用できます。4コアを超える場合は、ライセンスキーが必要となります。
データからインサイトまでのパイプラインを加速させませんか?
もはや古くなったデータを待つ必要はありません。RisingWaveと新しいSnowflakeシンクコネクタを組み合わせれば、常に最新で継続的に更新され、あらかじめ整形されたデータによって、Snowflakeでの分析がパワーアップします。
-
今すぐRisingWaveを試そう:
-
エキスパートに相談する:
-
コミュニティに参加しよう:
私たちは、あなたがSnowflake上でリアルタイムデータをどのように活用し、新たな可能性を切り開いていくかを楽しみにしています!
Discussion