"""R peak detection approaches for PPG and ECG"""
import numpy as np
from sklearn.cluster import KMeans
from scipy import signal
from vital_sqi.preprocess.band_filter import BandpassFilter
from vital_sqi.common.generate_template import ecg_dynamic_template
import warnings
from ecgdetectors import Detectors, panPeakDetect
ADAPTIVE_THRESHOLD = 1
COUNT_ORIG_METHOD = 2
CLUSTERER_METHOD = 3
SLOPE_SUM_METHOD = 4
MOVING_AVERAGE_METHOD = 5
DEFAULT_SCIPY = 6
ADAPTIVE_THRESHOLD = 1
COUNT_ORIG_METHOD = 2
CLUSTERER_METHOD = 3
SLOPE_SUM_METHOD = 4
MOVING_AVERAGE_METHOD = 5
DEFAULT_SCIPY = 6
[docs]class PeakDetector:
"""Various peak detection approaches getting from the paper
Systolic Peak Detection in Acceleration Photoplethysmograms Measured
from Emergency Responders in Tropical Conditions
Parameters
----------
Returns
-------
"""
def __init__(self, wave_type='ppg', fs=100):
self.clusters = 2
self.wave_type = wave_type
self.fs = fs
[docs] def ecg_detector(self, s, detector_type="pan_tompkins"):
"""
Expose
ECG peak detector from the github
https://github.com/berndporr/py-ecg-detectors
Parameters
----------
s :
Input signal
fs:
The signal frequency. Default is '256 Hz'
detector_type:
'hamilton': Open Source ECG Analysis Software Documentation,
E.P.Limited, 2002.
'christov':Real time electrocardiogram QRS detection using combined
adaptive threshold
'engzee': A single scan algorithm for QRS detection and
feature extraction
'swt': Real-time QRS detector using Stationary Wavelet Transform
for Automated ECG Analysis.
Uses the Pan and Tompkins thresolding.
'mva': Frequency Bands Effects on QRS Detection.
'mtemp':
'pan_tompkins': A Real-Time QRS Detection Algorithm
Default = 'pan_tompkins'
Returns
-------
type
an array of 1-D numpy array represent the peak list
"""
if self.wave_type == 'ppg':
warnings.warn("A ECG detectors is using on PPG waveform. "
"Output may produce incorrect result")
detector = Detectors(self.fs)
if detector_type == 'hamilton':
res = detector.hamilton_detector(s)
elif detector_type == 'christov':
res = detector.christov_detector(s)
elif detector_type == 'engzee':
res = detector.engzee_detector(s)
elif detector_type == 'swt':
res = detector.swt_detector(s)
elif detector_type == 'mva':
res = detector.two_average_detector(s)
elif detector_type == 'mtemp':
res = self.matched_filter_detector(s)
else:
res = detector.pan_tompkins_detector(s)
return np.array(res)
[docs] def ppg_detector(self, s, detector_type=ADAPTIVE_THRESHOLD,
clusterer="kmean", preprocess=True, cubing=False):
"""
Expose
PPG peak detector from the paper
Systolic Peak Detection in Acceleration Photoplethysmograms Measured
from Emergency Responders in Tropical Conditions
:param s: the input signal
:param detector_type:
:param clusterer:
:return:
"""
if preprocess:
filter = BandpassFilter()
s = filter.signal_highpass_filter(s, cutoff=1, order=2)
s = filter.signal_lowpass_filter(s, cutoff=12, order=2)
if cubing:
s = s ** 3
if self.wave_type != "ppg":
warnings.warn("A PPG detectors is using on unrecognized "
"PPG waveform. Output may produce incorrect result")
try:
if detector_type == CLUSTERER_METHOD:
peak_finalist, trough_finalist = \
self.detect_peak_trough_clusterer(s)
elif detector_type == SLOPE_SUM_METHOD:
peak_finalist, trough_finalist = \
self.detect_peak_trough_slope_sum(s)
elif detector_type == MOVING_AVERAGE_METHOD:
peak_finalist, trough_finalist = \
self.detect_peak_trough_moving_average_threshold(s)
elif detector_type == COUNT_ORIG_METHOD:
peak_finalist, trough_finalist = \
self.detect_peak_trough_count_orig(s)
elif detector_type == DEFAULT_SCIPY:
peak_finalist, trough_finalist = \
self.detect_peak_trough_default_scipy(s)
else:
peak_finalist, trough_finalist = \
self.detect_peak_trough_adaptive_threshold(s)
except Exception as err:
print(err)
return signal.find_peaks(s)[0], []
return peak_finalist, trough_finalist
[docs] def matched_filter_detector(self, unfiltered_ecg):
"""
handy
FIR matched filter using template of QRS complex.
Template provided in generate_template file
"""
template = ecg_dynamic_template(self.fs)
f0 = 0.1 / self.fs
f1 = 48 / self.fs
b, a = signal.butter(4, [f0 * 2, f1 * 2], btype='bandpass')
prefiltered_ecg = signal.lfilter(b, a, unfiltered_ecg)
matched_coeffs = template[::-1] # time reversing template
# matched filter FIR filtering
detection = signal.lfilter(matched_coeffs, 1, prefiltered_ecg)
# squaring matched filter output
squared = detection * detection
squared[:len(template)] = 0
squared_peaks = panPeakDetect(squared, self.fs)
return squared_peaks
[docs] def compute_feature(self, s, local_extrema):
"""
handy
Parameters
----------
s :
local_extrema :
Returns
-------
"""
amplitude = s[local_extrema]
diff = np.diff(amplitude)
diff = np.hstack((diff[0], diff, diff[-1]))
mean_diff = np.mean(np.vstack((diff[1:], diff[:-1])), axis=0)
amplitude = amplitude.reshape(-1, 1)
mean_diff = mean_diff.reshape(-1, 1)
return np.hstack((amplitude, mean_diff))
[docs] def detect_peak_trough_clusterer(self, s, clusterer='kmean', **kwargs):
"""
handy
Method 1: using clustering technique
Parameters
----------
s :
The input signals
method :
param kwargs:
**kwargs :
Returns
-------
type
tuple of 1-D numpy array
the first array is the peak list
and the second array is the troughs list
"""
# squarring doesnot work
# s = np.array(s) ** 2
local_maxima = signal.argrelmax(s)[0]
local_minima = signal.argrelmin(s)[0]
# local_minima_amplitude = s[local_minima]
# local_maxima_amplitude = s[local_maxima]
# lower, middle, upper = np.quantile(s, [0.25, 0.5, 0.75])
# clusterer = KMeans(kwargs)
clusterer = KMeans(n_clusters=2, init='k-means++', n_init=10,
max_iter=300, tol=0.0001,
precompute_distances='deprecated',
verbose=0, random_state=None, copy_x=True,
n_jobs='deprecated', algorithm='auto')
convert_maxima = self.compute_feature(s, local_maxima)
clusterer.fit(convert_maxima)
systolic_group = clusterer.predict(
convert_maxima[np.argmax(s[local_maxima])].reshape(1, -1))
labels = clusterer.predict(convert_maxima)
systolic_peaks_idx = local_maxima[np.where(labels == systolic_group)]
# ========================================================
# The same with troughs
convert_minima = self.compute_feature(s, local_minima)
clusterer.fit(convert_minima)
trough_group = clusterer.predict(
convert_minima[np.argmin(s[local_minima])].reshape(1, -1))
labels = clusterer.predict(convert_minima)
trough_idx = local_minima[np.where(labels == trough_group)]
return systolic_peaks_idx, trough_idx
[docs] def get_ROI(self, s, mva):
start_pos = []
end_pos = []
for idx in range(len(s) - 1):
if mva[idx] > s[idx] and mva[idx + 1] < s[idx + 1]:
start_pos.append(idx)
elif mva[idx] < s[idx] and mva[idx + 1] > s[idx + 1] \
and len(start_pos) > len(end_pos):
end_pos.append(idx)
if len(start_pos) > len(end_pos):
end_pos.append(len(s) - 1)
return start_pos, end_pos
[docs] def detect_peak_trough_adaptive_threshold(self, s,
adaptive_size=0.75,
overlap=0, sliding=1):
"""
:param s:
:param adaptive_size:
:param overlap: overlapping ratio
:return:
"""
# number of instances in the adaptive window
adaptive_window = adaptive_size * self.fs
adaptive_threshold = self.get_moving_average(
s, int(adaptive_window * 2 + 1))
start_ROIs, end_ROIs = self.get_ROI(s, adaptive_threshold)
peak_finalist = []
for start_ROI, end_ROI in zip(start_ROIs, end_ROIs):
region = s[start_ROI:end_ROI + 1]
peak_finalist.append(np.argmax(region) + start_ROI)
trough_finalist = []
for idx in range(len(peak_finalist) - 1):
region = s[peak_finalist[idx]:peak_finalist[idx + 1]]
trough_finalist.append(np.argmin(region) + peak_finalist[idx])
return peak_finalist, trough_finalist
[docs] def detect_peak_trough_default_scipy(self, s):
peak_finalist = signal.find_peaks(s)[0]
trough_finalist = []
for idx in range(len(peak_finalist) - 1):
region = s[peak_finalist[idx]:peak_finalist[idx + 1]]
trough_finalist.append(np.argmin(region) + peak_finalist[idx])
return peak_finalist, trough_finalist
[docs] def detect_peak_trough_count_orig(self, s):
"""
handy
Method 2: using local extreme technique with threshold
Parameters
----------
s :
Input signal
Returns
-------
type
tuple of 1-D numpy array
the first array is the peak list
and the second array is the troughs list
"""
# squaring decrease the efficiency
# s = np.array(s)**2
local_maxima = signal.argrelmax(s)[0]
local_minima = signal.argrelmin(s)[0]
peak_threshold = np.quantile(s[local_maxima], 0.75) * 0.2
trough_threshold = np.quantile(s[local_minima], 0.25) * 0.2
peak_shortlist = np.array([optima for optima in local_maxima
if s[optima] >= peak_threshold])
trough_shortlist = np.array([optima for optima in local_minima
if s[optima] <= trough_threshold])
peak_finalist = []
through_finalist = []
left_trough = trough_shortlist[0]
for i in range(1, len(trough_shortlist)):
right_trough = trough_shortlist[i]
peaks = [peak for peak in peak_shortlist
if peak < right_trough and peak > left_trough]
if len(peaks) == 0:
left_trough = np.array([left_trough, right_trough])
[np.argmin([s[left_trough], s[right_trough]])]
else:
peak = peaks[np.argmax(s[peaks])]
peak_finalist.append(peak)
through_finalist.append(left_trough)
left_trough = right_trough
through_finalist.append(right_trough)
return peak_finalist, through_finalist
[docs] def detect_peak_trough_slope_sum(self, s):
"""
handy
Method 3: analyze the slope sum to get local extreme
Parameters
----------
s :
return:
Returns
-------
"""
peak_finalist = []
trough_finalist = []
onset_list = []
"""
Here w is the length of the analysing window, which Zong et al. [25]
approximate to be equal to 128 ms or 47 samples
for the sampling frequency (fs) of 367 Hz
"""
w = 12
N = len(s)
Z = []
n_range = np.arange(w + 1, N)
for n in n_range:
Zk = 0
for k in range((n - w), n):
delta_y_k = s[k] - s[k - 1]
Zk = Zk + max(0, delta_y_k)
Z.append(Zk)
Z = np.array(Z)
fs = 100
Z_threshold = 3 * np.mean(Z[:10 * fs])
threshold_base = Z_threshold
for n in range(len(Z)):
actual_threshold = threshold_base * 0.6
if Z[n] > actual_threshold:
left = n - 15
right = n + 15
if left < 0:
left = 0
if right > len(Z):
right = len(Z)
local_min = np.min(Z[left:right])
local_max = np.max(Z[left:right])
if (local_max - local_min) > local_min * 2:
# Accept the pulse
threshold_crossing_point = n
onset = self.search_for_onset(
Z, threshold_crossing_point, local_max)
onset_list.append(onset)
# peak_finalist.append(threshold_crossing_point+np.argmax())
# maximum Z[n] value for each pulse detected
threshold_base = local_max # ?????
onset_list = np.array(list(set(onset_list)))
onset_list.sort()
for trough_idx in range(1, len(onset_list)):
left = onset_list[trough_idx - 1]
right = onset_list[trough_idx]
try:
peak_finalist.append(np.argmax(s[left:right]) + left)
trough_finalist.append(np.argmin(s[left:right]) + left)
except Exception as e:
print(e)
return peak_finalist, onset_list
[docs] def search_for_onset(Z, idx, local_max):
"""
handy
Parameters
----------
Z :
idx :
local_max :
Returns
-------
"""
# while Z[idx] > 0.01*local_max:
while Z[idx] > 0:
idx = idx - 1
if idx <= 0:
return idx
return idx + 1
[docs] def detect_peak_trough_moving_average_threshold(self, s):
"""
handy
Method 4 (examine second derivative)
Parameters
----------
s :
return:
Returns
-------
"""
peak_finalist = []
through_finalist = []
# Bandpass filter
filter = BandpassFilter()
S = filter.signal_highpass_filter(s, 0.5, fs=100)
# S = butter_lowpass_filter(S,8,fs=100)
# S = s
# Clipping the output by keeping the signal
# above zero will produce signal Z
Z = np.array([np.max([0, z]) for z in S])
# Squaring suppressing the small differences
# arising from the diastolic wave and noise
y = Z ** 2
w1 = 12 # 111ms
w2 = 67 # 678ms
# MA_peak
ma_peak = self.get_moving_average(y, w1)
# MA_beat
ma_beat = self.get_moving_average(y, w2)
# Thresholding
z_mean = np.mean(y)
beta = 0.02
alpha = beta * z_mean + ma_beat
thr1 = ma_beat + alpha
block_of_interest = np.zeros(len(thr1))
for i in range(len(ma_peak)):
if ma_peak[i] > thr1[i]:
block_of_interest[i] = 0.1
# Accept and Reject block of interest
# If a block is wider than or equal to THR2,
# it is classified as a systolic peak
thr2 = w1
# index of the signal having boi == 0.1
BOI_idx = np.where(block_of_interest > 0)[0]
# the different of consequence BOI
BOI_diff = np.diff(BOI_idx)
# index where the block move to other block
BOI_width_idx = np.where(BOI_diff > 1)[0]
for i in range(len(BOI_width_idx)):
if i == 0:
BOI_width = BOI_width_idx[i]
else:
BOI_width = BOI_width_idx[i] - BOI_width_idx[i - 1]
if BOI_width >= thr2:
if i == 0:
left_idx = 0
else:
left_idx = BOI_width_idx[i - 1]
# left_idx = BOI_width_idx[np.max([0,i-1])]
right_idx = BOI_width_idx[i]
region = y[BOI_idx[left_idx]:BOI_idx[right_idx] + 1]
peak_finalist.append(BOI_idx[left_idx] + np.argmax(region))
return peak_finalist, through_finalist
[docs] def get_moving_average(self, q, w):
"""
handy
Parameters
----------
q :
w :
Returns
-------
"""
# shifting = np.ceil(w-w/2)-1
# remaining = w-1-shifting
q_padded = np.pad(q, (w // 2, w - 1 - w // 2), mode='edge')
convole = np.convolve(q_padded, np.ones(w) / w, 'valid')
return convole