"""Signal quality indexes based on R peak detection"""
import numpy as np
from scipy import signal
import heartpy as hp
from heartpy.datautils import rolling_mean
from hrvanalysis import get_time_domain_features,get_frequency_domain_features,\
get_nn_intervals,get_csi_cvi_features,get_geometrical_features
from hrvanalysis.preprocessing import remove_outliers,remove_ectopic_beats,interpolate_nan_values
from heartpy.analysis import calc_ts_measures, calc_rr, calc_fd_measures,\
clean_rr_intervals,calc_poincare,calc_breathing
from heartpy.peakdetection import check_peaks, detect_peaks
from statsmodels.tsa.stattools import acf
from vital_sqi.common.rpeak_detection import PeakDetector
[docs]def get_all_features_hrva(data_sample,sample_rate=100,rpeak_method=0):
"""
:param data_sample:
:param sample_rate:
:param rpeak_method:
:return:
"""
if rpeak_method in [1,2,3,4]:
detector = PeakDetector()
peak_list = detector.ppg_detector(data_sample,rpeak_method)[0]
else:
rol_mean = rolling_mean(data_sample, windowsize=0.75, sample_rate=100.0)
peaks_wd = detect_peaks(data_sample,rol_mean,ma_perc = 20, sample_rate = 100.0)
peak_list = peaks_wd["peaklist"]
rr_list = np.diff(peak_list) * (1000/sample_rate) #1000 milisecond
nn_list = get_nn_intervals(rr_list)
nn_list_non_na = np.copy(nn_list)
nn_list_non_na[np.where(np.isnan(nn_list_non_na))[0]] = -1
time_domain_features = get_time_domain_features(rr_list)
frequency_domain_features = get_frequency_domain_features(rr_list)
geometrical_features = get_geometrical_features(rr_list)
csi_cvi_features = get_csi_cvi_features(rr_list)
return time_domain_features,frequency_domain_features,\
geometrical_features,csi_cvi_features
[docs]def get_all_features_heartpy(data_sample,sample_rate=100,rpeak_detector = 0):
# time domain features
td_features = ["bpm", "ibi", "sdnn", "sdsd", "rmssd",
"pnn20", "pnn50", "hr_mad", "sd1", "sd2",
"s", "sd1/sd2", "breathingrate"]
# frequency domain features
fd_features = ["lf", "hf", "lf/hf"]
try:
wd, m = hp.process(data_sample, sample_rate,calc_freq = True)
except Exception as e:
try:
wd, m = hp.process(data_sample, sample_rate)
except:
time_domain_features = {k: np.nan for k in td_features}
frequency_domain_features = {k: np.nan for k in fd_features}
return time_domain_features,frequency_domain_features
if rpeak_detector in [1,2,3,4]:
detector = PeakDetector(wave_type='ecg')
peak_list = detector.ppg_detector(data_sample,rpeak_detector,preprocess=False)[0]
wd["peaklist"] = peak_list
wd = calc_rr(peak_list,sample_rate,working_data=wd)
wd = check_peaks(wd['RR_list'], wd['peaklist'], wd['ybeat'],
reject_segmentwise=False, working_data=wd)
wd = clean_rr_intervals(working_data=wd)
rr_diff = wd['RR_list']
rr_sqdiff = np.power(rr_diff, 2)
wd, m = calc_ts_measures(wd['RR_list'], rr_diff, rr_sqdiff,working_data=wd)
m = calc_poincare(wd['RR_list'], wd['RR_masklist'], measures=m,
working_data=wd)
try:
measures, working_data = calc_breathing(wd['RR_list_cor'], data_sample, sample_rate,
measures=m,
working_data=wd)
except:
measures['breathingrate'] = np.nan
wd, m = calc_fd_measures(measures=measures,working_data=working_data)
time_domain_features = {k:m[k] for k in td_features}
frequency_domain_features = {}
for k in fd_features:
if k in m.keys():
frequency_domain_features[k] = m[k]
else:
frequency_domain_features[k] = np.nan
# frequency_domain_features = {k:m[k] for k in fd_features if k in m.keys}
# frequency_domain_features = {k:np.na for k in fd_features if k not in m.keys}
return time_domain_features,frequency_domain_features
[docs]def get_peak_error_features(data_sample,sample_rate=100,rpeak_detector = 0,low_rri=300,
high_rri=2000):
rules = ["malik", "karlsson", "kamath", "acar"]
try:
wd, m = hp.process(data_sample, sample_rate, calc_freq=True)
except:
try:
wd, m = hp.process(data_sample, sample_rate)
except:
error_dict = {rule+"_error":np.nan for rule in rules}
error_dict["outlier_error"] = np.nan
return error_dict
if rpeak_detector in [1, 2, 3, 4]:
detector = PeakDetector(wave_type='ecg')
peak_list = detector.ppg_detector(data_sample, rpeak_detector, preprocess=False)[0]
wd["peaklist"] = peak_list
wd = calc_rr(peak_list, sample_rate, working_data=wd)
wd = check_peaks(wd['RR_list'], wd['peaklist'], wd['ybeat'],
reject_segmentwise=False, working_data=wd)
wd = clean_rr_intervals(working_data=wd)
rr_intervals = wd["RR_list"]
rr_intervals_cleaned = remove_outliers(rr_intervals, low_rri=low_rri, high_rri=high_rri)
number_outliers = len(np.where(np.isnan(rr_intervals_cleaned))[0])
outlier_ratio = number_outliers/(len(rr_intervals_cleaned)-number_outliers)
error_sqi = {}
error_sqi['outlier_error'] = outlier_ratio
interpolated_rr_intervals = interpolate_nan_values(rr_intervals_cleaned)
for rule in rules:
nn_intervals = remove_ectopic_beats(interpolated_rr_intervals,
method=rule)
number_ectopics = len(np.where(np.isnan(nn_intervals))[0])
ectopic_ratio = number_ectopics/(len(nn_intervals)-number_ectopics)
error_sqi[rule+"_error"] = ectopic_ratio
return error_sqi
[docs]def correlogram_sqi(data_sample,sample_rate=100,time_lag=3,n_selection = 3):
"""
The method is based on the paper 'Classification of the Quality of Wristband-based
Photoplethysmography Signals'
Parameters
----------
data_sample
sample_rate
time_lag
Returns
-------
"""
nlags = time_lag*sample_rate
corr = acf(data_sample,nlags=nlags)
corr_peaks_idx = signal.find_peaks(corr)[0]
corr_peaks_value = corr[corr_peaks_idx]
if n_selection > len(corr_peaks_value):
n_selection = len(corr_peaks_value)
corr_sqi = [i for i in corr_peaks_idx[:n_selection]]+\
[i for i in corr_peaks_value[:n_selection]]
return corr_sqi