Source code for labtoolbox.signals.signals

import numpy as _np
import matplotlib.pyplot as _plt
import warnings
from typing import Union, Tuple, Optional, Callable, List
from numpy.typing import ArrayLike
from .._helper import DEFAULT_FIGSIZE, GenericError, apply_inward_ticks

# __all__ = ["fft", "ifft", "dfs", "fourier_series", "decompose", "harmonic", "envelope"]

[docs] def fft( data: _np.ndarray, t: Optional[Union[_np.ndarray, Tuple[_np.ndarray, _np.ndarray]]] = None, dt: Optional[Union[float, Tuple[float, float]]] = None, oversample: int = 2 ) -> Union[_np.ndarray, Tuple[_np.ndarray, _np.ndarray], Tuple[_np.ndarray, _np.ndarray, _np.ndarray]]: """ Compute the Fast Fourier Transform (FFT) of a signal. For 1D signals, supports both uniformly and non-uniformly sampled data using FFT or Non-Uniform FFT (NUFFT). For 2D signals, supports only uniformly sampled data using 2D FFT applied along rows and columns. Parameters ---------- data : ndarray Input signal as a 1D or 2D NumPy array (real or complex). For 1D, shape `(N,)`. For 2D, shape `(N, M)`. t : ndarray or tuple of ndarray, optional Time samples: - For 1D: 1D array of shape (N,), monotonically increasing. - For 2D: Tuple `(t1, t2)` where `t1` and `t2` are 1D arrays, monotonically increasing. If `None`, only `X` is returned unless `dt` is provided. Defaults to `None`. dt : float or tuple of float, optional Time interval(s) for uniform sampling: - For 1D: Single float `dt_interval`. - For 2D: Tuple `(dt1, dt2)` for intervals along axes. Ignored if `t` is provided. Used to calculate frequency bins if `t` is `None`. Defaults to `None`. oversample : int, optional Oversampling factor for NUFFT interpolation in non-uniform 1D case. Must be >= 1. Defaults to `2`. Ignored for 2D data. Returns ------- X : ndarray The FFT or NUFFT of the input signal. For 1D, shape `(N,)`. For 2D, shape `(N, M)`. Complex-valued. f : ndarray, optional For 1D: Frequency bins, shape `(N,)`. Returned only if `t` or `dt` is provided. f1, f2 : ndarray, optional For 2D: Frequency bins along axes 0 and 1, shapes `(N,)` and `(M,)`, respectively. Returned only if `t` or `dt` is provided. Notes ----- - For 1D uniform sampling with N ≤ 16, a direct DFT is used. For power-of-2 lengths, the Cooley-Tukey FFT algorithm is used. - For 1D non-uniform sampling, a NUFFT algorithm with Gaussian interpolation is used, with O(N log N + M) complexity. - For 2D signals, applies 1D FFTs along rows and columns for uniform sampling. """ try: from .._helper import ispow2, fft_cooley_tukey, dft_direct, GenericError import scipy.fft as spf # Validate inputs data = _np.asarray(data, dtype=complex) if data.ndim not in (1, 2): raise ValueError("'data' must be a 1D or 2D array.") if not _np.all(_np.isfinite(data)): raise ValueError("'data' contains non-finite values (NaN or inf).") if not isinstance(oversample, int) or oversample < 1: raise TypeError("'oversample' must be a positive integer.") if data.ndim == 1: N = len(data) if N == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]) if (t is not None or dt is not None) else data # Validate t for 1D is_uniform = True if t is not None: t = _np.asarray(t, dtype=float) if t.ndim != 1 or t.size != N: raise ValueError("'t' must be a 1D array of length 'N' for 1D data.") if not _np.all(_np.isreal(t)): raise TypeError("'t' must contain only real numbers.") if not _np.all(_np.isfinite(t)): raise ValueError("'t' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(t) > 0): raise ValueError("'t' must be monotonically increasing.") # Check uniformity if t.size > 1 and not _np.allclose(_np.diff(t), _np.diff(t)[0], rtol=1e-5): warnings.warn("Non-uniform sampling detected. Using NUFFT algorithm.", UserWarning) is_uniform = False # Validate dt for 1D if dt is not None: if not isinstance(dt, (int, float)): raise TypeError("'dt' must be a real number for 1D data.") if not _np.isfinite(dt) or dt <= 0: raise ValueError("'dt' must be a positive finite value.") # 1D Uniform sampling if is_uniform: if N <= 16: X = dft_direct(data) elif ispow2(N): X = fft_cooley_tukey(data) else: M = int(2 ** _np.ceil(_np.log2(N))) padded = _np.pad(data, (0, M - N), mode="constant") X = fft_cooley_tukey(padded) X = X[:N] # Determine dt and frequencies if t is not None: dt_final = (t[-1] - t[0]) / (N - 1) if N > 1 else 1.0 elif dt is not None: dt_final = dt else: return X f = _np.fft.fftfreq(N, d=dt_final) # Check aliasing spectrum_magnitude = _np.abs(X) threshold = _np.max(spectrum_magnitude) * 0.05 freq_components = _np.abs(f[spectrum_magnitude > threshold]) if freq_components.size > 0: f_max = _np.max(freq_components) fs = 1 / dt_final if f_max >= fs / 2: warnings.warn( f"Potential aliasing detected: the signal contains frequency components up to {f_max:.2g}, " f"which exceeds the Nyquist frequency ({fs/2:.2g}). " "Consider increasing the sampling frequency or applying an anti-aliasing filter.", UserWarning ) order = _np.argsort(f) return X[order], f[order] # 1D Non-uniform sampling: NUFFT points = t M = N t_min, t_max = points.min(), points.max() if t_max == t_min: raise ValueError("'t' must span a non-zero interval.") points_norm = (points - t_min) / (t_max - t_min) - 0.5 # Generate frequencies dt = _np.mean(_np.diff(points)) if N > 1 else 1.0 fs = 1.0 / dt frequencies = _np.linspace(-fs / 2, fs / 2 - fs / M, M) # Uniform grid for FFT N_grid = oversample * max(N, M) h = 1.0 / N_grid grid = _np.linspace(-0.5, 0.5 - h, N_grid) # Gaussian interpolation kernel sigma = 2.0 / oversample spread = int(6 * sigma * N_grid) spread = max(1, spread // 2 * 2 + 1) kernel = _np.exp(-_np.arange(-spread//2 + 1, spread//2 + 1)**2 / (2 * sigma**2)) kernel /= _np.sum(kernel) # Interpolate to uniform grid signal_grid = _np.zeros(N_grid, dtype=complex) for n in range(N): idx = int((points_norm[n] + 0.5) * N_grid) for i in range(-spread//2 + 1, spread//2 + 1): j = (idx + i) % N_grid dist = (points_norm[n] - grid[j]) * N_grid signal_grid[j] += data[n] * _np.exp(-dist**2 / (2 * sigma**2)) # FFT on uniform grid fft_grid = spf.fft(signal_grid) # Interpolate to target frequencies result = _np.zeros(M, dtype=complex) freq_norm = frequencies / (t_max - t_min) for m in range(M): for i in range(N_grid): phase = _np.exp(-2j * _np.pi * freq_norm[m] * grid[i]) result[m] += fft_grid[i] * phase * h if not _np.all(_np.isfinite(result)): raise ValueError("Computed NUFFT contains non-finite values.") scaled_frequencies = frequencies * (t_max - t_min) return result, scaled_frequencies else: # 2D data N, M = data.shape if N == 0 or M == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1 and M == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]), _np.array([0.0]) if (t is not None or dt is not None) else data # Validate t for 2D t1, t2 = None, None if t is not None: if not isinstance(t, tuple) or len(t) != 2: raise TypeError("'t' must be a tuple of two 1D NumPy arrays for 2D data.") t1, t2 = _np.asarray(t[0], dtype=float), _np.asarray(t[1], dtype=float) if t1.ndim != 1 or t2.ndim != 1 or t1.shape[0] != N or t2.shape[0] != M: raise ValueError("'t1' and 't2' must be 1D arrays of lengths 'N' and 'M', respectively.") if not _np.all(_np.isreal(t1)) or not _np.all(_np.isreal(t2)): raise TypeError("'t1' and 't2' must contain only real numbers.") if not _np.all(_np.isfinite(t1)) or not _np.all(_np.isfinite(t2)): raise ValueError("'t1' or 't2' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(t1) > 0) or not _np.all(_np.diff(t2) > 0): raise ValueError("'t1' and 't2' must be monotonically increasing.") # Check uniformity for 2D (required by docstring) if t1.size > 1 and not _np.allclose(_np.diff(t1), _np.diff(t1)[0], rtol=1e-5): raise ValueError("Non-uniform sampling not supported for 2D data: 't1' must be uniformly spaced.") if t2.size > 1 and not _np.allclose(_np.diff(t2), _np.diff(t2)[0], rtol=1e-5): raise ValueError("Non-uniform sampling not supported for 2D data: 't2' must be uniformly spaced.") # Validate dt for 2D if dt is not None: if not isinstance(dt, tuple) or len(dt) != 2: raise TypeError("'dt' must be a tuple of two floats for 2D data.") dt1, dt2 = dt if not isinstance(dt1, (int, float)) or not isinstance(dt2, (int, float)): raise TypeError("'dt1' and 'dt2' must be real numbers.") if not _np.isfinite(dt1) or not _np.isfinite(dt2) or dt1 <= 0 or dt2 <= 0: raise ValueError("'dt1' and 'dt2' must be positive finite values.") # Suppress aliasing warnings during 1D FFT calls with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Potential aliasing detected", category=UserWarning) # Initialize output X = _np.zeros((N, M), dtype=complex) # FFT along rows (axis 1, using t2) for i in range(N): row_t = t2 if t is not None else None row_dt = dt2 if dt is not None else None X[i, :], _ = fft(data[i, :], t=row_t, dt=row_dt, oversample=oversample) # FFT along columns (axis 0, using t1) X = X.T.copy() for j in range(M): col_t = t1 if t is not None else None col_dt = dt1 if dt is not None else None X[j, :], _ = fft(X[j, :], t=col_t, dt=col_dt, oversample=oversample) X = X.T # Compute frequencies if t is not None: dt1_final = (t1[-1] - t1[0]) / (N - 1) if t1 is not None and N > 1 else 1.0 dt2_final = (t2[-1] - t2[0]) / (M - 1) if t2 is not None and M > 1 else 1.0 elif dt is not None: dt1_final, dt2_final = dt else: return X f1 = _np.fft.fftfreq(N, d=dt1_final) f2 = _np.fft.fftfreq(M, d=dt2_final) # Check aliasing spectrum_magnitude = _np.abs(X) threshold = _np.max(spectrum_magnitude) * 0.05 freq_components = _np.abs(f1[_np.any(spectrum_magnitude > threshold, axis=1)]) if freq_components.size > 0: f1_max = _np.max(freq_components) fs1 = 1 / dt1_final if f1_max >= fs1 / 2: warnings.warn( f"Potential aliasing detected on axis 0: the signal contains frequency components up to {f1_max:.2g}, " f"which exceeds the Nyquist frequency ({fs1/2:.2g}). " "Consider increasing the sampling frequency or applying an anti-aliasing filter.", UserWarning ) freq_components = _np.abs(f2[_np.any(spectrum_magnitude > threshold, axis=0)]) if freq_components.size > 0: f2_max = _np.max(freq_components) fs2 = 1 / dt2_final if f2_max >= fs2 / 2: warnings.warn( f"Potential aliasing detected on axis 1: the signal contains frequency components up to {f2_max:.2g}, " f"which exceeds the Nyquist frequency ({fs2/2:.2g}). " "Consider increasing the sampling frequency or applying an anti-aliasing filter.", UserWarning ) order1, order2 = _np.argsort(f1), _np.argsort(f2) X = X[order1][:, order2] return X, f1[order1], f2[order2] except Exception as e: raise GenericError( message=f"Error computing FFT: {str(e)}", context="executing fft", original_error=e, details={"data_shape": data.shape, "t_type": type(t), "dt_type": type(dt)} )
[docs] def ifft(data: ArrayLike, freq: Optional[Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]] = None, df: Optional[Union[float, Tuple[float, float]]] = None, oversample: int = 2) -> Union[ArrayLike, Tuple[ArrayLike, ArrayLike], Tuple[ArrayLike, ArrayLike, ArrayLike]]: """ Compute the Inverse Fast Fourier Transform (IFFT) of a signal. For 1D signals, supports both uniformly and non-uniformly sampled frequency data using IFFT or Non-Uniform IFFT (NUIFFT). For 2D signals, supports only uniformly sampled data using 2D IFFT applied along rows and columns. Parameters ---------- data : ndarray Input frequency domain signal as a 1D or 2D NumPy array (complex). For 1D, shape `(N,)`. For 2D, shape `(N, M)`. freq : ndarray or tuple of ndarray, optional Frequency samples: - For 1D: 1D array of shape `(N,)`, monotonically increasing. - For 2D: Tuple `(f1, f2)` where `f1` (shape `(N,)`) and `f2` (shape `(M,)`) are 1D arrays, monotonically increasing. If `None`, time bins are returned only if `df` is provided. Defaults to `None`. df : float or tuple of float, optional Frequency spacing(s) for uniform sampling: - For 1D: Single float `df_interval`. - For 2D: Tuple `(df1, df2)` for spacings along axes. Ignored if `freq` is provided. Defaults to `None`. oversample : int, optional Oversampling factor for NUIFFT interpolation in non-uniform 1D case. Must be >= 1. Defaults to 2. Ignored for 2D data. Returns ------- x : ndarray The IFFT or NUIFFT of the input signal. For 1D, shape `(N,)`. For 2D, shape `(N, M)`. Complex-valued. t : ndarray, optional For 1D: Time bins, shape `(N,)`. Returned only if `freq` or `df` is provided. t1, t2 : ndarray, optional For 2D: Time bins along axes 0 and 1, shapes `(N,)` and `(M,)`, respectively. Returned only if `freq` or `df` is provided. Notes ----- - For 1D uniform sampling with N <= 16, a direct IDFT is used. For power-of-2 lengths, the Cooley-Tukey IFFT algorithm is used. - For 1D non-uniform sampling, a NUIFFT algorithm with Gaussian interpolation is used, with O(N log N + M) complexity. - For 2D signals, applies 1D IFFTs along rows and columns for uniform sampling. """ from .._helper import ispow2, idft_direct, ifft_cooley_tukey import scipy.fft as spf # Validate inputs data = _np.asarray(data, dtype=complex) if data.ndim not in (1, 2): raise ValueError("'data' must be a 1D or 2D array.") if not _np.all(_np.isfinite(data)): raise ValueError("'data' contains non-finite values (NaN or inf).") if not isinstance(oversample, int) or oversample < 1: raise TypeError("'oversample' must be a positive integer.") if data.ndim == 1: N = len(data) if N == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]) if (freq is not None or df is not None) else data # Validate freq for 1D is_uniform = True if freq is not None: freq = _np.asarray(freq, dtype=float) if freq.ndim != 1 or freq.size != N: raise ValueError("'freq' must be a 1D array of length 'N' for 1D data.") if not _np.all(_np.isreal(freq)): raise TypeError("'freq' must contain only real numbers.") if not _np.all(_np.isfinite(freq)): raise ValueError("'freq' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(freq) > 0): raise ValueError("'freq' must be monotonically increasing.") # Check uniformity if freq.size > 1 and not _np.allclose(_np.diff(freq), _np.diff(freq)[0], rtol=1e-5): warnings.warn("Non-uniform frequency sampling detected. Using NUIFFT algorithm.", UserWarning) is_uniform = False # Validate df for 1D if df is not None: if not isinstance(df, (int, float)): raise TypeError("'df' must be a real number for 1D data.") if not _np.isfinite(df) or df <= 0: raise ValueError("'df' must be a positive finite value.") # 1D Uniform sampling if is_uniform: if N <= 16: x = idft_direct(data) elif ispow2(N): x = ifft_cooley_tukey(data) else: M = int(2 ** _np.ceil(_np.log2(N))) padded = _np.pad(data, (0, M - N), mode="constant") x = ifft_cooley_tukey(padded) x = x[:N] if freq is not None or df is not None: df_value = (freq[-1] - freq[0]) / (N - 1) if freq is not None and N > 1 else df dt = 1 / (N * df_value) if df_value is not None else 1.0 t = _np.arange(0, N) * dt return x, t return x # 1D Non-uniform sampling: NUIFFT try: points = freq M = N f_min, f_max = points.min(), points.max() if f_max == f_min: raise ValueError("'freq' must span a non-zero interval.") points_norm = (points - f_min) / (f_max - f_min) - 0.5 # Generate time points df_value = _np.mean(_np.diff(points)) if N > 1 else 1.0 fs = 1.0 / df_value times = _np.linspace(-fs / 2, fs / 2 - fs / M, M) # Uniform grid for FFT N_grid = oversample * max(N, M) h = 1.0 / N_grid grid = _np.linspace(-0.5, 0.5 - h, N_grid) # Gaussian interpolation kernel sigma = 2.0 / oversample spread = int(6 * sigma * N_grid) spread = max(1, spread // 2 * 2 + 1) kernel = _np.exp(-_np.arange(-spread//2 + 1, spread//2 + 1)**2 / (2 * sigma**2)) kernel /= _np.sum(kernel) # Interpolate to uniform grid signal_grid = _np.zeros(N_grid, dtype=complex) for n in range(N): idx = int((points_norm[n] + 0.5) * N_grid) for i in range(-spread//2 + 1, spread//2 + 1): j = (idx + i) % N_grid dist = (points_norm[n] - grid[j]) * N_grid signal_grid[j] += data[n] * _np.exp(-dist**2 / (2 * sigma**2)) # IFFT on uniform grid ifft_grid = spf.ifft(signal_grid) # Interpolate to target times result = _np.zeros(M, dtype=complex) time_norm = times / (f_max - f_min) for m in range(M): for i in range(N_grid): phase = _np.exp(2j * _np.pi * time_norm[m] * grid[i]) result[m] += ifft_grid[i] * phase * h if not _np.all(_np.isfinite(result)): raise ValueError("Computed NUIFFT contains non-finite values.") scaled_times = times * (f_max - f_min) return result, scaled_times except Exception as e: raise ValueError(f"Error computing NUIFFT: {str(e)}") else: # 2D data N, M = data.shape if N == 0 or M == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1 and M == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]), _np.array([0.0]) if (freq is not None or df is not None) else data # Validate freq for 2D f1, f2 = None, None if freq is not None: if not isinstance(freq, tuple) or len(freq) != 2: raise TypeError("'freq' must be a tuple of two 1D NumPy arrays for 2D data.") f1, f2 = _np.asarray(freq[0], dtype=float), _np.asarray(freq[1], dtype=float) if f1.ndim != 1 or f2.ndim != 1 or f1.shape[0] != N or f2.shape[0] != M: raise ValueError("'f1' and 'f2' must be 1D arrays of lengths 'N' and 'M', respectively.") if not _np.all(_np.isreal(f1)) or not _np.all(_np.isreal(f2)): raise TypeError("'f1' and 'f2' must contain only real numbers.") if not _np.all(_np.isfinite(f1)) or not _np.all(_np.isfinite(f2)): raise ValueError("'f1' or 'f2' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(f1) > 0) or not _np.all(_np.diff(f2) > 0): raise ValueError("'f1' and 'f2' must be monotonically increasing.") # Validate df for 2D if df is not None: if not isinstance(df, tuple) or len(df) != 2: raise TypeError("'df' must be a tuple of two floats for 2D data.") df1, df2 = df if not isinstance(df1, (int, float)) or not isinstance(df2, (int, float)): raise TypeError("'df1' and 'df2' must be real numbers.") if not _np.isfinite(df1) or not _np.isfinite(df2) or df1 <= 0 or df2 <= 0: raise ValueError("'df1' and 'df2' must be positive finite values.") try: # Initialize output x = _np.zeros((N, M), dtype=complex) # IFFT along rows (axis 1, using f2) for i in range(N): row_f = f2 if f2 is not None else None row_df = df2 if df is not None else None x[i, :], _ = ifft(data[i, :], freq=row_f, df=row_df, oversample=oversample) # IFFT along columns (axis 0, using f1) x = x.T.copy() for j in range(M): col_f = f1 if f1 is not None else None col_df = df1 if df is not None else None x[j, :], _ = ifft(x[j, :], freq=col_f, df=col_df, oversample=oversample) x = x.T # Compute times if freq is not None: df1_final = (f1[-1] - f1[0]) / (N - 1) if f1 is not None and N > 1 else 1.0 df2_final = (f2[-1] - f2[0]) / (M - 1) if f2 is not None and M > 1 else 1.0 elif df is not None: df1_final, df2_final = df else: return x dt1 = 1 / (N * df1_final) dt2 = 1 / (M * df2_final) t1 = _np.arange(0, N) * dt1 t2 = _np.arange(0, M) * dt2 return x, t1, t2 except Exception as e: raise GenericError(f"Error computing IFFT2: {str(e)}")
# Removed function kept commented for reference. # def dfs( # t: ArrayLike, # data: ArrayLike, # order: int, # plot: bool = True, # showlegend: bool = True, # xlabel: str = "", # ylabel: str = "", # xscale: int = 0, # yscale: int = 0, # xlim: Optional[Tuple[float, float]] = None, # ylim: Optional[Tuple[float, float]] = None, # figsize: Tuple[float, float] = DEFAULT_FIGSIZE # ) -> Tuple[ArrayLike, float, ArrayLike, ArrayLike]: # """ # Computes the discrete Fourier series approximation of a sampled signal. # """ # try: # from scipy.integrate import simpson # data = _np.asarray(data) # t = _np.asarray(t) # if not _np.allclose(_np.diff(t), t[1] - t[0], rtol=1e-4): # raise ValueError("'t' must be uniformly spaced.") # if not isinstance(order, int): # raise TypeError("'order' must be an integer.") # if order <= 0: # raise ValueError("'order' must be equal or greater than 1.") # if t.size != data.size: # raise ValueError("'t' must have the same length as 'data'") # if not _np.issubdtype(data.dtype, _np.number): # raise TypeError("'data' must contain only numeric types (int, float, or complex).") # if not (_np.issubdtype(t.dtype, _np.floating) or _np.issubdtype(t.dtype, _np.integer)) or not _np.all(_np.isreal(t)): # raise TypeError("'t' must contain only real numbers (int or float).") # if not _np.all(_np.isfinite(data)): # raise ValueError("'data' contains non-finite values (NaN or inf).") # if not _np.all(_np.isfinite(t)): # raise ValueError("'t' contains non-finite values (NaN or inf).") # xscale = 10**xscale # yscale = 10**yscale # dt = t[1] - t[0] # fs = 1 / dt # f_nyquist = fs / 2 # max_freq = order / (t[-1] - t[0]) # if max_freq > f_nyquist: # warnings.warn( # f"Potential aliasing detected: the signal contains frequency components up to {max_freq:.2g}, " # f"which exceeds the Nyquist frequency ({f_nyquist:.2g}). " # "Consider increasing the sampling frequency or applying an anti-aliasing filter." # ) # a = t[0] # b = t[-1] # T = b - a # a0 = 2 * simpson(data, t) / T # a_n = [] # b_n = [] # for n in range(1, order): # cos_term = _np.cos(2 * _np.pi * n * t / T) # sin_term = _np.sin(2 * _np.pi * n * t / T) # an = 2 * simpson(data * cos_term, t) / T # bn = 2 * simpson(data * sin_term, t) / T # a_n.append(an) # b_n.append(bn) # f_approx = _np.full_like(t, a0 / 2) # for n in range(1, order): # f_approx += ( # a_n[n - 1] * _np.cos(2 * _np.pi * n * t / T) + # b_n[n - 1] * _np.sin(2 * _np.pi * n * t / T) # ) # if plot: # _plt.figure(figsize=figsize) # _plt.plot(t / xscale, data / yscale, label="Input data", lw=1, color="mediumblue", marker='') # _plt.plot( # t / xscale, f_approx / yscale, # label=f"Partial Fourier series\nNo. of terms = {order}", # lw=1, color="crimson", linestyle='-' # ) # if xlim: # _plt.xlim(xlim) # if ylim: # _plt.ylim(ylim) # _plt.xlabel(xlabel) # _plt.ylabel(ylabel) # if showlegend: # _plt.legend() # return f_approx, a0, _np.array(a_n), _np.array(b_n) # except Exception as e: # raise GenericError( # message="Unexpected error in discrete Fourier series computation", # context="executing dfs", # original_error=e, # details={"t_shape": t.shape, "data_shape": data.shape, "order": order} # )
[docs] def fourier_series(f: Callable[[Union[float, ArrayLike]], Union[float, ArrayLike]], interval: Tuple[float, float], order: int, num_points: int = 1000, xlabel: str = "x [ux]", ylabel: str = "y [uy]", xscale: int = 0, yscale: int = 0, figsize: Tuple[float, float] = DEFAULT_FIGSIZE) -> Tuple[ArrayLike, ArrayLike, ArrayLike, float, ArrayLike, ArrayLike]: """ Computes the Fourier series approximation of a function f(x) Parameters ---------- f : callable Function to approximate. interval : tuple of float The interval `(a, b)` over which to compute the Fourier series. order : int Number of Fourier modes to use in the approximation. num_points : int, optional Number of points for plotting. Default is 1000). xlabel : str, optional Label for the x-axis. ylabel : str, optional Label for the y-axis. xscale : int, optional Scaling factor for the x-axis used only for visualization. yscale : int, optional Scaling factor for the y-axis used only for visualization. figsize : tuple of float, optional Figure size passed to `matplotlib`. Default is `(6.4 * 1.2, 4.8 * 1.2)`. Returns ------- x : array-like 1D array representing the uniformly spaced sample points over one period. These are the evaluation points at which both the original function and the Fourier approximation are computed. f_original : array-like 1D array representing the values of the original input function evaluated at the sample points `x`. This is the reference signal used for comparison with the Fourier approximation. f_approx : array-like 1D Array containing the values of the truncated Fourier series evaluated at the same sample points `x`. This is the approximation of `f_original` using a finite number of harmonics (up to the specified order). a0 : float Zeroth Fourier coefficient (mean value component of the function over the period). It corresponds to the constant term of the Fourier series. a_n : array-like Array of cosine coefficients (Fourier coefficients of the even part of the function), corresponding to each harmonic up to the specified order (excluding a0). b_n : array-like Array of sine coefficients (Fourier coefficients of the odd part of the function), corresponding to each harmonic up to the specified order. Notes ---------- The values of `xscale` and `yscale` affect only the axis scaling in the plot. All parameters are estimated using the original input data as provided. """ from scipy.integrate import quad if not callable(f): raise TypeError("'f' must be a callable function.") if not isinstance(interval, (list, tuple)): raise TypeError("'interval' must be a list or a tuple.") if len(interval) != 2: raise ValueError("'interval' must have exactly two elements.") a, b = interval if not (_np.isscalar(a) and _np.isscalar(b)): raise TypeError("'interval' elements must be scalars.") if not (isinstance(a, (int, float)) and isinstance(b, (int, float))): raise TypeError("'interval' elements must be real numbers (int or float).") if b < a: a, b = b, a warnings.warn("Integration limits 'a' and 'b' have been swapped.", UserWarning) if a == b: raise ValueError("'a' must be not equal to 'b'.") if not isinstance(order, (int)): raise TypeError("'order' must be an integer.") if order <= 0: raise ValueError("'order' must be equal or greater than 1.") if not isinstance(xlabel, (str)): raise TypeError("'xlabel' must be a string.") if not isinstance(ylabel, (str)): raise TypeError("'ylabel' must be a string.") if num_points <= 0: raise ValueError("'num_points' must be equal or greater than 1.") if not isinstance(xscale, (int, float)): raise TypeError("'xscale' must be a real number (int or float).") if not isinstance(yscale, (int, float)): raise TypeError("'yscale' must be a real number (int or float).") if not isinstance(figsize, (list, tuple)) or len(figsize) != 2 or not all(isinstance(u, (int, float)) and _np.isfinite(u) for u in figsize): raise TypeError("'figsize' must be a list or tuple of exactly two finite real numbers.") xscale = 10**xscale yscale = 10**yscale # aggiungere nota che la yscale è solo a livello grafico T = b - a # Period L = T / 2 x = _np.linspace(a, b, num_points) # Compute a0 separately a0, _ = quad(lambda x_: f(x_), a, b) a0 /= L # Compute coefficients a_n and b_n a_n = [] b_n = [] for n in range(1, order + 1): an, _ = quad(lambda x_: f(x_) * _np.cos(n * _np.pi * (x_ - a) / L), a, b) bn, _ = quad(lambda x_: f(x_) * _np.sin(n * _np.pi * (x_ - a) / L), a, b) a_n.append(an / L) b_n.append(bn / L) # Build the Fourier approximation f_approx = _np.full_like(x, a0 / 2) for n in range(1, order + 1): f_approx += ( a_n[n - 1] * _np.cos(n * _np.pi * (x - a) / L) + b_n[n - 1] * _np.sin(n * _np.pi * (x - a) / L) ) f_original = f(x) # Plot _plt.figure(figsize=figsize) _plt.plot(x / xscale, f_original / yscale, label="Input function", lw=0.8, color = "blue") _plt.plot(x / xscale, f_approx / yscale, '--', label=f"Fourier approx. (order = {order})", lw=0.8, color = "red") _plt.xlabel(xlabel) _plt.ylabel(ylabel) _plt.legend() apply_inward_ticks(_plt.gca()) return f_original, f_approx, a0, a_n, b_n
# Removed functions kept commented for reference. # def harmonic(t: ArrayLike, y: ArrayLike, prominence: float = 0.05, n_max: Optional[int] = None, verbose: bool = True) -> List[dict]: # """ # Identifies the dominant harmonics present in a real-valued signal. # """ # from scipy.signal import find_peaks # t = _np.asarray(t) # y = _np.asarray(y) # if t.size != y.size: # raise ValueError("'t' must have the same length as 'y'") # if not (_np.issubdtype(y.dtype, _np.number)): # raise TypeError("'y' must contain only numeric types (int, float, or complex).") # if (not (_np.issubdtype(t.dtype, _np.floating) or _np.issubdtype(t.dtype, _np.integer))) or not _np.all(_np.isreal(t)): # raise TypeError("'t' must contain only real numbers (int or float).") # if not _np.all(_np.isfinite(y)): # raise ValueError("'data' contains non-finite values (NaN or inf).") # if not _np.all(_np.isfinite(t)): # raise ValueError("'t' contains non-finite values (NaN or inf).") # if not isinstance(prominence, (int, float)): # raise TypeError("'prominence' must be a real number (int or float).") # N = len(y) # yf, freqs = fft(y, t) # pos_mask = freqs > 0 # freqs = freqs[pos_mask] # yf = yf[pos_mask] # power = _np.abs(yf) # phases = _np.angle(yf) # peaks, _ = find_peaks(power, prominence=prominence * _np.max(power)) # sorted_peaks = peaks[_np.argsort(power[peaks])[::-1]] # if n_max is not None: # if not isinstance(prominence, (int)): # raise TypeError("'prominence' must be an integer.") # sorted_peaks = sorted_peaks[:n_max] # harmonics = [] # if sorted_peaks.size > 0: # reference_phase = phases[sorted_peaks[0]] # else: # reference_phase = 0 # for idx in sorted_peaks: # harmonics.append({ # "frequency": freqs[idx], # "amplitude": 2 * power[idx] / N, # "phase": phases[idx] - reference_phase # }) # return harmonics # # def decompose(t: ArrayLike, y: ArrayLike, freqs: ArrayLike, verbose: bool = True) -> List[dict]: # """ # Reconstructs specified sinusoidal components from a real-valued signal. # """ # from numpy.linalg import lstsq # t = _np.asarray(t) # y = _np.asarray(y) # freqs = _np.asarray(freqs) # if t.size != y.size: # raise ValueError("'t' must have the same length as 'y'") # if not (_np.issubdtype(y.dtype, _np.number)): # raise TypeError("'y' must contain only numeric types (int, float, or complex).") # if (not (_np.issubdtype(t.dtype, _np.floating) or _np.issubdtype(t.dtype, _np.integer))) or not _np.all(_np.isreal(t)): # raise TypeError("'t' must contain only real numbers (int or float).") # if (not (_np.issubdtype(freqs.dtype, _np.floating) or _np.issubdtype(freqs.dtype, _np.integer))) or not _np.all(_np.isreal(freqs)): # raise TypeError("'freqs' must contain only real numbers (int or float).") # if not _np.all(_np.isfinite(y)): # raise ValueError("'data' contains non-finite values (NaN or inf).") # if not _np.all(_np.isfinite(t)): # raise ValueError("'t' contains non-finite values (NaN or inf).") # A = [] # for f in freqs: # A.append(_np.sin(2 * _np.pi * f * t)) # A.append(_np.cos(2 * _np.pi * f * t)) # A = _np.vstack(A).T # coeffs, _, _, _ = lstsq(A, y, rcond=None) # components = [] # for i, f in enumerate(freqs): # sin_coef = coeffs[2 * i] # cos_coef = coeffs[2 * i + 1] # amplitude = _np.hypot(sin_coef, cos_coef) # phase = _np.arctan2(cos_coef, sin_coef) # components.append({ # "frequency": f, # "amplitude": amplitude, # "phase": phase # }) # return components
[docs] def envelope(signal: ArrayLike, method: str = 'peaks', mode: str = 'upper', filter_size: int = 31, fs: float = 1.0, remove_mean: bool = False) -> Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]: """ Compute the envelope of a 1D real-valued signal. Parameters ---------- signal : array-like The input real-valued signal (1D array). method : {'hilbert', 'peaks', 'adaptive'}, optional The method to compute the envelope: - 'hilbert': Uses the analytic signal via the Hilbert transform (best for bandpass signals). - 'peaks': Interpolates a smooth curve through signal peaks (versatile for general signals). - 'adaptive': Applies a variable-width filter to abs(signal) (robust to noise). Defaults to 'peaks'. mode : {'upper', 'lower', 'both'}, optional Select which envelope to return: - 'upper': The upper envelope (default). - 'lower': The lower envelope. - 'both': Returns a tuple (upper, lower). filter_size : int, optional Size of the smoothing filter for 'adaptive' method. Must be odd and positive. Defaults to `31`. fs : float, optional Sampling frequency in Hz, used for frequency estimation in 'adaptive' method. Defaults to `1.0`. remove_mean : bool, optional If True, removes the signal mean before computing the envelope. Defaults to `False`. Returns ------- ndarray or tuple of ndarrays The computed envelope(s): - If mode='upper' or 'lower', returns a 1D array. - If mode='both', returns a tuple (upper, lower). Notes ----- - The 'hilbert' method is mathematically rigorous for bandpass signals but may fail for broadband or non-oscillatory signals. - The 'peaks' method is versatile, interpolating through local maxima/minima, and works well for most signals. - The 'adaptive' method adjusts filter width based on local signal frequency, ideal for noisy or irregular signals. """ from scipy.signal import hilbert, find_peaks, medfilt from scipy.interpolate import UnivariateSpline # Validate signal signal = _np.asarray(signal, dtype=float) if signal.ndim != 1: raise ValueError("signal must be one-dimensional.") if not _np.issubdtype(signal.dtype, _np.floating) or not _np.all(_np.isreal(signal)): raise TypeError("signal must contain real numbers (float).") if not _np.all(_np.isfinite(signal)): raise ValueError("signal contains non-finite values (NaN or inf).") # Validate method valid_methods = ['hilbert', 'peaks', 'adaptive'] if method not in valid_methods: raise ValueError(f"method must be one of {valid_methods}.") # Validate mode valid_modes = ['upper', 'lower', 'both'] if mode not in valid_modes: raise ValueError(f"mode must be one of {valid_modes}.") # Validate filter_size if not isinstance(filter_size, int): raise TypeError("filter_size must be an integer.") if filter_size % 2 == 0 or filter_size < 1: raise ValueError("filter_size must be a positive odd integer.") # Validate fs if not isinstance(fs, (int, float)) or fs <= 0: raise ValueError("fs must be a positive number.") # Validate remove_mean if not isinstance(remove_mean, bool): raise TypeError("remove_mean must be a boolean.") # Remove mean if requested if remove_mean: signal = signal - _np.mean(signal) try: if method == 'hilbert': # Compute envelope using Hilbert transform analytic = hilbert(signal) env = _np.abs(analytic) elif method == 'peaks': # Find peaks for upper envelope peaks, _ = find_peaks(signal, distance=filter_size // 2) if len(peaks) < 2: # Fallback to constant envelope if too few peaks env = _np.full_like(signal, _np.max(_np.abs(signal))) else: # Interpolate through peaks x = _np.arange(len(signal)) spline = UnivariateSpline(peaks, signal[peaks], k=3, s=0, ext='const') env = spline(x) env = _np.maximum(env, 0) # Ensure non-negative envelope elif method == 'adaptive': # Compute absolute signal abs_sig = _np.abs(signal) # Estimate local frequency using zero-crossings zero_crossings = _np.where(_np.diff(_np.sign(signal)))[0] if len(zero_crossings) > 1: mean_period = _np.mean(_np.diff(zero_crossings)) adaptive_size = max(3, int(mean_period / 2) * 2 + 1) # Ensure odd else: adaptive_size = filter_size # Apply median filter with adaptive size env = medfilt(abs_sig, kernel_size=adaptive_size) # Handle mode if mode == 'upper': return env elif mode == 'lower': # Compute lower envelope as upper envelope of -signal lower_env = envelope(-signal, method=method, mode='upper', filter_size=filter_size, fs=fs, remove_mean=False) return -lower_env elif mode == 'both': upper = env lower_env = envelope(-signal, method=method, mode='upper', filter_size=filter_size, fs=fs, remove_mean=False) return upper, -lower_env except Exception as e: raise GenericError(f"Error computing envelope: {str(e)}")