🐳
バイタルサイン可視化システムを構築した話
はじめに
非接触のミリ波センサーを使って呼吸と心拍をリアルタイムで検出・解析するシステムを開発しました。
本記事では、シミュレーション版として実装したPythonコードについて説明します。
必要な環境とライブラリ
Pythonバージョン
Python 3.8以上(推奨:3.9+)
必要なライブラリ
システムの動作には以下のライブラリが必要です:
必須ライブラリ
# 数値計算・信号処理
numpy>=1.21.0
scipy>=1.7.0
# GUI・可視化
PyQt5>=5.15.0 # または PyQt6>=6.2.0
pyqtgraph>=0.12.0
# 標準ライブラリ(追加インストール不要)
sys, time, collections, traceback, warnings
標準的なpipでのインストール方法
# 基本パッケージ
pip install numpy scipy
# GUI関連(PyQt5の場合)
pip install PyQt5 pyqtgraph
# または PyQt6を使用する場合
pip install PyQt6 pyqtgraph
システム構成とステップ解説
STEP 1: GUI ライブラリの自動判定
try:
from PyQt5 import QtWidgets, QtCore
import pyqtgraph as pg
QT_FLAVOR = 'PyQt5'
QT_IS_PYQT6 = False
except ImportError:
try:
from PyQt6 import QtWidgets, QtCore
import pyqtgraph as pg
QT_FLAVOR = 'PyQt6'
QT_IS_PYQT6 = True
except ImportError:
print("エラー: PyQt5またはPyQt6がインストールされていません")
sys.exit(1)
PyQt5とPyQt6の両方に対応するため、自動検出機能を実装しています。どちらかがインストールされていれば動作します。
STEP 2: シミュレーション用センサーデータ生成
class SimulatedSensorData:
def __init__(self, frame_rate=20):
self.frame_rate = frame_rate
self.breathing_frequency = 0.25 # 15 bpm
self.heartbeat_frequency = 1.2 # 72 bpm
実際のミリ波センサーの動作を模擬するクラスです。以下の要素を含む現実的なデータを生成します:
- 呼吸による位相変動: 0.25Hz(15 bpm)の正弦波
- 心拍による微細変動: 1.2Hz(72 bpm)の小振幅変調
- 距離減衰: 実際のレーダーと同様の距離特性
- ノイズ: 現実的なレベルのランダムノイズ
生成されるIQデータの特徴
def generate_frame(self):
# バイタルサインによる位相変調
breathing_phase = self.breathing_amplitude * np.sin(2 * np.pi * self.breathing_frequency * current_time)
heartbeat_phase = self.heartbeat_amplitude * np.sin(2 * np.pi * self.heartbeat_frequency * current_time)
# 複素IQデータ生成: I + jQ = amplitude * exp(j * phase)
iq_frame[range_bin] = amplitude * np.exp(1j * phase)
STEP 3: リアルタイム解析エンジン
3-1: デジタルフィルタ設計
def _initialize_filters(self):
# DC成分除去用ローパスフィルタ(0.1Hz)
self.lpf_b, self.lpf_a = butter(2, lpf_cutoff, btype='low')
# 呼吸帯域バンドパスフィルタ(0.1-1.0Hz)
self.bpf_b, self.bpf_a = butter(4, [low_freq, high_freq], btype='band')
バイタルサイン抽出のための2段階フィルタリング:
- ローパスフィルタ: 静的なDC成分を除去
- バンドパスフィルタ: 呼吸周波数帯域(0.1-1.0Hz)を抽出
3-2: スライディングウィンドウ解析
def process_new_frame(self, iq_data):
# バッファに新フレーム追加
self.iq_buffer.append(target_iq)
# 十分なデータが溜まったら解析実行
if len(self.iq_buffer) >= self.window_size:
results = self._perform_vital_analysis()
効率的なリアルタイム処理のために、スライディングウィンドウ方式を採用:
- 短期バッファ: 50フレーム(2.5秒)の解析用
- 長期バッファ: 1000フレーム(50秒)の表示用
- 更新頻度制御: 20フレームに1回のGUI更新
3-3: 信号処理チェーン
def _perform_vital_analysis(self):
# ステップ1: DC成分除去
lpfout = signal.lfilter(self.lpf_b, self.lpf_a, iq_array)
sig_dcoff = iq_array - lpfout
# ステップ2: 位相抽出とアンラップ
sig_angle_raw = np.angle(sig_dcoff)
sig_angle = np.unwrap(sig_angle_raw)
# ステップ3: バンドパスフィルタリング
bpfout = signal.lfilter(self.bpf_b, self.bpf_a, sig_angle)
STEP 4: 4画面同時可視化システム
class RealTimeVisualizationWindow(QtWidgets.QMainWindow):
def __init__(self):
# 2x2グリッドレイアウト
layout = QtWidgets.QGridLayout(central_widget)
# プロット (A): I波形表示(左上)
# プロット (B): 距離スペクトラム(右上)
# プロット (C): 位相信号(左下)
# プロット (D): 周波数スペクトラム(右下)
リアルタイムで更新される4つのプロット:
| プロット | 表示内容 | 用途 |
|---|---|---|
| (A) I波形 | 実部データの時系列 | 生信号の確認 |
| (B) 距離スペクトラム | 距離毎の反射強度 | 解析位置の設定 |
| (C) 位相信号 | BPF後の呼吸変動 | バイタルサイン波形 |
| (D) 周波数解析 | FFTスペクトラム | 呼吸数の定量化 |
STEP 5: メインアプリケーション統合
class MainApplication(QtWidgets.QApplication):
def run_simulation_loop(self):
target_fps = 20
while True:
# センサーデータ生成
iq_frame_data = self.sensor_simulator.generate_frame()
# リアルタイム解析実行
analysis_results = self.analyzer.process_new_frame(iq_frame_data)
# フレームレート制御
time.sleep(frame_interval)
実機データへの移行方法
実機センサーとの接続
シミュレーション版から実機版への移行は、SimulatedSensorDataクラスを実際のセンサーインターフェースに置き換えるだけです。
# シミュレーター部分を置換
# self.sensor_simulator = SimulatedSensorData(frame_rate=20)
class Interface:
def __init__(self):
self.client = Client.open() # USB接続
# センサー設定
self.sensor_config = SensorConfig(
start_point=80, # 開始距離ポイント
num_points=160, # 測定ポイント数
step_length=24, # ステップ長
frame_rate=20.0, # フレームレート
sweep_rate=2000, # スイープレート
sweeps_per_frame=16 # フレーム毎スイープ数
)
self.session_config = SessionConfig(
[self.sensor_config], update_rate=20
)
# セッション開始
self.client.setup_session(self.session_config)
self.client.start_session()
def get_next_frame(self):
"""実機から1フレーム取得"""
result = self.client.get_next()
return result.frame # IQデータを返す
def close(self):
"""セッション終了"""
self.client.close()
メインループの変更
# シミュレーション版
iq_frame_data = self.sensor_simulator.generate_frame()
# 実機版に変更
iq_frame_data = self.acconeer_sensor.get_next_frame()
キャリブレーション調整
実機使用時には以下のパラメータ調整が必要です:
# 距離校正パラメータ
self.base_step_length_m = 0.0025 # センサー仕様に合わせて調整
self.start_point = 80 # 測定開始位置
# 信号処理パラメータ
self.target_position = 30 # 解析対象距離ポイント
self.breathing_frequency_range = (0.1, 1.0) # Hz
self.noise_level = 0.05 # 実測ノイズレベル
# フィルタ設計パラメータ
self.window_size = 50 # 解析ウィンドウサイズ
self.frame_freq = 20 # 実際のフレームレート
実機でのトラブルシューティング
よくある問題と対策
# データ品質チェック機能
def validate_frame_data(self, iq_data):
# NaN/Inf値のチェック
if np.any(np.isnan(iq_data)) or np.any(np.isinf(iq_data)):
return False
# 信号レベルチェック(過小/過大チェック)
signal_power = np.mean(np.abs(iq_data)**2)
if signal_power < 1e-10 or signal_power > 1e10:
return False
return True
# エラーハンドリング強化
def process_new_frame(self, iq_data):
try:
if not self.validate_frame_data(iq_data):
print("警告: 異常なフレームデータを検出")
return None
# 通常の解析処理
results = self._perform_vital_analysis()
return results
except Exception as e:
print(f"フレーム処理エラー: {e}")
return None
まとめ
本記事では、ミリ波センサーを用いたリアルタイムバイタルサイン解析システムの実装について詳しく解説しました。
システムの特徴
- 非接触測定: レーダーによる呼吸・心拍の遠隔検出
- リアルタイム処理: 20Hzでのライブ解析と表示
- 4画面同時表示: 多角的なデータ可視化
- 実機対応: シミュレーションから実センサーへの容易な移行
ソースコード
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
=============================================================================
リアルタイムバイタルサイン解析システム(シミュレーションモード)
=============================================================================
ミリ波センサーによるバイタルサイン(呼吸・心拍)の
リアルタイム解析システムのシミュレーション版
主な機能:
- シミュレーションされたミリ波レーダーデータ生成
- スライディングウィンドウによるリアルタイム解析
- 位相信号から呼吸周波数を抽出
- 4画面同時表示(I波形、距離スペクトラム、位相信号、周波数解析)
Python Version: 3.8+
Dependencies: numpy, scipy, PyQt5/PyQt6, pyqtgraph
"""
import numpy as np
import sys
import time
from collections import deque
import traceback
# =============================================================================
# STEP 1: GUI ライブラリの自動判定とインポート
# =============================================================================
try:
from PyQt5 import QtWidgets, QtCore
import pyqtgraph as pg
QT_FLAVOR = 'PyQt5'
QT_IS_PYQT6 = False
except ImportError:
try:
from PyQt6 import QtWidgets, QtCore
import pyqtgraph as pg
QT_FLAVOR = 'PyQt6'
QT_IS_PYQT6 = True
except ImportError:
print("エラー: PyQt5またはPyQt6がインストールされていません")
sys.exit(1)
from scipy.signal import butter, filtfilt
from scipy import signal
import warnings
warnings.filterwarnings('ignore')
class SimulatedSensorData:
"""
==========================================================================
STEP 2: シミュレーション用センサーデータ生成クラス
==========================================================================
現実的なバイタルサインデータを生成
"""
def __init__(self, frame_rate=20):
"""
シミュレーターの初期化
Args:
frame_rate (int): データ生成レート [Hz]
"""
self.frame_rate = frame_rate
self.frame_count = 0
self.start_time = time.time()
# === シミュレーション設定パラメータ ===
self.num_range_bins = 160 # 距離ビン数(実機と同じ)
self.breathing_frequency = 0.25 # 呼吸周波数 [Hz] = 15 bpm
self.breathing_amplitude = 0.1 # 呼吸による位相変動の振幅
self.heartbeat_frequency = 1.2 # 心拍周波数 [Hz] = 72 bpm
self.heartbeat_amplitude = 0.02 # 心拍による位相変動の振幅
self.noise_level = 0.05 # ノイズレベル
def _generate_background_profile(self, current_time=0):
"""
現実的な距離プロファイルの背景信号を生成
Args:
current_time (float): 現在のシミュレーション時間
Returns:
numpy.ndarray: 距離ビン毎の背景振幅
"""
range_bins = np.arange(self.num_range_bins)
# === 距離による減衰を模擬 ===
distance_factor = np.exp(-0.01 * range_bins)
# === 反射体の配置設定 ===
chest_position = 30 # 人体胸部の位置(距離ビン30付近)
wall_position = 120 # 後方の壁の位置
# === 時間変化する振幅変調 ===
# 呼吸による胸部反射強度の変化(±10%の変動)
breathing_modulation = 1.0 + 0.1 * np.sin(2 * np.pi * self.breathing_frequency * current_time)
# 心拍による微細な変動(±2%の変動)
heartbeat_modulation = 1.0 + 0.02 * np.sin(2 * np.pi * self.heartbeat_frequency * current_time)
# === ガウシアン分布で各反射体の強度を模擬 ===
# 胸部からの反射(バイタルサインによる動的変化あり)
chest_reflection = (50 * breathing_modulation * heartbeat_modulation *
np.exp(-0.5 * ((range_bins - chest_position) / 5)**2))
# 壁からの反射(静的な反射体 + 微小ノイズ)
wall_reflection = (30 * (1.0 + 0.01 * np.sin(2 * np.pi * 0.05 * current_time)) *
np.exp(-0.5 * ((range_bins - wall_position) / 10)**2))
# === 時間変化する背景ノイズ ===
# 基本ノイズレベル + 時間変化ノイズ
background_noise = (10 + 2 * np.sin(2 * np.pi * 0.1 * current_time) +
3 * np.random.randn(self.num_range_bins))
# === 全体の振幅プロファイル ===
total_amplitude = chest_reflection + wall_reflection + background_noise
return total_amplitude * distance_factor
def generate_frame(self):
"""
1フレーム分のIQデータを生成
Returns:
numpy.ndarray: 複素数IQデータ(距離ビン×1フレーム)
"""
# === 現在のシミュレーション時間計算 ===
current_time = (self.frame_count / self.frame_rate)
# === 動的な背景信号を生成 ===
background_amplitude = self._generate_background_profile(current_time)
# === バイタルサインによる位相変調の計算 ===
# 呼吸による位相変調(主成分)
breathing_phase = (self.breathing_amplitude *
np.sin(2 * np.pi * self.breathing_frequency * current_time))
# 心拍による位相変調(呼吸に重畳される微細変動)
heartbeat_phase = (self.heartbeat_amplitude *
np.sin(2 * np.pi * self.heartbeat_frequency * current_time))
# 総合位相変調
total_phase_modulation = breathing_phase + heartbeat_phase
# === 距離ビン毎のIQデータ生成 ===
iq_frame = np.zeros(self.num_range_bins, dtype=complex)
for range_bin in range(self.num_range_bins):
# 動的背景振幅の取得
amplitude = background_amplitude[range_bin]
# 胸部位置でのみバイタルサインを重畳
if 25 <= range_bin <= 35: # 胸部領域(距離ビン25-35)
# バイタルサインによる位相変調を適用
phase = total_phase_modulation + np.random.normal(0, self.noise_level)
else:
# その他の位置は静的な位相にノイズのみ
phase = np.random.normal(0, self.noise_level * 0.1)
# === 複素IQデータの生成 ===
# I + jQ = amplitude * exp(j * phase)
iq_frame[range_bin] = amplitude * np.exp(1j * phase)
# フレームカウンタの更新
self.frame_count += 1
return iq_frame
def get_frame_info(self):
"""
現在のフレーム情報を取得
Returns:
dict: フレーム統計情報
"""
current_time = time.time() - self.start_time
return {
'frame_count': self.frame_count,
'elapsed_time': current_time,
'breathing_bpm': self.breathing_frequency * 60,
'heartbeat_bpm': self.heartbeat_frequency * 60
}
class RealTimeVitalAnalyzer(QtCore.QObject):
"""
==========================================================================
STEP 3: リアルタイムバイタルサイン解析クラス
==========================================================================
スライディングウィンドウ方式でバイタルサインをリアルタイム解析
"""
# === PyQtシグナル定義(データ更新通知用) ===
data_updated = QtCore.pyqtSignal(dict)
def __init__(self, frame_freq=20, window_size=50, target_position=30):
"""
解析システムの初期化
Args:
frame_freq (int): フレーム周波数 [Hz]
window_size (int): スライディングウィンドウサイズ [フレーム数]
target_position (int): 解析対象の距離ポイント
"""
super().__init__()
self.frame_freq = frame_freq
self.window_size = window_size
self.target_position = target_position
# === データバッファの初期化 ===
# 短期解析用バッファ(スライディングウィンドウ)
self.iq_buffer = deque(maxlen=window_size)
# 長期表示用バッファ(50秒分のデータ保持)
display_buffer_size = int(frame_freq * 50) # 50秒 × フレームレート
self.display_buffer = deque(maxlen=display_buffer_size)
# I波形の時系列データ保存用
i_wave_buffer_size = int(frame_freq * 50) # 50秒分
self.i_wave_buffer = deque(maxlen=i_wave_buffer_size)
self.i_wave_timestamps = deque(maxlen=i_wave_buffer_size)
self.frame_count = 0
self.start_time = time.time()
# ===ミリ波センサの距離計算パラメータ ===
self.base_step_length_m = 0.0025 # 1距離ポイント = 2.5mm
self.start_point = 80 # 開始距離ポイント
# === 描画制御パラメータ ===
self.draw_interval = 20 # 20フレームに1回描画更新
# 解析結果保存用バッファ
self.results_buffer = deque(maxlen=1000)
# === デジタルフィルタの初期化 ===
self._initialize_filters()
def _initialize_filters(self):
"""
=======================================================================
STEP 3-1: デジタルフィルタの設計と初期化
=======================================================================
バイタルサイン抽出用のバンドパスフィルタを設計
"""
nyquist_freq = self.frame_freq / 2 # ナイキスト周波数
# === DC成分除去用ローパスフィルタ(0.1Hz) ===
lpf_cutoff = 0.1 / nyquist_freq
if lpf_cutoff >= 1.0:
lpf_cutoff = 0.95 # 安定性確保
# 2次バターワースフィルタ
self.lpf_b, self.lpf_a = butter(2, lpf_cutoff, btype='low')
# === 呼吸帯域バンドパスフィルタ(0.1-1.0Hz) ===
max_freq = min(1.0, nyquist_freq * 0.9) # 最大周波数制限
low_freq = 0.1 / nyquist_freq # 下限周波数
high_freq = max_freq / nyquist_freq # 上限周波数
# 周波数範囲の調整
if high_freq >= 1.0:
high_freq = 0.95
if low_freq >= high_freq:
low_freq = high_freq * 0.1
# 4次バターワースバンドパスフィルタ
self.bpf_b, self.bpf_a = butter(4, [low_freq, high_freq], btype='band')
self.max_freq = max_freq
def calculate_amplitude(self, iq_data):
"""
IQデータから振幅スペクトラムを計算
Args:
iq_data (numpy.ndarray): 複素数IQデータ
Returns:
numpy.ndarray: dB単位の振幅データ
"""
try:
# === 振幅計算: |I + jQ| ===
amplitude_linear = np.abs(iq_data)
# === dB変換: 20*log10(amplitude) ===
# 0除算を防ぐため小さな値を追加
amplitude_db = 20 * np.log10(amplitude_linear + 1e-12)
return amplitude_db
except Exception as e:
print(f"振幅計算エラー: {e}")
return None
def convert_range_bins_to_distance(self, range_bins):
"""
距離ビン番号を実際の距離[m]に変換
Args:
range_bins (numpy.ndarray): 距離ビン番号配列
Returns:
numpy.ndarray: 実距離[m]配列
"""
try:
# === ミリ波センサの距離計算式 ===
# distance [m] = (start_point + range_bin) * base_step_length_m
distances_m = (self.start_point + range_bins) * self.base_step_length_m
return distances_m
except Exception as e:
print(f"距離変換エラー: {e}")
return range_bins * 0.0025 # フォールバック値
def _perform_phase_fft_analysis(self, bpf_signal, time_axis):
"""
=======================================================================
STEP 3-2: 位相信号のFFT周波数解析
=======================================================================
BPF処理後の位相信号から呼吸周波数を抽出
Args:
bpf_signal (numpy.ndarray): バンドパス後の位相信号
time_axis (numpy.ndarray): 時間軸データ
Returns:
dict: FFT解析結果(周波数、パワー、ピーク情報)
"""
try:
if len(bpf_signal) < 32: # 最小データ長チェック
return None
# === FFT実行 ===
fft_result = np.fft.rfft(bpf_signal) # 実数FFT
fft_magnitude = np.abs(fft_result) # 振幅スペクトラム
# === 周波数軸の生成 ===
dt = time_axis[1] - time_axis[0] if len(time_axis) > 1 else 1.0 / self.frame_freq
frequencies = np.fft.rfftfreq(len(bpf_signal), d=dt)
# === パワースペクトラム計算 ===
power_linear = fft_magnitude ** 2
power_db = 10 * np.log10(power_linear + 1e-12) # dB変換
# === 呼吸周波数帯域でのピーク検出 ===
breathing_range = (frequencies >= 0.1) & (frequencies <= 1.0)
if np.any(breathing_range):
valid_frequencies = frequencies[breathing_range]
valid_power = power_db[breathing_range]
# 最大パワーの周波数を検出
peak_idx = np.argmax(valid_power)
peak_frequency = valid_frequencies[peak_idx]
peak_power = valid_power[peak_idx]
# === BPM計算 ===
bpm = peak_frequency * 60.0 # Hz → bpm変換
else:
peak_frequency = 0.0
peak_power = 0.0
bpm = 0.0
return {
'frequencies': frequencies,
'power_db': power_db,
'peak_frequency': peak_frequency,
'peak_power': peak_power,
'bpm': bpm
}
except Exception as e:
print(f"FFT解析エラー: {e}")
return None
def process_new_frame(self, iq_data):
"""
=======================================================================
STEP 3-3: 新フレームのスライディングウィンドウ解析
=======================================================================
新しいフレームデータを解析し、結果をGUIに送信
Args:
iq_data (numpy.ndarray): 新しいIQデータフレーム
Returns:
dict: 解析結果(初期化期間中はNone)
"""
try:
# === 複数スイープの平均化処理 ===
if iq_data.ndim == 2 and iq_data.shape[0] > 1:
averaged_sweep = np.mean(iq_data, axis=0)
elif iq_data.ndim == 2:
averaged_sweep = iq_data[0]
else:
averaged_sweep = iq_data
# === I波形データ抽出(実部) ===
i_wave_data = np.real(averaged_sweep)
# === 時間管理 ===
current_time = time.time() - self.start_time
# I波形データの時系列保存
self.i_wave_buffer.append(i_wave_data)
self.i_wave_timestamps.append(current_time)
# === 距離スペクトラム計算 ===
amplitude_data = self.calculate_amplitude(averaged_sweep)
range_bins = np.arange(len(averaged_sweep))
distances_m = self.convert_range_bins_to_distance(range_bins)
# === 対象距離ポイントのIQデータ抽出 ===
if self.target_position >= len(averaged_sweep):
return None
target_iq = averaged_sweep[self.target_position]
# === バッファへのデータ追加 ===
self.iq_buffer.append(target_iq) # 解析用(短期)
self.display_buffer.append(target_iq) # 表示用(長期)
self.frame_count += 1
# === 描画更新制御 ===
should_update_display = (self.frame_count % self.draw_interval == 0)
# === GUI更新: I波形データ送信 ===
if should_update_display and len(self.i_wave_buffer) > 0:
# 全I波形データの時系列結合
all_i_wave_data = []
all_timestamps = []
for frame_data, timestamp in zip(self.i_wave_buffer, self.i_wave_timestamps):
frame_length = len(frame_data)
sample_duration = 1.0 / self.frame_freq
sample_dt = sample_duration / frame_length
for sample_idx, sample_value in enumerate(frame_data):
sample_time = timestamp + (sample_idx * sample_dt)
all_i_wave_data.append(sample_value)
all_timestamps.append(sample_time)
# I波形データをGUIに送信
i_data = {
'type': 'i_wave',
'i_wave_data': np.array(all_i_wave_data),
'time_axis': np.array(all_timestamps),
'current_time': current_time
}
self.data_updated.emit(i_data)
# === GUI更新: 距離スペクトラムデータ送信 ===
if should_update_display:
target_distance_m = self.convert_range_bins_to_distance(
np.array([self.target_position]))[0]
range_data = {
'type': 'range_sweep',
'distances_m': distances_m,
'amplitude_data': amplitude_data,
'target_distance_m': target_distance_m
}
self.data_updated.emit(range_data)
# === 初期化待機期間の処理 ===
if len(self.iq_buffer) < self.window_size:
return None
# === メイン解析実行 ===
results = self._perform_vital_analysis()
if results:
# 表示用長期解析
display_results = self._perform_display_analysis()
# 結果に追加情報を付加
results['amplitude_data'] = amplitude_data
results['distances_m'] = distances_m
results['i_wave_data'] = i_wave_data
results['current_time'] = current_time
# === GUI更新: 位相解析とFFT結果送信 ===
if should_update_display and display_results:
# 位相信号データ送信
phase_data = {
'type': 'phase_analysis',
'time_axis': display_results['time_axis'],
'bpfout': display_results['bpfout']
}
self.data_updated.emit(phase_data)
# FFT解析結果送信
fft_results = self._perform_phase_fft_analysis(
display_results['bpfout'], display_results['time_axis'])
if fft_results:
fft_data = {
'type': 'phase_fft',
'frequencies': fft_results['frequencies'],
'power_db': fft_results['power_db'],
'peak_frequency': fft_results['peak_frequency'],
'peak_power': fft_results['peak_power'],
'bpm': fft_results['bpm']
}
self.data_updated.emit(fft_data)
# 結果をバッファに保存
results['frame_number'] = self.frame_count
results['timestamp'] = time.time()
self.results_buffer.append(results)
return results
except Exception as e:
print(f"フレーム処理エラー: {e}")
return None
def _perform_display_analysis(self):
"""
表示用長期解析(50秒分のデータ使用)
Returns:
dict: 長期解析結果
"""
try:
if len(self.display_buffer) < self.window_size:
return None
# === データバッファを配列に変換 ===
iq_array = np.array(list(self.display_buffer))
# データ品質チェック
if np.any(np.isnan(iq_array)) or np.any(np.isinf(iq_array)):
return None
# === デジタル信号処理チェーン ===
# ステップ1: DC成分抽出
lpfout = signal.lfilter(self.lpf_b, self.lpf_a, iq_array)
# ステップ2: 静的成分除去
sig_dcoff = iq_array - lpfout
# ステップ3: 位相抽出とアンラップ
sig_angle_raw = np.angle(sig_dcoff)
sig_angle = np.unwrap(sig_angle_raw)
# ステップ4: 呼吸帯域バンドパスフィルタリング
bpfout = signal.lfilter(self.bpf_b, self.bpf_a, sig_angle)
# === 実時間ベースの時間軸生成 ===
data_length = len(iq_array)
current_real_time = time.time() - self.start_time # 実際の経過時間
# データの実際の時間間隔(実測FPSベース)
actual_dt = current_real_time / self.frame_count if self.frame_count > 0 else (1.0 / self.frame_freq)
# 実時間ベースの時間軸(右ずれなし)
time_end = current_real_time
time_start = time_end - (data_length * actual_dt)
time_axis = np.linspace(time_start, time_end, data_length)
return {
'time_axis': time_axis,
'sig_angle_raw': sig_angle_raw,
'sig_angle': sig_angle,
'bpfout': bpfout,
'iq_data': iq_array
}
except Exception as e:
print(f"長期解析エラー: {e}")
return None
def _perform_vital_analysis(self):
"""
短期バッファでのバイタルサイン解析
Returns:
dict: 短期解析結果
"""
try:
# === バッファデータの配列変換 ===
iq_array = np.array(list(self.iq_buffer))
# データ品質チェック
if np.any(np.isnan(iq_array)) or np.any(np.isinf(iq_array)):
return None
# === 同じ信号処理チェーン ===
lpfout = signal.lfilter(self.lpf_b, self.lpf_a, iq_array)
sig_dcoff = iq_array - lpfout
sig_angle_raw = np.angle(sig_dcoff)
sig_angle = np.unwrap(sig_angle_raw)
bpfout = signal.lfilter(self.bpf_b, self.bpf_a, sig_angle)
# 時間軸作成(短期解析用)
time_axis = np.arange(len(iq_array)) / self.frame_freq
return {
'time_axis': time_axis,
'sig_angle_raw': sig_angle_raw,
'sig_angle': sig_angle,
'bpfout': bpfout,
'iq_data': iq_array
}
except Exception as e:
print(f"短期解析エラー: {e}")
return None
def get_latest_results(self):
"""最新の解析結果を取得"""
return list(self.results_buffer)[-10:] if self.results_buffer else []
class RealTimeVisualizationWindow(QtWidgets.QMainWindow):
"""
==========================================================================
STEP 4: リアルタイム可視化ウィンドウクラス
==========================================================================
2x2グリッドレイアウトで4つのプロットを同時表示
"""
def __init__(self):
"""可視化ウィンドウの初期化"""
super().__init__()
# === pyqtgraph設定 ===
pg.setConfigOptions(antialias=False) # 高速描画モード
# === ウィンドウ基本設定 ===
self.setWindowTitle(f'リアルタイムバイタルサイン解析システム - {QT_FLAVOR}')
self.setGeometry(100, 100, 1400, 900)
# === 中央ウィジェットと2x2グリッドレイアウト ===
central_widget = QtWidgets.QWidget()
self.setCentralWidget(central_widget)
layout = QtWidgets.QGridLayout(central_widget)
# === プロット (A): I波形表示(左上) ===
self.plot_i = pg.PlotWidget(title='(A) I波形 - スイープ平均化後の実部データ')
self.plot_i.setLabel('left', '振幅', units='')
self.plot_i.setLabel('bottom', '時間', units='s')
self.plot_i.showGrid(x=True, y=True, alpha=0.3)
self.curve_i = self.plot_i.plot(pen=pg.mkPen('#00aaff', width=2))
# === プロット (B): 距離スペクトラム(右上) ===
self.plot_range = pg.PlotWidget(title='(B) 距離スペクトラム - 振幅 vs 実距離')
self.plot_range.setLabel('left', '振幅', units='dB')
self.plot_range.setLabel('bottom', '距離', units='m')
self.plot_range.showGrid(x=True, y=True, alpha=0.3)
self.curve_range = self.plot_range.plot(pen=pg.mkPen('#ffaa00', width=2))
self.plot_range.setYRange(0, 100, padding=0) # 0-100dB表示
self.target_line = None # 解析対象位置マーカー
# === プロット (C): 位相信号(左下) ===
self.plot_phase = pg.PlotWidget(title='(C) 位相信号 - BPF(0.1-1.0Hz)後の呼吸変動')
self.plot_phase.setLabel('left', '位相', units='rad')
self.plot_phase.setLabel('bottom', '時間', units='s')
self.plot_phase.showGrid(x=True, y=True, alpha=0.3)
self.curve_phase = self.plot_phase.plot(pen=pg.mkPen('#55cc55', width=2))
self.plot_phase.setYRange(-10, 10, padding=0) # 縦軸を-10から10に固定
# === プロット (D): 周波数スペクトラム(右下) ===
self.plot_phase_fft = pg.PlotWidget(title='(D) 周波数スペクトラム - 呼吸周波数解析')
self.plot_phase_fft.setLabel('left', 'パワー', units='dB')
self.plot_phase_fft.setLabel('bottom', '周波数', units='Hz')
self.plot_phase_fft.showGrid(x=True, y=True, alpha=0.3)
self.curve_phase_fft = self.plot_phase_fft.plot(pen=pg.mkPen('#cc55aa', width=2))
self.plot_phase_fft.setXRange(0.0, 1.5, padding=0) # 0-1.5Hz表示
self.plot_phase_fft.setYRange(0.0, 100, padding=0)
# === 呼吸周波数ピークマーカー ===
self.phase_peak_line = pg.InfiniteLine(
angle=90, movable=False,
pen=pg.mkPen('#44aa44', width=2)
)
self.plot_phase_fft.addItem(self.phase_peak_line)
self.phase_peak_marker = self.plot_phase_fft.plot(
[0], [0], pen=None, symbol='o', symbolSize=8,
symbolBrush=pg.mkBrush('#44aa44')
)
# === BPM数値表示用テキスト ===
self.peak_frequency_text = pg.TextItem(
text="", color=(255, 255, 255),
fill=pg.mkBrush(68, 170, 68, 180),
border=pg.mkPen(68, 170, 68, 255),
anchor=(0.5, 1.0)
)
self.plot_phase_fft.addItem(self.peak_frequency_text)
# === グリッドレイアウトへの配置 ===
layout.addWidget(self.plot_i, 0, 0) # 左上
layout.addWidget(self.plot_range, 0, 1) # 右上
layout.addWidget(self.plot_phase, 1, 0) # 左下
layout.addWidget(self.plot_phase_fft, 1, 1) # 右下
def update_i_wave_data(self, time_axis, i_wave_data, current_time):
"""
I波形データの更新
"""
try:
self.curve_i.setData(time_axis, i_wave_data)
if len(time_axis) > 0:
data_start_time = time_axis[0]
data_end_time = time_axis[-1]
if data_end_time <= 50:
display_min = max(0, data_start_time - 2)
display_max = data_end_time + 5
self.plot_i.setXRange(display_min, display_max, padding=0)
else:
self.plot_i.setXRange(data_end_time - 50, data_end_time + 2, padding=0)
except Exception as e:
print(f"I波形更新エラー: {e}")
def update_range_data(self, distances_m, amplitude_data, target_distance_m):
"""
距離スペクトラムデータの更新
"""
try:
if amplitude_data is None:
return
self.curve_range.setData(distances_m, amplitude_data)
self.plot_range.setXRange(distances_m[0], distances_m[-1], padding=0.05)
if self.target_line is not None:
self.plot_range.removeItem(self.target_line)
self.target_line = pg.InfiniteLine(
pos=target_distance_m, angle=90,
pen=pg.mkPen('r', width=2,
style=QtCore.Qt.PenStyle.DashLine if QT_IS_PYQT6 else QtCore.Qt.DashLine),
label=f'解析位置 ({target_distance_m:.3f}m)'
)
self.plot_range.addItem(self.target_line)
except Exception as e:
print(f"距離スペクトラム更新エラー: {e}")
def update_phase_data(self, time_axis, bpf_signal):
"""
位相信号データの更新
"""
try:
self.curve_phase.setData(time_axis, bpf_signal)
if len(time_axis) > 0:
data_start_time = time_axis[0]
data_end_time = time_axis[-1]
if data_end_time <= 50:
display_min = max(0, data_start_time - 2)
display_max = data_end_time + 5
self.plot_phase.setXRange(display_min, display_max, padding=0)
else:
self.plot_phase.setXRange(data_end_time - 50, data_end_time + 2, padding=0)
except Exception as e:
print(f"位相信号更新エラー: {e}")
def update_phase_fft_data(self, frequencies, power_db, peak_frequency, peak_power, bpm):
"""
周波数スペクトラムデータの更新
"""
try:
self.curve_phase_fft.setData(frequencies, power_db)
if peak_frequency > 0:
self.phase_peak_line.setPos(peak_frequency)
self.phase_peak_marker.setData([peak_frequency], [peak_power])
text_content = f"{bpm:.1f} bpm\n({peak_frequency:.3f} Hz)"
self.peak_frequency_text.setText(text_content)
text_y_position = peak_power + 5 # 5dBオフセット
self.peak_frequency_text.setPos(peak_frequency, text_y_position)
self.peak_frequency_text.show()
else:
self.peak_frequency_text.hide()
self.plot_phase_fft.setTitle(f'(D) 周波数スペクトラム - 呼吸数: {bpm:.1f} bpm')
except Exception as e:
print(f"周波数スペクトラム更新エラー: {e}")
class MainApplication(QtWidgets.QApplication):
"""
==========================================================================
STEP 5: メインアプリケーションクラス
==========================================================================
全システムの統合とシミュレーションループの実行
"""
def __init__(self, argv):
"""
アプリケーションの初期化
"""
super().__init__(argv)
print(f"リアルタイムバイタルサイン解析システム - {QT_FLAVOR}")
# === システムコンポーネントの初期化 ===
self.analyzer = RealTimeVitalAnalyzer(
frame_freq=20, # 20Hz サンプリング
window_size=50, # 50フレーム = 2.5秒のウィンドウ
target_position=30 # 距離ポイント30で解析
)
self.main_window = RealTimeVisualizationWindow()
self.analyzer.data_updated.connect(self.on_data_updated)
self.sensor_simulator = SimulatedSensorData(frame_rate=20)
self.main_window.show()
print("シミュレーションモード開始 (Ctrl-C で終了)")
def on_data_updated(self, data):
"""
データ更新イベントハンドラー
"""
try:
data_type = data.get('type')
if data_type == 'i_wave':
self.main_window.update_i_wave_data(
data['time_axis'],
data['i_wave_data'],
data['current_time']
)
elif data_type == 'range_sweep':
self.main_window.update_range_data(
data['distances_m'],
data['amplitude_data'],
data['target_distance_m']
)
elif data_type == 'phase_analysis':
self.main_window.update_phase_data(
data['time_axis'],
data['bpfout']
)
elif data_type == 'phase_fft':
self.main_window.update_phase_fft_data(
data['frequencies'],
data['power_db'],
data['peak_frequency'],
data['peak_power'],
data['bpm']
)
except Exception as e:
print(f"GUI更新エラー: {e}")
def run_simulation_loop(self):
"""
メインシミュレーションループ
"""
try:
start_time = time.time()
frame_count = 0
target_fps = 20
frame_interval = 1.0 / target_fps
while True:
try:
loop_start = time.time()
iq_frame_data = self.sensor_simulator.generate_frame()
if iq_frame_data is not None:
analysis_results = self.analyzer.process_new_frame(iq_frame_data)
frame_count += 1
# 30秒毎に統計表示
if frame_count % 600 == 0: # 30秒毎(20Hz × 30秒)
elapsed_time = time.time() - start_time
fps = frame_count / elapsed_time
print(f"[{elapsed_time:.0f}秒経過] FPS: {fps:.1f}")
self.processEvents()
# フレームレート制御
loop_end = time.time()
loop_duration = loop_end - loop_start
sleep_time = max(0, frame_interval - loop_duration)
if sleep_time > 0:
time.sleep(sleep_time)
except KeyboardInterrupt:
print("\nユーザー中断")
break
except Exception as e:
print(f"ループエラー: {e}")
break
except Exception as e:
print(f"シミュレーションエラー: {e}")
finally:
total_time = time.time() - start_time
print(f"実行終了: {frame_count:,}フレーム, {total_time:.1f}秒, 平均FPS: {frame_count/total_time:.1f}")
def main():
"""
メイン実行関数
"""
try:
app = MainApplication(sys.argv)
app.run_simulation_loop()
except Exception as e:
print(f"起動エラー: {e}")
return 1
return 0
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)
Discussion