Databento × RisingWaveでSQLだけでリアルタイムスプーフィング検知システムを構築してみた

に公開

はじめに

金融市場は瞬く間に動き、多額の資金が毎秒取引されています。このようなダイナミックな環境では、価格を人為的に操作したり他者を欺くような「マーケット・マニピュレーション(市場操作)」が大きなリスクとなります。市場の健全性を守るためには、リアルタイムの取引監視が不可欠であり、不正行為を素早く検知し、被害の拡大を防ぐ必要があります。

以前の投稿では、Databento を用いてライブの市場データを取り込み、RisingWave によってVWAPのようなリアルタイム指標を算出する方法を紹介しました。本記事ではそれを基に、簡易的なリアルタイム取引監視システムの構築方法を解説します。具体的には、Databentoが提供する高品質なデータと、RisingWaveのリアルタイム処理能力を組み合わせ、**スプーフィング(spoofing)**という市場操作手法を検知する方法に焦点を当てます。

リアルタイム監視の重要性

現在の高速な金融市場では、従来の「終日取引終了後に行う監視」では不十分です。リアルタイム監視には以下のような重要なメリットがあります:

  • 早期検知: 不正なパターンを即座に特定し、迅速な対応が可能になります。
  • 積極的なコンプライアンス対応: 規制要件への適合姿勢を明確に示すことができます。
  • リスクの低減: 金銭的損失や企業の評判に対するダメージを最小限に抑えることができます。
  • 市場の健全性向上: より透明で信頼性の高い取引環境の構築に貢献します。

使用ツール:DatabentoとRisingWave

Databento:高品質なマーケットデータソース

複数の金融取引所を横断する高品質な市場データを入手するのは容易ではありません。Databento は、ライブおよび過去の市場データを包括的にカバーしながらも、軽量で強力なソリューションを提供します。以下の特徴があります:

  • 使いやすさ: シンプルなAPI仕様と簡潔なプロトコルにより、数分で利用開始可能です。
  • 高速処理: 正規化遅延は6.1マイクロ秒、FPGAベースのキャプチャによりデータの欠落もほぼゼロ。
  • 柔軟性: 複数の資産クラスおよび取引所に対応した統一メッセージ形式。すべてのオーダーブックメッセージや数十万のシンボルを同時に処理可能です。

RisingWave:リアルタイムデータ処理・管理プラットフォーム

RisingWave は、ストリーミングデータをSQLで処理・照会できる、統合型のリアルタイムデータ処理・管理プラットフォームです。本記事では特に、RisingWaveの**マテリアライズドビュー(Materialized Views)** 機能を活用します。これは新たなデータが到着するたびに継続的かつ増分的に結果を計算することで、再計算なしでリアルタイムな分析を実現します。

また、Python SDK により、Pythonベースのアプリケーションとの統合も容易です。

簡易的な監視システムの構築

このセクションでは、スプーフィング(spoofing) を検知するためのシステムを構築します。今回はデモ用に、Pythonスクリプトを使用して履歴データのバッチ取り込みを行います。ただし、RisingWave内部のコアな検知ロジック(マテリアライズドビューを使用)は連続処理(ストリーミング) を前提としたものです。実際の運用では、データ取り込みをライブストリームに適応させることで、RisingWaveが継続的にスプーフィングアラートを生成し続けることができます。

アーキテクチャ概要

以下の図は、本システムにおける高レベルなデータフローを示しています:

Databentoから取得したデータがPythonスクリプト経由でRisingWaveに送られ、マテリアライズドビューで処理されるフロー

この図では、Pythonスクリプトが Databento から過去の Market By Order(MBO)および Market By Price(MBP-1)データを取得し、それを RisingWave のベーステーブルに挿入しています。RisingWave 内部では、spoofing_alerts というマテリアライズドビューがこれらのテーブルから継続的にデータを処理してパターンを検出します。さらに、オプションでアラート購読アプリケーションを接続することで、リアルタイムに結果を受け取りアクションを取ることが可能です。

前提条件

以下の準備が必要です:

  1. 有効な Databento アカウント と API キー
  2. 稼働中の RisingWave インスタンス(例:Docker ローカルで localhost:4566 にアクセス可能な状態)
  3. Python 3.9 以上と uv パッケージマネージャー(インストールガイドはこちら

Python環境のセットアップ:

uv venv -p 3.9  # または任意の Python 3.9+ バージョン
source .venv/bin/activate  # Windows の場合は .venv\\Scripts\\activate
uv pip install databento risingwave-py pandas

ステップ1:RisingWaveにベーステーブルを作成する

最初に、市場データを格納するためのテーブルを作成します。ここでは、個々の注文イベント(取引を含む)に対応する Market by Order(MBO) データと、現在の Best Bid and Offer(BBO) を取得するための Market by Price(MBP-1) データの2種類を使用します。

これらは異なる目的に使用されます:MBO はスプーフィング検知に必要な粒度の高い注文アクションを提供し、MBP-1 は効率よく現在の BBO を参照するために使われます。

psql や他の SQL クライアントで RisingWave に接続し、以下のテーブルを作成します:

-- MBO(Market by Order)データ用テーブルの作成
CREATE TABLE IF NOT EXISTS market_data (
    ts_event TIMESTAMP,
    symbol VARCHAR,
    exchange VARCHAR,
    side VARCHAR,
    price DOUBLE PRECISION,
    size INTEGER,
    event_type VARCHAR,
    order_id VARCHAR
);

-- BBO(Best Bid and Offer)データ用テーブルの作成
CREATE TABLE IF NOT EXISTS bbo (
    ts_event TIMESTAMP,
    symbol VARCHAR,
    exchange VARCHAR,
    bid_px_00 DOUBLE PRECISION,
    ask_px_00 DOUBLE PRECISION
);

ステップ2:RisingWaveで継続的なスプーフィング検知ロジックを構築する

スプーフィングとは、本気で約定させる意図のない大口注文 をBBO付近に置くことで市場の印象を操作し、その直後に素早くキャンセルすることで行われる市場操作です。

ここでは、spoofing_alerts という名前の マテリアライズドビュー(Materialized View を定義します。このビューでは、market_data(MBO)と bbo(BBO)を結合し、検知ロジックを適用します。RisingWaveはこのビューを自動かつ増分的に更新し、新しいデータが到着するたびに結果を計算します。

パラメータの理解

スプーフィング検知ロジックには、実運用での精度向上のためにチューニングが必要なパラメータがいくつか含まれています:

  1. サイズ閾値(size > 1):

    • デモ値: この例ではテストデータであることを考慮し、結果が得られやすいように 1 を使用しています。
    • 注意: これは流動性の高い市場(例:ES.FUT)においては極めて低すぎる設定です。実際のスプーフィングは遥かに大きな注文が対象となるため、現実的な検出には大きな値に調整する必要があります
  2. BBOへの価格近接度(ティック距離)— 例:10 × 0.25:

    • ロジック: 注文価格がBBOに近いかどうかを確認します。例えば ES.n.0 のティックサイズ(0.25)を使って10ティック以内という意味になります。
    • チューニング: ティック数(例:5)は、対象銘柄のスプレッドや検出感度に応じて調整が必要です。

💡 注意: 現時点ではマテリアライズドビューは存在していても、データが投入されていないため空です。次のステップでデータを投入することで、RisingWaveが spoofing_alerts ビューの結果を自動計算し、更新を開始します。

ステップ3:Databentoから履歴市場データを取り込む

ここでは、Databento から指定した時間帯の MBO および MBP-1 データをバッチで取得し、market_data および bbo テーブルに挿入する Python スクリプトを実行します。この一括読み込みにより、spoofing_alerts マテリアライズドビューが初期結果を計算できるようになります。

以下のコードを ingest_data.py という名前で保存してください:

import databento as db
from risingwave import RisingWave, RisingWaveConnOptions
import logging
import os

# --- ログ設定 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Databento クライアントの初期化 ---
api_key = os.environ.get("DATABENTO_API_KEY")
if api_key is None:
    logger.error("DATABENTO_API_KEY 環境変数を設定してください。")
    exit(1)
hist = db.Historical()

# --- RisingWave 接続設定 ---
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="", database="dev"
    )
)

# --- MBO イベント用のアクションマッピング ---
action_map = {
    "A": "add",
    "M": "modify",
    "C": "cancel",
    "R": "clear",
    "T": "trade",
    "F": "fill",
    "N": "none",
}

def process_market_data(dataset, symbol, start_time):
    """Databentoから市場データを取得して処理する"""
    logger.info(f"{symbol} のデータを {start_time} から取得中...")

    # MBO データを取得
    logger.info("MBO データを取得中...")
    mbo_data = hist.timeseries.get_range(
        dataset=dataset,
        schema="mbo",
        stype_in="continuous",
        symbols=[symbol],
        start=start_time,
        limit=10000,
    )

    # BBO データを取得
    logger.info("BBO データを取得中...")
    tbbo_data = hist.timeseries.get_range(
        dataset=dataset,
        schema="mbp-1",
        stype_in="continuous",
        symbols=[symbol],
        start=start_time,
        limit=10000,
    )

    # データをRisingWaveに挿入
    with rw.getconn() as conn:
        # MBO レコードを一件ずつ処理
        mbo_count = 0
        for record in mbo_data:
            try:
                params = {
                    "ts_event": record.pretty_ts_event,
                    "symbol": symbol,
                    "exchange": "GLBX",
                    "side": "bid" if record.side == "B" else "ask",
                    "price": record.pretty_price,
                    "size": record.size,
                    "event_type": action_map[record.action],
                    "order_id": str(record.order_id),
                }
                conn.execute(
                    """
                    INSERT INTO market_data
                    (ts_event, symbol, exchange, side, price, size, event_type, order_id)
                    VALUES 
                    (:ts_event, :symbol, :exchange, :side, :price, :size, :event_type, :order_id)
                    """,
                    params,
                )
                mbo_count += 1
                if mbo_count % 1000 == 0:
                    logger.info(f"{mbo_count} 件の MBO レコードを処理しました")
            except Exception as e:
                logger.error(f"MBO レコード処理中のエラー: {e}")
                continue

        # BBO レコードを一件ずつ処理
        bbo_count = 0
        for record in tbbo_data:
            try:
                params = {
                    "ts_event": record.pretty_ts_event,
                    "symbol": symbol,
                    "exchange": "GLBX",
                    "bid_px_00": record.levels[0].bid_px,
                    "ask_px_00": record.levels[0].ask_px
                }
                conn.execute(
                    """
                    INSERT INTO bbo 
                    (ts_event, symbol, exchange, bid_px_00, ask_px_00)
                    VALUES 
                    (:ts_event, :symbol, :exchange, :bid_px_00, :ask_px_00)
                    """,
                    params,
                )
                bbo_count += 1
                if bbo_count % 1000 == 0:
                    logger.info(f"{bbo_count} 件の BBO レコードを処理しました")
            except Exception as e:
                logger.error(f"BBO レコード処理中のエラー: {e}")
                continue

    logger.info(f"データ取り込み完了。MBO: {mbo_count} 件、BBO: {bbo_count} 件")

def main():
    dataset = "GLBX.MDP3"
    symbol = "ES.n.0"
    start_time = "2024-03-19"

    logger.info(f"{start_time} からデータ処理を開始します")
    process_market_data(dataset, symbol, start_time)
    logger.info("完了!")

if __name__ == "__main__":
    main()

このスクリプトは python ingest_data.py で実行できます。Databento からデータを取得し、RisingWave テーブルへ1件ずつデータを挿入します。

オプション:アラートの購読

データの取り込みが完了すると、spoofing_alerts マテリアライズドビューは履歴データに基づいた検出結果を持つようになります。リアルタイムシステムにおいては、通常は別アプリケーションがこのビューの変更を継続的に購読します。

以下のスクリプト(subscribe_alerts.py)は、RisingWave に接続し、spoofing_alerts ビューに新たなレコードが追加されるたびに出力する例です。リアルタイムでスプーフィングが検知され次第通知されるよう、このスクリプトはデータ取り込みの前に実行することが推奨されます

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd
import signal
import sys
import threading
import logging
import os

# --- ログ設定 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- RisingWave 接続設定 ---
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="", database="dev"
    )
)

def signal_handler(sig, frame):
    """Ctrl+Cによる正常終了処理"""
    logger.info('シャットダウンしています...')
    sys.exit(0)

def handle_spoofing_alerts(event_df: pd.DataFrame) -> None:
    """スプーフィングアラートを処理し、ログに記録"""
    try:
        event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
        if event_df.empty:
            return
        event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
        logger.info("\\nスプーフィングアラート:")
        logger.info(event_df)

    except Exception as e:
        logger.error(f"アラート処理中のエラー: {e}", exc_info=True)
        # 一時的なエラーに備えてリトライ処理を検討する

def main():
    """アラート購読処理のメイン関数"""
    signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C による終了をサポート

    try:
        with rw.getconn() as conn:
            result = conn.execute("""
                SELECT count(*) FROM pg_matviews 
                WHERE matviewname = 'spoofing_alerts';
            """)
            if result and result[0][0] == 0:
                logger.error("マテリアライズドビュー 'spoofing_alerts' が存在しません。先に `spoofing_alerts.sql` を実行してください。")
                sys.exit(1)
    except Exception as e:
        logger.error(f"ビュー存在確認中のエラー: {e}")
        sys.exit(1)

    threading.Thread(
        target=lambda: rw.on_change(
            subscribe_from="spoofing_alerts",
            handler=handle_spoofing_alerts,
            output_format=OutputFormat.DATAFRAME,
        ),
        daemon=True
    ).start()

    logger.info("spoofing_alerts を購読中... イベントを待機します")
    signal.pause()
    
if __name__ == "__main__":
    main()

このスクリプトを python subscribe_alerts.py で実行すれば、spoofing_alerts ビューからのアラートをコンソール上でリアルタイムに確認できます。

出力例

spoofing_alerts ビューが履歴データと設定パラメータに基づいてイベントを検出すると、subscribe_alerts.py は以下のようなログを出力する可能性があります:

スプーフィングアラート出力例:

Spoofing Alert:
INFO:__main__:

window_start                 symbol     exchange     num_cancellations
0 2024-03-19 00:00:40+00:00  ES.n.0     GLBX                625
1 2024-03-19 00:00:45+00:00  ES.n.0     GLBX                 40

注: この出力は一例です。実際の結果は、取り込んだ市場データや検知SQLのチューニングパラメータに大きく依存します。デモのためにしきい値が低く設定されているため、実際のスプーフィングと異なり多数のアラートが発生する可能性があります。

まとめ

本記事では、Databento の高品質な市場データと RisingWave のリアルタイム SQL 処理能力を活用して、スプーフィング検知を目的とした基本的な取引監視システムの構築方法を紹介しました。履歴データを取り込み、RisingWave のマテリアライズドビューを用いて、注文サイズや BBO への近接度に基づくキャンセルパターンを継続的に特定しました。

今回はバッチ取り込み方式を使用しましたが、RisingWave の真の強みは、ライブストリーム上での複雑な分析処理を増分的に実行できる点にあります。データ取り込み部分をライブストリームに変更し、検知ロジックを適切にチューニングすることで、堅牢で信頼性の高いリアルタイム監視システムを構築できます。

さらに発展させるには、より高度な検知ルールの追加や、ユーザー定義関数(UDF)による機械学習モデルの統合、可視化ダッシュボードの構築なども検討してください。

詳細な情報については Databentoドキュメント および RisingWaveドキュメント をご参照ください。精度と信頼性を高めるには、現実的なデータでの徹底したテストと、パラメータの慎重な調整が不可欠です。

Discussion