#!/usr/bin/python
# coding: UTF-8
#
# Author: Dawid Laszuk
# Contact: https://github.com/laszukdawid/PyEMD/issues
#
# Edited: 07/07/2017
#
# Feel free to contact for any information.
import logging
import numpy as np
try:
from scipy.interpolate import SmoothBivariateSpline as SBS
from scipy.ndimage.filters import maximum_filter
from scipy.ndimage.morphology import binary_erosion, generate_binary_structure
except ImportError:
raise ImportError(
"EMD2D and BEMD are not supported. Feel free to play around and improve them. "
+ "Required depdenecies are in `requriements-extra`."
)
[docs]class EMD2D:
"""
**Empirical Mode Decomposition** on images.
**Important** This is an experimental module.
Experiments performed using this module didn't provide acceptable results,
either in actual output nor in computation performance. The author is not
an expert in image processing so it's very likely that the code could
have been improved. Take your best shot.
Method decomposes images into 2D representations of loose Intrinsic Mode
Functions (IMFs).
The current version of the algorithm detects local extrema, separately
minima and maxima, and then connects them to create envelopes. These
are then used to create a mean trend and subtracted from the input.
Threshold values that control goodness of the decomposition:
* `mse_thr` --- proto-IMF check whether small mean square error.
* `mean_thr` --- proto-IMF chekc whether small mean value.
"""
logger = logging.getLogger(__name__)
[docs] def __init__(self, **config):
# ProtoIMF related
self.mse_thr = 0.01
self.mean_thr = 0.01
self.FIXE = 0
self.FIXE_H = 0
self.MAX_ITERATION = 1000
# Update based on options
for key in config.keys():
if key in self.__dict__.keys():
self.__dict__[key] = config[key]
[docs] def __call__(self, image, max_imf=-1):
return self.emd(image, max_imf=max_imf)
[docs] @classmethod
def prepare_image(cls, image):
"""Prepares image for edge extrapolation.
Method bloats image by mirroring it along all axes. This turns
extrapolation on edges into interpolation within bigger image.
Parameters
----------
image : numpy 2D array
Image for which interpolation is required,
Returns
-------
image : numpy 2D array
Big image based on the input. Grid 3x3 where the center block is input and
neighbouring panels are respective mirror images.
"""
# TODO: This is nasty. Instead of bloating whole image and then trying to
# find all extrema, it's better to deal directly with indices.
shape = image.shape
big_image = np.zeros((shape[0] * 3, shape[1] * 3))
image_lr = np.fliplr(image)
image_ud = np.flipud(image)
image_ud_lr = np.flipud(image_lr)
image_lr_ud = np.fliplr(image_ud)
# Fill center with default image
big_image[shape[0] : 2 * shape[0], shape[1] : 2 * shape[1]] = image
# Fill left center
big_image[shape[0] : 2 * shape[0], : shape[1]] = image_lr
# Fill right center
big_image[shape[0] : 2 * shape[0], 2 * shape[1] :] = image_lr
# Fill center top
big_image[: shape[0], shape[1] : shape[1] * 2] = image_ud
# Fill center bottom
big_image[2 * shape[0] :, shape[1] : 2 * shape[1]] = image_ud
# Fill left top
big_image[: shape[0], : shape[1]] = image_ud_lr
# Fill left bottom
big_image[2 * shape[0] :, : shape[1]] = image_ud_lr
# Fill right top
big_image[: shape[0], 2 * shape[1] :] = image_lr_ud
# Fill right bottom
big_image[2 * shape[0] :, 2 * shape[1] :] = image_lr_ud
return big_image
[docs] @classmethod
def spline_points(cls, X, Y, Z, xi, yi):
"""Interpolates for given set of points"""
# SBS requires at least m=(kx+1)*(ky+1) points,
# where kx=ky=3 (default) is the degree of bivariate spline.
# Thus, if less than 16=(3+1)*(3+1) points, adjust kx & ky.
spline = SBS(X, Y, Z)
return spline(xi, yi)
[docs] @classmethod
def find_extrema(cls, image):
"""
Finds extrema, both mininma and maxima, based on local maximum filter.
Returns extrema in form of two rows, where the first and second are
positions of x and y, respectively.
Parameters
----------
image : numpy 2D array
Monochromatic image or any 2D array.
Returns
-------
min_peaks : numpy array
Minima positions.
max_peaks : numpy array
Maxima positions.
"""
# define an 3x3 neighborhood
neighborhood = generate_binary_structure(2, 2)
# apply the local maximum filter; all pixel of maximal value
# in their neighborhood are set to 1
local_min = maximum_filter(-image, footprint=neighborhood) == -image
local_max = maximum_filter(image, footprint=neighborhood) == image
# can't distinguish between background zero and filter zero
background = image == 0
# appear along the bg border (artifact of the local max filter)
eroded_background = binary_erosion(background, structure=neighborhood, border_value=1)
# we obtain the final mask, containing only peaks,
# by removing the background from the local_max mask (xor operation)
min_peaks = local_min ^ eroded_background
max_peaks = local_max ^ eroded_background
min_peaks = local_min
max_peaks = local_max
min_peaks[[0, -1], :] = False
min_peaks[:, [0, -1]] = False
max_peaks[[0, -1], :] = False
max_peaks[:, [0, -1]] = False
min_peaks = np.nonzero(min_peaks)
max_peaks = np.nonzero(max_peaks)
return min_peaks, max_peaks
[docs] @classmethod
def end_condition(cls, image, IMFs):
"""Determins whether decomposition should be stopped.
Parameters
----------
image : numpy 2D array
Input image which is decomposed.
IMFs : numpy 3D array
Array for which first dimensions relates to respective IMF,
i.e. (numIMFs, imageX, imageY).
"""
rec = np.sum(IMFs, axis=0)
# If reconstruction is perfect, no need for more tests
if np.allclose(image, rec):
return True
return False
[docs] def check_proto_imf(self, proto_imf, proto_imf_prev, mean_env):
"""Check whether passed (proto) IMF is actual IMF.
Current condition is solely based on checking whether the mean is below threshold.
Parameters
----------
proto_imf : numpy 2D array
Current iteration of proto IMF.
proto_imf_prev : numpy 2D array
Previous iteration of proto IMF.
mean_env : numpy 2D array
Local mean computed from top and bottom envelopes.
Returns
-------
boolean
Whether current proto IMF is actual IMF.
"""
# TODO: Sifiting is very sensitive and subtracting const val can often flip
# maxima with minima in decompoisition and thus repeating above/below
# behaviour. For now, mean_env is checked whether close to zero excluding
# its offset.
if np.all(np.abs(mean_env - mean_env.mean()) < self.mean_thr):
# if np.all(np.abs(mean_env)<self.mean_thr):
return True
# If very little change with sifting
if np.allclose(proto_imf, proto_imf_prev):
return True
# If IMF mean close to zero (below threshold)
if np.mean(np.abs(proto_imf)) < self.mean_thr:
return True
# Everything relatively close to 0
mse_proto_imf = np.mean(proto_imf * proto_imf)
if mse_proto_imf < self.mse_thr:
return True
return False
[docs] def emd(self, image, max_imf=-1):
"""Performs EMD on input image with specified parameters.
Parameters
----------
image : numpy 2D array,
Image which will be decomposed.
max_imf : int, (default: -1)
IMF number to which decomposition should be performed.
Negative value means *all*.
Returns
-------
IMFs : numpy 3D array
Set of IMFs in form of numpy array where the first dimension
relates to IMF's ordinary number.
"""
image_min, image_max = np.min(image), np.max(image)
offset = image_min
scale = image_max - image_min
image_s = (image - offset) / scale
imf = np.zeros(image.shape)
imf_old = imf.copy()
imfNo = 0
IMF = np.empty((imfNo,) + image.shape)
notFinished = True
while notFinished:
self.logger.debug("IMF -- " + str(imfNo))
res = image_s - np.sum(IMF[:imfNo], axis=0)
imf = res.copy()
mean_env = np.zeros(image.shape)
stop_sifting = False
# Counters
n = 0 # All iterations for current imf.
n_h = 0 # counts when mean(proto_imf) < threshold
while not stop_sifting and n < self.MAX_ITERATION:
n += 1
self.logger.debug("Iteration: " + str(n))
min_peaks, max_peaks = self.find_extrema(imf)
self.logger.debug("min_peaks = %i | max_peaks = %i", len(min_peaks[0]), len(max_peaks[0]))
if len(min_peaks[0]) > 4 and len(max_peaks[0]) > 4:
imf_old = imf.copy()
imf = imf - mean_env
min_env, max_env = self.extract_max_min_spline(imf)
mean_env = 0.5 * (min_env + max_env)
imf_old = imf.copy()
imf = imf - mean_env
# Fix number of iterations
if self.FIXE:
if n >= self.FIXE + 1:
stop_sifting = True
# Fix number of iterations after number of zero-crossings
# and extrema differ at most by one.
elif self.FIXE_H:
if n == 1:
continue
if self.check_proto_imf(imf, imf_old, mean_env):
n_h += 1
else:
n_h = 0
# STOP if enough n_h
if n_h >= self.FIXE_H:
stop_sifting = True
# Stops after default stopping criteria are met
else:
if self.check_proto_imf(imf, imf_old, mean_env):
stop_sifting = True
else:
notFinished = False
stop_sifting = True
IMF = np.vstack((IMF, imf.copy()[None, :]))
imfNo += 1
if self.end_condition(image, IMF) or (max_imf > 0 and imfNo >= max_imf):
notFinished = False
break
res = image_s - np.sum(IMF[:imfNo], axis=0)
if not np.allclose(res, 0):
IMF = np.vstack((IMF, res[None, :]))
imfNo += 1
IMF = IMF * scale
IMF[-1] += offset
return IMF
########################################
if __name__ == "__main__":
print("Running example on EMD2D")
PLOT = True
logging.basicConfig(level=logging.DEBUG)
# Generate image
print("Generating image... ", end="")
rows, cols = 1024, 1024
row_scale, col_scale = 256, 256
x = np.arange(rows) / float(row_scale)
y = np.arange(cols).reshape((-1, 1)) / float(col_scale)
pi2 = 2 * np.pi
img = np.zeros((rows, cols))
img = img + np.sin(2 * pi2 * x) * np.cos(y * 4 * pi2 + 4 * x * pi2)
img = img + 3 * np.sin(2 * pi2 * x) + 2
img = img + 5 * x * y + 2 * (y - 0.2) * y
print("Done")
# Perform decomposition
print("Performing decomposition... ", end="")
emd2d = EMD2D()
# emd2d.FIXE_H = 5
IMFs = emd2d.emd(img, max_imf=4)
imfNo = IMFs.shape[0]
print("Done")
if PLOT:
print("Plotting results... ", end="")
import pylab as plt
# Save image for preview
plt.figure(figsize=(4, 4 * (imfNo + 1)))
plt.subplot(imfNo + 1, 1, 1)
plt.imshow(img)
plt.colorbar()
plt.title("Input image")
# Save reconstruction
for n, imf in enumerate(IMFs):
plt.subplot(imfNo + 1, 1, n + 2)
plt.imshow(imf)
plt.colorbar()
plt.title("IMF %i" % (n + 1))
plt.savefig("image_decomp")
print("Done")