#!/usr/bin/python
# coding: UTF-8
#
# Author: Dawid Laszuk
# Contact: https://github.com/laszukdawid/PyEMD/issues
#
# Feel free to contact for any information.
"""
.. currentmodule:: EEMD
"""
import logging
from collections import defaultdict
from multiprocessing import Pool
from typing import Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
from tqdm import tqdm
from PyEMD.utils import get_timeline
[docs]class EEMD:
"""
**Ensemble Empirical Mode Decomposition**
Ensemble empirical mode decomposition (EEMD) [Wu2009]_
is noise-assisted technique, which is meant to be more robust
than simple Empirical Mode Decomposition (EMD). The robustness is
checked by performing many decompositions on signals slightly
perturbed from their initial position. In the grand average over
all IMF results the noise will cancel each other out and the result
is pure decomposition.
Parameters
----------
trials : int (default: 100)
Number of trials or EMD performance with added noise.
noise_width : float (default: 0.05)
Standard deviation of Gaussian noise (:math:`\hat\sigma`).
It's relative to absolute amplitude of the signal, i.e.
:math:`\hat\sigma = \sigma\cdot|\max(S)-\min(S)|`, where
:math:`\sigma` is noise_width.
ext_EMD : EMD (default: None)
One can pass EMD object defined outside, which will be
used to compute IMF decompositions in each trial. If none
is passed then EMD with default options is used.
parallel : bool (default: False)
Flag whether to use multiprocessing in EEMD execution.
Since each EMD(s+noise) is independent this should improve execution
speed considerably.
*Note* that it's disabled by default because it's the most common
problem when EEMD takes too long time to finish.
If you set the flag to True, make also sure to set `processes` to
some reasonable value.
processes : int or None (optional)
Number of processes harness when executing in parallel mode.
The value should be between 1 and max that depends on your hardware.
separate_trends : bool (default: False)
Flag whether to isolate trends from each EMD decomposition into a separate component.
If `true`, the resulting EEMD will contain ensemble only from IMFs and
the mean residue will be stacked as the last element.
References
----------
.. [Wu2009] Z. Wu and N. E. Huang, "Ensemble empirical mode decomposition:
A noise-assisted data analysis method", Advances in Adaptive
Data Analysis, Vol. 1, No. 1 (2009) 1-41.
"""
logger = logging.getLogger(__name__)
noise_kinds_all = ["normal", "uniform"]
def __init__(self, trials: int = 100, noise_width: float = 0.05, ext_EMD=None, parallel: bool = False, **kwargs):
# Ensemble constants
self.trials = trials
self.noise_width = noise_width
self.separate_trends = bool(kwargs.get("separate_trends", False))
self.random = np.random.RandomState()
self.noise_kind = kwargs.get("noise_kind", "normal")
self.parallel = parallel
self.processes = kwargs.get("processes") # Optional[int]
if self.processes is not None and not self.parallel:
self.logger.warning("Passed value for process has no effect when `parallel` is False.")
if ext_EMD is None:
from PyEMD import EMD
self.EMD = EMD(**kwargs)
else:
self.EMD = ext_EMD
self.E_IMF = None # Optional[np.ndarray]
self.residue = None # Optional[np.ndarray]
self._all_imfs = {}
def __call__(
self, S: np.ndarray, T: Optional[np.ndarray] = None, max_imf: int = -1, progress: bool = False
) -> np.ndarray:
return self.eemd(S, T=T, max_imf=max_imf, progress=progress)
def __getstate__(self) -> Dict:
self_dict = self.__dict__.copy()
return self_dict
[docs] def generate_noise(self, scale: float, size: Union[int, Sequence[int]]) -> np.ndarray:
"""
Generate noise with specified parameters.
Currently supported distributions are:
* *normal* with std equal scale.
* *uniform* with range [-scale/2, scale/2].
Parameters
----------
scale : float
Width for the distribution.
size : int
Number of generated samples.
Returns
-------
noise : numpy array
Noise sampled from selected distribution.
"""
if self.noise_kind == "normal":
noise = self.random.normal(loc=0, scale=scale, size=size)
elif self.noise_kind == "uniform":
noise = self.random.uniform(low=-scale / 2, high=scale / 2, size=size)
else:
raise ValueError(
"Unsupported noise kind. Please assigned `noise_kind` to be one of these: {0}".format(
str(self.noise_kinds_all)
)
)
return noise
[docs] def noise_seed(self, seed: int) -> None:
"""Set seed for noise generation."""
self.random.seed(seed)
[docs] def eemd(
self, S: np.ndarray, T: Optional[np.ndarray] = None, max_imf: int = -1, progress: bool = False
) -> np.ndarray:
"""
Performs EEMD on provided signal.
For a large number of iterations defined by `trials` attr
the method performs :py:meth:`emd` on a signal with added white noise.
Parameters
----------
S : numpy array,
Input signal on which EEMD is performed.
T : numpy array or None, (default: None)
If none passed samples are numerated.
max_imf : int, (default: -1)
Defines up to how many IMFs each decomposition should
be performed. By default (negative value) it decomposes
all IMFs.
Returns
-------
eIMF : numpy array
Set of ensemble IMFs produced from input signal. In general,
these do not have to be, and most likely will not be, same as IMFs
produced using EMD.
"""
if T is None:
T = get_timeline(len(S), S.dtype)
scale = self.noise_width * np.abs(np.max(S) - np.min(S))
self._S = S
self._T = T
self._N = len(S)
self._scale = scale
self.max_imf = max_imf
# For trial number of iterations perform EMD on a signal
# with added white noise
if self.parallel:
pool = Pool(processes=self.processes)
map_pool = pool.map
else:
map_pool = map
all_IMFs = map_pool(self._trial_update, range(self.trials))
if self.parallel:
pool.close()
self._all_imfs = defaultdict(list)
it = iter if not progress else lambda x: tqdm(x, desc="EEMD", total=self.trials)
for imfs, trend in it(all_IMFs):
# A bit of explanation here.
# If the `trend` is not None, that means it was intentionally separated in the decomp process.
# This might due to `separate_trends` flag which means that trends are summed up separately
# and treated as the last component. Since `proto_eimfs` is a dict, that `-1` is treated literally
# and **not** as the *last position*. We can then use that `-1` to always add it as the last pos
# in the actual eIMF, which indicates the trend.
if trend is not None:
self._all_imfs[-1].append(trend)
for imf_num, imf in enumerate(imfs):
self._all_imfs[imf_num].append(imf)
# Convert defaultdict back to dict and explicitly rename `-1` position to be {the last value} for consistency.
self._all_imfs = dict(self._all_imfs)
if -1 in self._all_imfs:
self._all_imfs[len(self._all_imfs)] = self._all_imfs.pop(-1)
for imf_num in self._all_imfs.keys():
self._all_imfs[imf_num] = np.array(self._all_imfs[imf_num])
self.E_IMF = self.ensemble_mean()
self.residue = S - np.sum(self.E_IMF, axis=0)
return self.E_IMF
def _trial_update(self, trial) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""A single trial evaluation, i.e. EMD(signal + noise).
*Note*: Although `trial` argument isn't used it's needed for the (multiprocessing) map method.
"""
noise = self.generate_noise(self._scale, self._N)
imfs = self.emd(self._S + noise, self._T, self.max_imf)
trend = None
if self.separate_trends:
imfs, trend = self.EMD.get_imfs_and_trend()
return (imfs, trend)
[docs] def emd(self, S: np.ndarray, T: np.ndarray, max_imf: int = -1) -> np.ndarray:
"""Vanilla EMD method.
Provides emd evaluation from provided EMD class.
For reference please see :class:`PyEMD.EMD`.
"""
return self.EMD.emd(S, T, max_imf=max_imf)
[docs] def get_imfs_and_residue(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Provides access to separated imfs and residue from recently analysed signal.
Returns
-------
(imfs, residue) : (np.ndarray, np.ndarray)
Tuple that contains all imfs and a residue (if any).
"""
if self.E_IMF is None or self.residue is None:
raise ValueError("No IMF found. Please, run EMD method or its variant first.")
return self.E_IMF, self.residue
@property
def all_imfs(self):
"""A dictionary with all computed imfs per given order."""
return self._all_imfs
[docs] def ensemble_count(self) -> List[int]:
"""Count of imfs observed for given order, e.g. 1st proto-imf, in the whole ensemble."""
return [len(imfs) for imfs in self._all_imfs.values()]
[docs] def ensemble_mean(self) -> np.ndarray:
"""Pointwise mean over computed ensemble. Same as the output of `eemd()` method."""
return np.array([imfs.mean(axis=0) for imfs in self._all_imfs.values()])
[docs] def ensemble_std(self) -> np.ndarray:
"""Pointwise standard deviation over computed ensemble."""
return np.array([imfs.std(axis=0) for imfs in self._all_imfs.values()])
###################################################
# Beginning of program
if __name__ == "__main__":
import pylab as plt
E_imfNo = np.zeros(50, dtype=np.int)
# Logging options
logging.basicConfig(level=logging.INFO)
# EEMD options
max_imf = -1
# Signal options
N = 500
tMin, tMax = 0, 2 * np.pi
T = np.linspace(tMin, tMax, N)
S = 3 * np.sin(4 * T) + 4 * np.cos(9 * T) + np.sin(8.11 * T + 1.2)
# Prepare and run EEMD
eemd = EEMD(trials=50)
eemd.noise_seed(12345)
E_IMFs = eemd.eemd(S, T, max_imf)
imfNo = E_IMFs.shape[0]
# Plot results in a grid
c = np.floor(np.sqrt(imfNo + 1))
r = np.ceil((imfNo + 1) / c)
plt.ioff()
plt.subplot(r, c, 1)
plt.plot(T, S, "r")
plt.xlim((tMin, tMax))
plt.title("Original signal")
for num in range(imfNo):
plt.subplot(r, c, num + 2)
plt.plot(T, E_IMFs[num], "g")
plt.xlim((tMin, tMax))
plt.title("Imf " + str(num + 1))
plt.show()