"""Calculate the statistical Significance of IMFs."""
import logging
import math
import numpy as np
from scipy import stats
from scipy.signal import find_peaks
# helper function: Find mean period of an IMF
def mean_period(data):
"""Return mean-period of signal."""
peaks = len(find_peaks(data, height=0)[0])
return len(data) / peaks if peaks > 0 else len(data)
# helper function: find energy of signal/IMF
def energy(data):
"""Return energy of signal."""
return sum(pow(data, 2))
# helper function: find IMF significance in 'a priori' test
def significance_apriori(energy_density, T, N, alpha):
"""Check a priori significance and Return True if significant else False."""
k = abs(stats.norm.ppf((1 - alpha) / 2))
upper_limit = -T + (k * (math.sqrt(2 / N) * math.exp(T / 2)))
lower_limit = -T - (k * (math.sqrt(2 / N) * math.exp(T / 2)))
return not (lower_limit <= energy_density <= upper_limit)
# helper function: find significance in 'a posteriori' test
def significance_aposteriori(scaled_energy_density, T, N, alpha):
"""Check a posteriori significance and Return True if significant else False."""
k = abs(stats.norm.ppf((1 - alpha) / 2))
upper_limit = -T + (k * (math.sqrt(2 / N) * math.exp(T / 2)))
return not (scaled_energy_density <= upper_limit)
[docs]def whitenoise_check(IMFs: np.ndarray, test_name: str = "aposteriori", rescaling_imf: int = 1, alpha: float = 0.95):
"""Whitenoise statistical significance test.
Performs whitenoise test as described by Wu & Huang [Wu2004]_.
References
----------
.. [Wu2004] Zhaohua Wu, and Norden E. Huang. "A Study of the Characteristics of White Noise Using the
Empirical Mode Decomposition Method." Proceedings: Mathematical, Physical and Engineering
Sciences, vol. 460, no. 2046, The Royal Society, 2004, pp. 1597–611, http://www.jstor.org/stable/4143111.
Parameters
----------
IMFs: np.ndarray
(Required) numpy array containing IMFs computed from a normalized signal
test_name: str
(Optional) Test type. Supported values: 'apriori', 'aposteriori'. (default 'aposteriori')
rescaling_imf: int
(Optional) ith IMF of the signal used in rescaling for 'a posteriori' test. (default 1)
alpha: float
(Optional) The percentiles at which the test is to be performed; 0 < alpha < 1; (default 0.95)
Returns
-------
Optional dictionary
Returns dictionary with keys and values as IMFs' number and test result, respetively,
Test results can be either 0 (fail) or 1 (pass).
In case of problems with the input imfs, e.g. NaN values or no imfs, we return None.
Examples
--------
>>> import numpy as np
>>> from PyEMD import EMD
>>> from PyEMD.checks import whitenoise_check
>>> T = np.linspace(0, 1, 100)
>>> S = np.sin(2*2*np.pi*T)
>>> emd = EMD()
>>> imfs = emd(S)
>>> significant_imfs = whitenoise_check(imfs, test_name='apriori')
>>> significant_imfs
{1: 1, 2: 1}
>>> type(significant_imfs)
<class 'dict'>
"""
assert isinstance(IMFs, np.ndarray), "Invalid Data type, Pass a numpy.ndarray containing IMFs"
# check if IMFs are empty or not
if len(IMFs) == 0 or len(IMFs[0]) == 0:
logging.getLogger("PyEMD").warning("Detected empty input. Skipping check.")
return None
assert isinstance(alpha, float), "Invalid Data type for alpha, pass a float value between (0,1)"
assert 0 < alpha < 1, "alpha value should be in between (0,1)"
assert test_name in ("apriori", "aposteriori"), "Invalid test type"
assert isinstance(rescaling_imf, int), "Invalid data type for rescaling_imf, pass a int value"
assert 0 < rescaling_imf <= len(IMFs), "Invalid rescaling IMF"
if np.any(np.isnan(IMFs)):
# Return None if input has NaN
logging.getLogger("PyEMD").warning("Detected NaN values during whitenoise check. Skipping check.")
return None
N = len(IMFs[0])
output = {}
if test_name == "apriori":
for idx, imf in enumerate(IMFs):
log_T = math.log(mean_period(imf))
energy_density = math.log(energy(imf) / N)
sig_priori = significance_apriori(energy_density, log_T, N, alpha)
output[idx + 1] = int(sig_priori)
elif test_name == "aposteriori":
scaling_imf_mean_period = math.log(mean_period(IMFs[rescaling_imf - 1]))
scaling_imf_energy_density = math.log(energy(IMFs[rescaling_imf - 1]) / N)
k = abs(stats.norm.ppf((1 - alpha) / 2))
up_limit = -scaling_imf_mean_period + (k * math.sqrt(2 / N) * math.exp(scaling_imf_mean_period) / 2)
scaling_factor = math.exp(up_limit)
for idx, imf in enumerate(IMFs):
log_T = math.log(mean_period(imf))
if idx != rescaling_imf - 1:
scaled_energy_density = math.log((energy(imf) / N) / scaling_factor)
else:
scaled_energy_density = math.log(scaling_factor)
sig_aposteriori = significance_aposteriori(scaled_energy_density, log_T, N, alpha)
output[idx + 1] = int(sig_aposteriori)
else:
raise AssertionError("Only 'apriori' and 'aposteriori' are allowed")
return output