Source code for labtoolbox.signals.signals

import numpy as _np
import matplotlib.pyplot as _plt
import warnings
from typing import Union, Tuple, Optional, Callable, List
from numpy.typing import ArrayLike
from .._helper import GenericError

# __all__ = ["fft", "ifft", "dfs", "fourier_series", "decompose", "harmonic", "envelope"]

[docs] def fft( data: _np.ndarray, t: Optional[Union[_np.ndarray, Tuple[_np.ndarray, _np.ndarray]]] = None, dt: Optional[Union[float, Tuple[float, float]]] = None, oversample: int = 2 ) -> Union[_np.ndarray, Tuple[_np.ndarray, _np.ndarray], Tuple[_np.ndarray, _np.ndarray, _np.ndarray]]: """ Compute the Fast Fourier Transform (FFT) of a signal. For 1D signals, supports both uniformly and non-uniformly sampled data using FFT or Non-Uniform FFT (NUFFT). For 2D signals, supports only uniformly sampled data using 2D FFT applied along rows and columns. Parameters ---------- data : ndarray Input signal as a 1D or 2D NumPy array (real or complex). For 1D, shape `(N,)`. For 2D, shape `(N, M)`. t : ndarray or tuple of ndarray, optional Time samples: - For 1D: 1D array of shape (N,), monotonically increasing. - For 2D: Tuple `(t1, t2)` where `t1` and `t2` are 1D arrays, monotonically increasing. If `None`, only `X` is returned unless `dt` is provided. Defaults to `None`. dt : float or tuple of float, optional Time interval(s) for uniform sampling: - For 1D: Single float `dt_interval`. - For 2D: Tuple `(dt1, dt2)` for intervals along axes. Ignored if `t` is provided. Used to calculate frequency bins if `t` is `None`. Defaults to `None`. oversample : int, optional Oversampling factor for NUFFT interpolation in non-uniform 1D case. Must be >= 1. Defaults to `2`. Ignored for 2D data. Returns ------- X : ndarray The FFT or NUFFT of the input signal. For 1D, shape `(N,)`. For 2D, shape `(N, M)`. Complex-valued. f : ndarray, optional For 1D: Frequency bins, shape `(N,)`. Returned only if `t` or `dt` is provided. f1, f2 : ndarray, optional For 2D: Frequency bins along axes 0 and 1, shapes `(N,)` and `(M,)`, respectively. Returned only if `t` or `dt` is provided. Notes ----- - For 1D uniform sampling with N ≤ 16, a direct DFT is used. For power-of-2 lengths, the Cooley-Tukey FFT algorithm is used. - For 1D non-uniform sampling, a NUFFT algorithm with Gaussian interpolation is used, with O(N log N + M) complexity. - For 2D signals, applies 1D FFTs along rows and columns for uniform sampling. """ try: from .._helper import ispow2, fft_cooley_tukey, dft_direct, GenericError import scipy.fft as spf # Validate inputs data = _np.asarray(data, dtype=complex) if data.ndim not in (1, 2): raise ValueError("'data' must be a 1D or 2D array.") if not _np.all(_np.isfinite(data)): raise ValueError("'data' contains non-finite values (NaN or inf).") if not isinstance(oversample, int) or oversample < 1: raise TypeError("'oversample' must be a positive integer.") if data.ndim == 1: N = len(data) if N == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]) if (t is not None or dt is not None) else data # Validate t for 1D is_uniform = True if t is not None: t = _np.asarray(t, dtype=float) if t.ndim != 1 or t.size != N: raise ValueError("'t' must be a 1D array of length 'N' for 1D data.") if not _np.all(_np.isreal(t)): raise TypeError("'t' must contain only real numbers.") if not _np.all(_np.isfinite(t)): raise ValueError("'t' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(t) > 0): raise ValueError("'t' must be monotonically increasing.") # Check uniformity if t.size > 1 and not _np.allclose(_np.diff(t), _np.diff(t)[0], rtol=1e-5): warnings.warn("Non-uniform sampling detected. Using NUFFT algorithm.", UserWarning) is_uniform = False # Validate dt for 1D if dt is not None: if not isinstance(dt, (int, float)): raise TypeError("'dt' must be a real number for 1D data.") if not _np.isfinite(dt) or dt <= 0: raise ValueError("'dt' must be a positive finite value.") # 1D Uniform sampling if is_uniform: if N <= 16: X = dft_direct(data) elif ispow2(N): X = fft_cooley_tukey(data) else: M = int(2 ** _np.ceil(_np.log2(N))) padded = _np.pad(data, (0, M - N), mode="constant") X = fft_cooley_tukey(padded) X = X[:N] # Determine dt and frequencies if t is not None: dt_final = (t[-1] - t[0]) / (N - 1) if N > 1 else 1.0 elif dt is not None: dt_final = dt else: return X f = _np.fft.fftfreq(N, d=dt_final) # Check aliasing spectrum_magnitude = _np.abs(X) threshold = _np.max(spectrum_magnitude) * 0.05 freq_components = _np.abs(f[spectrum_magnitude > threshold]) if freq_components.size > 0: f_max = _np.max(freq_components) fs = 1 / dt_final if f_max >= fs / 2: warnings.warn( f"Potential aliasing detected: the signal contains frequency components up to {f_max:.2g}, " f"which exceeds the Nyquist frequency ({fs/2:.2g}). " "Consider increasing the sampling frequency or applying an anti-aliasing filter.", UserWarning ) order = _np.argsort(f) return X[order], f[order] # 1D Non-uniform sampling: NUFFT points = t M = N t_min, t_max = points.min(), points.max() if t_max == t_min: raise ValueError("'t' must span a non-zero interval.") points_norm = (points - t_min) / (t_max - t_min) - 0.5 # Generate frequencies dt = _np.mean(_np.diff(points)) if N > 1 else 1.0 fs = 1.0 / dt frequencies = _np.linspace(-fs / 2, fs / 2 - fs / M, M) # Uniform grid for FFT N_grid = oversample * max(N, M) h = 1.0 / N_grid grid = _np.linspace(-0.5, 0.5 - h, N_grid) # Gaussian interpolation kernel sigma = 2.0 / oversample spread = int(6 * sigma * N_grid) spread = max(1, spread // 2 * 2 + 1) kernel = _np.exp(-_np.arange(-spread//2 + 1, spread//2 + 1)**2 / (2 * sigma**2)) kernel /= _np.sum(kernel) # Interpolate to uniform grid signal_grid = _np.zeros(N_grid, dtype=complex) for n in range(N): idx = int((points_norm[n] + 0.5) * N_grid) for i in range(-spread//2 + 1, spread//2 + 1): j = (idx + i) % N_grid dist = (points_norm[n] - grid[j]) * N_grid signal_grid[j] += data[n] * _np.exp(-dist**2 / (2 * sigma**2)) # FFT on uniform grid fft_grid = spf.fft(signal_grid) # Interpolate to target frequencies result = _np.zeros(M, dtype=complex) freq_norm = frequencies / (t_max - t_min) for m in range(M): for i in range(N_grid): phase = _np.exp(-2j * _np.pi * freq_norm[m] * grid[i]) result[m] += fft_grid[i] * phase * h if not _np.all(_np.isfinite(result)): raise ValueError("Computed NUFFT contains non-finite values.") scaled_frequencies = frequencies * (t_max - t_min) return result, scaled_frequencies else: # 2D data N, M = data.shape if N == 0 or M == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1 and M == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]), _np.array([0.0]) if (t is not None or dt is not None) else data # Validate t for 2D t1, t2 = None, None if t is not None: if not isinstance(t, tuple) or len(t) != 2: raise TypeError("'t' must be a tuple of two 1D NumPy arrays for 2D data.") t1, t2 = _np.asarray(t[0], dtype=float), _np.asarray(t[1], dtype=float) if t1.ndim != 1 or t2.ndim != 1 or t1.shape[0] != N or t2.shape[0] != M: raise ValueError("'t1' and 't2' must be 1D arrays of lengths 'N' and 'M', respectively.") if not _np.all(_np.isreal(t1)) or not _np.all(_np.isreal(t2)): raise TypeError("'t1' and 't2' must contain only real numbers.") if not _np.all(_np.isfinite(t1)) or not _np.all(_np.isfinite(t2)): raise ValueError("'t1' or 't2' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(t1) > 0) or not _np.all(_np.diff(t2) > 0): raise ValueError("'t1' and 't2' must be monotonically increasing.") # Check uniformity for 2D (required by docstring) if t1.size > 1 and not _np.allclose(_np.diff(t1), _np.diff(t1)[0], rtol=1e-5): raise ValueError("Non-uniform sampling not supported for 2D data: 't1' must be uniformly spaced.") if t2.size > 1 and not _np.allclose(_np.diff(t2), _np.diff(t2)[0], rtol=1e-5): raise ValueError("Non-uniform sampling not supported for 2D data: 't2' must be uniformly spaced.") # Validate dt for 2D if dt is not None: if not isinstance(dt, tuple) or len(dt) != 2: raise TypeError("'dt' must be a tuple of two floats for 2D data.") dt1, dt2 = dt if not isinstance(dt1, (int, float)) or not isinstance(dt2, (int, float)): raise TypeError("'dt1' and 'dt2' must be real numbers.") if not _np.isfinite(dt1) or not _np.isfinite(dt2) or dt1 <= 0 or dt2 <= 0: raise ValueError("'dt1' and 'dt2' must be positive finite values.") # Suppress aliasing warnings during 1D FFT calls with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Potential aliasing detected", category=UserWarning) # Initialize output X = _np.zeros((N, M), dtype=complex) # FFT along rows (axis 1, using t2) for i in range(N): row_t = t2 if t is not None else None row_dt = dt2 if dt is not None else None X[i, :], _ = fft(data[i, :], t=row_t, dt=row_dt, oversample=oversample) # FFT along columns (axis 0, using t1) X = X.T.copy() for j in range(M): col_t = t1 if t is not None else None col_dt = dt1 if dt is not None else None X[j, :], _ = fft(X[j, :], t=col_t, dt=col_dt, oversample=oversample) X = X.T # Compute frequencies if t is not None: dt1_final = (t1[-1] - t1[0]) / (N - 1) if t1 is not None and N > 1 else 1.0 dt2_final = (t2[-1] - t2[0]) / (M - 1) if t2 is not None and M > 1 else 1.0 elif dt is not None: dt1_final, dt2_final = dt else: return X f1 = _np.fft.fftfreq(N, d=dt1_final) f2 = _np.fft.fftfreq(M, d=dt2_final) # Check aliasing spectrum_magnitude = _np.abs(X) threshold = _np.max(spectrum_magnitude) * 0.05 freq_components = _np.abs(f1[_np.any(spectrum_magnitude > threshold, axis=1)]) if freq_components.size > 0: f1_max = _np.max(freq_components) fs1 = 1 / dt1_final if f1_max >= fs1 / 2: warnings.warn( f"Potential aliasing detected on axis 0: the signal contains frequency components up to {f1_max:.2g}, " f"which exceeds the Nyquist frequency ({fs1/2:.2g}). " "Consider increasing the sampling frequency or applying an anti-aliasing filter.", UserWarning ) freq_components = _np.abs(f2[_np.any(spectrum_magnitude > threshold, axis=0)]) if freq_components.size > 0: f2_max = _np.max(freq_components) fs2 = 1 / dt2_final if f2_max >= fs2 / 2: warnings.warn( f"Potential aliasing detected on axis 1: the signal contains frequency components up to {f2_max:.2g}, " f"which exceeds the Nyquist frequency ({fs2/2:.2g}). " "Consider increasing the sampling frequency or applying an anti-aliasing filter.", UserWarning ) order1, order2 = _np.argsort(f1), _np.argsort(f2) X = X[order1][:, order2] return X, f1[order1], f2[order2] except Exception as e: raise GenericError( message=f"Error computing FFT: {str(e)}", context="executing fft", original_error=e, details={"data_shape": data.shape, "t_type": type(t), "dt_type": type(dt)} )
[docs] def ifft(data: ArrayLike, freq: Optional[Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]] = None, df: Optional[Union[float, Tuple[float, float]]] = None, oversample: int = 2) -> Union[ArrayLike, Tuple[ArrayLike, ArrayLike], Tuple[ArrayLike, ArrayLike, ArrayLike]]: """ Compute the Inverse Fast Fourier Transform (IFFT) of a signal. For 1D signals, supports both uniformly and non-uniformly sampled frequency data using IFFT or Non-Uniform IFFT (NUIFFT). For 2D signals, supports only uniformly sampled data using 2D IFFT applied along rows and columns. Parameters ---------- data : ndarray Input frequency domain signal as a 1D or 2D NumPy array (complex). For 1D, shape `(N,)`. For 2D, shape `(N, M)`. freq : ndarray or tuple of ndarray, optional Frequency samples: - For 1D: 1D array of shape `(N,)`, monotonically increasing. - For 2D: Tuple `(f1, f2)` where `f1` (shape `(N,)`) and `f2` (shape `(M,)`) are 1D arrays, monotonically increasing. If `None`, time bins are returned only if `df` is provided. Defaults to `None`. df : float or tuple of float, optional Frequency spacing(s) for uniform sampling: - For 1D: Single float `df_interval`. - For 2D: Tuple `(df1, df2)` for spacings along axes. Ignored if `freq` is provided. Defaults to `None`. oversample : int, optional Oversampling factor for NUIFFT interpolation in non-uniform 1D case. Must be >= 1. Defaults to 2. Ignored for 2D data. Returns ------- x : ndarray The IFFT or NUIFFT of the input signal. For 1D, shape `(N,)`. For 2D, shape `(N, M)`. Complex-valued. t : ndarray, optional For 1D: Time bins, shape `(N,)`. Returned only if `freq` or `df` is provided. t1, t2 : ndarray, optional For 2D: Time bins along axes 0 and 1, shapes `(N,)` and `(M,)`, respectively. Returned only if `freq` or `df` is provided. Notes ----- - For 1D uniform sampling with N <= 16, a direct IDFT is used. For power-of-2 lengths, the Cooley-Tukey IFFT algorithm is used. - For 1D non-uniform sampling, a NUIFFT algorithm with Gaussian interpolation is used, with O(N log N + M) complexity. - For 2D signals, applies 1D IFFTs along rows and columns for uniform sampling. """ from .._helper import ispow2, idft_direct, ifft_cooley_tukey import scipy.fft as spf # Validate inputs data = _np.asarray(data, dtype=complex) if data.ndim not in (1, 2): raise ValueError("'data' must be a 1D or 2D array.") if not _np.all(_np.isfinite(data)): raise ValueError("'data' contains non-finite values (NaN or inf).") if not isinstance(oversample, int) or oversample < 1: raise TypeError("'oversample' must be a positive integer.") if data.ndim == 1: N = len(data) if N == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]) if (freq is not None or df is not None) else data # Validate freq for 1D is_uniform = True if freq is not None: freq = _np.asarray(freq, dtype=float) if freq.ndim != 1 or freq.size != N: raise ValueError("'freq' must be a 1D array of length 'N' for 1D data.") if not _np.all(_np.isreal(freq)): raise TypeError("'freq' must contain only real numbers.") if not _np.all(_np.isfinite(freq)): raise ValueError("'freq' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(freq) > 0): raise ValueError("'freq' must be monotonically increasing.") # Check uniformity if freq.size > 1 and not _np.allclose(_np.diff(freq), _np.diff(freq)[0], rtol=1e-5): warnings.warn("Non-uniform frequency sampling detected. Using NUIFFT algorithm.", UserWarning) is_uniform = False # Validate df for 1D if df is not None: if not isinstance(df, (int, float)): raise TypeError("'df' must be a real number for 1D data.") if not _np.isfinite(df) or df <= 0: raise ValueError("'df' must be a positive finite value.") # 1D Uniform sampling if is_uniform: if N <= 16: x = idft_direct(data) elif ispow2(N): x = ifft_cooley_tukey(data) else: M = int(2 ** _np.ceil(_np.log2(N))) padded = _np.pad(data, (0, M - N), mode="constant") x = ifft_cooley_tukey(padded) x = x[:N] if freq is not None or df is not None: df_value = (freq[-1] - freq[0]) / (N - 1) if freq is not None and N > 1 else df dt = 1 / (N * df_value) if df_value is not None else 1.0 t = _np.arange(0, N) * dt return x, t return x # 1D Non-uniform sampling: NUIFFT try: points = freq M = N f_min, f_max = points.min(), points.max() if f_max == f_min: raise ValueError("'freq' must span a non-zero interval.") points_norm = (points - f_min) / (f_max - f_min) - 0.5 # Generate time points df_value = _np.mean(_np.diff(points)) if N > 1 else 1.0 fs = 1.0 / df_value times = _np.linspace(-fs / 2, fs / 2 - fs / M, M) # Uniform grid for FFT N_grid = oversample * max(N, M) h = 1.0 / N_grid grid = _np.linspace(-0.5, 0.5 - h, N_grid) # Gaussian interpolation kernel sigma = 2.0 / oversample spread = int(6 * sigma * N_grid) spread = max(1, spread // 2 * 2 + 1) kernel = _np.exp(-_np.arange(-spread//2 + 1, spread//2 + 1)**2 / (2 * sigma**2)) kernel /= _np.sum(kernel) # Interpolate to uniform grid signal_grid = _np.zeros(N_grid, dtype=complex) for n in range(N): idx = int((points_norm[n] + 0.5) * N_grid) for i in range(-spread//2 + 1, spread//2 + 1): j = (idx + i) % N_grid dist = (points_norm[n] - grid[j]) * N_grid signal_grid[j] += data[n] * _np.exp(-dist**2 / (2 * sigma**2)) # IFFT on uniform grid ifft_grid = spf.ifft(signal_grid) # Interpolate to target times result = _np.zeros(M, dtype=complex) time_norm = times / (f_max - f_min) for m in range(M): for i in range(N_grid): phase = _np.exp(2j * _np.pi * time_norm[m] * grid[i]) result[m] += ifft_grid[i] * phase * h if not _np.all(_np.isfinite(result)): raise ValueError("Computed NUIFFT contains non-finite values.") scaled_times = times * (f_max - f_min) return result, scaled_times except Exception as e: raise ValueError(f"Error computing NUIFFT: {str(e)}") else: # 2D data N, M = data.shape if N == 0 or M == 0: warnings.warn("'data' is empty. Returning empty array.", UserWarning) return _np.array([]) if N == 1 and M == 1: warnings.warn("'data' is a scalar. Returning 'data'.", UserWarning) return data, _np.array([0.0]), _np.array([0.0]) if (freq is not None or df is not None) else data # Validate freq for 2D f1, f2 = None, None if freq is not None: if not isinstance(freq, tuple) or len(freq) != 2: raise TypeError("'freq' must be a tuple of two 1D NumPy arrays for 2D data.") f1, f2 = _np.asarray(freq[0], dtype=float), _np.asarray(freq[1], dtype=float) if f1.ndim != 1 or f2.ndim != 1 or f1.shape[0] != N or f2.shape[0] != M: raise ValueError("'f1' and 'f2' must be 1D arrays of lengths 'N' and 'M', respectively.") if not _np.all(_np.isreal(f1)) or not _np.all(_np.isreal(f2)): raise TypeError("'f1' and 'f2' must contain only real numbers.") if not _np.all(_np.isfinite(f1)) or not _np.all(_np.isfinite(f2)): raise ValueError("'f1' or 'f2' contains non-finite values (NaN or inf).") if not _np.all(_np.diff(f1) > 0) or not _np.all(_np.diff(f2) > 0): raise ValueError("'f1' and 'f2' must be monotonically increasing.") # Validate df for 2D if df is not None: if not isinstance(df, tuple) or len(df) != 2: raise TypeError("'df' must be a tuple of two floats for 2D data.") df1, df2 = df if not isinstance(df1, (int, float)) or not isinstance(df2, (int, float)): raise TypeError("'df1' and 'df2' must be real numbers.") if not _np.isfinite(df1) or not _np.isfinite(df2) or df1 <= 0 or df2 <= 0: raise ValueError("'df1' and 'df2' must be positive finite values.") try: # Initialize output x = _np.zeros((N, M), dtype=complex) # IFFT along rows (axis 1, using f2) for i in range(N): row_f = f2 if f2 is not None else None row_df = df2 if df is not None else None x[i, :], _ = ifft(data[i, :], freq=row_f, df=row_df, oversample=oversample) # IFFT along columns (axis 0, using f1) x = x.T.copy() for j in range(M): col_f = f1 if f1 is not None else None col_df = df1 if df is not None else None x[j, :], _ = ifft(x[j, :], freq=col_f, df=col_df, oversample=oversample) x = x.T # Compute times if freq is not None: df1_final = (f1[-1] - f1[0]) / (N - 1) if f1 is not None and N > 1 else 1.0 df2_final = (f2[-1] - f2[0]) / (M - 1) if f2 is not None and M > 1 else 1.0 elif df is not None: df1_final, df2_final = df else: return x dt1 = 1 / (N * df1_final) dt2 = 1 / (M * df2_final) t1 = _np.arange(0, N) * dt1 t2 = _np.arange(0, M) * dt2 return x, t1, t2 except Exception as e: raise GenericError(f"Error computing IFFT2: {str(e)}")
[docs] def dfs( t: ArrayLike, data: ArrayLike, order: int, plot: bool = True, showlegend: bool = True, xlabel: str = "", ylabel: str = "", xscale: int = 0, yscale: int = 0, xlim: Optional[Tuple[float, float]] = None, ylim: Optional[Tuple[float, float]] = None ) -> Tuple[ArrayLike, float, ArrayLike, ArrayLike]: """ Computes the discrete Fourier series approximation of a sampled signal. Parameters ---------- t : array-like 1D array of sample points (must be uniformly spaced). data : array-like 1D array of sampled signal. order : int Order of the Fourier approximation (number of harmonics). plot : bool, optional If `True`, plots the original function and its Fourier approximation. showlegend : bool, optional If `True`, ... xlabel : str, optional Label for the x-axis, including units in square brackets (e.g., "Time [s]"). ylabel : str, optional Label for the y-axis, including units in square brackets (e.g., "Intensity [V]"). xscale : int, optional Scaling factor for the x-axis (e.g., `xscale = -3` corresponds to 1e-3, to convert seconds to milliseconds). yscale : int, optional Scaling factor for the y-axis. xlim : tuple, optional Limits for the x-axis, in the form `(xmin, xmax)`. The values should already be scaled with respect to `xscale`. If `None` or an empty tuple, the default limits will be automatically determined from the data. ylim : tuple, optional Limits for the y-axis, in the form `(ymin, ymax)`. The values should already be scaled with respect to `yscale`. If `None` or an empty tuple, the default limits will be automatically determined from the data. Returns ------- f_approx : numpy.array Array of the same shape as `t`, containing the values of the Fourier series approximation of the input function at each point in `t`. a0 : float Zeroth Fourier coefficient (mean value component of the function over the period). It corresponds to the constant term of the Fourier series. a_n : numpy.array Array of cosine coefficients (Fourier coefficients of the even part of the function), corresponding to each harmonic up to the specified order (excluding a0). b_n : numpy.array Array of sine coefficients (Fourier coefficients of the odd part of the function), corresponding to each harmonic up to the specified order. Notes ---------- The values of `xscale` and `yscale` affect only the axis scaling in the plot. All outputs are estimated using the original input data as provided. """ try: from scipy.integrate import simpson # import matplotlib.cm as _cm # Validazione input data = _np.asarray(data) t = _np.asarray(t) if not _np.allclose(_np.diff(t), t[1] - t[0], rtol=1e-4): raise ValueError("'t' must be uniformly spaced.") if not isinstance(order, int): raise TypeError("'order' must be an integer.") if order <= 0: raise ValueError("'order' must be equal or greater than 1.") if t.size != data.size: raise ValueError("'t' must have the same length as 'data'") if not _np.issubdtype(data.dtype, _np.number): raise TypeError("'data' must contain only numeric types (int, float, or complex).") if not (_np.issubdtype(t.dtype, _np.floating) or _np.issubdtype(t.dtype, _np.integer)) or not _np.all(_np.isreal(t)): raise TypeError("'t' must contain only real numbers (int or float).") if not _np.all(_np.isfinite(data)): raise ValueError("'data' contains non-finite values (NaN or inf).") if not _np.all(_np.isfinite(t)): raise ValueError("'t' contains non-finite values (NaN or inf).") if not isinstance(xscale, (int, float)): raise TypeError("'xscale' must be a real number (int or float).") if not isinstance(yscale, (int, float)): raise TypeError("'yscale' must be a real number (int or float).") if xlim is not None: if not isinstance(xlim, (list, tuple)): raise TypeError("'xlim' must be a list or tuple (either empty or containing two real numbers).") if len(xlim) != 0 and (len(xlim) != 2 or not all(isinstance(u, (int, float)) and _np.isfinite(u) for u in xlim)): raise TypeError("'xlim' must be empty or a list/tuple of exactly two finite real numbers.") if ylim is not None: if not isinstance(ylim, (list, tuple)): raise TypeError("'ylim' must be a list or tuple (either empty or containing two real numbers).") if len(ylim) != 0 and (len(ylim) != 2 or not all(isinstance(u, (int, float)) and _np.isfinite(u) for u in ylim)): raise TypeError("'ylim' must be empty or a list/tuple of exactly two finite real numbers.") if not isinstance(xlabel, str): raise TypeError("'xlabel' must be a string.") if not isinstance(ylabel, str): raise TypeError("'ylabel' must be a string.") xscale = 10**xscale yscale = 10**yscale N = len(t) dt = t[1] - t[0] fs = 1 / dt f_nyquist = fs / 2 max_freq = order / (t[-1] - t[0]) if max_freq > f_nyquist: warnings.warn( f"Potential aliasing detected: the signal contains frequency components up to {max_freq:.2g}, " f"which exceeds the Nyquist frequency ({f_nyquist:.2g}). " "Consider increasing the sampling frequency or applying an anti-aliasing filter." ) a = t[0] b = t[-1] T = b - a # Zeroth coefficient a0 = 2 * simpson(data, t) / T a_n = [] b_n = [] for n in range(1, order): cos_term = _np.cos(2 * _np.pi * n * t / T) sin_term = _np.sin(2 * _np.pi * n * t / T) an = 2 * simpson(data * cos_term, t) / T bn = 2 * simpson(data * sin_term, t) / T # if apply_filter: # decay = _np.exp(- (n / order)**2) # an *= decay # bn *= decay a_n.append(an) b_n.append(bn) # Construct approximation f_approx = _np.full_like(t, a0 / 2) for n in range(1, order): f_approx += ( a_n[n - 1] * _np.cos(2 * _np.pi * n * t / T) + b_n[n - 1] * _np.sin(2 * _np.pi * n * t / T) ) if plot: # Plot armoniche con alpha ridotto se showpanel=True # if showharmonics: # if shift: # i = a0 # max_harmonics = min(order - 1, 5) # Limita a 10 armoniche per chiarezza # # cmap = _cm.viridis # # norm = _plt.Normalize(1, max_harmonics) # for n in range(1, max_harmonics + 1): # harmonic = ( # a_n[n-1] * _np.cos(2 * _np.pi * n * t / T) + # b_n[n-1] * _np.sin(2 * _np.pi * n * t / T) # ) # _plt.plot( # t / xscale, ((harmonic + i) / yscale), # color="dodgerblue", lw=0.8, alpha=0.3 # ) # color=cmap(norm(n)) # Plot dati originali e approssimazione _plt.plot( t / xscale, data / yscale, label="Input data", lw=1, color="mediumblue", marker='' ) _plt.plot( t / xscale, f_approx / yscale, label=f"Partial Fourier series\nNo. of terms = {order}", lw=1, color="crimson", linestyle='-' ) # Imposta limiti e etichette if xlim: _plt.xlim(xlim) if ylim: _plt.ylim(ylim) _plt.xlabel(xlabel) _plt.ylabel(ylabel) if showlegend: _plt.legend() return f_approx, a0, _np.array(a_n), _np.array(b_n) except Exception as e: raise GenericError( message="Unexpected error in discrete Fourier series computation", context="executing dfs", original_error=e, details={"t_shape": t.shape, "data_shape": data.shape, "order": order} )
[docs] def fourier_series(f: Callable[[Union[float, ArrayLike]], Union[float, ArrayLike]], interval: Tuple[float, float], order: int, num_points: int = 1000, xlabel: str = "x [ux]", ylabel: str = "y [uy]", xscale: int = 0, yscale: int = 0) -> Tuple[ArrayLike, ArrayLike, ArrayLike, float, ArrayLike, ArrayLike]: """ Computes the Fourier series approximation of a function f(x) Parameters ---------- f : callable Function to approximate. interval : tuple of float The interval `(a, b)` over which to compute the Fourier series. order : int Number of Fourier modes to use in the approximation. num_points : int, optional Number of points for plotting. Default is 1000). Returns ------- x : array-like 1D array representing the uniformly spaced sample points over one period. These are the evaluation points at which both the original function and the Fourier approximation are computed. f_original : array-like 1D array representing the values of the original input function evaluated at the sample points `x`. This is the reference signal used for comparison with the Fourier approximation. f_approx : array-like 1D Array containing the values of the truncated Fourier series evaluated at the same sample points `x`. This is the approximation of `f_original` using a finite number of harmonics (up to the specified order). a0 : float Zeroth Fourier coefficient (mean value component of the function over the period). It corresponds to the constant term of the Fourier series. a_n : array-like Array of cosine coefficients (Fourier coefficients of the even part of the function), corresponding to each harmonic up to the specified order (excluding a0). b_n : array-like Array of sine coefficients (Fourier coefficients of the odd part of the function), corresponding to each harmonic up to the specified order. Notes ---------- The values of `xscale` and `yscale` affect only the axis scaling in the plot. All parameters are estimated using the original input data as provided. """ from scipy.integrate import quad if not callable(f): raise TypeError("'f' must be a callable function.") if not isinstance(interval, (list, tuple)): raise TypeError("'interval' must be a list or a tuple.") if len(interval) != 2: raise ValueError("'interval' must have exactly two elements.") a, b = interval if not (_np.isscalar(a) and _np.isscalar(b)): raise TypeError("'interval' elements must be scalars.") if not (isinstance(a, (int, float)) and isinstance(b, (int, float))): raise TypeError("'interval' elements must be real numbers (int or float).") if b < a: a, b = b, a warnings.warn("Integration limits 'a' and 'b' have been swapped.", UserWarning) if a == b: raise ValueError("'a' must be not equal to 'b'.") if not isinstance(order, (int)): raise TypeError("'order' must be an integer.") if order <= 0: raise ValueError("'order' must be equal or greater than 1.") if not isinstance(xlabel, (str)): raise TypeError("'xlabel' must be a string.") if not isinstance(ylabel, (str)): raise TypeError("'ylabel' must be a string.") if num_points <= 0: raise ValueError("'num_points' must be equal or greater than 1.") if not isinstance(xscale, (int, float)): raise TypeError("'xscale' must be a real number (int or float).") if not isinstance(yscale, (int, float)): raise TypeError("'yscale' must be a real number (int or float).") xscale = 10**xscale yscale = 10**yscale # aggiungere nota che la yscale è solo a livello grafico T = b - a # Period L = T / 2 x = _np.linspace(a, b, num_points) # Compute a0 separately a0, _ = quad(lambda x_: f(x_), a, b) a0 /= L # Compute coefficients a_n and b_n a_n = [] b_n = [] for n in range(1, order + 1): an, _ = quad(lambda x_: f(x_) * _np.cos(n * _np.pi * (x_ - a) / L), a, b) bn, _ = quad(lambda x_: f(x_) * _np.sin(n * _np.pi * (x_ - a) / L), a, b) a_n.append(an / L) b_n.append(bn / L) # Build the Fourier approximation f_approx = _np.full_like(x, a0 / 2) for n in range(1, order + 1): f_approx += ( a_n[n - 1] * _np.cos(n * _np.pi * (x - a) / L) + b_n[n - 1] * _np.sin(n * _np.pi * (x - a) / L) ) f_original = f(x) # Plot _plt.plot(x / xscale, f_original / yscale, label="Input function", lw=0.8, color = "blue") _plt.plot(x / xscale, f_approx / yscale, '--', label=f"Fourier approx. (order = {order})", lw=0.8, color = "red") _plt.xlabel(xlabel) _plt.ylabel(ylabel) _plt.legend() return f_original, f_approx, a0, a_n, b_n
[docs] def harmonic(t: ArrayLike, y: ArrayLike, prominence: float = 0.05, n_max: Optional[int] = None, verbose: bool = True) -> List[dict]: """ Identifies the dominant harmonics present in a real-valued signal. This function computes the FFT of the input signal y(t), finds the peaks in its amplitude spectrum, and returns their frequencies, amplitudes, and phases. The phase of each harmonic is calculated relative to the phase of the first (most prominent) harmonic. It does *not* reconstruct the time-domain components—only reports what harmonics are strongest. Parameters ---------- t : array-like Time array. y : array-like Signal samples corresponding to `t`. prominence : float, optional Minimum prominence of peaks in the power spectrum. Default is 0.05. n_max : int or None, optional If given, return at most `n_max` harmonics. verbose : bool, optional If `True`, prints a formatted table with the frequency, amplitude, and phase of the harmonics. Returns ------- harmonics : list of dict Each dict contains frequency, amplitude, and phase (rad, relative to the first harmonic) of a harmonic component. """ from scipy.signal import find_peaks t = _np.asarray(t) y = _np.asarray(y) if t.size != y.size: raise ValueError("'t' must have the same length as 'y'") if not (_np.issubdtype(y.dtype, _np.number)): raise TypeError("'y' must contain only numeric types (int, float, or complex).") if (not (_np.issubdtype(t.dtype, _np.floating) or _np.issubdtype(t.dtype, _np.integer))) or not _np.all(_np.isreal(t)): raise TypeError("'t' must contain only real numbers (int or float).") if not _np.all(_np.isfinite(y)): raise ValueError("'data' contains non-finite values (NaN or inf).") if not _np.all(_np.isfinite(t)): raise ValueError("'t' contains non-finite values (NaN or inf).") if not isinstance(prominence, (int, float)): raise TypeError("'prominence' must be a real number (int or float).") N = len(y) yf, freqs = fft(y, t) pos_mask = freqs > 0 freqs = freqs[pos_mask] yf = yf[pos_mask] power = _np.abs(yf) phases = _np.angle(yf) peaks, _ = find_peaks(power, prominence=prominence * _np.max(power)) sorted_peaks = peaks[_np.argsort(power[peaks])[::-1]] if n_max is not None: if not isinstance(prominence, (int)): raise TypeError("'prominence' must be an integer.") sorted_peaks = sorted_peaks[:n_max] harmonics = [] if sorted_peaks.size > 0: # Phase of the first (most prominent) harmonic reference_phase = phases[sorted_peaks[0]] else: reference_phase = 0 # Fallback if no peaks are found for idx in sorted_peaks: harmonics.append({ "frequency": freqs[idx], "amplitude": 2 * power[idx] / N, "phase": phases[idx] - reference_phase # Phase relative to the first harmonic }) if verbose: def format_dynamic_number(val): if val == 0: return "0" abs_val = abs(val) if abs_val >= 1e4 or abs_val <= 1e-3: return f"{val:.3e}" # Scientific format elif abs_val >= 1: return f"{val:.4f}".rstrip('0').rstrip('.') # Decimal up to 4 places else: return f"{val:.4g}" # Significant figures headers = ["Harmonic", "Frequency", "Amplitude", "Phase"] # Prepare rows with formatted values rows = [] for i, h in enumerate(harmonics, 1): row = [ f"H#{i}", format_dynamic_number(h["frequency"]), format_dynamic_number(h["amplitude"]), format_dynamic_number(h["phase"]) ] rows.append(row) # Calculate maximum width for each column col_widths = [] for col_idx, header in enumerate(headers): max_data_width = max(len(row[col_idx]) for row in rows) if rows else len(header) col_width = max(len(header), max_data_width) col_widths.append(col_width) # Build header line header_line = f"{headers[0]:<{col_widths[0]}} | " + " | ".join( f"{headers[i]:<{col_widths[i]}}" for i in range(1, len(headers)) ) divider = "-" * len(header_line) # Pretty print print() print("=" * len(header_line)) print(header_line) print(divider) for row in rows: line = f"{row[0]:<{col_widths[0]}} | " + " | ".join( f"{row[i]:<{col_widths[i]}}" for i in range(1, len(row)) ) print(line) print("=" * len(header_line)) return harmonics
[docs] def decompose(t: ArrayLike, y: ArrayLike, freqs: ArrayLike, verbose: bool = True) -> List[dict]: """ Reconstructs specified sinusoidal components from a real-valued signal. Given a set of target frequencies, this function fits the time‑domain data y(t) to a sum of sinusoids at those frequencies (via linear least squares), and returns the amplitude and phase of each component. Use this when you already know which harmonics to extract. Parameters ---------- t : array-like Time array. y : array-like Signal samples. freqs : array-like Frequencies of the components to extract. verbose : bool, optional If `True`, prints a formatted table of the components. Returns ------- components : list of dict Each dict contains frequency (Hz), amplitude, and phase (rad). """ from numpy.linalg import lstsq t = _np.asarray(t) y = _np.asarray(y) freqs = _np.asarray(freqs) if t.size != y.size: raise ValueError("'t' must have the same length as 'y'") if not (_np.issubdtype(y.dtype, _np.number)): raise TypeError("'y' must contain only numeric types (int, float, or complex).") if (not (_np.issubdtype(t.dtype, _np.floating) or _np.issubdtype(t.dtype, _np.integer))) or not _np.all(_np.isreal(t)): raise TypeError("'t' must contain only real numbers (int or float).") if (not (_np.issubdtype(freqs.dtype, _np.floating) or _np.issubdtype(freqs.dtype, _np.integer))) or not _np.all(_np.isreal(freqs)): raise TypeError("'freqs' must contain only real numbers (int or float).") if not _np.all(_np.isfinite(y)): raise ValueError("'data' contains non-finite values (NaN or inf).") if not _np.all(_np.isfinite(t)): raise ValueError("'t' contains non-finite values (NaN or inf).") A = [] for f in freqs: A.append(_np.sin(2 * _np.pi * f * t)) A.append(_np.cos(2 * _np.pi * f * t)) A = _np.vstack(A).T coeffs, _, _, _ = lstsq(A, y, rcond=None) components = [] for i, f in enumerate(freqs): sin_coef = coeffs[2 * i] cos_coef = coeffs[2 * i + 1] amplitude = _np.hypot(sin_coef, cos_coef) phase = _np.arctan2(cos_coef, sin_coef) components.append({ "frequency": f, "amplitude": amplitude, "phase": phase }) if verbose: def format_dynamic_number(val): if val == 0: return "0" abs_val = abs(val) if abs_val >= 1e4 or abs_val <= 1e-3: return f"{val:.3e}" # formato scientifico elif abs_val >= 1: return f"{val:.4f}".rstrip('0').rstrip('.') # decimali fino a 4 cifre else: return f"{val:.4g}" # cifre significative headers = ["Frequency", "Amplitude", "Phase"] # Prepara righe con valori formattati rows = [] for i, h in enumerate(components, 1): row = [ format_dynamic_number(h["frequency"]), format_dynamic_number(h["amplitude"]), format_dynamic_number(h["phase"]) ] rows.append(row) # Calcola larghezza massima per ogni colonna col_widths = [] for col_idx, header in enumerate(headers): max_data_width = max(len(row[col_idx]) for row in rows) col_width = max(len(header), max_data_width) col_widths.append(col_width) # Costruisci la riga dell'intestazione header_line = f"{headers[0]:<{col_widths[0]}} | " + " | ".join( f"{headers[i]:<{col_widths[i]}}" for i in range(1, len(headers)) ) divider = "-" * len(header_line) # Stampa elegante print() print("=" * len(header_line)) print(header_line) print(divider) for row in rows: line = f"{row[0]:<{col_widths[0]}} | " + " | ".join( f"{row[i]:<{col_widths[i]}}" for i in range(1, len(row)) ) print(line) print("=" * len(header_line)) return components
[docs] def envelope(signal: ArrayLike, method: str = 'peaks', mode: str = 'upper', filter_size: int = 31, fs: float = 1.0, remove_mean: bool = False) -> Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]: """ Compute the envelope of a 1D real-valued signal. Parameters ---------- signal : array-like The input real-valued signal (1D array). method : {'hilbert', 'peaks', 'adaptive'}, optional The method to compute the envelope: - 'hilbert': Uses the analytic signal via the Hilbert transform (best for bandpass signals). - 'peaks': Interpolates a smooth curve through signal peaks (versatile for general signals). - 'adaptive': Applies a variable-width filter to abs(signal) (robust to noise). Defaults to 'peaks'. mode : {'upper', 'lower', 'both'}, optional Select which envelope to return: - 'upper': The upper envelope (default). - 'lower': The lower envelope. - 'both': Returns a tuple (upper, lower). filter_size : int, optional Size of the smoothing filter for 'adaptive' method. Must be odd and positive. Defaults to `31`. fs : float, optional Sampling frequency in Hz, used for frequency estimation in 'adaptive' method. Defaults to `1.0`. remove_mean : bool, optional If True, removes the signal mean before computing the envelope. Defaults to `False`. Returns ------- ndarray or tuple of ndarrays The computed envelope(s): - If mode='upper' or 'lower', returns a 1D array. - If mode='both', returns a tuple (upper, lower). Notes ----- - The 'hilbert' method is mathematically rigorous for bandpass signals but may fail for broadband or non-oscillatory signals. - The 'peaks' method is versatile, interpolating through local maxima/minima, and works well for most signals. - The 'adaptive' method adjusts filter width based on local signal frequency, ideal for noisy or irregular signals. """ from scipy.signal import hilbert, find_peaks, medfilt from scipy.interpolate import UnivariateSpline # Validate signal signal = _np.asarray(signal, dtype=float) if signal.ndim != 1: raise ValueError("signal must be one-dimensional.") if not _np.issubdtype(signal.dtype, _np.floating) or not _np.all(_np.isreal(signal)): raise TypeError("signal must contain real numbers (float).") if not _np.all(_np.isfinite(signal)): raise ValueError("signal contains non-finite values (NaN or inf).") # Validate method valid_methods = ['hilbert', 'peaks', 'adaptive'] if method not in valid_methods: raise ValueError(f"method must be one of {valid_methods}.") # Validate mode valid_modes = ['upper', 'lower', 'both'] if mode not in valid_modes: raise ValueError(f"mode must be one of {valid_modes}.") # Validate filter_size if not isinstance(filter_size, int): raise TypeError("filter_size must be an integer.") if filter_size % 2 == 0 or filter_size < 1: raise ValueError("filter_size must be a positive odd integer.") # Validate fs if not isinstance(fs, (int, float)) or fs <= 0: raise ValueError("fs must be a positive number.") # Validate remove_mean if not isinstance(remove_mean, bool): raise TypeError("remove_mean must be a boolean.") # Remove mean if requested if remove_mean: signal = signal - _np.mean(signal) try: if method == 'hilbert': # Compute envelope using Hilbert transform analytic = hilbert(signal) env = _np.abs(analytic) elif method == 'peaks': # Find peaks for upper envelope peaks, _ = find_peaks(signal, distance=filter_size // 2) if len(peaks) < 2: # Fallback to constant envelope if too few peaks env = _np.full_like(signal, _np.max(_np.abs(signal))) else: # Interpolate through peaks x = _np.arange(len(signal)) spline = UnivariateSpline(peaks, signal[peaks], k=3, s=0, ext='const') env = spline(x) env = _np.maximum(env, 0) # Ensure non-negative envelope elif method == 'adaptive': # Compute absolute signal abs_sig = _np.abs(signal) # Estimate local frequency using zero-crossings zero_crossings = _np.where(_np.diff(_np.sign(signal)))[0] if len(zero_crossings) > 1: mean_period = _np.mean(_np.diff(zero_crossings)) adaptive_size = max(3, int(mean_period / 2) * 2 + 1) # Ensure odd else: adaptive_size = filter_size # Apply median filter with adaptive size env = medfilt(abs_sig, kernel_size=adaptive_size) # Handle mode if mode == 'upper': return env elif mode == 'lower': # Compute lower envelope as upper envelope of -signal lower_env = envelope(-signal, method=method, mode='upper', filter_size=filter_size, fs=fs, remove_mean=False) return -lower_env elif mode == 'both': upper = env lower_env = envelope(-signal, method=method, mode='upper', filter_size=filter_size, fs=fs, remove_mean=False) return upper, -lower_env except Exception as e: raise GenericError(f"Error computing envelope: {str(e)}")