# Copyright (c) 2016-2017 Louisiana State University
# 2017-2025 Cardiff University
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy. If not, see <http://www.gnu.org/licenses/>.
"""Python implementation of the tiled Q-transform scan.
This is a re-implementation of the original Q-transform scan from the Omega
pipeline, all credits for the original algorithm go to its
authors.
"""
from __future__ import annotations
import warnings
from math import (
ceil,
exp,
isinf,
log,
pi,
)
from typing import TYPE_CHECKING
import numpy
from numpy import fft as npfft
from ..segments import Segment
from ..utils import round_to_power
if TYPE_CHECKING:
from collections.abc import Iterator
from numpy.typing import NDArray
from ..frequencyseries import FrequencySeries
from ..spectrogram import Spectrogram
from ..table import EventTable
from ..time import LIGOTimeGPS
from ..timeseries import TimeSeries
__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"
__credits__ = [
"Scott Coughlin <scott.coughlin@ligo.org>, ",
"Alex Urban <alexander.urban@ligo.org>",
]
__all__ = [
"QGram",
"QPlane",
"QTile",
"QTiling",
"q_scan",
]
# -- q-transform defaults
#: Default ``(fmin, fmax)`` frequency range for Q-transform.
DEFAULT_FRANGE = (0, float("inf"))
#: Default maximum fractional mismatch between neighbouring tiles.
DEFAULT_MISMATCH = 0.2
#: Default ``(qlow, qhigh)`` Q range for Q-transform.
DEFAULT_QRANGE = (4, 64)
# constants
_SQRT_2 = 2 ** (1 / 2.)
# -- object class definitions --------
class QObject:
"""Base class for Q-transform objects.
This object exists just to provide basic methods for all other
Q-transform objects.
"""
def __init__(
self,
duration: float,
sampling: float,
mismatch: float = DEFAULT_MISMATCH,
) -> None:
"""Initialize a `QObject`."""
self.duration = float(duration)
self.sampling = float(sampling)
self.mismatch = float(mismatch)
@property
def deltam(self) -> float:
"""Fractional mismatch between neighbouring tiles."""
return 2 * (self.mismatch / 3.) ** (1 / 2.)
class QBase(QObject):
"""Base class for Q-transform objects with fixed Q.
This class just provides a property for Q-prime = Q / sqrt(11)
"""
def __init__(
self,
q: float,
duration: float,
sampling: float,
mismatch: float = DEFAULT_MISMATCH,
) -> None:
"""Initialize a `QBase`."""
super().__init__(
duration,
sampling,
mismatch=mismatch,
)
self.q = float(q)
@property
def qprime(self) -> float:
"""Normalized Q `(q/sqrt(11))`."""
return self.q / 11 ** (1 / 2.)
[docs]
class QTiling(QObject):
"""Iterable constructor of `QPlane` objects.
For a given Q-range, each of the resulting `QPlane` objects can
be iterated over.
Parameters
----------
duration : `float`
The duration of the data to be Q-transformed.
qrange : `tuple` of `float`
``(low, high)`` pair of Q extrema.
frange : `tuple` of `float`
``(low, high)`` pair of frequency extrema.
sampling : `float`
Sampling rate (in Hertz) of data to be Q-transformed.
mismatch : `float`
Maximum fractional mismatch between neighbouring tiles.
"""
def __init__(
self,
duration: float,
sampling: float,
qrange: tuple[float, float] = DEFAULT_QRANGE,
frange: tuple[float, float] = DEFAULT_FRANGE,
mismatch: float = DEFAULT_MISMATCH,
) -> None:
"""Initialize a `QTiling` object."""
super().__init__(
duration,
sampling,
mismatch=mismatch,
)
self.qrange = (float(qrange[0]), float(qrange[1]))
# format (min, max) frequency range
qlist = list(self._iter_qs())
fmin, fmax = float(frange[0]), float(frange[1])
if fmin == 0: # set non-zero lower frequency
fmin = 50 * max(qlist) / (2 * pi * self.duration)
maxf = (
self.sampling
/ 2
/ (1 + 11 ** (1 / 2.) / min(qlist))
)
if isinf(fmax):
fmax = maxf
elif fmax > maxf: # truncate upper frequency to maximum
warnings.warn(
f"upper frequency of {fmax} Hz is too high for the given "
f"Q range, resetting to {maxf} Hz",
stacklevel=2,
)
fmax = maxf
self.frange = (fmin, fmax)
@property
def qs(self) -> numpy.ndarray:
"""Array of Q values for this `QTiling`."""
return numpy.array(list(self._iter_qs()))
@property
def whitening_duration(self) -> float:
"""The recommended data duration required for whitening."""
return max(t.whitening_duration for t in self)
def _iter_qs(self) -> Iterator[float]:
"""Iterate over the Q values."""
# work out how many Qs we need
sqrt_2 = _SQRT_2
cumum = log(self.qrange[1] / self.qrange[0]) / sqrt_2
nplanes = int(max(ceil(cumum / self.deltam), 1))
delta_q = cumum / nplanes
for i in range(nplanes):
yield self.qrange[0] * exp(sqrt_2 * delta_q * (i + .5))
def __iter__(self) -> Iterator[QPlane]:
"""Iterate over this `QTiling`.
Yields a `QPlane` at each Q value.
"""
for q in self._iter_qs():
yield QPlane(
q,
self.frange,
self.duration,
self.sampling,
mismatch=self.mismatch,
)
[docs]
class QPlane(QBase):
"""Iterable representation of a Q-transform plane.
For a given Q, an array of frequencies can be iterated over, yielding
a `QTile` each time.
Parameters
----------
q : `float`
The Q-value for this plane.
frange : `tuple` of `float`
``(low, high)`` range of frequencies for this plane.
duration : `float`
The duration of the data to be Q-transformed.
sampling : `float`
Sampling rate (in Hertz) of data to be Q-transformed.
mismatch : `float`
Maximum fractional mismatch between neighbouring tiles.
"""
def __init__(
self,
q: float,
frange: tuple[float, float],
duration: float,
sampling: float,
mismatch: float = DEFAULT_MISMATCH,
) -> None:
"""Initialize a `QPlane` object."""
super().__init__(
q,
duration,
sampling,
mismatch=mismatch,
)
# format (min, max) frequency range
fmin, fmax = float(frange[0]), float(frange[1])
if fmin == 0: # set non-zero lower frequency
fmin = 50 * self.q / (2 * pi * self.duration)
if isinf(fmax): # set non-infinite upper frequency
fmax = self.sampling / 2 / (1 + 1 / self.qprime)
self.frange = (fmin, fmax)
def __iter__(self) -> Iterator[QTile]:
"""Iterate over this `QPlane`.
Yields a `QTile` at each frequency.
"""
# for each frequency, yield a QTile
for freq in self._iter_frequencies():
yield QTile(
self.q,
freq,
self.duration,
self.sampling,
mismatch=self.mismatch,
)
def _iter_frequencies(self) -> Iterator[float]:
"""Iterate over the frequencies of this `QPlane`."""
# work out how many frequencies we need
minf, maxf = self.frange
fcum_mismatch = log(maxf / minf) * (2 + self.q ** 2) ** (1 / 2.) / 2.
nfreq = int(max(1, ceil(fcum_mismatch / self.deltam)))
fstep = fcum_mismatch / nfreq
fstepmin = 1 / self.duration
# for each frequency, yield a QTile
last = None
for i in range(nfreq):
this = (
minf
* exp(2 / (2 + self.q ** 2) ** (1 / 2.) * (i + .5) * fstep)
// fstepmin * fstepmin
)
if this != last: # yield only unique elements
yield this
last = this
@property
def frequencies(self) -> NDArray[numpy.float64]:
"""Array of central frequencies for this `QPlane`."""
return numpy.array(list(self._iter_frequencies()))
@property
def farray(self) -> NDArray[numpy.float64]:
"""Array of frequencies for the lower-edge of each frequency bin."""
bandwidths = 2 * pi ** (1 / 2.) * self.frequencies / self.q
return self.frequencies - bandwidths / 2.
@property
def whitening_duration(self) -> float:
"""The recommended data duration required for whitening."""
return round_to_power(
self.q / (2 * self.frange[0]),
base=2,
which=None,
)
[docs]
class QTile(QBase):
"""Representation of a tile with fixed Q and frequency."""
def __init__(
self,
q: float,
frequency: float,
duration: float,
sampling: float,
mismatch: float = DEFAULT_MISMATCH,
) -> None:
"""Initialize a `QTile` object."""
super().__init__(
q,
duration,
sampling,
mismatch=mismatch,
)
self.frequency = frequency
@property
def bandwidth(self) -> float:
"""The bandwidth for tiles in this row."""
return 2 * pi ** (1 / 2.) * self.frequency / self.q
@property
def ntiles(self) -> int:
"""The number of tiles in this row."""
tcum_mismatch = self.duration * 2 * pi * self.frequency / self.q
return int(round_to_power(
tcum_mismatch / self.deltam,
base=2,
which="upper",
))
@property
def windowsize(self) -> int:
"""The size of the frequency-domain window for this row."""
return 2 * int(
self.frequency
/ self.qprime
* self.duration,
) + 1
def _get_indices(self) -> NDArray[numpy.int64]:
half = int((self.windowsize - 1) / 2)
return numpy.arange(-half, half + 1)
[docs]
def get_window(self) -> NDArray[numpy.float64]:
"""Generate the bi-square window for this row.
Returns
-------
window : `numpy.ndarray`
"""
# real frequencies
wfrequencies = self._get_indices() / self.duration
# dimensionless frequencies
xfrequencies = wfrequencies * self.qprime / self.frequency
# normalize and generate bi-square window
norm = (
self.ntiles
/ (self.duration * self.sampling)
* (315 * self.qprime / (128 * self.frequency)) ** (1 / 2.)
)
return (1 - xfrequencies ** 2) ** 2 * norm
[docs]
def get_data_indices(self) -> numpy.ndarray[tuple[int], numpy.dtype[numpy.int64]]:
"""Return the index array of interesting frequencies for this row."""
return numpy.round(
self._get_indices() + 1 + self.frequency * self.duration,
).astype(int)
@property
def padding(self) -> tuple[int, int]:
"""The `(left, right)` padding required for the IFFT."""
pad = self.ntiles - self.windowsize
return (int((pad - 1) / 2.), int((pad + 1) / 2.))
[docs]
class QGram:
"""Store tile energies over an irregularly gridded plane.
Parameters
----------
plane : `QPlane`
The time-frequency plane over which to populate.
energies : `list` of `TimeSeries`
A list of signal energies for each row of tiles.
search : `~gwpy.segments.Segment`
Search window of interest to determine the loudest tile.
"""
def __init__(
self,
plane: QPlane,
energies: list[TimeSeries],
search: tuple[float, float] | None,
) -> None:
"""Initialize a `QGram` object."""
self.plane = plane
self.energies = energies
self.peak = self._find_peak(search)
def _find_peak(self, search: tuple[float, float] | None) -> dict[str, float]:
peak: dict[str, float] = {
"energy": 0.,
"snr": 0.,
"time": 0.,
"frequency": 0.,
}
for freq, energy in zip(self.plane.frequencies, self.energies, strict=True):
if search is not None:
energy = energy.crop(*search) # noqa: PLW2901
maxidx = energy.value.argmax()
maxe = energy.value[maxidx]
if maxe > peak["energy"]:
peak.update({
"energy": maxe,
"snr": (2 * maxe) ** (1 / 2.),
"time": energy.t0.value + energy.dt.value * maxidx,
"frequency": freq,
})
return peak
[docs]
def interpolate(
self,
tres: float | None | str = "<default>",
fres: float | None | str = "<default>",
*,
logf: bool = False,
outseg: Segment | None = None,
) -> Spectrogram:
"""Interpolate this `QGram` over a regularly-gridded spectrogram.
Parameters
----------
tres : `float`
Desired time resolution (seconds) of output `Spectrogram`,
default is `abs(outseg) / 1000.`
fres : `float`, `int`, `None`
Desired frequency resolution (Hertz) of output `Spectrogram`,
or, if ``logf=True``, the number of frequency samples;
give `None` to skip this step and return the original resolution,
default is 0.5 Hz or 500 frequency samples.
logf : `bool`
Boolean switch to enable (`True`) or disable (`False`) use of
log-sampled frequencies in the output `Spectrogram`.
outseg : `~gwpy.segments.Segment`
GPS `[start, stop)` segment for output `Spectrogram`,
default is the full duration of the input.
Returns
-------
out : `~gwpy.spectrogram.Spectrogram`
Output `Spectrogram` of normalised Q energy.
See Also
--------
scipy.interpolate
This method uses `~scipy.interpolate.InterpolatedUnivariateSpline`
to cast all frequency rows to a common time-axis, and then
`~scipy.interpolate.interp2d` to apply the desired frequency
resolution across the band.
Notes
-----
This method will return a `Spectrogram` of dtype ``float32`` if
``norm`` is given, and ``float64`` otherwise.
To optimize plot rendering with `~matplotlib.axes.Axes.pcolormesh`,
the output `~gwpy.spectrogram.Spectrogram` can be given a log-sampled
frequency axis by passing `logf=True` at runtime. The `fres` argument
is then the number of points on the frequency axis. Note, this is
incompatible with `~matplotlib.axes.Axes.imshow`.
It is also highly recommended to use the `outseg` keyword argument
when only a small window around a given GPS time is of interest.
"""
from scipy.interpolate import (
InterpolatedUnivariateSpline,
RectBivariateSpline,
)
from ..spectrogram import Spectrogram
if outseg is None:
outseg = self.energies[0].span
frequencies = self.plane.frequencies
dtype = self.energies[0].dtype
# build regular Spectrogram from peak-Q data by interpolating each
# (Q, frequency) `TimeSeries` to have the same time resolution
if tres == "<default>":
tres = abs(Segment(outseg)) / 1000.
xout: numpy.ndarray[tuple[int], numpy.dtype[numpy.float64]] = numpy.arange(
outseg[0],
outseg[1],
step=tres,
)
nx = xout.size
ny = frequencies.size
out = Spectrogram(
numpy.empty((nx, ny), dtype=dtype),
t0=outseg[0],
dt=tres,
frequencies=frequencies,
)
# record Q in output
out.q = self.plane.q
# interpolate rows
for i, row in enumerate(self.energies):
xrow = numpy.arange(
row.x0.value,
(row.x0 + row.duration).value,
row.dx.value,
)
interp = InterpolatedUnivariateSpline(xrow, row.value)
out[:, i] = interp(xout).astype(
dtype,
casting="same_kind",
copy=False,
)
if fres is None:
return out
# interpolate the spectrogram to increase its frequency resolution
# --- this is done because Duncan doesn't like interpolated images
# since they don't support log scaling
interp = RectBivariateSpline(xout, frequencies, out.value)
if not logf:
if fres == "<default>":
fres = .5
outfreq = numpy.arange(
self.plane.frange[0],
self.plane.frange[1],
fres,
dtype=dtype,
)
else:
if fres == "<default>":
fres = 500
outfreq = numpy.geomspace(
self.plane.frange[0],
self.plane.frange[1],
num=int(fres),
)
new = type(out)(
interp(xout, outfreq).astype(
dtype,
casting="same_kind",
copy=False,
),
t0=outseg[0],
dt=tres,
frequencies=outfreq,
)
new.q = self.plane.q
return new
[docs]
def table(self, snrthresh: float = 5.5) -> EventTable:
"""Represent this `QPlane` as an `EventTable`.
Parameters
----------
snrthresh : `float`
Lower inclusive threshold on individual tile SNR to keep in the
table, default: 5.5.
Returns
-------
out : `~gwpy.table.EventTable`
A table of time-frequency tiles on this `QPlane`.
Notes
-----
Only tiles with signal energy greater than or equal to
`snrthresh ** 2 / 2` will be stored in the output `EventTable`.
"""
from ..table import EventTable
# get plane properties
freqs = self.plane.frequencies
bws = 2 * (freqs - self.plane.farray)
# collect table data as a recarray
names = (
"time",
"frequency",
"duration",
"bandwidth",
"energy",
)
rec = numpy.recarray((0,), names=names, formats=["f8"] * len(names))
for f, bw, row in zip(freqs, bws, self.energies, strict=True):
ind, = (row.value >= snrthresh ** 2 / 2.).nonzero()
new = ind.size
if new > 0:
rec.resize((rec.size + new,), refcheck=False)
rec["time"][-new:] = row.times.value[ind]
rec["frequency"][-new:] = f
rec["duration"][-new:] = row.dt.to("s").value
rec["bandwidth"][-new:] = bw
rec["energy"][-new:] = row.value[ind]
# save to a table
out = EventTable(rec, copy=False)
out.meta["q"] = self.plane.q
return out
# -- utilities -----------------------
[docs]
def q_scan(
data: TimeSeries,
mismatch: float = DEFAULT_MISMATCH,
qrange: tuple[float, float] = DEFAULT_QRANGE,
frange: tuple[float, float] = DEFAULT_FRANGE,
duration: float | None = None,
sampling: float | None = None,
**kwargs,
) -> tuple[QGram, float]:
"""Transform data by scanning over a `QTiling`.
This utility is provided mainly to allow direct manipulation of the
`QTiling.transform` output. Most users probably just want to use
:meth:`~gwpy.timeseries.TimeSeries.q_transform`, which wraps around this.
Parameters
----------
data : `~gwpy.timeseries.TimeSeries` or `ndarray`
The time- or frequency-domain input data.
mismatch : `float`
Maximum allowed fractional mismatch between neighbouring tiles.
qrange : `tuple` of `float`
``(low, high)`` range of Qs to scan.
frange : `tuple` of `float`
``(low, high)`` range of frequencies to scan.
duration : `float`
Duration (seconds) of input, required if `data` is not a `TimeSeries`.
sampling : `float`
Sample rate (Hertz) of input, required if `data` is not a `TimeSeries`.
kwargs
Other keyword arguments to be passed to :meth:`QTiling.transform`,
including ``'epoch'`` and ``'search'``
Returns
-------
qgram : `QGram`
The raw output of :meth:`QTiling.transform`.
far : `float`
Expected false alarm rate (Hertz) of white Gaussian noise with the
same peak energy and total duration as `qgram`.
"""
from gwpy.timeseries import TimeSeries
# prepare input
if isinstance(data, TimeSeries):
duration = abs(data.span)
sampling = data.sample_rate.to("Hz").value
kwargs.setdefault("epoch", data.x0.value)
data = data.fft().value
# return a raw Q-transform and its significance
return QTiling(
duration,
sampling,
mismatch=mismatch,
qrange=qrange,
frange=frange,
).transform(data, **kwargs)