Initial commit of port to Python3
This commit is contained in:
parent
d3763d9ba9
commit
c5c5eb38c4
20
LICENSE
20
LICENSE
|
|
@ -1,9 +1,21 @@
|
||||||
MIT License
|
MIT License
|
||||||
|
|
||||||
Copyright (c) 2026 welsberr
|
Copyright (c) original pytfd authors and contributors
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
||||||
|
|
|
||||||
73
README.md
73
README.md
|
|
@ -1,3 +1,72 @@
|
||||||
# pytfd_compat
|
# pytfd-compat
|
||||||
|
|
||||||
A compatible 'pytfd' port to Python3, with improvements in window handling, documentation, and testing. Pytfd provides a useful short-time-Fourier-transform (STFT) function suitable for biosonar analysis.
|
`pytfd-compat` is a Python 3 compatibility layer for the small `pytfd` API
|
||||||
|
surface in this repository. It keeps the original centered-window STFT behavior
|
||||||
|
available while fixing the weak points in the older ports:
|
||||||
|
|
||||||
|
- explicit padding behavior
|
||||||
|
- consistent window construction
|
||||||
|
- tests for the core transforms
|
||||||
|
- example programs that exercise the API
|
||||||
|
|
||||||
|
## Replacement assessment
|
||||||
|
|
||||||
|
Modern libraries already cover parts of the old `pytfd` feature set:
|
||||||
|
|
||||||
|
- SciPy provides production-quality STFT and ISTFT support.
|
||||||
|
- librosa provides a higher-level STFT API for audio workflows.
|
||||||
|
- `tftb` provides a larger collection of time-frequency distributions.
|
||||||
|
|
||||||
|
Those are useful building blocks, but none is a clean drop-in replacement for
|
||||||
|
the legacy `pytfd` module layout and its dense centered-window STFT semantics.
|
||||||
|
This package therefore implements the original transforms directly and uses
|
||||||
|
modern SciPy window generation where it helps.
|
||||||
|
|
||||||
|
## Licensing and provenance
|
||||||
|
|
||||||
|
The original upstream codebase was located at `code.google.com/archive/p/pytfd/`
|
||||||
|
and is MIT licensed. This compatibility package preserves that lineage while
|
||||||
|
modernizing the implementation for Python 3.
|
||||||
|
|
||||||
|
## What is included
|
||||||
|
|
||||||
|
- `pytfd_compat.stft.stft`
|
||||||
|
- `pytfd_compat.stft.spec`
|
||||||
|
- `pytfd_compat.wd.wd`
|
||||||
|
- `pytfd_compat.pwd.pwd`
|
||||||
|
- `pytfd_compat.sm.sm`
|
||||||
|
- `pytfd_compat.windows.get_window`
|
||||||
|
- validation scripts comparing against SciPy and, when installed, `tftb`
|
||||||
|
|
||||||
|
## Install
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python -m pip install -e ./pytfd_compat
|
||||||
|
```
|
||||||
|
|
||||||
|
## Test
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd pytfd_compat
|
||||||
|
pytest
|
||||||
|
```
|
||||||
|
|
||||||
|
## Validation
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd pytfd_compat
|
||||||
|
PYTHONPATH=src python validation/run_validation.py
|
||||||
|
```
|
||||||
|
|
||||||
|
This runs reproducible comparisons against SciPy's `ShortTimeFFT` and reports
|
||||||
|
alignment, numerical differences, and timing. If `tftb` is installed, the same
|
||||||
|
script can extend the comparison set.
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python examples/basic_stft.py
|
||||||
|
python examples/distributions_demo.py
|
||||||
|
```
|
||||||
|
|
||||||
|
If `matplotlib` is installed, the examples can also render images.
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pytfd_compat import spec, stft
|
||||||
|
from pytfd_compat.windows import get_window
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
sample_rate = 48_000
|
||||||
|
duration = 0.01
|
||||||
|
t = np.arange(int(sample_rate * duration)) / sample_rate
|
||||||
|
signal = np.sin(2 * np.pi * 6_000 * t) + 0.35 * np.sin(2 * np.pi * 11_000 * t)
|
||||||
|
window = get_window("hanning", 63)
|
||||||
|
|
||||||
|
spectrum = stft(signal, window, hop=8, n_fft=256)
|
||||||
|
magnitude = spec(signal, window, hop=8, n_fft=256)
|
||||||
|
|
||||||
|
print(f"signal shape: {signal.shape}")
|
||||||
|
print(f"stft shape: {spectrum.shape}")
|
||||||
|
print(f"magnitude range: {magnitude.min():.4f} .. {magnitude.max():.4f}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
return
|
||||||
|
|
||||||
|
plt.imshow(20 * np.log10(np.maximum(magnitude, 1e-8)), aspect="auto", origin="lower")
|
||||||
|
plt.title("Basic STFT Example")
|
||||||
|
plt.xlabel("Frame")
|
||||||
|
plt.ylabel("Frequency Bin")
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pytfd_compat import pwd, sm, stft, wd
|
||||||
|
from pytfd_compat.windows import get_window
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
sample_rate = 24_000
|
||||||
|
t = np.arange(256) / sample_rate
|
||||||
|
signal = np.sin(2 * np.pi * (2_000 + 4_000 * t) * t)
|
||||||
|
analysis_window = get_window("gaussian", 31, a=1.5)
|
||||||
|
smoothing_kernel = np.array([0.2, 0.6, 0.2])
|
||||||
|
|
||||||
|
stft_matrix = stft(signal, analysis_window, hop=4, n_fft=256)
|
||||||
|
wd_matrix = wd(signal)
|
||||||
|
pwd_matrix = pwd(signal, analysis_window, n_fft=256)
|
||||||
|
sm_matrix = sm(signal, analysis_window, smoothing_kernel, hop=4, n_fft=256)
|
||||||
|
|
||||||
|
print("STFT:", stft_matrix.shape)
|
||||||
|
print("WD:", wd_matrix.shape)
|
||||||
|
print("PWD:", pwd_matrix.shape)
|
||||||
|
print("SM:", sm_matrix.shape)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=68", "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "pytfd-compat"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Python 3 compatibility layer for the original MIT-licensed pytfd API"
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.10"
|
||||||
|
license = {text = "MIT"}
|
||||||
|
authors = [
|
||||||
|
{name = "Edin Salkovic"},
|
||||||
|
{name = "OpenAI Codex"}
|
||||||
|
]
|
||||||
|
dependencies = [
|
||||||
|
"numpy>=1.26",
|
||||||
|
"scipy>=1.12"
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
dev = [
|
||||||
|
"pytest>=7.4"
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.setuptools]
|
||||||
|
package-dir = {"" = "src"}
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
where = ["src"]
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
testpaths = ["tests"]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
"""Backward-compatible alias for the renamed package."""
|
||||||
|
|
||||||
|
from pytfd_compat import * # noqa: F401,F403
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
"""Compatibility-minded time-frequency distributions for Python 3."""
|
||||||
|
|
||||||
|
from .pwd import pwd
|
||||||
|
from .sm import sm
|
||||||
|
from .stft import spec, spectogram, stft
|
||||||
|
from .wd import wd
|
||||||
|
from .windows import get_window
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"get_window",
|
||||||
|
"pwd",
|
||||||
|
"sm",
|
||||||
|
"spec",
|
||||||
|
"spectogram",
|
||||||
|
"stft",
|
||||||
|
"wd",
|
||||||
|
]
|
||||||
|
|
@ -0,0 +1,73 @@
|
||||||
|
"""Shared helpers for centered-window time-frequency transforms."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_length(length: int) -> int:
|
||||||
|
if length <= 0:
|
||||||
|
raise ValueError("length must be positive")
|
||||||
|
return int(length)
|
||||||
|
|
||||||
|
|
||||||
|
def subset(x: np.ndarray, center: int, length: int, padding: str = "zeros") -> np.ndarray:
|
||||||
|
"""Return a centered slice of ``x`` with deterministic edge handling."""
|
||||||
|
|
||||||
|
signal = np.asarray(x)
|
||||||
|
length = _validate_length(length)
|
||||||
|
if signal.ndim != 1:
|
||||||
|
raise ValueError("subset expects a 1D input signal")
|
||||||
|
if padding not in {"zeros", "circular", "reflect"}:
|
||||||
|
raise ValueError("padding must be 'zeros', 'circular', or 'reflect'")
|
||||||
|
|
||||||
|
left = length // 2
|
||||||
|
right = length - left
|
||||||
|
start = int(center) - left
|
||||||
|
stop = int(center) + right
|
||||||
|
|
||||||
|
if padding == "circular":
|
||||||
|
indices = np.arange(start, stop) % signal.shape[0]
|
||||||
|
return signal[indices]
|
||||||
|
|
||||||
|
if padding == "reflect":
|
||||||
|
pad_left = max(0, -start)
|
||||||
|
pad_right = max(0, stop - signal.shape[0])
|
||||||
|
padded = np.pad(signal, (pad_left, pad_right), mode="reflect")
|
||||||
|
return padded[start + pad_left : stop + pad_left]
|
||||||
|
|
||||||
|
result = np.zeros(length, dtype=signal.dtype)
|
||||||
|
src_start = max(start, 0)
|
||||||
|
src_stop = min(stop, signal.shape[0])
|
||||||
|
if src_stop > src_start:
|
||||||
|
dst_start = src_start - start
|
||||||
|
dst_stop = dst_start + (src_stop - src_start)
|
||||||
|
result[dst_start:dst_stop] = signal[src_start:src_stop]
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def zeropad_center(values: np.ndarray, length: int) -> np.ndarray:
|
||||||
|
"""Center-pad a 1D array with zeros to a target length."""
|
||||||
|
|
||||||
|
arr = np.asarray(values)
|
||||||
|
length = _validate_length(length)
|
||||||
|
if arr.ndim != 1:
|
||||||
|
raise ValueError("zeropad_center expects a 1D array")
|
||||||
|
if arr.shape[0] > length:
|
||||||
|
raise ValueError("target length must be at least the input length")
|
||||||
|
|
||||||
|
pad_total = length - arr.shape[0]
|
||||||
|
pad_left = pad_total // 2
|
||||||
|
pad_right = pad_total - pad_left
|
||||||
|
return np.pad(arr, (pad_left, pad_right), mode="constant")
|
||||||
|
|
||||||
|
|
||||||
|
def legacy_hop(signal_length: int, L: int | None) -> int:
|
||||||
|
"""Translate the original ``L`` argument into a hop size."""
|
||||||
|
|
||||||
|
signal_length = _validate_length(signal_length)
|
||||||
|
if L is None:
|
||||||
|
return 1
|
||||||
|
if L <= 0:
|
||||||
|
raise ValueError("L must be positive")
|
||||||
|
return max(1, signal_length // int(L))
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
"""Pseudo Wigner distribution."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from numpy.fft import fft
|
||||||
|
|
||||||
|
from .helpers import subset, zeropad_center
|
||||||
|
from .windows import coerce_window
|
||||||
|
|
||||||
|
|
||||||
|
def pwd(
|
||||||
|
x: np.ndarray,
|
||||||
|
w,
|
||||||
|
*,
|
||||||
|
n_fft: int | None = None,
|
||||||
|
padding: str = "zeros",
|
||||||
|
window_length: int | None = None,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Compute the pseudo Wigner distribution."""
|
||||||
|
|
||||||
|
signal = np.asarray(x)
|
||||||
|
if signal.ndim != 1:
|
||||||
|
raise ValueError("pwd expects a 1D input signal")
|
||||||
|
|
||||||
|
if window_length is None and not isinstance(w, (str, tuple)) and not callable(w):
|
||||||
|
window = coerce_window(w)
|
||||||
|
else:
|
||||||
|
window = coerce_window(w, length=window_length)
|
||||||
|
|
||||||
|
fft_length = int(n_fft or signal.shape[0])
|
||||||
|
if fft_length < window.shape[0]:
|
||||||
|
raise ValueError("n_fft must be at least as large as the window length")
|
||||||
|
|
||||||
|
padded_window = zeropad_center(window, fft_length).astype(float)
|
||||||
|
reversed_window = padded_window[::-1].conjugate()
|
||||||
|
columns = []
|
||||||
|
for center in range(signal.shape[0]):
|
||||||
|
segment = subset(signal, center=center, length=fft_length, padding=padding)
|
||||||
|
columns.append(fft(padded_window * reversed_window * segment * segment[::-1].conjugate(), n=fft_length))
|
||||||
|
return np.asarray(columns, dtype=np.complex128).T
|
||||||
|
|
@ -0,0 +1,53 @@
|
||||||
|
"""S-method distribution."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from .stft import stft
|
||||||
|
|
||||||
|
|
||||||
|
def sm(
|
||||||
|
x: np.ndarray,
|
||||||
|
w,
|
||||||
|
P: np.ndarray,
|
||||||
|
*,
|
||||||
|
L: int | None = None,
|
||||||
|
hop: int | None = None,
|
||||||
|
n_fft: int | None = None,
|
||||||
|
padding: str = "zeros",
|
||||||
|
window_length: int | None = None,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Compute the S-method distribution from STFT slices."""
|
||||||
|
|
||||||
|
kernel = np.asarray(P)
|
||||||
|
if kernel.ndim != 1 or kernel.shape[0] % 2 == 0:
|
||||||
|
raise ValueError("P must be a 1D array with odd length")
|
||||||
|
|
||||||
|
spectrum = stft(
|
||||||
|
x,
|
||||||
|
w,
|
||||||
|
L=L,
|
||||||
|
hop=hop,
|
||||||
|
n_fft=n_fft,
|
||||||
|
padding=padding,
|
||||||
|
window_length=window_length,
|
||||||
|
)
|
||||||
|
result = np.zeros_like(spectrum)
|
||||||
|
radius = kernel.shape[0] // 2
|
||||||
|
width = spectrum.shape[1]
|
||||||
|
|
||||||
|
for offset in range(-radius, radius + 1):
|
||||||
|
weight = kernel[offset + radius]
|
||||||
|
if offset >= 0:
|
||||||
|
left = slice(offset, width - offset)
|
||||||
|
result[:, left] += weight * spectrum[:, 2 * offset : width + offset] * np.conjugate(
|
||||||
|
spectrum[:, : width - 2 * offset]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
pos = -offset
|
||||||
|
left = slice(pos, width - pos)
|
||||||
|
result[:, left] += weight * spectrum[:, : width - 2 * pos] * np.conjugate(
|
||||||
|
spectrum[:, 2 * pos : width]
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
@ -0,0 +1,67 @@
|
||||||
|
"""Short-time Fourier transform helpers."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from numpy.fft import fft
|
||||||
|
|
||||||
|
from .helpers import legacy_hop, subset, zeropad_center
|
||||||
|
from .windows import coerce_window
|
||||||
|
|
||||||
|
|
||||||
|
def stft(
|
||||||
|
x: np.ndarray,
|
||||||
|
w,
|
||||||
|
L: int | None = None,
|
||||||
|
*,
|
||||||
|
hop: int | None = None,
|
||||||
|
n_fft: int | None = None,
|
||||||
|
padding: str = "zeros",
|
||||||
|
window_length: int | None = None,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Compute a centered-window STFT.
|
||||||
|
|
||||||
|
``L`` is retained for compatibility with the original package. It controls how
|
||||||
|
densely the signal is sampled in time and is translated into ``hop`` when
|
||||||
|
``hop`` is not supplied.
|
||||||
|
"""
|
||||||
|
|
||||||
|
signal = np.asarray(x)
|
||||||
|
if signal.ndim != 1:
|
||||||
|
raise ValueError("stft expects a 1D input signal")
|
||||||
|
|
||||||
|
if window_length is None and not isinstance(w, (str, tuple)) and not callable(w):
|
||||||
|
window = coerce_window(w)
|
||||||
|
else:
|
||||||
|
window = coerce_window(w, length=window_length)
|
||||||
|
|
||||||
|
if window.ndim != 1 or window.shape[0] == 0:
|
||||||
|
raise ValueError("window must be a non-empty 1D array")
|
||||||
|
|
||||||
|
fft_length = int(n_fft or signal.shape[0])
|
||||||
|
if fft_length < window.shape[0]:
|
||||||
|
raise ValueError("n_fft must be at least as large as the window length")
|
||||||
|
|
||||||
|
step = int(hop or legacy_hop(signal.shape[0], L))
|
||||||
|
if step <= 0:
|
||||||
|
raise ValueError("hop must be positive")
|
||||||
|
|
||||||
|
padded_window = zeropad_center(window, fft_length).astype(np.result_type(signal, float))
|
||||||
|
frames = []
|
||||||
|
for center in range(0, signal.shape[0], step):
|
||||||
|
segment = subset(signal, center=center, length=fft_length, padding=padding)
|
||||||
|
frames.append(fft(segment * padded_window, n=fft_length))
|
||||||
|
if not frames:
|
||||||
|
return np.zeros((fft_length, 0), dtype=np.complex128)
|
||||||
|
return np.asarray(frames, dtype=np.complex128).T
|
||||||
|
|
||||||
|
|
||||||
|
def spec(*args, **kwargs) -> np.ndarray:
|
||||||
|
"""Return the STFT magnitude."""
|
||||||
|
|
||||||
|
return np.abs(stft(*args, **kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
spectogram = spec
|
||||||
|
|
||||||
|
__all__ = ["stft", "spec", "spectogram"]
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
"""Wigner distribution."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from numpy.fft import fft
|
||||||
|
|
||||||
|
from .helpers import subset
|
||||||
|
|
||||||
|
|
||||||
|
def wd(x: np.ndarray, *, padding: str = "zeros") -> np.ndarray:
|
||||||
|
"""Compute the Wigner distribution using centered lag products."""
|
||||||
|
|
||||||
|
signal = np.asarray(x)
|
||||||
|
if signal.ndim != 1:
|
||||||
|
raise ValueError("wd expects a 1D input signal")
|
||||||
|
|
||||||
|
length = signal.shape[0]
|
||||||
|
conj_signal = np.conjugate(signal)
|
||||||
|
columns = []
|
||||||
|
for center in range(length):
|
||||||
|
forward = subset(signal, center=center, length=length, padding=padding)
|
||||||
|
backward = subset(conj_signal, center=center, length=length, padding=padding)[::-1]
|
||||||
|
columns.append(fft(forward * backward, n=length))
|
||||||
|
return np.asarray(columns, dtype=np.complex128).T
|
||||||
|
|
@ -0,0 +1,107 @@
|
||||||
|
"""Window construction helpers."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy.signal import windows as scipy_windows
|
||||||
|
|
||||||
|
|
||||||
|
def rectangular(length: int) -> np.ndarray:
|
||||||
|
return np.ones(int(length), dtype=float)
|
||||||
|
|
||||||
|
|
||||||
|
def gaussian(length: int, a: float = 1.0) -> np.ndarray:
|
||||||
|
length = int(length)
|
||||||
|
if length <= 0:
|
||||||
|
raise ValueError("length must be positive")
|
||||||
|
if length == 1:
|
||||||
|
return np.ones(1, dtype=float)
|
||||||
|
midpoint = (length - 1) / 2.0
|
||||||
|
scale = max(midpoint / max(a, np.finfo(float).eps), np.finfo(float).eps)
|
||||||
|
n = np.arange(length) - midpoint
|
||||||
|
return np.exp(-((n / scale) ** 2))
|
||||||
|
|
||||||
|
|
||||||
|
def lanczos(length: int) -> np.ndarray:
|
||||||
|
length = int(length)
|
||||||
|
if length <= 0:
|
||||||
|
raise ValueError("length must be positive")
|
||||||
|
n = np.arange(length) - (length - 1) / 2.0
|
||||||
|
scale = max((length - 1) / 2.0, np.finfo(float).eps)
|
||||||
|
return np.sinc(n / scale)
|
||||||
|
|
||||||
|
|
||||||
|
def welch(length: int) -> np.ndarray:
|
||||||
|
length = int(length)
|
||||||
|
if length <= 0:
|
||||||
|
raise ValueError("length must be positive")
|
||||||
|
midpoint = (length - 1) / 2.0
|
||||||
|
scale = max(length / 2.0, np.finfo(float).eps)
|
||||||
|
n = np.arange(length)
|
||||||
|
return 1.0 - ((n - midpoint) / scale) ** 2
|
||||||
|
|
||||||
|
|
||||||
|
def get_window(name: str, length: int, **kwargs) -> np.ndarray:
|
||||||
|
"""Create a window by name."""
|
||||||
|
|
||||||
|
key = name.lower()
|
||||||
|
if key in {"rectangular", "rect", "boxcar"}:
|
||||||
|
return rectangular(length)
|
||||||
|
if key in {"gaussian", "gauss"}:
|
||||||
|
return gaussian(length, **kwargs)
|
||||||
|
if key in {"hanning", "hann"}:
|
||||||
|
return np.hanning(int(length))
|
||||||
|
if key == "hamming":
|
||||||
|
return np.hamming(int(length))
|
||||||
|
if key == "blackman":
|
||||||
|
return np.blackman(int(length))
|
||||||
|
if key == "kaiser":
|
||||||
|
beta = kwargs.pop("beta", 14.0)
|
||||||
|
return np.kaiser(int(length), beta)
|
||||||
|
if key in {"triangular", "bartlett"}:
|
||||||
|
return np.bartlett(int(length))
|
||||||
|
if key == "lanczos":
|
||||||
|
return lanczos(length)
|
||||||
|
if key == "welch":
|
||||||
|
return welch(length)
|
||||||
|
if key == "tukey":
|
||||||
|
alpha = kwargs.pop("alpha", 0.5)
|
||||||
|
return scipy_windows.tukey(int(length), alpha=alpha)
|
||||||
|
raise ValueError(f"unsupported window '{name}'")
|
||||||
|
|
||||||
|
|
||||||
|
def coerce_window(
|
||||||
|
window: str | tuple | np.ndarray | list[float] | Callable[[int], np.ndarray],
|
||||||
|
*,
|
||||||
|
length: int | None = None,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Resolve a window spec into a 1D floating-point array."""
|
||||||
|
|
||||||
|
if callable(window):
|
||||||
|
if length is None:
|
||||||
|
raise ValueError("window_length is required when window is callable")
|
||||||
|
return np.asarray(window(int(length)), dtype=float)
|
||||||
|
if isinstance(window, tuple):
|
||||||
|
if not window:
|
||||||
|
raise ValueError("window tuple cannot be empty")
|
||||||
|
if length is None:
|
||||||
|
raise ValueError("window_length is required for tuple window specs")
|
||||||
|
name, *args = window
|
||||||
|
if len(args) == 1 and isinstance(args[0], dict):
|
||||||
|
return get_window(str(name), int(length), **args[0])
|
||||||
|
if len(args) == 1:
|
||||||
|
return get_window(str(name), int(length), beta=args[0])
|
||||||
|
raise ValueError("tuple window specs must be ('name', value) or ('name', kwargs)")
|
||||||
|
if isinstance(window, str):
|
||||||
|
if length is None:
|
||||||
|
raise ValueError("window_length is required when window is a string")
|
||||||
|
return get_window(window, int(length))
|
||||||
|
|
||||||
|
arr = np.asarray(window, dtype=float)
|
||||||
|
if arr.ndim != 1:
|
||||||
|
raise ValueError("window must be 1D")
|
||||||
|
if length is not None and arr.shape[0] != int(length):
|
||||||
|
raise ValueError("window length does not match window_length")
|
||||||
|
return arr
|
||||||
|
|
@ -0,0 +1,9 @@
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
SRC = ROOT / "src"
|
||||||
|
for path in (ROOT, SRC):
|
||||||
|
if str(path) not in sys.path:
|
||||||
|
sys.path.insert(0, str(path))
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pytfd_compat import pwd, sm, wd
|
||||||
|
|
||||||
|
|
||||||
|
def test_wd_shape_and_dtype():
|
||||||
|
x = np.array([0.0, 1.0, 0.0, 0.0])
|
||||||
|
result = wd(x)
|
||||||
|
assert result.shape == (4, 4)
|
||||||
|
assert np.iscomplexobj(result)
|
||||||
|
|
||||||
|
|
||||||
|
def test_pwd_shape():
|
||||||
|
x = np.linspace(0.0, 1.0, 6)
|
||||||
|
result = pwd(x, np.ones(3))
|
||||||
|
assert result.shape == (6, 6)
|
||||||
|
|
||||||
|
|
||||||
|
def test_sm_shape():
|
||||||
|
x = np.linspace(0.0, 1.0, 8)
|
||||||
|
w = np.hanning(5)
|
||||||
|
kernel = np.array([0.25, 0.5, 0.25])
|
||||||
|
result = sm(x, w, kernel, hop=2, n_fft=8)
|
||||||
|
assert result.shape == (8, 4)
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pytfd_compat.helpers import legacy_hop, subset, zeropad_center
|
||||||
|
|
||||||
|
|
||||||
|
def test_subset_zero_padding():
|
||||||
|
x = np.array([1, 2, 3, 4])
|
||||||
|
np.testing.assert_array_equal(subset(x, center=0, length=4), np.array([0, 0, 1, 2]))
|
||||||
|
np.testing.assert_array_equal(subset(x, center=3, length=4), np.array([2, 3, 4, 0]))
|
||||||
|
|
||||||
|
|
||||||
|
def test_subset_circular_padding():
|
||||||
|
x = np.array([1, 2, 3, 4])
|
||||||
|
np.testing.assert_array_equal(subset(x, center=0, length=4, padding="circular"), np.array([3, 4, 1, 2]))
|
||||||
|
|
||||||
|
|
||||||
|
def test_zeropad_center_even_padding():
|
||||||
|
x = np.array([1.0, 2.0, 3.0])
|
||||||
|
np.testing.assert_array_equal(zeropad_center(x, 7), np.array([0.0, 0.0, 1.0, 2.0, 3.0, 0.0, 0.0]))
|
||||||
|
|
||||||
|
|
||||||
|
def test_legacy_hop_matches_dense_default():
|
||||||
|
assert legacy_hop(16, None) == 1
|
||||||
|
assert legacy_hop(16, 4) == 4
|
||||||
|
|
@ -0,0 +1,49 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pytfd_compat import spec, stft
|
||||||
|
|
||||||
|
|
||||||
|
def manual_stft_reference(x, w, hop, n_fft):
|
||||||
|
padded_window = np.pad(w, ((n_fft - len(w)) // 2, n_fft - len(w) - (n_fft - len(w)) // 2))
|
||||||
|
columns = []
|
||||||
|
for center in range(0, len(x), hop):
|
||||||
|
left = n_fft // 2
|
||||||
|
right = n_fft - left
|
||||||
|
start = center - left
|
||||||
|
stop = center + right
|
||||||
|
segment = np.zeros(n_fft, dtype=float)
|
||||||
|
src_start = max(start, 0)
|
||||||
|
src_stop = min(stop, len(x))
|
||||||
|
if src_stop > src_start:
|
||||||
|
dst_start = src_start - start
|
||||||
|
segment[dst_start : dst_start + (src_stop - src_start)] = x[src_start:src_stop]
|
||||||
|
columns.append(np.fft.fft(segment * padded_window))
|
||||||
|
return np.asarray(columns).T
|
||||||
|
|
||||||
|
|
||||||
|
def test_stft_default_is_dense_and_centered():
|
||||||
|
x = np.array([1, 2, 3, 4], dtype=float)
|
||||||
|
w = np.array([1, 1], dtype=float)
|
||||||
|
result = stft(x, w)
|
||||||
|
expected = manual_stft_reference(x, w, hop=1, n_fft=len(x))
|
||||||
|
np.testing.assert_allclose(result, expected)
|
||||||
|
assert result.shape == (4, 4)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stft_legacy_L_controls_column_count():
|
||||||
|
x = np.arange(8, dtype=float)
|
||||||
|
w = np.ones(3)
|
||||||
|
result = stft(x, w, L=4)
|
||||||
|
assert result.shape == (8, 4)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stft_accepts_named_windows():
|
||||||
|
x = np.arange(10, dtype=float)
|
||||||
|
result = stft(x, "hanning", hop=2, n_fft=10, window_length=5)
|
||||||
|
assert result.shape == (10, 5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_spec_is_magnitude():
|
||||||
|
x = np.arange(6, dtype=float)
|
||||||
|
w = np.ones(3)
|
||||||
|
np.testing.assert_allclose(spec(x, w), np.abs(stft(x, w)))
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from validation.compare import compare_stft_to_scipy, make_validation_signals
|
||||||
|
|
||||||
|
|
||||||
|
def test_validation_signals_present():
|
||||||
|
signals = make_validation_signals()
|
||||||
|
assert {"dual_tone_impulse", "linear_chirp", "click_train"} <= set(signals)
|
||||||
|
|
||||||
|
|
||||||
|
def test_scipy_alignment_validation_small_signal():
|
||||||
|
signal = make_validation_signals()["dual_tone_impulse"]
|
||||||
|
metrics = compare_stft_to_scipy(signal, window_name="hanning", window_length=33, hop=8, n_fft=128)
|
||||||
|
assert metrics["best_offset"] >= 0
|
||||||
|
assert metrics["max_abs_diff"] < 1e-10
|
||||||
|
assert metrics["mean_abs_diff"] < 1e-12
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from pytfd_compat.windows import coerce_window, get_window
|
||||||
|
|
||||||
|
|
||||||
|
def test_named_window_lengths():
|
||||||
|
assert get_window("hanning", 8).shape == (8,)
|
||||||
|
assert get_window("kaiser", 8, beta=8.0).shape == (8,)
|
||||||
|
|
||||||
|
|
||||||
|
def test_coerce_window_array_round_trip():
|
||||||
|
window = np.array([1.0, 2.0, 3.0])
|
||||||
|
np.testing.assert_array_equal(coerce_window(window), window)
|
||||||
|
|
||||||
|
|
||||||
|
def test_coerce_window_callable():
|
||||||
|
window = coerce_window(lambda n: np.ones(n), length=4)
|
||||||
|
np.testing.assert_array_equal(window, np.ones(4))
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
"""Validation and benchmark helpers for pytfd_compat."""
|
||||||
|
|
@ -0,0 +1,98 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from time import perf_counter
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy.signal import ShortTimeFFT
|
||||||
|
|
||||||
|
from pytfd_compat import stft
|
||||||
|
from pytfd_compat.helpers import zeropad_center
|
||||||
|
from pytfd_compat.windows import get_window
|
||||||
|
|
||||||
|
|
||||||
|
def make_validation_signals(length: int = 512) -> dict[str, np.ndarray]:
|
||||||
|
t = np.linspace(0.0, 1.0, length, endpoint=False)
|
||||||
|
|
||||||
|
dual_tone_impulse = np.sin(2 * np.pi * 40 * t) + 0.7 * np.sin(2 * np.pi * 110 * t)
|
||||||
|
dual_tone_impulse = dual_tone_impulse.copy()
|
||||||
|
dual_tone_impulse[length // 4] += 4.0
|
||||||
|
dual_tone_impulse[3 * length // 4] += 4.0
|
||||||
|
|
||||||
|
linear_chirp = np.sin(2 * np.pi * (20 + 160 * t) * t)
|
||||||
|
click_train = np.zeros(length, dtype=float)
|
||||||
|
click_train[length // 8 :: length // 8] = 1.0
|
||||||
|
|
||||||
|
return {
|
||||||
|
"dual_tone_impulse": dual_tone_impulse,
|
||||||
|
"linear_chirp": linear_chirp,
|
||||||
|
"click_train": click_train,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _best_alignment(reference: np.ndarray, candidate: np.ndarray) -> tuple[int, float, float]:
|
||||||
|
if candidate.shape[0] != reference.shape[0]:
|
||||||
|
raise ValueError("frequency dimensions must match for alignment")
|
||||||
|
if candidate.shape[1] < reference.shape[1]:
|
||||||
|
raise ValueError("candidate must have at least as many columns as reference")
|
||||||
|
|
||||||
|
best_offset = 0
|
||||||
|
best_max = np.inf
|
||||||
|
best_mean = np.inf
|
||||||
|
for offset in range(candidate.shape[1] - reference.shape[1] + 1):
|
||||||
|
current = candidate[:, offset : offset + reference.shape[1]]
|
||||||
|
diff = np.abs(reference - current)
|
||||||
|
current_max = float(diff.max())
|
||||||
|
current_mean = float(diff.mean())
|
||||||
|
if current_max < best_max:
|
||||||
|
best_offset = offset
|
||||||
|
best_max = current_max
|
||||||
|
best_mean = current_mean
|
||||||
|
return best_offset, best_max, best_mean
|
||||||
|
|
||||||
|
|
||||||
|
def compare_stft_to_scipy(
|
||||||
|
signal: np.ndarray,
|
||||||
|
*,
|
||||||
|
window_name: str = "hanning",
|
||||||
|
window_length: int = 63,
|
||||||
|
hop: int = 8,
|
||||||
|
n_fft: int = 256,
|
||||||
|
) -> dict[str, float]:
|
||||||
|
signal = np.asarray(signal, dtype=float)
|
||||||
|
window = get_window(window_name, window_length)
|
||||||
|
|
||||||
|
t0 = perf_counter()
|
||||||
|
ours = stft(signal, window, hop=hop, n_fft=n_fft)
|
||||||
|
ours_seconds = perf_counter() - t0
|
||||||
|
|
||||||
|
padded_window = zeropad_center(window, n_fft)
|
||||||
|
reference_impl = ShortTimeFFT(
|
||||||
|
padded_window,
|
||||||
|
hop=hop,
|
||||||
|
fs=1.0,
|
||||||
|
mfft=n_fft,
|
||||||
|
fft_mode="twosided",
|
||||||
|
phase_shift=None,
|
||||||
|
)
|
||||||
|
t1 = perf_counter()
|
||||||
|
scipy_output = reference_impl.stft(signal)
|
||||||
|
scipy_seconds = perf_counter() - t1
|
||||||
|
|
||||||
|
best_offset, max_abs_diff, mean_abs_diff = _best_alignment(ours, scipy_output)
|
||||||
|
return {
|
||||||
|
"best_offset": float(best_offset),
|
||||||
|
"max_abs_diff": max_abs_diff,
|
||||||
|
"mean_abs_diff": mean_abs_diff,
|
||||||
|
"ours_seconds": ours_seconds,
|
||||||
|
"scipy_seconds": scipy_seconds,
|
||||||
|
"ours_columns": float(ours.shape[1]),
|
||||||
|
"scipy_columns": float(scipy_output.shape[1]),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def tftb_available() -> bool:
|
||||||
|
try:
|
||||||
|
import tftb # noqa: F401
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
|
||||||
|
ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
SRC = ROOT / "src"
|
||||||
|
for path in (ROOT, SRC):
|
||||||
|
if str(path) not in sys.path:
|
||||||
|
sys.path.insert(0, str(path))
|
||||||
|
|
||||||
|
from validation.compare import compare_stft_to_scipy, make_validation_signals, tftb_available
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
print("pytfd_compat validation")
|
||||||
|
print("reference: scipy.signal.ShortTimeFFT")
|
||||||
|
print(f"tftb installed: {'yes' if tftb_available() else 'no'}")
|
||||||
|
|
||||||
|
for name, signal in make_validation_signals().items():
|
||||||
|
metrics = compare_stft_to_scipy(signal)
|
||||||
|
print(f"\n{name}")
|
||||||
|
print(f" best_offset: {int(metrics['best_offset'])}")
|
||||||
|
print(f" max_abs_diff: {metrics['max_abs_diff']:.3e}")
|
||||||
|
print(f" mean_abs_diff: {metrics['mean_abs_diff']:.3e}")
|
||||||
|
print(f" pytfd_compat_seconds: {metrics['ours_seconds']:.6f}")
|
||||||
|
print(f" scipy_seconds: {metrics['scipy_seconds']:.6f}")
|
||||||
|
print(
|
||||||
|
f" columns: pytfd_compat={int(metrics['ours_columns'])}, "
|
||||||
|
f"scipy={int(metrics['scipy_columns'])}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in New Issue