#!/usr/bin/python
# coding: UTF-8
#
# Author: Dawid Laszuk
# Contact: https://github.com/laszukdawid/PyEMD/issues
#
# Feel free to contact for any information.
import logging
from typing import Optional, Tuple
import numpy as np
from scipy.interpolate import interp1d
from PyEMD.splines import akima, cubic_spline_3pts
from PyEMD.utils import get_timeline
FindExtremaOutput = Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]
[docs]class EMD:
"""
.. _EMD:
**Empirical Mode Decomposition**
Method of decomposing signal into Intrinsic Mode Functions (IMFs)
based on algorithm presented in Huang et al. [Huang1998]_.
Algorithm was validated with Rilling et al. [Rilling2003]_ Matlab's version from 3.2007.
Threshold which control the goodness of the decomposition:
* `std_thr` --- Test for the proto-IMF how variance changes between siftings.
* `svar_thr` -- Test for the proto-IMF how energy changes between siftings.
* `total_power_thr` --- Test for the whole decomp how much of energy is solved.
* `range_thr` --- Test for the whole decomp whether the difference is tiny.
References
----------
.. [Huang1998] N. E. Huang et al., "The empirical mode decomposition and the
Hilbert spectrum for non-linear and non stationary time series
analysis", Proc. Royal Soc. London A, Vol. 454, pp. 903-995, 1998
.. [Rilling2003] G. Rilling, P. Flandrin and P. Goncalves, "On Empirical Mode
Decomposition and its algorithms", IEEE-EURASIP Workshop on
Nonlinear Signal and Image Processing NSIP-03, Grado (I), June 2003
Examples
--------
>>> import numpy as np
>>> T = np.linspace(0, 1, 100)
>>> S = np.sin(2*2*np.pi*T)
>>> emd = EMD(extrema_detection='parabol')
>>> IMFs = emd.emd(S)
>>> IMFs.shape
(1, 100)
"""
logger = logging.getLogger(__name__)
[docs] def __init__(self, spline_kind: str = "cubic", nbsym: int = 2, **kwargs):
"""Initiate *EMD* instance.
Configuration, such as threshold values, can be passed as kwargs (keyword arguments).
Parameters
----------
FIXE : int (default: 0)
FIXE_H : int (default: 0)
MAX_ITERATION : int (default 1000)
Maximum number of iterations per single sifting in EMD.
energy_ratio_thr : float (default: 0.2)
Threshold value on energy ratio per IMF check.
std_thr float : (default 0.2)
Threshold value on standard deviation per IMF check.
svar_thr float : (default 0.001)
Threshold value on scaled variance per IMF check.
total_power_thr : float (default 0.005)
Threshold value on total power per EMD decomposition.
range_thr : float (default 0.001)
Threshold for amplitude range (after scaling) per EMD decomposition.
extrema_detection : str (default 'simple')
Method used to finding extrema.
DTYPE : np.dtype (default np.float64)
Data type used.
Examples
--------
>>> emd = EMD(std_thr=0.01, range_thr=0.05)
"""
# Declare constants
self.energy_ratio_thr = float(kwargs.get("energy_ratio_thr", 0.2))
self.std_thr = float(kwargs.get("std_thr", 0.2))
self.svar_thr = float(kwargs.get("svar_thr", 0.001))
self.total_power_thr = float(kwargs.get("total_power_thr", 0.005))
self.range_thr = float(kwargs.get("range_thr", 0.001))
self.nbsym = int(kwargs.get("nbsym", nbsym))
self.scale_factor = float(kwargs.get("scale_factor", 1.0))
self.spline_kind = spline_kind
self.extrema_detection = kwargs.get("extrema_detection", "simple") # simple, parabol
assert self.extrema_detection in (
"simple",
"parabol",
), "Only 'simple' and 'parabol' values supported"
self.DTYPE = kwargs.get("DTYPE", np.float64)
self.FIXE = int(kwargs.get("FIXE", 0))
self.FIXE_H = int(kwargs.get("FIXE_H", 0))
self.MAX_ITERATION = int(kwargs.get("MAX_ITERATION", 1000))
# Instance global declaration
self.imfs = None # Optional[np.ndarray]
self.residue = None # Optional[np.ndarray]
[docs] def __call__(self, S: np.ndarray, T: Optional[np.ndarray] = None, max_imf: int = -1) -> np.ndarray:
return self.emd(S, T=T, max_imf=max_imf)
[docs] def prepare_points(
self,
T: np.ndarray,
S: np.ndarray,
max_pos: np.ndarray,
max_val: np.ndarray,
min_pos: np.ndarray,
min_val: np.ndarray,
):
"""
Performs extrapolation on edges by adding extra extrema, also known
as mirroring signal. The number of added points depends on *nbsym*
variable.
Parameters
----------
T : numpy array
Position or time array.
S : numpy array
Input signal.
max_pos : iterable
Sorted time positions of maxima.
max_val : iterable
Signal values at max_pos positions.
min_pos : iterable
Sorted time positions of minima.
min_val : iterable
Signal values at min_pos positions.
Returns
-------
max_extrema : numpy array (2 rows)
Position (1st row) and values (2nd row) of minima.
min_extrema : numpy array (2 rows)
Position (1st row) and values (2nd row) of maxima.
"""
if self.extrema_detection == "parabol":
return self._prepare_points_parabol(T, S, max_pos, max_val, min_pos, min_val)
elif self.extrema_detection == "simple":
return self._prepare_points_simple(T, S, max_pos, max_val, min_pos, min_val)
else:
msg = "Incorrect extrema detection type. Please try: 'simple' or 'parabol'."
raise ValueError(msg)
def _prepare_points_parabol(self, T, S, max_pos, max_val, min_pos, min_val) -> Tuple[np.ndarray, np.ndarray]:
"""
Performs mirroring on signal which extrema do not necessarily
belong on the position array.
See :meth:`EMD.prepare_points`.
"""
# Need at least two extrema to perform mirroring
max_extrema = np.zeros((2, len(max_pos)), dtype=self.DTYPE)
min_extrema = np.zeros((2, len(min_pos)), dtype=self.DTYPE)
max_extrema[0], min_extrema[0] = max_pos, min_pos
max_extrema[1], min_extrema[1] = max_val, min_val
# Local variables
nbsym = self.nbsym
end_min, end_max = len(min_pos), len(max_pos)
####################################
# Left bound
d_pos = max_pos[0] - min_pos[0]
left_ext_max_type = d_pos < 0 # True -> max, else min
# Left extremum is maximum
if left_ext_max_type:
if (S[0] > min_val[0]) and (np.abs(d_pos) > (max_pos[0] - T[0])):
# mirror signal to first extrema
expand_left_max_pos = 2 * max_pos[0] - max_pos[1 : nbsym + 1]
expand_left_min_pos = 2 * max_pos[0] - min_pos[0:nbsym]
expand_left_max_val = max_val[1 : nbsym + 1]
expand_left_min_val = min_val[0:nbsym]
else:
# mirror signal to beginning
expand_left_max_pos = 2 * T[0] - max_pos[0:nbsym]
expand_left_min_pos = 2 * T[0] - np.append(T[0], min_pos[0 : nbsym - 1])
expand_left_max_val = max_val[0:nbsym]
expand_left_min_val = np.append(S[0], min_val[0 : nbsym - 1])
# Left extremum is minimum
else:
if (S[0] < max_val[0]) and (np.abs(d_pos) > (min_pos[0] - T[0])):
# mirror signal to first extrema
expand_left_max_pos = 2 * min_pos[0] - max_pos[0:nbsym]
expand_left_min_pos = 2 * min_pos[0] - min_pos[1 : nbsym + 1]
expand_left_max_val = max_val[0:nbsym]
expand_left_min_val = min_val[1 : nbsym + 1]
else:
# mirror signal to beginning
expand_left_max_pos = 2 * T[0] - np.append(T[0], max_pos[0 : nbsym - 1])
expand_left_min_pos = 2 * T[0] - min_pos[0:nbsym]
expand_left_max_val = np.append(S[0], max_val[0 : nbsym - 1])
expand_left_min_val = min_val[0:nbsym]
if not expand_left_min_pos.shape:
expand_left_min_pos, expand_left_min_val = min_pos, min_val
if not expand_left_max_pos.shape:
expand_left_max_pos, expand_left_max_val = max_pos, max_val
expand_left_min = np.vstack((expand_left_min_pos[::-1], expand_left_min_val[::-1]))
expand_left_max = np.vstack((expand_left_max_pos[::-1], expand_left_max_val[::-1]))
####################################
# Right bound
d_pos = max_pos[-1] - min_pos[-1]
right_ext_max_type = d_pos > 0
# Right extremum is maximum
if not right_ext_max_type:
if (S[-1] < max_val[-1]) and (np.abs(d_pos) > (T[-1] - min_pos[-1])):
# mirror signal to last extrema
idx_max = max(0, end_max - nbsym)
idx_min = max(0, end_min - nbsym - 1)
expand_right_max_pos = 2 * min_pos[-1] - max_pos[idx_max:]
expand_right_min_pos = 2 * min_pos[-1] - min_pos[idx_min:-1]
expand_right_max_val = max_val[idx_max:]
expand_right_min_val = min_val[idx_min:-1]
else:
# mirror signal to end
idx_max = max(0, end_max - nbsym + 1)
idx_min = max(0, end_min - nbsym)
expand_right_max_pos = 2 * T[-1] - np.append(max_pos[idx_max:], T[-1])
expand_right_min_pos = 2 * T[-1] - min_pos[idx_min:]
expand_right_max_val = np.append(max_val[idx_max:], S[-1])
expand_right_min_val = min_val[idx_min:]
# Right extremum is minimum
else:
if (S[-1] > min_val[-1]) and len(max_pos) > 1 and (np.abs(d_pos) > (T[-1] - max_pos[-1])):
# mirror signal to last extremum
idx_max = max(0, end_max - nbsym - 1)
idx_min = max(0, end_min - nbsym)
expand_right_max_pos = 2 * max_pos[-1] - max_pos[idx_max:-1]
expand_right_min_pos = 2 * max_pos[-1] - min_pos[idx_min:]
expand_right_max_val = max_val[idx_max:-1]
expand_right_min_val = min_val[idx_min:]
else:
# mirror signal to end
idx_max = max(0, end_max - nbsym)
idx_min = max(0, end_min - nbsym + 1)
expand_right_max_pos = 2 * T[-1] - max_pos[idx_max:]
expand_right_min_pos = 2 * T[-1] - np.append(min_pos[idx_min:], T[-1])
expand_right_max_val = max_val[idx_max:]
expand_right_min_val = np.append(min_val[idx_min:], S[-1])
if not expand_right_min_pos.shape:
expand_right_min_pos, expand_right_min_val = min_pos, min_val
if not expand_right_max_pos.shape:
expand_right_max_pos, expand_right_max_val = max_pos, max_val
expand_right_min = np.vstack((expand_right_min_pos[::-1], expand_right_min_val[::-1]))
expand_right_max = np.vstack((expand_right_max_pos[::-1], expand_right_max_val[::-1]))
max_extrema = np.hstack((expand_left_max, max_extrema, expand_right_max))
min_extrema = np.hstack((expand_left_min, min_extrema, expand_right_min))
return max_extrema, min_extrema
def _prepare_points_simple(
self,
T: np.ndarray,
S: np.ndarray,
max_pos: np.ndarray,
max_val: Optional[np.ndarray],
min_pos: np.ndarray,
min_val: Optional[np.ndarray],
) -> Tuple[np.ndarray, np.ndarray]:
"""
Performs mirroring on signal which extrema can be indexed on
the position array.
See :meth:`EMD.prepare_points`.
"""
# Find indexes of pass
ind_min = min_pos.astype(int)
ind_max = max_pos.astype(int)
# Local variables
nbsym = self.nbsym
end_min, end_max = len(min_pos), len(max_pos)
####################################
# Left bound - mirror nbsym points to the left
if ind_max[0] < ind_min[0]:
if S[0] > S[ind_min[0]]:
lmax = ind_max[1 : min(end_max, nbsym + 1)][::-1]
lmin = ind_min[0 : min(end_min, nbsym + 0)][::-1]
lsym = ind_max[0]
else:
lmax = ind_max[0 : min(end_max, nbsym)][::-1]
lmin = np.append(ind_min[0 : min(end_min, nbsym - 1)][::-1], 0)
lsym = 0
else:
if S[0] < S[ind_max[0]]:
lmax = ind_max[0 : min(end_max, nbsym + 0)][::-1]
lmin = ind_min[1 : min(end_min, nbsym + 1)][::-1]
lsym = ind_min[0]
else:
lmax = np.append(ind_max[0 : min(end_max, nbsym - 1)][::-1], 0)
lmin = ind_min[0 : min(end_min, nbsym)][::-1]
lsym = 0
####################################
# Right bound - mirror nbsym points to the right
if ind_max[-1] < ind_min[-1]:
if S[-1] < S[ind_max[-1]]:
rmax = ind_max[max(end_max - nbsym, 0) :][::-1]
rmin = ind_min[max(end_min - nbsym - 1, 0) : -1][::-1]
rsym = ind_min[-1]
else:
rmax = np.append(ind_max[max(end_max - nbsym + 1, 0) :], len(S) - 1)[::-1]
rmin = ind_min[max(end_min - nbsym, 0) :][::-1]
rsym = len(S) - 1
else:
if S[-1] > S[ind_min[-1]]:
rmax = ind_max[max(end_max - nbsym - 1, 0) : -1][::-1]
rmin = ind_min[max(end_min - nbsym, 0) :][::-1]
rsym = ind_max[-1]
else:
rmax = ind_max[max(end_max - nbsym, 0) :][::-1]
rmin = np.append(ind_min[max(end_min - nbsym + 1, 0) :], len(S) - 1)[::-1]
rsym = len(S) - 1
# In case any array missing
if not lmin.size:
lmin = ind_min
if not rmin.size:
rmin = ind_min
if not lmax.size:
lmax = ind_max
if not rmax.size:
rmax = ind_max
# Mirror points
tlmin = 2 * T[lsym] - T[lmin]
tlmax = 2 * T[lsym] - T[lmax]
trmin = 2 * T[rsym] - T[rmin]
trmax = 2 * T[rsym] - T[rmax]
# If mirrored points are not outside passed time range.
if tlmin[0] > T[0] or tlmax[0] > T[0]:
if lsym == ind_max[0]:
lmax = ind_max[0 : min(end_max, nbsym)][::-1]
else:
lmin = ind_min[0 : min(end_min, nbsym)][::-1]
if lsym == 0:
raise Exception("Left edge BUG")
lsym = 0
tlmin = 2 * T[lsym] - T[lmin]
tlmax = 2 * T[lsym] - T[lmax]
if trmin[-1] < T[-1] or trmax[-1] < T[-1]:
if rsym == ind_max[-1]:
rmax = ind_max[max(end_max - nbsym, 0) :][::-1]
else:
rmin = ind_min[max(end_min - nbsym, 0) :][::-1]
if rsym == len(S) - 1:
raise Exception("Right edge BUG")
rsym = len(S) - 1
trmin = 2 * T[rsym] - T[rmin]
trmax = 2 * T[rsym] - T[rmax]
zlmax = S[lmax]
zlmin = S[lmin]
zrmax = S[rmax]
zrmin = S[rmin]
tmin = np.append(tlmin, np.append(T[ind_min], trmin))
tmax = np.append(tlmax, np.append(T[ind_max], trmax))
zmin = np.append(zlmin, np.append(S[ind_min], zrmin))
zmax = np.append(zlmax, np.append(S[ind_max], zrmax))
max_extrema = np.array([tmax, zmax])
min_extrema = np.array([tmin, zmin])
# Make double sure, that each extremum is significant
max_dup_idx = np.where(max_extrema[0, 1:] == max_extrema[0, :-1])
max_extrema = np.delete(max_extrema, max_dup_idx, axis=1)
min_dup_idx = np.where(min_extrema[0, 1:] == min_extrema[0, :-1])
min_extrema = np.delete(min_extrema, min_dup_idx, axis=1)
return max_extrema, min_extrema
[docs] def spline_points(self, T: np.ndarray, extrema: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Constructs spline over given points.
Parameters
----------
T : numpy array
Position or time array.
extrema : numpy array
Position (1st row) and values (2nd row) of points.
Returns
-------
T : numpy array
Position array (same as input).
spline : numpy array
Spline array over given positions T.
"""
kind = self.spline_kind.lower()
t = T[np.r_[T >= extrema[0, 0]] & np.r_[T <= extrema[0, -1]]]
if kind == "akima":
return t, akima(extrema[0], extrema[1], t)
elif kind == "cubic":
if extrema.shape[1] > 3:
return t, interp1d(extrema[0], extrema[1], kind=kind)(t)
else:
return cubic_spline_3pts(extrema[0], extrema[1], t)
elif kind in ["slinear", "quadratic", "linear"]:
return T, interp1d(extrema[0], extrema[1], kind=kind)(t).astype(self.DTYPE)
else:
raise ValueError("No such interpolation method!")
@staticmethod
def _not_duplicate(S: np.ndarray) -> np.ndarray:
"""
Returns indices for not repeating values, where there is no extremum.
Example
-------
>>> S = [0, 1, 1, 1, 2, 3]
>>> idx = self._not_duplicate(S)
[0, 1, 3, 4, 5]
"""
dup = np.r_[S[1:-1] == S[0:-2]] & np.r_[S[1:-1] == S[2:]]
not_dup_idx = np.arange(1, len(S) - 1)[~dup]
idx = np.empty(len(not_dup_idx) + 2, dtype=np.int64)
idx[0] = 0
idx[-1] = len(S) - 1
idx[1:-1] = not_dup_idx
return idx
[docs] def find_extrema(self, T: np.ndarray, S: np.ndarray) -> FindExtremaOutput:
"""
Returns extrema (minima and maxima) for given signal S.
Detection and definition of the extrema depends on
``extrema_detection`` variable, set on initiation of EMD.
Parameters
----------
T : numpy array
Position or time array.
S : numpy array
Input data S(T).
Returns
-------
local_max_pos : numpy array
Position of local maxima.
local_max_val : numpy array
Values of local maxima.
local_min_pos : numpy array
Position of local minima.
local_min_val : numpy array
Values of local minima.
"""
if self.extrema_detection == "parabol":
return self._find_extrema_parabol(T, S)
elif self.extrema_detection == "simple":
return self._find_extrema_simple(T, S)
else:
raise ValueError("Incorrect extrema detection type. Please try: 'simple' or 'parabol'.")
def _find_extrema_parabol(self, T: np.ndarray, S: np.ndarray) -> FindExtremaOutput:
"""
Performs parabol estimation of extremum, i.e. an extremum is a peak
of parabol spanned on 3 consecutive points, where the mid point is
the closest.
See :meth:`EMD.find_extrema()`.
"""
# Finds indexes of zero-crossings
S1, S2 = S[:-1], S[1:]
indzer = np.nonzero(S1 * S2 < 0)[0]
if np.any(S == 0):
indz = np.nonzero(S == 0)[0]
if np.any(np.diff(indz) == 1):
zer = S == 0
dz = np.diff(np.append(np.append(0, zer), 0))
debz = np.nonzero(dz == 1)[0]
finz = np.nonzero(dz == -1)[0] - 1
indz = np.round((debz + finz) / 2.0)
indzer = np.sort(np.append(indzer, indz))
dt = float(T[1] - T[0])
scale = 2.0 * dt * dt
idx = self._not_duplicate(S)
T = T[idx]
S = S[idx]
# p - previous
# 0 - current
# n - next
Tp, T0, Tn = T[:-2], T[1:-1], T[2:]
Sp, S0, Sn = S[:-2], S[1:-1], S[2:]
# a = Sn + Sp - 2*S0
# b = 2*(Tn+Tp)*S0 - ((Tn+T0)*Sp+(T0+Tp)*Sn)
# c = Sp*T0*Tn -2*Tp*S0*Tn + Tp*T0*Sn
TnTp, T0Tn, TpT0 = Tn - Tp, T0 - Tn, Tp - T0
scale = Tp * Tn * Tn + Tp * Tp * T0 + T0 * T0 * Tn - Tp * Tp * Tn - Tp * T0 * T0 - T0 * Tn * Tn
a = T0Tn * Sp + TnTp * S0 + TpT0 * Sn
b = (S0 - Sn) * Tp ** 2 + (Sn - Sp) * T0 ** 2 + (Sp - S0) * Tn ** 2
c = T0 * Tn * T0Tn * Sp + Tn * Tp * TnTp * S0 + Tp * T0 * TpT0 * Sn
a = a / scale
b = b / scale
c = c / scale
a[a == 0] = 1e-14
tVertex = -0.5 * b / a
idx = np.r_[tVertex < T0 + 0.5 * (Tn - T0)] & np.r_[tVertex >= T0 - 0.5 * (T0 - Tp)]
a, b, c = a[idx], b[idx], c[idx]
tVertex = tVertex[idx]
sVertex = a * tVertex * tVertex + b * tVertex + c
local_max_pos, local_max_val = tVertex[a < 0], sVertex[a < 0]
local_min_pos, local_min_val = tVertex[a > 0], sVertex[a > 0]
return local_max_pos, local_max_val, local_min_pos, local_min_val, indzer
@staticmethod
def _find_extrema_simple(T: np.ndarray, S: np.ndarray) -> FindExtremaOutput:
"""
Performs extrema detection, where extremum is defined as a point,
that is above/below its neighbours.
See :meth:`EMD.find_extrema`.
"""
# Finds indexes of zero-crossings
S1, S2 = S[:-1], S[1:]
indzer = np.nonzero(S1 * S2 < 0)[0]
if np.any(S == 0):
indz = np.nonzero(S == 0)[0]
if np.any(np.diff(indz) == 1):
zer = S == 0
dz = np.diff(np.append(np.append(0, zer), 0))
debz = np.nonzero(dz == 1)[0]
finz = np.nonzero(dz == -1)[0] - 1
indz = np.round((debz + finz) / 2.0)
indzer = np.sort(np.append(indzer, indz))
# Finds local extrema
d = np.diff(S)
d1, d2 = d[:-1], d[1:]
indmin = np.nonzero(np.r_[d1 * d2 < 0] & np.r_[d1 < 0])[0] + 1
indmax = np.nonzero(np.r_[d1 * d2 < 0] & np.r_[d1 > 0])[0] + 1
# When two or more points have the same value
if np.any(d == 0):
imax, imin = [], []
bad = d == 0
dd = np.diff(np.append(np.append(0, bad), 0))
debs = np.nonzero(dd == 1)[0]
fins = np.nonzero(dd == -1)[0]
if debs[0] == 1:
if len(debs) > 1:
debs, fins = debs[1:], fins[1:]
else:
debs, fins = [], []
if len(debs) > 0:
if fins[-1] == len(S) - 1:
if len(debs) > 1:
debs, fins = debs[:-1], fins[:-1]
else:
debs, fins = [], []
lc = len(debs)
if lc > 0:
for k in range(lc):
if d[debs[k] - 1] > 0:
if d[fins[k]] < 0:
imax.append(np.round((fins[k] + debs[k]) / 2.0))
else:
if d[fins[k]] > 0:
imin.append(np.round((fins[k] + debs[k]) / 2.0))
if len(imax) > 0:
indmax = indmax.tolist()
for x in imax:
indmax.append(int(x))
indmax.sort()
if len(imin) > 0:
indmin = indmin.tolist()
for x in imin:
indmin.append(int(x))
indmin.sort()
local_max_pos = T[indmax]
local_max_val = S[indmax]
local_min_pos = T[indmin]
local_min_val = S[indmin]
return local_max_pos, local_max_val, local_min_pos, local_min_val, indzer
[docs] def end_condition(self, S: np.ndarray, IMF: np.ndarray) -> bool:
"""Tests for end condition of whole EMD. The procedure will stop if:
* Absolute amplitude (max - min) is below *range_thr* threshold, or
* Metric L1 (mean absolute difference) is below *total_power_thr* threshold.
Parameters
----------
S : numpy array
Original signal on which EMD was performed.
IMF : numpy 2D array
Set of IMFs where each row is IMF. Their order is not important.
Returns
-------
end : bool
Whether sifting is finished.
"""
# When to stop EMD
tmp = S - np.sum(IMF, axis=0)
if np.max(tmp) - np.min(tmp) < self.range_thr:
self.logger.debug("FINISHED -- RANGE")
return True
if np.sum(np.abs(tmp)) < self.total_power_thr:
self.logger.debug("FINISHED -- SUM POWER")
return True
return False
[docs] def check_imf(
self,
imf_new: np.ndarray,
imf_old: np.ndarray,
eMax: np.ndarray,
eMin: np.ndarray,
) -> bool:
"""
Huang criteria for **IMF** (similar to Cauchy convergence test).
Signal is an IMF if consecutive siftings do not affect signal
in a significant manner.
"""
# local max are >0 and local min are <0
if np.any(eMax[1] < 0) or np.any(eMin[1] > 0):
return False
# Convergence
if np.sum(imf_new ** 2) < 1e-10:
return False
# Precompute values
imf_diff = imf_new - imf_old
imf_diff_sqrd_sum = np.sum(imf_diff * imf_diff)
# Scaled variance test
svar = imf_diff_sqrd_sum / (max(imf_old) - min(imf_old))
if svar < self.svar_thr:
self.logger.debug("Scaled variance -- PASSED")
return True
# Standard deviation test
std = np.sum((imf_diff / imf_new) ** 2)
if std < self.std_thr:
self.logger.debug("Standard deviation -- PASSED")
return True
energy_ratio = imf_diff_sqrd_sum / np.sum(imf_old * imf_old)
if energy_ratio < self.energy_ratio_thr:
self.logger.debug("Energy ratio -- PASSED")
return True
return False
@staticmethod
def _common_dtype(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Casts inputs (x, y) into a common numpy DTYPE."""
dtype = np.find_common_type([x.dtype, y.dtype], [])
if x.dtype != dtype:
x = x.astype(dtype)
if y.dtype != dtype:
y = y.astype(dtype)
return x, y
@staticmethod
def _normalize_time(t: np.ndarray) -> np.ndarray:
"""
Normalize time array so that it doesn't explode on tiny values.
Returned array starts with 0 and the smallest increase is by 1.
"""
d = np.diff(t)
assert np.all(d != 0), "All time domain values needs to be unique"
return (t - t[0]) / np.min(d)
[docs] def emd(self, S: np.ndarray, T: Optional[np.ndarray] = None, max_imf: int = -1) -> np.ndarray:
"""
Performs Empirical Mode Decomposition on signal S.
The decomposition is limited to *max_imf* imfs.
Returns IMF functions and residue in a single numpy array format.
Parameters
----------
S : numpy array,
Input signal.
T : numpy array, (default: None)
Position or time array. If None is passed or self.extrema_detection == "simple",
then numpy range is created.
max_imf : int, (default: -1)
IMF number to which decomposition should be performed.
Negative value means *all*.
Returns
-------
IMFs and residue : numpy array
A numpy array which cointains both the IMFs and residual, if any, appended as
the last slice.
"""
if T is not None and len(S) != len(T):
raise ValueError("Time series have different sizes: len(S) -> {} != {} <- len(T)".format(len(S), len(T)))
if T is None or self.extrema_detection == "simple":
T = get_timeline(len(S), S.dtype)
# Normalize T so that it doesn't explode
T = self._normalize_time(T)
# Make sure same types are dealt
S, T = self._common_dtype(S, T)
self.DTYPE = S.dtype
N = len(S)
residue = S.astype(self.DTYPE)
imf = np.zeros(len(S), dtype=self.DTYPE)
imf_old = np.nan
if S.shape != T.shape:
raise ValueError("Position or time array should be the same size as signal.")
# Create arrays
imfNo = 0
extNo = -1
IMF = np.empty((imfNo, N)) # Numpy container for IMF
finished = False
while not finished:
self.logger.debug("IMF -- %s", imfNo)
residue[:] = S - np.sum(IMF[:imfNo], axis=0)
imf = residue.copy()
mean = np.zeros(len(S), dtype=self.DTYPE)
# Counters
n = 0 # All iterations for current imf.
n_h = 0 # counts when |#zero - #ext| <=1
while True:
n += 1
if n >= self.MAX_ITERATION:
self.logger.info("Max iterations reached for IMF. Continuing with another IMF.")
break
ext_res = self.find_extrema(T, imf)
max_pos, min_pos, indzer = ext_res[0], ext_res[2], ext_res[4]
extNo = len(min_pos) + len(max_pos)
nzm = len(indzer)
if extNo > 2:
max_env, min_env, eMax, eMin = self.extract_max_min_spline(T, imf)
mean[:] = 0.5 * (max_env + min_env)
imf_old = imf.copy()
imf[:] = imf - mean
# Fix number of iterations
if self.FIXE:
if n >= self.FIXE:
break
# Fix number of iterations after number of zero-crossings
# and extrema differ at most by one.
elif self.FIXE_H:
tmp_residue = self.find_extrema(T, imf)
max_pos, min_pos, ind_zer = (
tmp_residue[0],
tmp_residue[2],
tmp_residue[4],
)
extNo = len(max_pos) + len(min_pos)
nzm = len(ind_zer)
if n == 1:
continue
# If proto-IMF add one, or reset counter otherwise
n_h = n_h + 1 if abs(extNo - nzm) < 2 else 0
# STOP
if n_h >= self.FIXE_H:
break
# Stops after default stopping criteria are met
else:
ext_res = self.find_extrema(T, imf)
max_pos, _, min_pos, _, ind_zer = ext_res
extNo = len(max_pos) + len(min_pos)
nzm = len(ind_zer)
if imf_old is np.nan:
continue
f1 = self.check_imf(imf, imf_old, eMax, eMin)
f2 = abs(extNo - nzm) < 2
# STOP
if f1 and f2:
break
else: # Less than 2 ext, i.e. trend
finished = True
break
# END OF IMF SIFTING
IMF = np.vstack((IMF, imf.copy()))
imfNo += 1
if self.end_condition(S, IMF) or imfNo == max_imf:
finished = True
break
# If the last sifting had 2 or less extrema then that's a trend (residue)
if extNo <= 2:
IMF = IMF[:-1]
# Saving imfs and residue for external references
self.imfs = IMF.copy()
self.residue = S - np.sum(self.imfs, axis=0)
# If residue isn't 0 then add it to the output
if not np.allclose(self.residue, 0):
IMF = np.vstack((IMF, self.residue))
return IMF
[docs] def get_imfs_and_residue(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Provides access to separated imfs and residue from recently analysed signal.
Returns
-------
imfs : np.ndarray
Obtained IMFs
residue : np.ndarray
Residue.
"""
if self.imfs is None or self.residue is None:
raise ValueError("No IMF found. Please, run EMD method or its variant first.")
return self.imfs, self.residue
[docs] def get_imfs_and_trend(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Provides access to separated imfs and trend from recently analysed signal.
Note that this may differ from the `get_imfs_and_residue` as the trend isn't
necessarily the residue. Residue is a point-wise difference between input signal
and all obtained components, whereas trend is the slowest component (can be zero).
Returns
-------
imfs : np.ndarray
Obtained IMFs
trend : np.ndarray
The main trend.
"""
if self.imfs is None or self.residue is None:
raise ValueError("No IMF found. Please, run EMD method or its variant first.")
imfs, residue = self.get_imfs_and_residue()
if np.allclose(residue, 0):
return imfs[:-1].copy(), imfs[-1].copy()
else:
return imfs, residue
###################################################
if __name__ == "__main__":
import pylab as plt
# Logging options
logging.basicConfig(level=logging.DEBUG)
# EMD options
max_imf = -1
DTYPE = np.float64
# Signal options
N = 400
tMin, tMax = 0, 2 * np.pi
T = np.linspace(tMin, tMax, N, dtype=DTYPE)
S = np.sin(20 * T * (1 + 0.2 * T)) + T ** 2 + np.sin(13 * T)
S = S.astype(DTYPE)
print("Input S.dtype: " + str(S.dtype))
# Prepare and run EMD
emd = EMD()
emd.FIXE_H = 5
emd.nbsym = 2
emd.spline_kind = "cubic"
emd.DTYPE = DTYPE
imfs = emd.emd(S, T, max_imf)
imfNo = imfs.shape[0]
# Plot results
c = 1
r = np.ceil((imfNo + 1) / c)
plt.ioff()
plt.subplot(r, c, 1)
plt.plot(T, S, "r")
plt.xlim((tMin, tMax))
plt.title("Original signal")
for num in range(imfNo):
plt.subplot(r, c, num + 2)
plt.plot(T, imfs[num], "g")
plt.xlim((tMin, tMax))
plt.ylabel("Imf " + str(num + 1))
plt.tight_layout()
plt.show()