Skip to content

Commit

Permalink
Add Peak-Prominence Delineator
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasEmrich committed Nov 27, 2024
1 parent 21e3cf7 commit 6c648d8
Showing 1 changed file with 209 additions and 10 deletions.
219 changes: 209 additions & 10 deletions neurokit2/ecg/ecg_delineate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def ecg_delineate(
sampling_rate : int
The sampling frequency of ``ecg_signal`` (in Hz, i.e., samples/second). Defaults to 1000.
method : str
Can be one of ``"peak"`` for a peak-based method, ``"cwt"`` for continuous wavelet transform
or ``"dwt"`` (default) for discrete wavelet transform.
Can be one of ``"peak"`` for a peak-based method, ``"prominence"`` for a peak-prominence-based method,
``"cwt"`` for continuous wavelet transform or ``"dwt"`` (default) for discrete wavelet transform.
show : bool
If ``True``, will return a plot to visualizing the delineated waves information.
show_type: str
Expand All @@ -60,7 +60,16 @@ def ecg_delineate(
Defaults to ``False``. If ``True``, replaces the delineated features with ``np.nan`` if its
standardized distance from R-peaks is more than 3.
**kwargs
Other optional arguments.
Other optional arguments:
If using the ``"prominence"`` method, additional parameters (in milliseconds) can be passed to set
individual physiological limits for the search boundaries:
- ``max_qrs_interval``: The maximum allowable QRS complex interval. Defaults to 180 ms.
- ``max_pr_interval``: The maximum PR interval duration. Defaults to 300 ms.
- ``max_r_rise_time``: Maximum duration for the R-wave rise. Defaults to 120 ms.
- ``typical_st_segment``: Typical duration of the ST segment. Defaults to 150 ms.
- ``max_p_basepoint_interval``: The maximum interval between P-wave on- and offset. Defaults to 100 ms.
- ``max_r_basepoint_interval``: The maximum interval between R-wave on- and offset. Defaults to 100 ms.
- ``max_t_basepoint_interval``: The maximum interval between T-wave on- and offset. Defaults to 200 ms.
Returns
-------
Expand All @@ -71,7 +80,7 @@ def ecg_delineate(
``"ECG_Q_Peaks"``, ``"ECG_S_Peaks"``, ``"ECG_T_Peaks"``, ``"ECG_P_Onsets"``,
``"ECG_T_Offsets"``, respectively.
For wavelet methods, in addition to the above information, the dictionary contains the
For the wavelet and prominence methods, in addition to the above information, the dictionary contains the
samples at which QRS-onsets and QRS-offsets occur, accessible with the key
``"ECG_P_Peaks"``, ``"ECG_T_Peaks"``, ``"ECG_P_Onsets"``, ``"ECG_P_Offsets"``,
``"ECG_Q_Peaks"``, ``"ECG_S_Peaks"``, ``"ECG_T_Onsets"``, ``"ECG_T_Offsets"``,
Expand Down Expand Up @@ -114,6 +123,8 @@ def ecg_delineate(
- Martínez, J. P., Almeida, R., Olmos, S., Rocha, A. P., & Laguna, P. (2004). A wavelet-based
ECG delineator: evaluation on standard databases. IEEE Transactions on biomedical engineering,
51(4), 570-581.
- Emrich, J., Gargano, A., Koka, T., & Muma, M. (2024). Physiology-Informed ECG Delineation Based
on Peak Prominence. 32nd European Signal Processing Conference (EUSIPCO), 1402-1406.
"""
# Sanitize input for ecg_cleaned
Expand Down Expand Up @@ -162,10 +173,12 @@ def ecg_delineate(
)
elif method in ["dwt", "discrete wavelet transform"]:
waves = _dwt_ecg_delineator(ecg_cleaned, rpeaks, sampling_rate=sampling_rate)
elif method in ["prominence", "peak-prominence", "emrich", "emrich2024"]:
waves = _prominence_ecg_delineator(ecg_cleaned, rpeaks=rpeaks, sampling_rate=sampling_rate, **kwargs)

else:
raise ValueError(
"NeuroKit error: ecg_delineate(): 'method' should be one of 'peak',"
"NeuroKit error: ecg_delineate(): 'method' should be one of 'peak', 'prominence',"
"'cwt' or 'dwt'."
)

Expand Down Expand Up @@ -739,6 +752,195 @@ def _ecg_delineator_cwt(ecg, rpeaks=None, sampling_rate=1000):
}


# =============================================================================
# PROMINENCE METHOD
# =============================================================================
def _prominence_ecg_delineator(ecg, rpeaks=None, sampling_rate=1000, **kwargs):
# pysiology-informed boundaries in milliseconds, adapt if needed
max_qrs_interval = int(kwargs.get("max_qrs_interval", 180) * sampling_rate / 1000)
max_pr_interval = int(kwargs.get("max_pr_interval", 300) * sampling_rate / 1000)
max_r_rise_time = int(kwargs.get("max_r_rise_time", 120) * sampling_rate / 1000)
typical_st_segment = int(kwargs.get("typical_st_segment", 150) * sampling_rate / 1000)
# max basepoint intervals
max_p_basepoint_interval = int(kwargs.get("max_p_basepoint_interval", 100) * sampling_rate / 1000)
max_r_basepoint_interval = int(kwargs.get("max_r_basepoint_interval", 100) * sampling_rate / 1000)
max_t_basepoint_interval = int(kwargs.get("max_t_basepoint_interval", 200) * sampling_rate / 1000)

waves = {
"ECG_P_Onsets": [],
"ECG_P_Peaks": [],
"ECG_P_Offsets": [],
"ECG_Q_Peaks": [],
"ECG_R_Onsets": [],
"ECG_R_Offsets": [],
"ECG_S_Peaks": [],
"ECG_T_Onsets": [],
"ECG_T_Peaks": [],
"ECG_T_Offsets": [],
}

# calculate RR intervals
rr = np.diff(rpeaks)
rr = np.insert(rr, 0, min(rr[0], 2 * rpeaks[0]))
rr = np.insert(rr, -1, min(rr[-1], 2 * rpeaks[-1]))

# iterate over all beats
left = 0
for i in range(len(rpeaks)):
# 1. split signal into segments
rpeak_pos = min(rpeaks[i], rr[i] // 2)
left = rpeaks[i] - rpeak_pos
right = rpeaks[i] + rr[i + 1] // 2
ecg_seg = ecg[left:right]

current_wave = {
"ECG_R_Peaks": rpeak_pos,
}

# 2. find local extrema in signal
local_maxima = scipy.signal.find_peaks(ecg_seg)[0]
local_minima = scipy.signal.find_peaks(-ecg_seg)[0]
local_extrema = np.concatenate((local_maxima, local_minima))

# 3. compute prominence weight
weight_maxima = _calc_prominence(local_maxima, ecg_seg, current_wave["ECG_R_Peaks"])
weight_minima = _calc_prominence(local_minima, ecg_seg, current_wave["ECG_R_Peaks"], minima=True)

if local_extrema.any():
# find waves
_prominence_find_q_wave(weight_minima, current_wave, max_r_rise_time)
_prominence_find_s_wave(ecg_seg, weight_minima, current_wave, max_qrs_interval)
_prominence_find_p_wave(local_maxima, weight_maxima, current_wave, max_pr_interval)
_prominence_find_t_wave(local_extrema, (weight_minima + weight_maxima), current_wave, typical_st_segment)
_prominence_find_on_offsets(
ecg_seg,
sampling_rate,
local_minima,
current_wave,
max_p_basepoint_interval,
max_r_basepoint_interval,
max_t_basepoint_interval,
)

# append waves for current beat / complex
for key in waves:
if key == "ECG_R_Peaks":
waves[key].append(int(rpeaks[i]))
elif key in current_wave:
waves[key].append(int(current_wave[key] + left))
else:
waves[key].append(np.nan)

return waves


def _calc_prominence(peaks, sig, Rpeak=None, minima=False):
"""Returns an array of the same length as sig with prominences computed for the provided peaks.
Prominence of the R-peak is excluded if the R-peak position is given.
"""
w = np.zeros_like(sig)

if len(peaks) < 1:
return w
# get prominence
_sig = -sig if minima else sig
w[peaks] = scipy.signal.peak_prominences(_sig, peaks)[0]
# optional: set rpeak prominence to zero to emphasize other peaks
if Rpeak is not None:
w[Rpeak] = 0
return w


def _prominence_find_q_wave(weight_minima, current_wave, max_r_rise_time):
if "ECG_R_Peaks" not in current_wave:
return
q_bound = max(current_wave["ECG_R_Peaks"] - max_r_rise_time, 0)

current_wave["ECG_Q_Peaks"] = np.argmax(weight_minima[q_bound : current_wave["ECG_R_Peaks"]]) + q_bound


def _prominence_find_s_wave(sig, weight_minima, current_wave, max_qrs_interval):
if "ECG_Q_Peaks" not in current_wave:
return
s_bound = current_wave["ECG_Q_Peaks"] + max_qrs_interval
s_wave = np.argmax(weight_minima[current_wave["ECG_R_Peaks"] : s_bound] > 0) + current_wave["ECG_R_Peaks"]
current_wave["ECG_S_Peaks"] = (
np.argmin(sig[current_wave["ECG_R_Peaks"] : s_bound]) + current_wave["ECG_R_Peaks"]
if s_wave == current_wave["ECG_R_Peaks"]
else s_wave
)


def _prominence_find_p_wave(local_maxima, weight_maxima, current_wave, max_pr_interval):
if "ECG_Q_Peaks" not in current_wave:
return
p_candidates = local_maxima[
(current_wave["ECG_Q_Peaks"] - max_pr_interval <= local_maxima) & (local_maxima <= current_wave["ECG_Q_Peaks"])
]
if p_candidates.any():
current_wave["ECG_P_Peaks"] = p_candidates[np.argmax(weight_maxima[p_candidates])]


def _prominence_find_t_wave(local_extrema, weight_extrema, current_wave, typical_st_segment):
if "ECG_S_Peaks" not in current_wave:
return
t_candidates = local_extrema[(current_wave["ECG_S_Peaks"] + typical_st_segment <= local_extrema)]
if t_candidates.any():
current_wave["ECG_T_Peaks"] = t_candidates[np.argmax(weight_extrema[t_candidates])]


def _prominence_find_on_offsets(
sig,
sampling_rate,
local_minima,
current_wave,
max_p_basepoint_interval,
max_r_basepoint_interval,
max_t_basepoint_interval,
):
if "ECG_P_Peaks" in current_wave:
_, p_on, p_off = scipy.signal.peak_prominences(
sig, [current_wave["ECG_P_Peaks"]], wlen=max_p_basepoint_interval
)
if not np.isnan(p_on):
current_wave["ECG_P_Onsets"] = p_on[0]
if not np.isnan(p_off):
current_wave["ECG_P_Offsets"] = p_off[0]

if "ECG_T_Peaks" in current_wave:
p = -1 if np.isin(current_wave["ECG_T_Peaks"], local_minima) else 1

_, t_on, t_off = scipy.signal.peak_prominences(
p * sig, [current_wave["ECG_T_Peaks"]], wlen=max_t_basepoint_interval
)
if not np.isnan(t_on):
current_wave["ECG_T_Onsets"] = t_on[0]
if not np.isnan(t_off):
current_wave["ECG_T_Offsets"] = t_off[0]

# correct R-peak position towards local maxima (otherwise prominence will be falsely computed)
r_pos = _correct_peak(sig, sampling_rate, current_wave["ECG_R_Peaks"])
_, r_on, r_off = scipy.signal.peak_prominences(sig, [r_pos], wlen=max_r_basepoint_interval)
if not np.isnan(r_on):
current_wave["ECG_R_Onsets"] = r_on[0]

if not np.isnan(r_off):
current_wave["ECG_R_Offsets"] = r_off[0]


def _correct_peak(sig, fs, peak, window=0.02):
"""Correct peak towards local maxima within provided window."""

left = peak - int(window * fs)
right = peak + int(window * fs)
if len(sig[left:right]) > 0:
return np.argmax(sig[left:right]) + left
else:
return peak


# Internals
# ---------------------

Expand Down Expand Up @@ -798,11 +1000,7 @@ def _onset_offset_delineator(ecg, peaks, peak_type="rpeaks", sampling_rate=1000)
epsilon_onset = 0.25 * wt_peaks_data["peak_heights"][-1]
leftbase = wt_peaks_data["left_bases"][-1] + index_peak - half_wave_width
if peak_type == "rpeaks":
candidate_onsets = (
np.where(cwtmatr[2, nfirst - 100 : nfirst] < epsilon_onset)[0]
+ nfirst
- 100
)
candidate_onsets = np.where(cwtmatr[2, nfirst - 100 : nfirst] < epsilon_onset)[0] + nfirst - 100
elif peak_type in ["tpeaks", "ppeaks"]:
candidate_onsets = (
np.where(-cwtmatr[4, nfirst - 100 : nfirst] < epsilon_onset)[0]
Expand Down Expand Up @@ -1123,6 +1321,7 @@ def _ecg_delineate_plot(
sampling_rate=1000,
window_start=-0.35,
window_end=0.55,
**kwargs
):
"""
import neurokit2 as nk
Expand Down

0 comments on commit 6c648d8

Please sign in to comment.