💓

Pan–Tompkins アルゴリズム — QRS/Rピーク検出

に公開

Pan–Tompkins アルゴリズム — QRS / Rピーク検出

概要

Pan–Tompkins アルゴリズムは 1985 年に Pan と Tompkins によって提案された、リアルタイム向けの QRS 検出手法です。フィルタリング、微分、二乗、移動平均(積分器)、および適応閾値によって R 波を強調し、ピーク位置を決定します。シンプルで計算コストが低く、多くの実装や派生手法の基礎となっています。

処理の流れ(要点)

  1. バンドパスフィルタ
    • 目的: ベースラインドリフトと高周波ノイズの除去。原論文では 5–15 Hz 程度が推奨されることが多い。
  2. 微分(Derivative)
    • 目的: QRS の急峻な変化(立ち上がり)を強調する。
  3. 二乗(Squaring)
    • 目的: 信号を正化して高振幅成分を強調する。
  4. 移動平均(積分器)
    • 目的: 窓内のエネルギーを平滑化し、QRS の持続時間に対応するピーク形状を作る。
  5. 閾値処理とピーク選択
    • 目的: 候補ピークから実際の R ピークを選ぶ。閾値は適応的に更新することで、信号振幅の変動に対応する。

実装例(1.バンドパスフィルタ)

import numpy as np
from scipy.signal import butter, filtfilt
import pandas as pd
import wfdb
import matplotlib.pyplot as plt

record = wfdb.rdrecord('100', pn_dir='mitdb')
ecg_signal = record.p_signal[:,0]  # 1チャンネル目を取得
fs = record.fs  # サンプリング周波数

# 0.元波形
plt.plot(ecg_signal[:2000])  # 最初の2000サンプルを表示
plt.title('Original ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()


# 1.バンドパスフィルタの設計
def bandpass_filter(signal, fs, lowcut=5.0, highcut=15.0, order=2):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, signal)

filtered_bandpass_ecg = bandpass_filter(ecg_signal, fs)

#filtered_bandpass_ecgをplotする
plt.plot(filtered_bandpass_ecg[:2000])  # 最初の2000サンプルを表示
plt.title('Bandpass Filtered ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()

生波形 バンドパスフィルタ
ECG origin ECG banpass

実装例(2.微分(Derivative))

def derivative_filter(signal):
    # Pan-Tompkins original derivative approximation (5-point) 論参照
    # coefficients [1, 2, 0, -2, -1] / 8
    coeffs = np.array([1.0, 2.0, 0.0, -2.0, -1.0]) / 8.0
    return np.convolve(signal, coeffs, mode='same')

filtered_derivative_ecg = derivative_filter(filtered_bandpass_ecg)

#plot filtered_derivative_ecg
plt.plot(filtered_derivative_ecg[:2000])  # 最初の2000サンプルを表示
plt.title('Derivative Filtered ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()

ECG derivative

実装例(3.二乗(Squaring))

#3.二乗し、ピーク強調
squared_ecg = filtered_derivative_ecg ** 2
#plot squared_ecg
plt.plot(squared_ecg[:2000])  # 最初の2000サンプ
plt.title('Squared ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()

ECG squared

実装例(4.移動平均(積分器))

def moving_window_integration(signal, window_size=30):
    window = np.ones(window_size) / window_size
    return np.convolve(signal, window, mode='same')

# 窓幅をサンプリング周波数に基づいて設定 (例: 150ms 論文抜粋)
window_size = max(1, int((150 / 1000.0) * fs))
integrated_ecg = moving_window_integration(squared_ecg, window_size=window_size)
plt.title('Integrated ECG Signal')
plt.plot(integrated_ecg[:2000])  # 最初の2000サンプルを表示
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.show()

ECG integrated

実装例(5.閾値処理とピーク選択)


# detection = integrated_ecg
min_peak_distance = int(0.3 * fs)
min_missed_distance = int(0.25 * fs)

peaks, _ = scipy.signal.find_peaks(integrated_ecg, plateau_size=(1, 1))

SPKI = 0.0
NPKI = 0.0

threshold_I1_history = np.zeros(len(integrated_ecg))
threshold_I2_history = np.zeros(len(integrated_ecg))

signal_peaks = []
last_peak = 0
last_index = -1

for index, peak in enumerate(peaks):
    peak_value = integrated_ecg[peak]
    threshold_I1 = NPKI + 0.25 * (SPKI - NPKI)
    threshold_I1_history[peak] = threshold_I1
    threshold_I2 = 0.5 * threshold_I1
    threshold_I2_history[peak] = threshold_I2

    if peak_value > threshold_I1 and peak > last_peak + min_peak_distance:
        signal_peaks.append(peak)

        # RR_missed
        if len(signal_peaks) > 9:
            RR_ave = (signal_peaks[-2] - signal_peaks[-10]) // 8
            RR_missed = int(1.66 * RR_ave)
            if peak - last_peak > RR_missed:
                missed_peaks = peaks[last_index + 1:index]
                missed_peaks = missed_peaks[(missed_peaks > last_peak + min_missed_distance) & (missed_peaks < peak - min_missed_distance)]
                missed_peaks = missed_peaks[integrated_ecg[missed_peaks] > threshold_I2]
                if len(missed_peaks) > 0:
                    signal_peaks[-1] = missed_peaks[np.argmax(integrated_ecg[missed_peaks])]
                    signal_peaks.append(peak)

        last_peak = peak
        last_index = index

        SPKI = 0.125 * peak_value + 0.875 * SPKI
    else:
        NPKI = 0.125 * peak_value + 0.875 * NPKI

# Plot integrated signal and thresholds
plt.figure(figsize=(12, 4))
plt.plot(integrated_ecg[:2000], label='Integrated')
plt.scatter(peaks[peaks<2000], integrated_ecg[peaks[peaks<2000]], color='gray', s=10, label='candidate peaks')
plt.scatter([p for p in signal_peaks if p<2000], integrated_ecg[[p for p in signal_peaks if p<2000]], color='red', s=20, label='signal peaks')
plt.legend()
plt.title('Integrated signal with adaptive thresholds (I1 and I2)')
plt.show()

#可視化
plt.figure(figsize=(12, 4))
plt.plot(ecg_signal[:2000], label='Original ECG')
#棒でピークを表示
for index,peak in enumerate(signal_peaks):
    if index < 6:
        plt.axvline(x=peak, color='red', linestyle='--', alpha=0.7)
plt.title('Detected R-peaks on Original ECG Signal')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.legend()
plt.show()
ピーク候補 最終結果
ECG origin ECG banpass

ピーク候補がずれているのは、積分器の影響でピークが広がるためです。実際の R ピーク位置は元の ECG 信号上で確認する必要があります。
Pan–Tompkinsの論文を真似るのが良いですが、実際の実装ではピーク位置を元の信号に戻して最も近いピークを選ぶなどの工夫が必要です。

注意点

  • 上記実装は閾値を固定パーセンタイルで決めていますが、実装では信号ごとに閾値を適応的に更新するのが一般的です(原論文参照)。
  • 高ノイズや心電図アーチファクトでは偽検出や見逃しが発生しやすく、事前のノイズ除去やポスト処理(波形確認・アラインメント)が重要です。
  • ライブラリ実装(NeuroKit2 など)を使うとパラメータ調整や互換性の点で便利です。

参考


Discussion