🫠

個別株リアルタイムデータ取得

2024/12/20に公開

株式市場のリアルタイム注文板監視システムの実装

はじめに

本記事では、株式市場の注文板データをリアルタイムで監視し、変更を検知して保存するPythonシステムの実装について解説します。

取得データの一例

データ取得中のエクセル画面です

一定間隔でデータを読み込み以下の形式にして追記していきます。

XXXX_order_book.json
{"timestamp": "2024-11-21 09:00:10.819", "stock_code": "XXXX", "c_price": 877.9, "c_price_time": "09:00:10", "c_price_f": " ", "c_price_tick": "↑", "volume": 347200.0, "trading_value": 304309.0, "vwap": 876.4653, "best_ask_p": 877.9, "best_ask_q": 200.0, "best_ask_time": "09:00:10", "best_bid_p": 877.5, "best_bid_q": 400.0, "best_bid_time": "09:00:10", "ask_price_1": 877.9, "ask_quantity_1": 200.0, "ask_price_2": 878.0, "ask_quantity_2": 300.0, "ask_price_3": 878.1, "ask_quantity_3": 500.0, "ask_price_4": 878.2, "ask_quantity_4": 600.0, "ask_price_5": 878.3, "ask_quantity_5": 2500.0, "ask_price_6": 878.4, "ask_quantity_6": 2700.0, "ask_price_7": 878.6, "ask_quantity_7": 3100.0, "ask_price_8": 878.8, "ask_quantity_8": 1900.0, "ask_price_9": 878.9, "ask_quantity_9": 1400.0, "ask_price_10": 879.0, "ask_quantity_10": 3000.0, "bid_price_1": 877.5, "bid_quantity_1": 400.0, "bid_price_2": 877.4, "bid_quantity_2": 600.0, "bid_price_3": 877.3, "bid_quantity_3": 400.0, "bid_price_4": 877.2, "bid_quantity_4": 400.0, "bid_price_5": 877.1, "bid_quantity_5": 2000.0, "bid_price_6": 877.0, "bid_quantity_6": 2600.0, "bid_price_7": 876.9, "bid_quantity_7": 2600.0, "bid_price_8": 876.8, "bid_quantity_8": 1700.0, "bid_price_9": 876.7, "bid_quantity_9": 2900.0, "bid_price_10": 876.6, "bid_quantity_10": 1500.0, "sell_market_order_q": 0.0, "buy_market_order_q": 0.0, "over_q": 4524900.0, "under_q": 3015100.0, "sp_sell_flag": " ", "sp_buy_flag": " ", "f1": 877.8, "f2": 877.9, "f3": 878.0, "f4": 878.3, "f1_time": "09:00:10", "f2_time": "09:00:10", "f3_time": "09:00:10", "f4_time": "09:00:09"}
{"timestamp": "2024-11-21 09:00:11.428", "stock_code": "XXXX", "c_price": 878.4, "c_price_time": "09:00:11", "c_price_f": " ", "c_price_tick": "↑", "volume": 353200.0, "trading_value": 309578.0, "vwap": 876.4954, "best_ask_p": 878.6, "best_ask_q": 200.0, "best_ask_time": "09:00:11", "best_bid_p": 878.2, "best_bid_q": 400.0, "best_bid_time": "09:00:11", "ask_price_1": 878.6, "ask_quantity_1": 200.0, "ask_price_2": 878.7, "ask_quantity_2": 100.0, "ask_price_3": 878.8, "ask_quantity_3": 100.0, "ask_price_4": 878.9, "ask_quantity_4": 700.0, "ask_price_5": 879.0, "ask_quantity_5": 3000.0, "ask_price_6": 879.2, "ask_quantity_6": 2000.0, "ask_price_7": 879.3, "ask_quantity_7": 5500.0, "ask_price_8": 879.4, "ask_quantity_8": 1900.0, "ask_price_9": 879.6, "ask_quantity_9": 3200.0, "ask_price_10": 879.7, "ask_quantity_10": 300.0, "bid_price_1": 878.2, "bid_quantity_1": 400.0, "bid_price_2": 878.1, "bid_quantity_2": 300.0, "bid_price_3": 878.0, "bid_quantity_3": 400.0, "bid_price_4": 877.9, "bid_quantity_4": 500.0, "bid_price_5": 877.8, "bid_quantity_5": 300.0, "bid_price_6": 877.7, "bid_quantity_6": 900.0, "bid_price_7": 877.6, "bid_quantity_7": 1500.0, "bid_price_8": 877.5, "bid_quantity_8": 1900.0, "bid_price_9": 877.4, "bid_quantity_9": 1600.0, "bid_price_10": 877.3, "bid_quantity_10": 2100.0, "sell_market_order_q": 0.0, "buy_market_order_q": 0.0, "over_q": 4531000.0, "under_q": 3060100.0, "sp_sell_flag": " ", "sp_buy_flag": " ", "f1": 878.3, "f2": 878.2, "f3": 877.8, "f4": 878.1, "f1_time": "09:00:11", "f2_time": "09:00:11", "f3_time": "09:00:11", "f4_time": "09:00:11"}
{"timestamp": "2024-11-21 09:00:12.031", "stock_code": "XXXX", "c_price": 878.8, "c_price_time": "09:00:12", "c_price_f": " ", "c_price_tick": "↑", "volume": 353700.0, "trading_value": 310017.0, "vwap": 876.4983, "best_ask_p": 878.9, "best_ask_q": 300.0, "best_ask_time": "09:00:12", "best_bid_p": 878.7, "best_bid_q": 300.0, "best_bid_time": "09:00:12", "ask_price_1": 878.9, "ask_quantity_1": 300.0, "ask_price_2": 879.0, "ask_quantity_2": 1300.0, "ask_price_3": 879.2, "ask_quantity_3": 800.0, "ask_price_4": 879.3, "ask_quantity_4": 5500.0, "ask_price_5": 879.4, "ask_quantity_5": 1700.0, "ask_price_6": 879.6, "ask_quantity_6": 4100.0, "ask_price_7": 879.7, "ask_quantity_7": 300.0, "ask_price_8": 879.8, "ask_quantity_8": 1700.0, "ask_price_9": 879.9, "ask_quantity_9": 300.0, "ask_price_10": 880.0, "ask_quantity_10": 6900.0, "bid_price_1": 878.7, "bid_quantity_1": 300.0, "bid_price_2": 878.6, "bid_quantity_2": 300.0, "bid_price_3": 878.5, "bid_quantity_3": 300.0, "bid_price_4": 878.4, "bid_quantity_4": 400.0, "bid_price_5": 878.3, "bid_quantity_5": 400.0, "bid_price_6": 878.2, "bid_quantity_6": 800.0, "bid_price_7": 878.1, "bid_quantity_7": 700.0, "bid_price_8": 878.0, "bid_quantity_8": 1900.0, "bid_price_9": 877.9, "bid_quantity_9": 2500.0, "bid_price_10": 877.8, "bid_quantity_10": 1800.0, "sell_market_order_q": 0.0, "buy_market_order_q": 0.0, "over_q": 4517200.0, "under_q": 3068100.0, "sp_sell_flag": " ", "sp_buy_flag": " ", "f1": 878.6, "f2": 878.3, "f3": 878.4, "f4": 878.3, "f1_time": "09:00:12", "f2_time": "09:00:12", "f3_time": "09:00:11", "f4_time": "09:00:11"}

上記のデータ取得時は、350銘柄を一括で0.2秒毎にエクセルシートを読み込みました。データの更新まで最短でも0.4秒はかかっていました。銘柄数によるか分かりませんが楽天RSSのデータ更新は大体0.3~0.5秒くらいの間隔がありそうです。

事前準備

価格配信の更新間隔の設定

Market Speed IIを起動
右下の通信設定から

価格配信方法をリアルフィードに変更

また、EXCEL上で楽天RSSのアドインを使用できるようにします。恐縮ですがここは省略します。
マーケトスピードIIのタブの各種設定から、更新間隔を調整します。1msに設定していますが、そこまで早く更新できていない印象があります。

システムアーキテクチャ

システムは以下の主要なコンポーネントで構成されています:

  1. OrderBookMonitor クラス: システム全体を制御する中心的なクラス
  2. データ取得プロセス: Excelからデータを読み取るプロセス
  3. データ処理スレッド: 取得したデータを処理し保存するスレッド
  4. キューベースの通信: プロセス間のデータ受け渡しを管理

クラス構成図

OrderBookMonitor
├── 初期化処理
│   ├── Excel設定
│   └── データ構造初期化
├── データ取得プロセス
│   ├── インターバル制御
│   └── Excel通信
├── データ処理スレッド
│   ├── 差分検知
│   └── データ保存
└── 監視制御処理

システムアーキテクチャ

システムは以下の主要なコンポーネントで構成されています:

  1. OrderBookMonitor クラス: システム全体を制御する中心的なクラス
  2. データ取得プロセス: Excelからデータを読み取るプロセス
  3. データ処理スレッド: 取得したデータを処理し保存するスレッド
  4. キューベースの通信: プロセス間のデータ受け渡しを管理

クラス構成図

OrderBookMonitor
├── 初期化処理
│   ├── Excel設定
│   └── データ構造初期化
├── データ取得プロセス
│   ├── インターバル制御
│   └── Excel通信
├── データ処理スレッド
│   ├── 差分検知
│   └── データ保存
└── 監視制御処理

Excelシートの初期設定と起動

1. シートの作成と設定

市場データを取得するためのExcelシートを以下のように設定します:

def create_excel(self):
    wb = openpyxl.Workbook()
    ws = wb.active
    ws["A1"].value = "=RssMarketHeader(1)"  # ヘッダー行の設定

    # 市場データを取得するセルの設定
    for row in range(2, 502):
        for col in range(2, 150):
            cell = ws.cell(row=row, column=col)
            cell.value = f"=RssMarket($A{row}, {openpyxl.utils.get_column_letter(col)}$1)"
    
    # 銘柄コードの設定
    for i, code in enumerate(self.code_list):
        ws[f"A{i+2}"].value = str(code)
    
    wb.save(self.excel_path)

このコードは以下の処理を実行します:

  • RssMarketHeader関数でヘッダー情報を設定
  • 各セルにRssMarket関数を設定して市場データを取得
  • 監視対象の銘柄コードを設定

2. 特殊なExcel起動設定

楽天RSSのアドインを有効にした状態でExcelを起動するために、特別なコマンドライン引数を使用します:

def add_xl_app(self) -> xw.App:
    try:
        # エクセル実行プログラム'C:.*EXCEL\.EXE'を探して返す
        excel_path = self.get_path_to_xl2()
        # /x オプションを使用してアドインを有効化
        command = f'"{str(excel_path)}" /x "{self.excel_path}"'

        proc = subprocess.Popen(command)
        time.sleep(1)  # 初期化待機

        # プロセスの確認と接続
        for _ in range(10):
            try:
                xl_app = xw.apps[proc.pid]
                print("PID:", proc.pid, "アプリケーションが正常に起動しました。")
                return xl_app, proc.pid
            except KeyError:
                print("PID確認中...")
                time.sleep(1)

重要なポイント:

  • /x オプション:楽天RSSアドインを有効にした状態でExcelを起動
  • subprocess.Popen:非同期でExcelプロセスを起動
  • PID管理:起動したExcelプロセスを追跡し、後続のxlwingsの操作で使用

3. Excelからの市場データ取得処理の実装解説

市場データ監視システムの中核となるデータ取得処理について、実装の詳細と技術的な考慮点を解説します。この処理は、Excelシートから市場データを読み取り、処理キューに追加する重要な役割を担っています。

def fetch_data_and_put_queue(self):
    try:
        sht = xw.apps[self.pid].books.active.sheets[0]
        data = sht.range(self.data_range).value
        timestamp = self.get_current_jst_timestamp()
        if isinstance(data, list) and isinstance(data[0], list):
            for row_data in data:
                self.data_queue.put((dict(zip(self.headers, row_data)), timestamp))
        else:
            self.data_queue.put((dict(zip(self.headers, data)), timestamp))
    except Exception as e:
        print(f"データ取得に失敗しました: {e}")
  • 保存されたプロセスID(pid)を使用して特定のExcelインスタンスにアクセス

  • self.data_rangeで指定された範囲のデータを一括取得

  • 例:"B2:ES10"のような範囲指定

  • パフォーマンス最適化のため、セル単位ではなく範囲で一括取得

  • 変換されたデータとタイムスタンプをタプルとしてキューに追加

  • マルチプロセス対応のキュー(MPQueue)を使用

  • データ処理スレッドがキューからデータを取り出して処理

4. マルチプロセス/スレッド処理

システムは以下の並行処理を実装:

  1. データ取得プロセス:
def data_reader(self):
    while not self.stop_flag:
        interval = self.get_interval_by_time(datetime.now().time())
        self.fetch_data_and_put_queue()
        time.sleep(interval)
  1. データ処理スレッド:
def data_processor(self):
    while not self.stop_flag or not self.data_queue.empty():
        data_dict, timestamp = self.data_queue.get(timeout=1)
        structured_data = self.structure_data(data_dict, timestamp)
        if self.has_data_changed(structured_data):
            self.append_to_json(folder_path, stock_code, structured_data)

この設計により:

  • データ取得と処理の分離
  • システムの応答性向上
  • リソースの効率的な利用

データ保存形式

データは日付ごとのフォルダに、銘柄別のJSONファイルとして保存されます:

rakuten_rss_data/
└── stock_20240119/
    ├── 1234_order_book.json
    ├── 5678_order_book.json
    └── ...

各JSONレコードは以下の構造を持ちます:

  • タイムスタンプ
  • 銘柄情報
  • 価格情報
  • 注文数量
  • その他の市場データ

エラーハンドリングと復旧機能

システムは様々な異常状態に対応する機能を実装しています:

  1. データ受信タイムアウト検知:
if current_time - last_data_time > 5.1:
    print("データ受信が5秒以上ありません。プロセスを再起動します。")
    self.stop_data_reader()
    self.start_data_reader()
  1. Excel接続エラーのハンドリング
  2. プロセス終了時の適切なリソース解放

システムの利用方法

システムの基本的な起動方法:

# 監視対象の銘柄コードリストを準備
INIT_CODE = get_watchlist_codes()

# モニターインスタンスの作成と開始
monitor = OrderBookMonitor(EXCEL_PATH, INIT_CODE, JSON_BASE_PATH)
monitor.monitor()

まとめ

この実装は以下の特徴を持つ堅牢な市場データ監視システムを提供します:

  • リアルタイムデータ取得
  • 効率的なリソース利用
  • 信頼性の高いデータ保存
  • 柔軟な設定オプション

本システムは、市場分析や取引戦略の研究に必要な高品質な注文板データの収集を可能にします。

これらの機能追加により、さらに高度な市場分析や取引支援が可能になると考えられます。

import time
import json
import xlwings as xw
from pathlib import Path
from datetime import datetime, timezone, timedelta
import hashlib
import subprocess
import re
import openpyxl
from queue import Empty
from threading import Thread
from multiprocessing import Process, Queue as MPQueue
import pandas as pd
from datetime import time as datetime_time
import polars as pl

class OrderBookMonitor:
    def __init__(self, excel_path, code_list, json_base_path):
        """
        初期化メソッド

        :param excel_path: Excelファイルのパス
        :param code_list: 銘柄コードのリスト
        :param json_base_path: JSONファイルを保存するベースのフォルダパス
        """
        self.excel_path = excel_path
        self.code_list = code_list
        self.json_base_path = Path(json_base_path)
        self.pid = None # Excelを起動するprocess id
        self.previous_hashes = {}
        self.headers = []
        self.data_queue = MPQueue()  # Multiprocessing Queue
        self.stop_flag = False  
        self.reader_process = None  # データ取得プロセスの参照

        # `data_range` を code_list の範囲から動的に設定
        start_row = 2
        end_row = start_row + len(code_list) - 1
        self.data_range = f"B{start_row}:ES{end_row}"
    
    def create_excel(self):
        wb = openpyxl.Workbook()
        ws = wb.active
        ws["A1"].value = "=RssMarketHeader(1)"

        for row in range(2, 502):
            for col in range(2, 150):
                cell = ws.cell(row=row, column=col)
                cell.value = f"=RssMarket($A{row}, {openpyxl.utils.get_column_letter(col)}$1)"
        
        for i, code in enumerate(self.code_list):
            ws[f"A{i+2}"].value = str(code)
        
        wb.save(self.excel_path)

    def get_path_to_xl2(self) -> Path:
        try:
            subprocess_rtn = subprocess.run(['assoc', '.xlsx'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            assoc_to = re.search(r'Excel\.Sheet\.\d+', subprocess_rtn.stdout.decode("utf-8")).group()
            
            subprocess_rtn = subprocess.run(['ftype', assoc_to], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            xl_path = re.search(r'C:.*EXCEL\.EXE', subprocess_rtn.stdout.decode('utf-8')).group()

            print("Excelのパス:", xl_path)  # デバッグ用出力
            return Path(xl_path)
        except (AttributeError, subprocess.CalledProcessError) as e:
            raise FileNotFoundError("Excelのパスが取得できませんでした。Excelがインストールされているか確認してください。") from e

    def add_xl_app(self) -> xw.App:
        try:
            excel_path = self.get_path_to_xl2()
            command = f'"{str(excel_path)}" /x "{self.excel_path}"'
            print("実行コマンド:", command)  # デバッグ用出力
            proc = subprocess.Popen(command)
            time.sleep(1)  # 初期待機

            for _ in range(10):
                try:
                    xl_app = xw.apps[proc.pid]
                    print("PID:", proc.pid, xw.apps.keys(), "アプリケーションが正常に起動しました。")
                    return xl_app, proc.pid
                except KeyError:
                    print("PID確認中...")  # デバッグ用出力
                    time.sleep(0.5)
            
            proc.terminate()
            raise RuntimeError("Excelアプリケーションが正常に起動しませんでした。")
        except Exception as e:
            print("エラー内容:", e)  # エラーメッセージの出力
            raise RuntimeError("Excelの起動に失敗しました。") from e

    def initialize_excel(self):
        """Excelアプリケーションの初期化とデータの取得"""
        app, self.pid = self.add_xl_app()
        time.sleep(2)
        wb = app.books.active
        sht = wb.sheets[0]
        
        # ヘッダーの取得
        self.headers = sht.range("B1:ES1").value
        print(f"監視する銘柄コード: {self.code_list}")
        print("Excelシートの初期化が完了しました。")

    def get_current_date_folder(self):
        """現在の日付に基づいたフォルダパスを取得"""
        current_date = datetime.today().strftime('%Y%m%d')
        folder_name = f"stock_{current_date}"
        folder_path = self.json_base_path / folder_name
        folder_path.mkdir(parents=True, exist_ok=True)
        return folder_path

    def extract_order_levels_flat(self, data_dict, price_prefix, quantity_prefix, levels=10, field_prefix=''):
        orders = {}
        for i in range(1, levels + 1):
            price_key = f"{price_prefix}{i}"
            quantity_key = f"{quantity_prefix}{i}"
            price = data_dict.get(price_key)
            quantity = data_dict.get(quantity_key)
            if price is not None and quantity is not None:
                orders[f"{field_prefix}price_{i}"] = price
                orders[f"{field_prefix}quantity_{i}"] = quantity
        return orders

    def structure_data(self, data_dict, timestamp):
        """データ辞書を平坦な構造に変換"""
        stock_code = str(data_dict.get("銘柄コード"))
        if '.' in stock_code:
            stock_code = stock_code.rstrip('0').rstrip('.')
        
        structured_data = {
            "timestamp": timestamp,
            "stock_code": stock_code,
            "c_price": data_dict.get("現在値"),
            "c_price_time": data_dict.get("現在値詳細時刻"),
            "c_price_f": data_dict.get("現在値フラグ"),
            "c_price_tick": data_dict.get("現在値ティック"),
            "volume": data_dict.get("出来高"),
            "trading_value": data_dict.get("売買代金"),
            "vwap": data_dict.get("出来高加重平均"),
            
            # best_sell フィールドをフラット化
            "best_ask_p": data_dict.get("最良売気配値"),
            "best_ask_q": data_dict.get("最良売気配数量"),
            "best_ask_time": data_dict.get("最良売気配詳細時刻"),
            
            # best_buy フィールドをフラット化
            "best_bid_p": data_dict.get("最良買気配値"),
            "best_bid_q": data_dict.get("最良買気配数量"),
            "best_bid_time": data_dict.get("最良買気配詳細時刻"),
        }

        # ask レベルをフラットに追加
        ask_levels = self.extract_order_levels_flat(
            data_dict,
            price_prefix="最良売気配値",
            quantity_prefix="最良売気配数量",
            levels=10,
            field_prefix="ask_"
        )
        structured_data.update(ask_levels)

        # bid レベルをフラットに追加
        bid_levels = self.extract_order_levels_flat(
            data_dict,
            price_prefix="最良買気配値",
            quantity_prefix="最良買気配数量",
            levels=10,
            field_prefix="bid_"
        )
        structured_data.update(bid_levels)

        # 追加項目をフラットに追加
        additional_fields = {
            "sell_market_order_q": data_dict.get("売成行数量"),
            "buy_market_order_q": data_dict.get("買成行数量"),
            "over_q": data_dict.get("OVER気配数量"),
            "under_q": data_dict.get("UNDER気配数量"),
            "sp_sell_flag": data_dict.get("特別売気配フラグ"),
            "sp_buy_flag": data_dict.get("特別買気配フラグ"),
            "f1": data_dict.get("歩み1"),
            "f2": data_dict.get("歩み2"),
            "f3": data_dict.get("歩み3"),
            "f4": data_dict.get("歩み4"),
            "f1_time": data_dict.get("歩み1詳細時刻"),
            "f2_time": data_dict.get("歩み2詳細時刻"),
            "f3_time": data_dict.get("歩み3詳細時刻"),
            "f4_time": data_dict.get("歩み4詳細時刻"),
        }
        structured_data.update(additional_fields)

        return structured_data

    def get_current_jst_timestamp(self):
        """日本標準時 (JST) の現在時刻をミリ秒精度のタイムスタンプで取得"""
        jst = timezone(timedelta(hours=9))  # JSTタイムゾーン (UTC+9)
        return datetime.now(jst).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    
    def calculate_hash(self, data):
        """データのハッシュ値を計算(タイムスタンプを除外)"""
        # タイムスタンプを除外したデータを生成
        data_without_timestamp = {k: v for k, v in data.items() if k != "timestamp"}
        data_string = json.dumps(data_without_timestamp, sort_keys=True).encode('utf-8')
        return hashlib.md5(data_string).hexdigest()

    def has_data_changed(self, current_data):
        """データが変更されたかを判定"""
        stock_code = current_data["stock_code"]
        current_hash = self.calculate_hash(current_data)
        if self.previous_hashes.get(stock_code) != current_hash:
            self.previous_hashes[stock_code] = current_hash
            return True
        return False

    def append_to_json(self, folder_path, stock_code, data):
        """指定されたフォルダと銘柄コードに基づいてJSONファイルにデータを追記"""
        json_file = folder_path / f"{stock_code}_order_book.json"
        with open(json_file, 'a', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False)
            f.write('\n')  # NDJSON形式では各オブジェクトが新しい行にある

    def close_excel(self):
        """Excelアプリケーションを閉じる"""
        try:
            wb = xw.apps[self.pid].books.active
            wb.close()
        except Exception as e:
            print(f"Excelブックのクローズに失敗しました: {e}")
        try:
            xw.apps[self.pid].quit()
        except Exception as e:
            print(f"Excelアプリケーションのクローズに失敗しました: {e}")

    def start_data_reader(self):
        """data_readerプロセスを開始するメソッド"""
        if self.reader_process is not None and self.reader_process.is_alive():
            print("既にdata_readerプロセスが動作しています。")
            return
        self.reader_process = Process(target=self.data_reader)
        self.reader_process.start()
        print(f"data_readerプロセスを開始しました。PID: {self.reader_process.pid}")

    def stop_data_reader(self):
        """data_readerプロセスを停止するメソッド"""
        if self.reader_process is not None:
            if self.reader_process.is_alive():
                print(f"data_readerプロセスを終了します。PID: {self.reader_process.pid}")
                self.reader_process.terminate()
                self.reader_process.join()
                print("data_readerプロセスを終了しました。")
            self.reader_process = None

    def get_interval_by_time(self, current_time):
        def time_in_range(start, end, current):
            """時刻が指定範囲内かチェック"""
            if start <= end:
                return start <= current <= end
            else:  # 範囲が深夜をまたぐ場合
                return start <= current or current <= end

        # 時間帯の定義
        intervals = [
            (datetime_time(8, 0), datetime_time(8, 55), 5.0),      # 8:00-8:55 -> 5秒
            (datetime_time(8, 55), datetime_time(11, 30), 0.5),     # 8:55-11:30 -> 0.5秒
            (datetime_time(11, 30), datetime_time(12, 25), 5.0),   # 11:30-12:25 -> 5秒
            (datetime_time(12, 25), datetime_time(15, 30), 0.5),    # 12:25-15:30 -> 0.5秒
        ]

        # 現在時刻に対応する間隔を返す
        for start, end, interval in intervals:
            if time_in_range(start, end, current_time):
                return interval
        
        return 0.5  # デフォルト値(該当する時間帯がない場合)

    def data_reader(self):
        """定期的にデータを取得し、キューに追加"""
        while not self.stop_flag:
            start_time = time.time()
            current_time = datetime.now().time()
            interval = self.get_interval_by_time(current_time)

            self.fetch_data_and_put_queue()

            # 経過時間を計算し、必要な待機時間を調整
            elapsed_time = time.time() - start_time
            if elapsed_time < interval:
                time.sleep(interval - elapsed_time)

    def fetch_data_and_put_queue(self):
        """データを取得してキューに追加する補助関数"""
        try:
            wb = xw.apps[self.pid].books.active
            sht = wb.sheets[0]
            data = sht.range(self.data_range).value
            timestamp = self.get_current_jst_timestamp()
            if isinstance(data, list) and isinstance(data[0], list):
                for row_data in data:
                    self.data_queue.put((dict(zip(self.headers, row_data)), timestamp))
            else:
                self.data_queue.put((dict(zip(self.headers, data)), timestamp))
        except Exception as e:
            print(f"データ取得に失敗しました: {e}")

    def data_processor(self):
        """キューからデータを受け取り、差分を確認し、変更があれば記録"""
        folder_path = self.get_current_date_folder()
        last_data_time = time.time()
        while not self.stop_flag or not self.data_queue.empty():
            try:
                data_dict, timestamp = self.data_queue.get(timeout=1)  # 1秒待機
                last_data_time = time.time()  # データ受信時に更新
                # 差分判定と記録の処理
                structured_data = self.structure_data(data_dict, timestamp)
                if self.has_data_changed(structured_data):
                    stock_code = structured_data["stock_code"]
                    self.append_to_json(folder_path, stock_code, structured_data)
                    #print(f"銘柄コード {stock_code} のデータが更新されました: {structured_data['timestamp']}")
            except Empty:
                current_time = time.time()
                if current_time - last_data_time > 5.1:  # 5秒以上データが来なかった場合
                    print("データ受信が5秒以上ありません。data_readerプロセスを再起動します。")
                    self.stop_data_reader()
                    time.sleep(1)  # 再起動前に1秒待機
                    self.start_data_reader()
                    last_data_time = time.time()  # リセット
                continue  # キューが空の場合はスキップ
            except Exception as e:
                print("データ処理に失敗しました:", e)  # 例外内容の出力
                continue  # エラー発生時もループを続行

    def monitor(self):
        """監視プロセスの開始"""
        try:
            self.create_excel()
            self.initialize_excel()
            # データ取得プロセスとデータ処理スレッドの分割
            self.start_data_reader()  # データ取得プロセスを開始
            processor_thread = Thread(target=self.data_processor, daemon=True)
            
            processor_thread.start()
            
            # 停止時刻の設定
            stop_time = datetime.combine(datetime.today(), datetime_time(15, 31))  # 15:31を設定
            
            # プロセスとスレッドの実行を監視
            while processor_thread.is_alive():
                # 現在時刻を確認して停止フラグを立てる
                if datetime.now() >= stop_time:
                    print("停止時刻に達しました。停止処理を開始します...")
                    self.stop_flag = True  # 停止フラグを設定してループを終了させる
                    break
                
                processor_thread.join(timeout=1)

        except KeyboardInterrupt:
            print("停止処理を開始しています...")
            self.stop_flag = True  # 停止フラグを設定してループを終了させる
        finally:
            # すべてのリソースを解放する
            self.stop_data_reader()  # data_readerプロセスを停止
            self.close_excel()  # 終了時にExcelを閉じる

            print("監視を停止しました。")


if __name__ == "__main__":
    # パラメータの設定
    EXCEL_PATH = "rakuten_rss.xlsx"  # Excelファイルのパス
    INIT_CODE = ["2936", "4499", "190A"]  
    #INIT_CODE = get_watchlist_codes()
    JSON_BASE_PATH = "./rakuten_rss_data"  # JSONファイルを保存するベースのフォルダ

    target_time = datetime_time(8, 0)  # 8:00 AM

    while True:
        current_time = datetime.now().time()
        if current_time >= target_time and current_time:
            print(f"現在時刻: {current_time}. 処理を開始します。")
            # モニターのインスタンス作成
            monitor = OrderBookMonitor(EXCEL_PATH, INIT_CODE, JSON_BASE_PATH)
            monitor.monitor()
            break  # メイン処理が終了したらループを抜ける
        else:
            print(f"現在時刻: {current_time}. 開始時刻 {target_time} まで待機中...")
            time.sleep(60) 

Discussion