🌟

リアルタイムCDCで失われたトランザクション整合性を取り戻す ― RisingWaveのアプローチ

に公開

現代のリアルタイムデータシステムは、下流のストリーム処理を支えるために Change Data Capture(CDC)に大きく依存しています。CDC はデータベースの INSERTUPDATEDELETE 操作を継続的に取得し、下流システムがこれらのイベントストリームを購読できるようにします。これにより、以下が可能になります。

  • リアルタイムレポートやモニタリングダッシュボードの構築
  • リスク制御やアラートロジックの駆動
  • データをデータウェアハウス、検索エンジン、キャッシュへ同期
  • リアルタイム分析、レコメンデーション、トレードマッチングなどのアプリケーションを支援

しかし、しばしば見落とされがちな重要な論点があります。それが トランザクション境界(transactional boundaries) です。

データベースへの書き込みはトランザクションとして実行されます。しかし、これらの変更が個々の CDC イベントに分解されると、下流システムでは 1 つのトランザクションが分割され、「処理途中の」データが露出する可能性があります。このような一時的不整合は、レポートの数値変動、誤検知アラート、さらにはデータウェアハウスへの不完全なデータ取り込みといった問題を引き起こします。

本記事では、CDC ストリームを処理する際に生じるトランザクション境界の一般的な問題を説明します。そのうえで、どのようにトランザクション境界を認識し、尊重することでアトミック性と一貫性を保証し、マテリアライズドビューや結合クエリが常にデータベースの真の完全な状態を反映するようにするかを解説します。

2つの例

典型的な支払いシナリオを考えてみましょう。ユーザーが送金を完了すると、システムは次の操作を 1 つのトランザクションとして実行します。

  • ある口座からの引き落とし
  • 別の口座への入金
  • 監査テーブルへのレコード挿入

UPDATEINSERT イベントを受信するたびに即座に下流へ送信してしまうと、マテリアライズドビュー側では「引き落としだけ先に見える」「監査ログだけ先に届く」といった事態が起こり得ます。リアルタイムのリスク制御やモニタリングでは、これは次のような問題を引き起こします。

  • 誤検知アラート:引き落とし後・入金前の一時的な残高異常を「資金損失」と誤認する。
  • 誤った分析:リアルタイムレポートが資産総額の減少を示し、運用やトレーダーが誤判断する。
  • データ破損:トランザクション途中でデータウェアハウスに書き込むと、半分だけ記録され、一貫性や外部キー制約が壊れる。

別の結合シナリオを見てみましょう。リアルタイムジョブが orders テーブルと payments テーブルを結合して GMV(Gross Merchandise Volume:流通総額)を算出しているとします。

SELECT o.order_id, o.user_id, p.amount
FROM orders o
JOIN payments p ON o.order_id = p.order_id;

もし 1 つのトランザクションが「注文の挿入」→「支払いの挿入」という順で行われても、CDC ストリームがバリア(Barrier)で分割され、注文があるエポック(Epoch)に、支払いが次のエポックに入ると、結合の中間状態では一時的に結果が空になります。支払いがまだ到着していないため、ダッシュボードでは 一時的な GMV の急減 が発生し、不要なアラートや調査を招きます。

この問題は、金融・決済・EC・物流といった領域で特にクリティカルです。

  • トランザクションモニタリング:取引照合、決済処理、リスク検証は、部分的データではなく完全なトランザクションに基づく必要があります。
  • リアルタイムレポート:GMV、在庫レベル、倉庫移動などの指標は、トランザクション整合性がなければ数値が揺らぎ、運用を誤導します。
  • リスク制御・アラート:部分的トランザクションは誤警報を引き起こし、資金凍結や注文差し止めを招き、ユーザー体験を損ないます。

したがって、CDC ストリーム内でトランザクションのアトミック性と順序を保持することは、リアルタイム計算と分析の信頼性を確保するうえで不可欠です。

バリアとエポック:ストリーム処理の「コミットポイント」

トランザクション境界の問題を理解するには、ストリーム処理システムがどのようにデータを整理し、一貫性を確保するかを把握する必要があります。バッチ処理と異なり、ストリーム処理は無限のイベントストリームを扱うため、データベーストランザクションのように「すべてを一度にコミット」できません。そこで導入されるのが バリア(Barrier) です。

バリアとは、データストリーム内に挿入される特別な制御メッセージです。ビジネスデータは持たず、ストリーム全体のグラフを伝播します。オペレーターがバリアを受け取ると、まずバッファ内のデータを処理してから状態をスナップショットし、バリアを下流に渡します。トポロジ内の全オペレーターがバリアを受け取った時点で、「その時点までのデータはすべて処理済み」と見なされ、安全に可視化できます。同時に、その時点の状態を一貫したチェックポイントとして永続化できます。

2 つの連続するバリアの間にあるデータを エポック(Epoch) と呼びます。これはストリーム処理における「マイクロバッチ」と考えられます。

  • エポック内では、オペレーターの状態は継続的に更新されますが、出力はエポックの完了までバッファリングされます。
  • エポックの境界はバリアであり、データベースにおけるコミットポイントの役割を果たします。

この設計により、ストリーム処理システムは Exactly-Once(1回限りの処理) セマンティクスを実現し、障害時にも最新のチェックポイントから復旧できるため、データ損失や重複処理を避けられます。

ただし、バリアは通常 固定スケジュール(例:毎秒)や イベント数(例:N レコードごと)で注入されます。上流データベースでトランザクションがいつ開始・終了するかは認識していません。もしトランザクション更新がバリアをまたぐと、2 つのエポックに分割され、下流の状態やマテリアライズドビューに一時的不整合が生じます。これが「半分のトランザクション(half-a-transaction)」問題です。

技術的な課題:バリアとトランザクションの衝突

私たちはすでにバリアとエポックの役割を理解しました。これらは Exactly-Once と復旧性を保証しますが、上流のトランザクション境界は意識していません。ここに問題があります。バリアは定期的に挿入されますが、トランザクションのコミットは予測不能です。トランザクションが 2 つのバリアにまたがると、2 つのエポックに分割されてしまいます。

典型的な銀行送金では、まず引き落とし、次に入金、最後に監査ログの挿入が行われます。引き落としイベントがあるエポック、入金とログが次のエポックになると、下流のマテリアライズドビューや結合結果は一時的に誤ります。口座 A の残高は減り、口座 B は変わらず、総資産が不自然に縮む――これでは誤ったリスクアラートが出るだけでなく、ETL が不完全データを DWH に書き込むことにもつながります。

クロステーブルのトランザクションでは、CDC がテーブルごとに変更を別ストリームへ分割するため、さらに悪化します。統一的な調整がなければ、結合オペレーターはテーブル A の更新を先に見て、テーブル B の対応更新をまだ見ていない状況になります。結果として一部のウィンドウでは結合結果が空、あるいは部分一致になります。GMV 計算、注文‐支払い照合、在庫‐出荷整合性の分析では、このような一時的不整合は許容できません。

課題はこうです。
トランザクション内の全イベントを同一エポックに集約し、ストリーム処理のフォールトトレランスを損なわずにアトミックコミットを保証するにはどうするか。
さらに複数テーブルの変更を 1 つの論理ストリームに統合しなければ、下流の結合整合性は保てません。

RisingWave の解決策:トランザクション境界を「認識し、尊重する」

CDC が上流 DB のトランザクションセマンティクスを正しく反映するため、RisingWave は次の要点を設計に取り込みました。入力ストリームの順序保証、バリアによる分割の防止、クロステーブル対応、スキーマ進化、フォールトトレランスです。以下で解説します。

1. 厳密に順序付けされたトランザクションストリームの取得

まずは入力ストリームの順序を正しく保つことが重要です。上流 DB のチェンジログ自体には順序がありますが、Debezium Connector と Kafka のようなミドルウェアを経由すると、テーブルごとに別トピックに分割されがちです。トランザクションメタデータが別トピックだと、下流での再構成が必要になり、順序の乱れや遅延リスクが増えます。

RisingWave は Debezium Embedded Engine を組み込み、パースをコネクタノード内で直接実行します。これにより、上流 DB のコミット順と完全に一致する、厳密に線形なイベントストリームを得られます。BEGINCOMMITROLLBACK といったトランザクションマーカーも含まれるため、下流で余計な「再構成」ロジックは不要です。すべてのトランザクションイベントは自然に完全かつ連続的に届きます。

この段階で、マルチスレッド消費やマルチトピックマージの複雑さが解消され、トランザクション境界を正確に把握できます。これがすべての基盤です。

2. トランザクションを同一エポックに保つためのバリア一時停止

順序付けされたストリームを得たら、次はバリアによる分断の防止です。RisingWave の方針は明快で、トランザクション中はバリアの伝播を一時停止 します。

具体的には、Source Executor が BEGIN を受け取ると「トランザクションモード」に入ります。この間、受信イベントはメモリでバッファし、即時には下流へ流しません。到着したバリアはイベントを伴わずに下流へ通します。COMMIT を受け取った時点で、バッファした全イベントを一括フラッシュします。下流オペレーターはそれらをまとめて消費し、状態を更新し、新しいエポックスナップショットを生成します。

この設計の利点はシンプルで正確な点にあります。

  • アトミック性:トランザクション内のイベントは「見えない」か「一度に見える」かのどちらか。
  • 一貫性:マテリアライズドビューや結合は常に DB の真の状態を反映し、中間の「半端な」状態を作りません。
  • 復旧性:トランザクション完了後にのみバリアが進むため、チェックポイントは常にトランザクション境界に整合します。復旧時に取りこぼしや重複適用は発生しません。

もちろんトレードオフもあります。巨大なトランザクションではバリア伝播が遅れ、エポックが通常より長くなり、チェックポイントや可視化が遅延する可能性があります。しかし、数百ミリ秒〜数秒の遅延は、部分的トランザクションを露出させるよりもはるかに許容できます。

3. 複数テーブルの CDC ソースでクロステーブル整合性を保証

単一テーブルの整合だけでは不十分です。実務では、注文ステータス更新・支払いレコード挿入・在庫変更など、複数テーブルにまたがるトランザクションが一般的です。これらのテーブルを別ソースで取り込むと、イベントは別々のストリームに分散し、下流の結合整合性を保証できません。

この課題に対し、RisingWave は マルチテーブル CDC ソース を導入しました。1 つのソースに複数テーブルを指定すると、1 接続でチェンジログを購読します。こうして、トランザクション内のすべてのイベントが同一の物理ストリーム上に順序を保って現れます。

CREATE SOURCE my_cdc WITH (
  connector = 'postgres-cdc',
  database.name = 'mydb',
  table.name = 'public.orders, public.payments, public.inventory'
);

CREATE TABLE orders (...) FROM SOURCE my_cdc TABLE 'public.orders';
CREATE TABLE payments (...) FROM SOURCE my_cdc TABLE 'public.payments';
CREATE TABLE inventory (...) FROM SOURCE my_cdc TABLE 'public.inventory';

3 テーブルを同時に更新するトランザクションでも、変更は一括でバッファされ、同じエポックでフラッシュされます。下流の結合や集計は、アトミックで一貫した結果になります。

4. ストリーミンググラフの実装詳細

実装面では、RisingWave はストリーミンググラフを構築する際、ソースノードとそれに依存するすべてのマテビューを同一の「トランザクションドメイン」に配置します。バリアはこのドメインに入る時点でインターセプトされ、トランザクション完了後にのみ下流へ伝播します。これによりトランザクション整合とチェックポイント整合が保たれます。

RisingWave の Meta Service は、これら「トランザクションドメイン」のライフサイクルを管理します。DDL トランザクション中に同一 CDC ソースを参照する複数テーブルを作成すると、システムが検知して単一の物理ソースジョブとして実行し、接続とバリアストリームを共有します。

5. スキーマ進化とトランザクション境界

トランザクション認識はスキーマ変更の簡素化にもつながります。上流テーブルのスキーマが変わり(例:列追加)、RisingWave で ALTER TABLE を実行すると、次のトランザクション境界で実行グラフを再計画します。これにより、スキーマ変更とデータ処理の整合性が保たれます。トランザクション途中で「古いスキーマと新しいスキーマが混在する」中途半端な状態を防げます。

6. トレードオフ:大規模トランザクションとレイテンシ

バリア停止でアトミック性を保証する一方、巨大トランザクションではイベントの受信・バッファに時間がかかり、通常のバリア間隔を超える可能性があります。その結果、本来は複数エポックに分散するはずのイベントが 1 つの長いエポックにまとまり、エンドツーエンドのレイテンシが増加します。

設計段階では次を検討しました。

  • バリアを先に伝播:チェックポイント間隔は維持できるが、不完全トランザクションを下流に晒し、整合性を損なう。
  • 楽観実行+ロールバック:イベントを先に送って暫定計算し、コミットで可視化または取り消す。ただし、ほぼ全ステートフル演算子に多版管理とロールバック実装が必要で、極めて複雑。
  • 厳密バッファ+バリア遅延:単純かつ直接的。トランザクションとエポックを 1:1 に揃える代わりにレイテンシを犠牲にする。

RisingWave は 3 つ目を採用しました。整合性を確実にしつつ、状態管理の複雑さを増やさないためです。実運用では巨大・クロステーブルトランザクションは限定的で、大半は通常のバリア間隔内で完了します。したがって追加レイテンシは許容できます。ごく稀に極端に大きいトランザクションでも、整合性のためにレイテンシを受け入れるのは妥当です。

7. 復旧とフォールトトレランス

この設計は復旧ロジックも単純化します。障害時、RisingWave は最新バリアに対応するスナップショットから復旧します。このバリアは必ずトランザクション境界の後にあり、コミット済みトランザクションの再適用や部分トランザクションの取りこぼしは発生しません。逆にバリアがトランザクション途中なら、復旧時に送信済みイベントを飛ばす特殊処理が要り、実装が複雑でエラー誘発的になります。

結論:CDC を「信頼できるもの」に

CDC の正しい処理とは、単に個々の変更イベントを下流に流すことではありません。変更の 組み合わせ が DB の真の状態と一致していることが重要です。トランザクション境界を無視すると、現実には存在しない中間状態を観測し、レポート変動、誤警報、データ不整合、さらには業務インシデントを招きます。

RisingWave の設計思想はシンプルです。

  • Debezium Embedded Engine で、厳密に順序付けされたトランザクションストリームを得る。
  • トランザクション中はバリアを止め、トランザクションとエポックを整合させる。
  • マルチテーブル CDC ソースで、クロステーブルトランザクションを単一の論理ストリームとして扱う。

これにより、マテリアライズドビューや結合クエリは常に DB の真の状態を反映し、任意の時点で完全・アトミックな結果が得られます。データの信頼性が向上し、リアルタイム分析、リスク制御、取引照合、監視シナリオでの確実な動作が期待できます。

エンジニアにとって、これは RisingWave でリアルタイムレポート、リスク制御、取引判断を安心して行えることを意味します。もはや「半分のトランザクション」が流れ込み、ビジネスロジックを壊す心配はありません。言い換えれば、RisingWave はデータベースのアトミシティをストリーミングの世界にもたらしました。

Discussion