Source code for gwpy.detector.channel

# Copyright (c) 2014-2017 Louisiana State University
#               2017-2025 Cardiff University
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""The `Channel` and `ChannelList`."""

from __future__ import annotations

import re
from copy import copy
from math import ceil
from typing import (
    TYPE_CHECKING,
    overload,
)

import numpy
from astropy import units
from urllib3.util import parse_url

from ..io import nds2 as io_nds2
from ..io.registry import UnifiedReadWriteMethod
from ..time import to_gps
from .connect import (
    ChannelListRead,
    ChannelListWrite,
)
from .units import parse_unit

if TYPE_CHECKING:
    import builtins
    from typing import (
        Literal,
        Self,
        SupportsFloat,
    )

    import arrakis
    import nds2
    from astropy.units import (
        Quantity,
        UnitBase,
    )

    from ..segments import SegmentListDict
    from ..time import SupportsToGps
    from ..typing import UnitLike

__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"

# NOTE: when we depend on numpy>=2.0, numpy.bool_ here can
#       be replaced with numpy.bool.
BOOL_TYPES = {bool, numpy.bool_, numpy.dtype(bool), "bool"}

QUOTE_REGEX = re.compile(r'^[\s\"\']+|[\s\"\']+$')


[docs] class Channel: """Representation of a gravitational-wave detectory data channel. Parameters ---------- name : `str`, `Channel` Name of this Channel (or another Channel itself). If a `Channel` is given, all other parameters not set explicitly will be copied over. sample_rate : `float`, `~astropy.units.Quantity`, optional Rate at which data are sampled for this channel, simple floats must be given in Hz. unit : `~astropy.units.Unit`, `str`, optional Name of the unit for the data of this channel. frequency_range : `tuple` of `float` ``[low, high)`` spectral frequency range of interest for this channel. safe : `bool`, optional If `True` this channel is 'safe' to use as a witness of non-gravitational-wave noise in the detector. dtype : `numpy.dtype`, optional Numeric type of data for this channel. frametype : `str`, optional LDAS name for frames that contain this channel. model : `str`, optional Name of the SIMULINK front-end model that produces this channel. Notes ----- The `Channel` structure implemented here is designed to match the data recorded in the LIGO Channel Information System (https://cis.ligo.org) for which a query interface is provided. """ MATCH = re.compile( r"((?:(?P<ifo>[A-Z]\d))?|[\w-]+):" # match IFO prefix r"(?:(?P<system>[a-zA-Z0-9]+))?" # match system r"(?:[-_](?P<subsystem>[a-zA-Z0-9]+))?" # match subsystem r"(?:[-_](?P<signal>[a-zA-Z0-9_-]+?))?" # match signal r"(?:[\.-](?P<trend>[a-z]+))?" # match trend type r"(?:,(?P<type>([a-z]-)?[a-z]+))?$", # match channel type ) def __init__( self, name: str | Self, **params, ) -> None: """Create a new `Channel`. Parameters ---------- name : `str`, `Channel` The name of the new channel, or an existing channel to copy. params ``(key, value)`` pairs for attributes of new channel. """ # init properties self._name: str # set in _init_name() self._ifo: str | None = None self._system: str | None = None self._subsystem: str | None = None self._signal: str | None = None self._trend: str | None = None self._sample_rate: Quantity | None = None self._unit: UnitBase | None = None self._frequency_range: Quantity | None = None self._safe: bool | None = None self._type: str | None = None self._dtype: numpy.dtype | None = None self._frametype: str | None = None self._model: str | None = None self._url: str | None = None # copy existing Channel if isinstance(name, Channel): self._init_from_channel(name) # parse name into component parts else: self._init_name(name) # set metadata for key, value in params.items(): setattr(self, key, value) def _init_from_channel(self, other: Self) -> None: """Copy attributes from ``other`` to this channel.""" for key, value in vars(other).items(): setattr(self, key, copy(value)) def _init_name(self, name: str) -> None: """Initialise the name of this `Channel`.""" try: parts = self.parse_channel_name(name) except (TypeError, ValueError): # failed to parse self.name = str(name) else: self.name = str(name).split(",")[0] for key, val in parts.items(): try: setattr(self, key, val) except AttributeError: setattr(self, f"_{key}", val) # -- properties ----------------------------- @property def name(self) -> str: """Name of this channel. This should follow the naming convention, with the following format: 'IFO:SYSTEM-SUBSYSTEM_SIGNAL' """ return self._name @name.setter def name(self, name: str) -> None: self._name = str(name) @property def sample_rate(self) -> Quantity | None: """Rate of samples (Hertz) for this channel.""" return self._sample_rate @sample_rate.setter def sample_rate(self, rate: SupportsFloat | Quantity | None) -> None: if rate is None: self._sample_rate = None elif isinstance(rate, units.Quantity): self._sample_rate = rate else: # pylint: disable=no-member self._sample_rate = units.Quantity(float(rate), unit=units.Hertz) @sample_rate.deleter def sample_rate(self) -> None: self._sample_rate = None @property def unit(self) -> UnitBase | None: """Data unit for this channel.""" return self._unit @unit.setter def unit(self, u: UnitLike) -> None: if u is None: self._unit = None else: self._unit = parse_unit(u) @unit.deleter def unit(self) -> None: self._unit = None @property def frequency_range(self) -> Quantity | None: """Frequency range of interest (Hertz) for this channel.""" return self._frequency_range @frequency_range.setter def frequency_range(self, frange: tuple[float, float] | None) -> None: if frange is None: self._frequency_range = None else: low, hi = frange self._frequency_range = units.Quantity((low, hi), unit="Hz") @frequency_range.deleter def frequency_range(self) -> None: self._frequency_range = None @property def safe(self) -> bool | None: """Whether this channel is 'safe' to use as a noise witness. Any channel that records part or all of a GW signal as it interacts with the interferometer is not safe to use as a noise witness A safe value of `None` simply indicates that the safety of this channel has not been determined """ return self._safe @safe.setter def safe(self, s: bool | None) -> None: if s is None: self._safe = None else: self._safe = bool(s) @safe.deleter def safe(self) -> None: self._safe = None @property def model(self) -> str | None: """Name of the SIMULINK front-end model that defines this channel.""" return self._model @model.setter def model(self, mdl: str | None) -> None: if mdl is None: self._model = None else: self._model = mdl.lower() @model.deleter def model(self) -> None: self._model = None @property def type(self) -> str | None: """DAQ data type for this channel. Valid values for this field are restricted to those understood by the NDS2 client sofware, namely: 'm-trend', 'online', 'raw', 'reduced', 's-trend', 'static', 'test-pt' """ return self._type @type.setter def type(self, type_: str | int | None) -> None: if type_ is None: self._type = None else: self._type = io_nds2.Nds2ChannelType.find(type_).nds2name @type.deleter def type(self) -> None: self._type = None @property def ndstype(self) -> int: """NDS type integer for this channel. This property is mapped to the `Channel.type` string. """ if self.type is None: return 0 # UNKNOWN return io_nds2.Nds2ChannelType.find(self.type).value @ndstype.setter def ndstype(self, type_: str | int | None) -> None: self.type = type_ @ndstype.deleter def ndstype(self) -> None: del self.type @property def dtype(self) -> numpy.dtype | None: """Numeric type for data in this channel.""" return self._dtype @dtype.setter def dtype(self, type_: str | int | builtins.type | None) -> None: if type_ is None: self._dtype = None elif type_ in BOOL_TYPES: # NDS2 doesn't support bool self._dtype = numpy.dtype(bool) else: self._dtype = io_nds2.Nds2DataType.find(type_).dtype @dtype.deleter def dtype(self) -> None: self._dtype = None @property def url(self) -> str | None: """CIS browser url for this channel.""" return self._url @url.setter def url(self, href: str | None) -> None: if href is None: self._url = None else: url = parse_url(href) if url.scheme not in ("http", "https", "file"): msg = f"Invalid URL '{href}'" raise ValueError(msg) self._url = href @url.deleter def url(self) -> None: self._url = None @property def frametype(self) -> str | None: """LDAS type description for frame files containing this channel.""" return self._frametype @frametype.setter def frametype(self, ft: str | None) -> None: self._frametype = ft @frametype.deleter def frametype(self) -> None: self._frametype = None # -- read-only properties ------------------- @property def ifo(self) -> str | None: """Interferometer prefix for this channel.""" return self._ifo @property def system(self) -> str | None: """Instrumental system for this channel.""" return self._system @property def subsystem(self) -> str | None: """Instrumental sub-system for this channel.""" return self._subsystem @property def signal(self) -> str | None: """Instrumental signal for this channel.""" return self._signal @property def trend(self) -> str | None: """Trend type for this channel.""" return self._trend @property def texname(self) -> str: """Name of this channel in LaTeX printable format.""" return str(self).replace("_", r"\_") @property def ndsname(self) -> str: """Name of this channel as stored in the NDS database.""" if self.type not in {None, "raw", "reduced", "online"}: return f"{self.name},{self.type}" return self.name # -- classmethods ---------------------------
[docs] @classmethod def query( cls, name: str, *, kerberos: bool | None = None, **kwargs, ) -> Self: """Query the LIGO Channel Information System for ``name``. Parameters ---------- name : `str` Name of channel for which to query. kerberos : `bool`, optional If `True` use an existing Kerberos ticket as the authentication credential. Default (`None`) is to check for credentials and request username and password if none are found. kwargs Other keyword arguments are passed directly to :func:`ciecplib.get`. Returns ------- channel : `Channel` A new `Channel` containing all of the attributes set from its entry in the CIS. """ channellist = ChannelList.query(name, kerberos=kerberos, **kwargs) if not channellist: msg = f"No channels found matching '{name}'" raise ValueError(msg) if len(channellist) > 1: msg = ( f"{len(channellist)} channels found matching '{name}', " "please refine search, or use `ChannelList.query` to " "return all results" ) raise ValueError(msg) return channellist[0]
[docs] @classmethod def query_nds2( cls, name: str, host: str | None = None, port: int | None = None, connection: nds2.connection | None = None, type: str | int | None = None, # noqa: A002 ) -> Self: """Query an NDS server for channel information. Parameters ---------- name : `str` Name of channel for which to query. host : `str`, optional Name of NDS2 server. port : `int`, optional Port number for NDS2 connection. connection : `nds2.connection` Open connection to use for query. type : `str`, `int` NDS2 channel type with which to restrict query Returns ------- channel : `Channel` Channel with metadata retrieved from NDS2 server. Raises ------ ValueError If multiple channels are found for a given name. Notes ----- .. warning:: One of ``host`` or ``connection`` is required. """ return ChannelList.query_nds2( [name], host=host, port=port, connection=connection, type=type, unique=True, )[0]
[docs] @classmethod def from_nds2( cls, nds2channel: nds2.channel, ) -> Self: """Generate a new channel using an existing nds2.channel object.""" return cls( nds2channel.name, sample_rate=nds2channel.sample_rate, unit=nds2channel.signal_units or None, dtype=nds2channel.data_type, type=nds2channel.channel_type, )
[docs] @classmethod def from_arrakis( cls, arrakischannel: arrakis.Channel, ) -> Self: """Generate a new channel using an existing `arrakis.Channel`. Parameters ---------- arrakischannel : `arrakis.Channel` The input channel from Arrakis to parse. Returns ------- channel : `gwpy.detector.Channel` A new `Channel`. """ return cls( arrakischannel.name, sample_rate=arrakischannel.sample_rate, dtype=arrakischannel.data_type, )
# -- methods --------------------------------
[docs] @classmethod def parse_channel_name( cls, name: str, *, strict: bool = True, ) -> dict[str, str | None]: """Decompose a channel name string into its components. This method parses channels acccording to the LIGO Channel Naming Convention :dcc:`LIGO-T990033`. Parameters ---------- name : `str` Name to parse. strict : `bool`, optional If `True` (default) require exact matching of format, with no surrounding text. Returns ------- match : `dict` `dict` of channel name components with the following keys: - `'ifo'`: the letter-number interferometer prefix - `'system'`: the top-level system name - `'subsystem'`: the second-level sub-system name - `'signal'`: the remaining underscore-delimited signal name - `'trend'`: the trend type - `'ndstype'`: the NDS2 channel suffix Any optional keys that aren't found will return a value of `None` Raises ------ ValueError if the name cannot be parsed with at least an IFO and SYSTEM Examples -------- >>> Channel.parse_channel_name("L1:LSC-DARM_IN1_DQ") {'ifo': 'L1', 'ndstype': None, 'signal': 'IN1_DQ', 'subsystem': 'DARM', 'system': 'LSC', 'trend': None} >>> Channel.parse_channel_name( ... "H1:ISI-BS_ST1_SENSCOR_GND_STS_X_BLRMS_100M_300M.rms,m-trend", ... ) {'ifo': 'H1', 'ndstype': 'm-trend', 'signal': 'ST1_SENSCOR_GND_STS_X_BLRMS_100M_300M', 'subsystem': 'BS', 'system': 'ISI', 'trend': 'rms'} """ match = cls.MATCH.search(name) if match is None or ( strict and (match.start() != 0 or match.end() != len(name)) ): msg = "Cannot parse channel name according to LIGO-T990033" raise ValueError(msg) return match.groupdict()
@overload def find_frametype( self, gpstime: SupportsToGps | None = None, *, frametype_match: str | re.Pattern | None = None, return_all: Literal[False] = False, allow_tape: bool = True, **kwargs, ) -> str: ... @overload def find_frametype( self, gpstime: SupportsToGps | None = None, *, frametype_match: str | re.Pattern | None = None, return_all: Literal[True] = True, allow_tape: bool = True, **kwargs, ) -> list[str]: ...
[docs] def find_frametype( self, gpstime: SupportsToGps | None = None, *, frametype_match: str | re.Pattern | None = None, return_all: bool = False, allow_tape: bool = True, **kwargs, ) -> str | list[str]: """Find the containing frametype(s) for this `Channel`. Parameters ---------- gpstime : `int`, optional A reference GPS time at which to search for frame files. Default is to search in the latest available data for each discoverable dataset. frametype_match : `str` A regular expression string to use to down-select from the list of all available datasets. return_all: `bool`, optional If `True` return all matched datasets; if `False` (default) only the first match is returned allow_tape : `bool`, default: `True` If `True` (default) include datasets whose files are stored on slow magnetic tape. kwargs Other keyword arguments are passed directly to :func:`gwpy.io.datafind.find_frametype`. Returns ------- frametype : `str` or `list[str]` If ``return_all=False`` a single `str` containing the 'best' dataset name. If ``return_all=True`` a `list` of dataset names. """ from ..io import datafind return datafind.find_frametype( self, gpstime=gpstime, frametype_match=frametype_match, return_all=return_all, allow_tape=allow_tape, **kwargs, )
[docs] def copy(self) -> Self: """Return a copy of this `Channel`.""" return type(self)(self)
def __str__(self) -> str: """Return the name of this `Channel`.""" return self.name or "" def __repr__(self) -> str: """Return a printable representation of this `Channel`.""" repr_ = f'<Channel("{self}"' if self.type: repr_ += f" [{self.type}]" repr_ += f", {self.sample_rate}" return repr_ + f") at {hex(id(self))}>" def __eq__(self, other: object) -> bool: """Return `True` if all attributes of this channel match ``other``.""" try: for attr in ("name", "sample_rate", "unit", "url", "type", "dtype"): if getattr(self, attr) != getattr(other, attr): return False except ( AttributeError, # no such attribute TypeError, # attribute values can't be compared ): return False return True def __hash__(self) -> int: """Return a hash of this `Channel`.""" hash_ = 0 for attr in ("name", "sample_rate", "unit", "url", "type", "dtype"): hash_ += hash(getattr(self, attr)) return hash_
[docs] class ChannelList(list): """A `list` of `channels <Channel>`, with parsing utilities.""" # -- properties ------------------ @property def ifos(self) -> set[str]: """The `set` of interferometer prefixes used in this `ChannelList`.""" return {c.ifo for c in self} # -- i/o ------------------------- read = UnifiedReadWriteMethod(ChannelListRead) write = UnifiedReadWriteMethod(ChannelListWrite) # -- methods ---------------------
[docs] @classmethod def from_names(cls, *names: str) -> Self: """Create a new `ChannelList` from a list of names. The list of names can include comma-separated sets of names, in which case the return will be a flattened list of all parsed channel names. """ new = cls() for namestr in names: for name in cls._split_names(namestr): new.append(Channel(name)) return new
@staticmethod def _split_names(namestr: str) -> list[str]: """Split a comma-separated list of channel names.""" out = [] namestr = QUOTE_REGEX.sub("", namestr) while True: namestr = namestr.strip("' \n") if "," not in namestr: break for nds2type in [*io_nds2.Nds2ChannelType.nds2names(), ""]: if nds2type and f",{nds2type}" in namestr: try: channel, ctype, namestr = namestr.split(",", 2) except ValueError: channel, ctype = namestr.split(",") namestr = "" out.append(f"{channel},{ctype}") break if nds2type == "" and "," in namestr: channel, namestr = namestr.split(",", 1) out.append(channel) break if namestr: out.append(namestr) return out
[docs] def find(self, name: str) -> int: """Find the `Channel` with a specific name in this `ChannelList`. Parameters ---------- name : `str` name of the `Channel` to find Returns ------- index : `int` the position of the first `Channel` in this `ChannelList` whose `~Channel.name` matches the search key. Raises ------ ValueError if no matching `Channel` is found. """ for i, chan in enumerate(self): if name == chan.name: return i raise ValueError(name)
[docs] def sieve( self, name: str | re.Pattern | None = None, sample_rate: float | None = None, *, sample_range: tuple[float, float] | None = None, exact_match: bool = False, **others, ) -> Self: """Find all channels in this list matching the specified criteria. Parameters ---------- name : `str`, or regular expression Any part of the channel name against which to match (or full name if `exact_match=False` is given). sample_rate : `float` Rate (number of samples per second) to match exactly. sample_range : 2-`tuple` `[low, high]` closed interval or rates to match within. exact_match : `bool` If `True` return channels matching ``name`` exactly. If `False` (default) allow partial matches. others: Other ``(key, value)`` attribute pairs to match. Returns ------- new : `ChannelList` A new `ChannelList` containing the matching channels. """ # format name regex if isinstance(name, re.Pattern): flags = name.flags name = str(name.pattern) else: flags = 0 if name is not None: if exact_match: name = name if name.startswith(r"\A") else fr"\A{name}" name = name if name.endswith(r"\Z") else fr"{name}\Z" name_regexp = re.compile(name, flags=flags) def _match(channel: Channel) -> bool: if name is not None and name_regexp.search(channel.name) is None: return False if sample_rate is not None and channel.sample_rate != sample_rate: return False if sample_range is not None and ( channel.sample_rate is None or sample_range[0] > channel.sample_rate.value or sample_range[1] <= channel.sample_rate.value ): return False for key, val in others.items(): if val is not None and getattr(channel, key, None) != val: return False return True return type(self)(filter(_match, self))
[docs] @classmethod def query( cls, name: str, *, kerberos: bool | None = None, **kwargs, ) -> Self: """Query the LIGO Channel Information System a `ChannelList`. Parameters ---------- name : `str` name of channel, or part of it. kerberos : `bool`, optional If `True` use an existing Kerberos ticket as the authentication credential. Default (`None`) is to check for credentials and request username and password if none are found. kwargs Other keyword arguments are passed directly to :func:`ciecplib.get`. Returns ------- channels : `ChannelList` A new list containing all `Channels <Channel>` found. """ from .io import cis return cls(cis.query( name, kerberos=kerberos, **kwargs, ))
[docs] @classmethod def query_nds2( cls, names: list[str], *, host: str | None = None, port: int | None = None, connection: nds2.connection | None = None, type: str | int | None = io_nds2.NDS2_CHANNEL_TYPE_ANY, # noqa: A002 unique: bool = False, ) -> Self: """Query an NDS server for channel information. Parameters ---------- names : `str` Names of requested channels. host : `str`, optional Name of NDS2 server. port : `int`, optional Port number for NDS2 connection. connection : `nds2.connection`, optional Open connection to use for query. type : `str`, `int`, optional NDS2 channel type with which to restrict query unique : `bool`, optional If `True` require a unique query result for each name given. Default is `False`. Returns ------- channellist : `ChannelList` list of `Channels <Channel>` with metadata retrieved from NDS2 server Raises ------ ValueError If multiple channels are found for a given name and `unique=True` is given. Notes ----- .. warning:: One of ``host`` or ``connection`` is required. """ ndschannels = io_nds2.find_channels( names, host=host, port=port, connection=connection, type=type, unique=unique, ) return cls(map(Channel.from_nds2, ndschannels))
[docs] @classmethod def query_nds2_availability( cls, channels: list[str | Channel], start: SupportsToGps, end: SupportsToGps, ctype: int | str = io_nds2.Nds2ChannelType.any().value, connection: nds2.connection | None = None, host: str | None = None, port: int | None = None, ) -> SegmentListDict: """Query for when data are available for these channels in NDS2. Parameters ---------- channels : `list` of `str` or `Channel` List of `Channel` or `str` for which to search. start : `~gwpy.time.LIGOTimeGPS`, `float`, `str` GPS start time of search. Any input parseable by `~gwpy.time.to_gps` is fine. end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional GPS end time of search. Any input parseable by `~gwpy.time.to_gps` is fine. ctype : `int`, `str` The NDS2 channel type name or enum ID against which to restrict results. Default is ``127`` which means 'all'. host : `str`, optional Name of NDS2 server to use. port : `int`, optional Port number for NDS2 connection. connection : `nds2.connection`, optional Open connection to use for query. Returns ------- segdict : `~gwpy.segments.SegmentListDict` dict of ``(name, SegmentList)`` pairs """ start = int(to_gps(start)) end = ceil(to_gps(end)) with io_nds2._connection(connection=connection, host=host, port=port) as conn: chans = io_nds2.find_channels( channels, unique=True, epoch=(start, end), type=ctype, connection=conn, ) availability = io_nds2.get_availability( chans, start, end, connection=conn, ) return type(availability)(zip( channels, availability.values(), strict=True, ))