🐥

バッチETLにさよなら:RisingWave×Snowflakeで実現するリアルタイム分析パイプライン

に公開

Fresh Data, Faster: Stream Directly from RisingWave to Snowflake.png

Snowflakeウェアハウスにフレッシュで分析可能なデータを届けるのは、時として時間との戦いのように感じられます。信頼性の高い従来型のバッチETLプロセスは、処理に遅延が生じる可能性があり、その結果、インサイトは常に現実より数時間(あるいはそれ以上)遅れてしまいます。もしこのレイテンシを大幅に縮小できたら、しかもすでに変換・整形済みのデータをSnowflakeに直接届けられたとしたら、どうでしょう?

それこそが、RisingWaveの新しいSnowflakeシンクコネクタで実現されることです。

初めての方のために説明すると、RisingWaveは統合型リアルタイムデータプラットフォームであり、イベントストリームデータを到着と同時に取り込み、処理し、変換するよう設計されています。RisingWaveはデータの「行き先」だけでなく、「出どころ」にも非常に柔軟です。Apache KafkaやApache Pulsarといった人気のストリーミングプラットフォームからのデータ取り込みはもちろん、PostgreSQLやMySQLといったトランザクションデータベースからのCDC(変更データキャプチャ)ストリームにも対応。あるいは、アプリケーションやマイクロサービス間のインタラクションから生成されるイベントストリームの取り込みも可能です。RisingWaveは、こうした多様なデータに対して、結合、集約、ウィンドウ処理といったSQLベースの複雑な演算をリアルタイムで行える強力なエンジンなのです。

そして今、Snowflakeシンクコネクタによって、RisingWaveからSnowflakeテーブルへ直接、継続的なデータブリッジを構築することが可能になりました。
つまり、Snowflakeに入るデータと、それをどのように扱うかに、どんな変化が起きるのでしょうか?

  • より早くデータを受け取り、迅速に行動を起こす:
    最も即効性のある効果は、データの鮮度です。バッチジョブで一定間隔ごとにデータを収集・変換・ロードするのを待つ代わりに、RisingWaveは処理済みのデータを継続的にストリーミングします。これにより、Snowflakeに届くデータは常に最新で、ダッシュボードやレポートもリアルタイムの状況を反映します。

  • データ前処理の簡素化:
    多くの重たい変換処理は、データがSnowflakeに到達するにRisingWave内で実行されます。RisingWaveは、複雑なデータ変換、エンリッチメント、集約処理をストリーム上で実施するのが得意です。複数のストリームの結合、ランニングトータルの計算、複雑なロジックによるイベントフィルタリングなどがすべてRisingWave内で処理され、Snowflakeに届くデータはすでに整形され、即時利用可能な状態です。これにより、Snowflake内での変換ロジックが大幅に簡素化され、計算コストの削減にもつながる可能性があります。

内部ではどのように機能するのか?

その仕組みはとてもスムーズです。RisingWaveは指定されたデータソースからデータを取り込み、定義されたリアルタイム変換処理を実行し、その処理済みデータをJSON形式でユーザー管理のS3バケットへ書き出します。そこから、SnowflakeのSnowpipeサービスが自動的かつ継続的にそのデータをターゲットのSnowflakeテーブルへロードします。S3に対応したSnowpipeの既存の自動化機能とシームレスに連携するよう設計されています。

Snowpipeを使うなら、なぜRisingWaveも必要なのか?

この疑問はもっともです。最終的にデータはS3に書き出され、SnowpipeがSnowflakeへのロードを担うのであれば、なぜ最初から生データをS3に送ってSnowpipeだけで済ませないのか?

たしかに、SnowpipeはS3などのステージからSnowflakeテーブルへのファイルロードを効率的かつ自動的に行う優れた仕組みです。生データや準構造化データをデータウェアハウスに取り込むには最適な手段です。

しかし、RisingWaveの導入によって得られる決定的な違いは、データがS3に到達するに何が起きるかという点にあります:

  1. リアルタイムかつ複雑な変換処理:
    RisingWaveはデータをストリーム中に処理・変換します。これにより、たとえばKafkaからのユーザークリックストリームとCDCストリームからのユーザー属性データをSQLで結合する、複雑な集約処理を行う、詳細なフィルタリングロジックを適用する、データをリッチ化する──といった高度な操作を、すべて出力前に実現可能です。一方、Snowpipe単体では単なるファイルロードしかできず、変換はSnowflake内で行う必要があり、その分レイテンシが増し、別途計算リソースも必要になります。

  2. ステートフル処理とマテリアライズドビュー:
    RisingWaveは、到着した新しいデータに応じてインクリメンタルに更新されるマテリアライズドビューを維持することができます。これにより、セッション分割やリアルタイムのリーダーボードといった複雑な分析ビューを事前に計算可能です。Snowflakeに取り込まれるのは生データではなく、こうした整形済みかつ即時クエリ可能なビューになります。

  3. ソースとの直接統合と初期処理:
    RisingWaveはKafka、Pulsar、CDCストリームなどの多様なソースと直接接続可能で、取り込みプロトコルの処理やデシリアライズ、初期フィルタリングやスキーマ検証といった処理も複雑な変換の前に実行可能です。Snowpipeをこれらのソースと直接連携させようとすると、別のシステムやカスタムコードで一旦S3ファイルに落とす必要があります。

  4. Snowflake内のデータ整備・レイテンシ削減:
    RisingWaveからSnowflakeへはすでに変換・集約され、分析準備が整った状態でデータが届くため、Snowflake内での追加処理は大幅に簡素化または不要になります。これにより、インサイトを得るまでの時間が短縮され、Snowflake上のコンピュートコストも削減できる可能性があります。

はじめに:導入の概要

導入は、いくつかの重要なステップから成ります。まずはS3バケットの詳細情報を用意し、SnowpipeがそのS3ロケーションを監視するように設定する必要があります(Snowflake公式のAmazon S3向けSnowpipe自動化ガイドが参考になります)。

次に、RisingWave内でシンクを作成します。たとえば、すでにストリーミングデータを処理しているマテリアライズドビュー ss_mv がある場合、以下のように設定します:

CREATE SINK snowflake_sink
FROM ss_mv -- マテリアライズドビューまたはソース
WITH (
    connector = 'snowflake',
    type = 'append-only', -- アップサート対応も可能!後述参照
    s3.bucket_name = 'your-s3-bucket-name',
    s3.credentials.access = 'your-aws-access-key',
    s3.credentials.secret = 'your-aws-secret-key',
    s3.region_name = 'your-s3-bucket-region',
    s3.path = 'path/to/data/', -- オプション:バケット内の特定パス
    force_append_only = 'true' -- 追記のみが必要な場合
);

更新や削除(アップサート)はどうするのか?

RisingWaveで AS CHANGELOG を使ってマテリアライズドビューを定義すると、RisingWaveは新規行だけでなく、更新や削除も追跡するようになります。このとき、__op(操作タイプ:insert, delete, update_before, update_after)や __row_id(変更の順序付け用)といった特殊な列が付与されます。

CREATE SINK snowflake_sink as WITH sub AS changelog FROM user_behaviors
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp,
changelog_op AS __op,
_changelog_row_id::bigint AS __row_id
FROM
sub WITH (
connector = 'snowflake',
type = 'append-only',
s3.bucket_name = 'EXAMPLE_S3_BUCKET',
s3.credentials.access = 'EXAMPLE_AWS_ACCESS',
s3.credentials.secret = 'EXAMPLE_AWS_SECRET',
s3.region_name = 'EXAMPLE_REGION',
s3.path = 'EXAMPLE_S3_PATH',
);

そしてSnowflake側では、以下のようにDynamic Tableを定義することで、常に最新状態を保つビューを自動的に構築できます:

-- Snowflake内で実行
CREATE OR REPLACE DYNAMIC TABLE current_user_behaviors
TARGET_LAG = '1 minute' -- 鮮度の指定(例:1分以内)
WAREHOUSE = your_snowflake_warehouse
AS
SELECT *
FROM (
    SELECT *,
    ROW_NUMBER() OVER (PARTITION BY primary_key_column ORDER BY __row_id DESC) as rn
    FROM your_staging_table_fed_by_snowpipe
)
WHERE rn = 1 AND (__op = 1 OR __op = 3); -- __op=1(Insert)、__op=3(Update_after)

提供状況

Snowflakeシンクコネクタは、RisingWaveのPremium Edition機能のひとつです。RisingWave Cloudでは追加費用なしで利用可能です。セルフホスト環境でも、4コア以下のデプロイメントであれば無料でPremium Editionのすべての機能を使用できます。4コアを超える場合は、ライセンスキーが必要となります。

データからインサイトまでのパイプラインを加速させませんか?

もはや古くなったデータを待つ必要はありません。RisingWaveと新しいSnowflakeシンクコネクタを組み合わせれば、常に最新で継続的に更新され、あらかじめ整形されたデータによって、Snowflakeでの分析がパワーアップします。

私たちは、あなたがSnowflake上でリアルタイムデータをどのように活用し、新たな可能性を切り開いていくかを楽しみにしています!

Discussion