💓
Pan–Tompkins アルゴリズム — QRS/Rピーク検出
Pan–Tompkins アルゴリズム — QRS / Rピーク検出
概要
Pan–Tompkins アルゴリズムは 1985 年に Pan と Tompkins によって提案された、リアルタイム向けの QRS 検出手法です。フィルタリング、微分、二乗、移動平均(積分器)、および適応閾値によって R 波を強調し、ピーク位置を決定します。シンプルで計算コストが低く、多くの実装や派生手法の基礎となっています。
処理の流れ(要点)
- バンドパスフィルタ
- 目的: ベースラインドリフトと高周波ノイズの除去。原論文では 5–15 Hz 程度が推奨されることが多い。
- 微分(Derivative)
- 目的: QRS の急峻な変化(立ち上がり)を強調する。
- 二乗(Squaring)
- 目的: 信号を正化して高振幅成分を強調する。
- 移動平均(積分器)
- 目的: 窓内のエネルギーを平滑化し、QRS の持続時間に対応するピーク形状を作る。
- 閾値処理とピーク選択
- 目的: 候補ピークから実際の R ピークを選ぶ。閾値は適応的に更新することで、信号振幅の変動に対応する。
実装例(1.バンドパスフィルタ)
import numpy as np
from scipy.signal import butter, filtfilt
import pandas as pd
import wfdb
import matplotlib.pyplot as plt
record = wfdb.rdrecord('100', pn_dir='mitdb')
ecg_signal = record.p_signal[:,0] # 1チャンネル目を取得
fs = record.fs # サンプリング周波数
# 0.元波形
plt.plot(ecg_signal[:2000]) # 最初の2000サンプルを表示
plt.title('Original ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()
# 1.バンドパスフィルタの設計
def bandpass_filter(signal, fs, lowcut=5.0, highcut=15.0, order=2):
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='band')
return filtfilt(b, a, signal)
filtered_bandpass_ecg = bandpass_filter(ecg_signal, fs)
#filtered_bandpass_ecgをplotする
plt.plot(filtered_bandpass_ecg[:2000]) # 最初の2000サンプルを表示
plt.title('Bandpass Filtered ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()
生波形 | バンドパスフィルタ |
---|---|
![]() |
![]() |
実装例(2.微分(Derivative))
def derivative_filter(signal):
# Pan-Tompkins original derivative approximation (5-point) 論参照
# coefficients [1, 2, 0, -2, -1] / 8
coeffs = np.array([1.0, 2.0, 0.0, -2.0, -1.0]) / 8.0
return np.convolve(signal, coeffs, mode='same')
filtered_derivative_ecg = derivative_filter(filtered_bandpass_ecg)
#plot filtered_derivative_ecg
plt.plot(filtered_derivative_ecg[:2000]) # 最初の2000サンプルを表示
plt.title('Derivative Filtered ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()
実装例(3.二乗(Squaring))
#3.二乗し、ピーク強調
squared_ecg = filtered_derivative_ecg ** 2
#plot squared_ecg
plt.plot(squared_ecg[:2000]) # 最初の2000サンプ
plt.title('Squared ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()
実装例(4.移動平均(積分器))
def moving_window_integration(signal, window_size=30):
window = np.ones(window_size) / window_size
return np.convolve(signal, window, mode='same')
# 窓幅をサンプリング周波数に基づいて設定 (例: 150ms 論文抜粋)
window_size = max(1, int((150 / 1000.0) * fs))
integrated_ecg = moving_window_integration(squared_ecg, window_size=window_size)
plt.title('Integrated ECG Signal')
plt.plot(integrated_ecg[:2000]) # 最初の2000サンプルを表示
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()
実装例(5.閾値処理とピーク選択)
# detection = integrated_ecg
min_peak_distance = int(0.3 * fs)
min_missed_distance = int(0.25 * fs)
peaks, _ = scipy.signal.find_peaks(integrated_ecg, plateau_size=(1, 1))
SPKI = 0.0
NPKI = 0.0
threshold_I1_history = np.zeros(len(integrated_ecg))
threshold_I2_history = np.zeros(len(integrated_ecg))
signal_peaks = []
last_peak = 0
last_index = -1
for index, peak in enumerate(peaks):
peak_value = integrated_ecg[peak]
threshold_I1 = NPKI + 0.25 * (SPKI - NPKI)
threshold_I1_history[peak] = threshold_I1
threshold_I2 = 0.5 * threshold_I1
threshold_I2_history[peak] = threshold_I2
if peak_value > threshold_I1 and peak > last_peak + min_peak_distance:
signal_peaks.append(peak)
# RR_missed
if len(signal_peaks) > 9:
RR_ave = (signal_peaks[-2] - signal_peaks[-10]) // 8
RR_missed = int(1.66 * RR_ave)
if peak - last_peak > RR_missed:
missed_peaks = peaks[last_index + 1:index]
missed_peaks = missed_peaks[(missed_peaks > last_peak + min_missed_distance) & (missed_peaks < peak - min_missed_distance)]
missed_peaks = missed_peaks[integrated_ecg[missed_peaks] > threshold_I2]
if len(missed_peaks) > 0:
signal_peaks[-1] = missed_peaks[np.argmax(integrated_ecg[missed_peaks])]
signal_peaks.append(peak)
last_peak = peak
last_index = index
SPKI = 0.125 * peak_value + 0.875 * SPKI
else:
NPKI = 0.125 * peak_value + 0.875 * NPKI
# Plot integrated signal and thresholds
plt.figure(figsize=(12, 4))
plt.plot(integrated_ecg[:2000], label='Integrated')
plt.scatter(peaks[peaks<2000], integrated_ecg[peaks[peaks<2000]], color='gray', s=10, label='candidate peaks')
plt.scatter([p for p in signal_peaks if p<2000], integrated_ecg[[p for p in signal_peaks if p<2000]], color='red', s=20, label='signal peaks')
plt.legend()
plt.title('Integrated signal with adaptive thresholds (I1 and I2)')
plt.show()
#可視化
plt.figure(figsize=(12, 4))
plt.plot(ecg_signal[:2000], label='Original ECG')
#棒でピークを表示
for index,peak in enumerate(signal_peaks):
if index < 6:
plt.axvline(x=peak, color='red', linestyle='--', alpha=0.7)
plt.title('Detected R-peaks on Original ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.legend()
plt.show()
ピーク候補 | 最終結果 |
---|---|
![]() |
![]() |
ピーク候補がずれているのは、積分器の影響でピークが広がるためです。実際の R ピーク位置は元の ECG 信号上で確認する必要があります。
Pan–Tompkinsの論文を真似るのが良いですが、実際の実装ではピーク位置を元の信号に戻して最も近いピークを選ぶなどの工夫が必要です。
注意点
- 上記実装は閾値を固定パーセンタイルで決めていますが、実装では信号ごとに閾値を適応的に更新するのが一般的です(原論文参照)。
- 高ノイズや心電図アーチファクトでは偽検出や見逃しが発生しやすく、事前のノイズ除去やポスト処理(波形確認・アラインメント)が重要です。
- ライブラリ実装(NeuroKit2 など)を使うとパラメータ調整や互換性の点で便利です。
参考
- Pan, J. & Tompkins, W. J., "A Real‑Time QRS Detection Algorithm", IEEE Trans. Biomed. Eng., 1985.
- NeuroKit2: https://neurokit2.readthedocs.io
- SciPy find_peaks: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks.html
Discussion