Source code for gwpy.segments.flag

# Copyright (c) 2014-2017 Louisiana State University
#               2017-2025 Cardiff University
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""The `DataQualityFlag` and `DataQualityDict`.

The `DataQualityFlag` represents an annotated set of data-quality segments
indicating something about the state of a laser-interferometer
gravitational-wave detector in a given time interval.

The `DataQualityDict` is just a `dict` of flags, provided as a convenience
for handling multiple flags over the same global time interval.
"""

from __future__ import annotations

import contextlib
import json
import operator
import os
import re
import textwrap
import warnings
from concurrent.futures import ThreadPoolExecutor
from copy import (
    copy as shallowcopy,
    deepcopy,
)
from functools import (
    reduce,
    wraps,
)
from io import BytesIO
from math import (
    ceil,
    floor,
)
from typing import TYPE_CHECKING
from urllib.parse import urlparse

from astropy.table import Row as AstropyTableRow
from dqsegdb2.query import query_segments
from dqsegdb2.utils import get_default_host
from gwosc import timeline
from numpy import inf
from requests import codes as http_codes
from requests.exceptions import (
    HTTPError,
    RequestException,
)

from ..io.registry import UnifiedReadWriteMethod
from ..time import (
    LIGOTimeGPS,
    to_gps,
)
from ..utils.misc import if_not_none
from .connect import (
    DataQualityDictRead,
    DataQualityDictWrite,
    DataQualityFlagRead,
    DataQualityFlagWrite,
)
from .segments import (
    Segment,
    SegmentList,
)

if TYPE_CHECKING:
    from collections.abc import (
        Callable,
        Iterable,
    )
    from typing import (
        ParamSpec,
        Self,
        SupportsFloat,
        TypeVar,
    )

    import astropy.table
    import igwn_ligolw

    from ...plot import Plot
    from ...time import SupportsToGps

    P = ParamSpec("P")
    R = TypeVar("R")

    SegmentListLike = Iterable[tuple[SupportsFloat, SupportsFloat]]

__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"
__all__ = [
    "DataQualityDict",
    "DataQualityFlag",
]

DEFAULT_SEGMENT_SERVER = get_default_host()

IFO_TAG_VERSION_REGEX = re.compile(
    r"\A(?P<ifo>[A-Z]\d):(?P<tag>[^/]+):(?P<version>\d+)\Z")
IFO_TAG_REGEX = re.compile(r"\A(?P<ifo>[A-Z]\d):(?P<tag>[^/]+)\Z")
TAG_VERSION_REGEX = re.compile(r"\A(?P<tag>[^/]+):(?P<version>\d+)\Z")


# -- utilities -----------------------

def _parse_query_segments(
    args: tuple[
        SegmentList | tuple[SupportsToGps, SupportsToGps]
    ] | tuple[SupportsToGps, SupportsToGps],
    func: Callable,
) -> SegmentList:
    """Parse ``args`` for query_dqsegdb() or query_segdb().

    Returns a SegmentList in all cases
    """
    # user passed SegmentList
    if len(args) == 1 and isinstance(args[0], SegmentList):
        return args[0]

    # otherwise unpack two arguments as a segment
    if len(args) == 1:
        args = args[0]

    # if not two arguments, panic
    try:
        start, end = args
    except ValueError as exc:
        exc.args = (f"{func.__name__}() takes 2 arguments for start and end GPS time, "
                    "or 1 argument containing a Segment or SegmentList",)
        raise

    # return list with one Segment
    return SegmentList([Segment(to_gps(start), to_gps(end))])


def _check_on_error(func: Callable[P, R]) -> Callable[P, R]:
    @wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        on_error = kwargs.get("on_error", "raise")
        # check on_error flag
        if on_error not in {"raise", "warn", "ignore"}:
            msg = (
                "on_error must be one of 'raise', 'warn', or 'ignore', "
                f"not '{on_error}'"
            )
            raise ValueError(msg)
        return func(*args, **kwargs)

    return wrapper



# -- DataQualityFlag -----------------

[docs] class DataQualityFlag: """A representation of a named set of segments. Parameters ---------- name : str, optional The name of this flag. This should be of the form {ifo}:{tag}:{version}, e.g. 'H1:DMT-SCIENCE:1'. Use `label` for human-readable names. active : `SegmentList`, optional A list of active segments for this flag known : `SegmentList`, optional A list of known segments for this flag label : `str`, optional Human-readable name for this flag, e.g. ``'Science-mode'`` category : `int`, optional Veto category for this flag. description : `str`, optional Human-readable description of why this flag was created. isgood : `bool`, optional Do active segments mean the IFO was in a good state? """ _EntryClass: type[Segment] = Segment _ListClass: type[SegmentList] = SegmentList
[docs] def __init__( self, name: str | None = None, active: SegmentListLike | None = None, known: SegmentListLike | None = None, *, label: str | None = None, category: int | None = None, description: str | None = None, isgood: bool = True, padding: tuple[float | None, float | None] | None = None, ) -> None: """Define a new DataQualityFlag.""" self.name = name self.known = known self.active = active self.label = label self.category = category self.description = description self.isgood = isgood self.padding = padding
# -- utilities ------------------- def _to_segmentlist( self, segmentlist: SegmentListLike | None, ) -> SegmentList: """Convert input to SegmentList of the correct type.""" if segmentlist is None: return self._ListClass() return self._ListClass(self._EntryClass(*seg) for seg in segmentlist) # -- properties ------------------ @property def name(self) -> str | None: """The name associated with this flag. This normally takes the form {ifo}:{tag}:{version}. If found, each component is stored separately the associated attributes. :type: `str` """ return self._name @name.setter def name(self, name: str | None) -> None: self._name = name try: self._parse_name(name) except ValueError: self._parse_name(None) @property def ifo(self) -> str | None: """The interferometer associated with this flag. This should be a single uppercase letter and a single number, e.g. ``'H1'``. """ return self._ifo @ifo.setter def ifo(self, ifoname: str | None) -> None: self._ifo = ifoname @property def tag(self) -> str | None: """The tag (name) associated with this flag. This should take the form ``'AAA-BBB_CCC_DDD'``, i.e. where each component is an uppercase acronym of alphanumeric characters only, e.g. ``'DCH-IMC_BAD_CALIBRATION'`` or ``'DMT-SCIENCE'``. """ return self._tag @tag.setter def tag(self, tag: str | None) -> None: self._tag = tag @property def version(self) -> int | None: """The version number of this flag. Each flag in the segment database is stored with a version integer, with each successive version representing a more accurate dataset for its known segments than any previous. """ return self._version @version.setter def version(self, v: int | None) -> None: self._version = if_not_none(int, v) @property def label(self) -> str | None: """A human-readable label for this flag. For example: ``'Science-mode'``. """ return self._label @label.setter def label(self, lab: str | None) -> None: self._label = lab @property def active(self) -> SegmentList: """The set of segments during which this flag was active.""" return self._active @active.setter def active(self, segmentlist: SegmentListLike | None) -> None: self._active = self._to_segmentlist(segmentlist) @active.deleter def active(self) -> None: self._active = self._ListClass() @property def known(self) -> SegmentList: """The segments during which this flag was known.""" return self._known @known.setter def known(self, segmentlist: SegmentListLike | None) -> None: self._known = self._to_segmentlist(segmentlist) @known.deleter def known(self) -> None: self._known = self._ListClass() @property def category(self) -> int | None: """Veto category for this flag.""" return self._category @category.setter def category(self, cat: int | None) -> None: self._category = if_not_none(int, cat) @property def description(self) -> str | None: """Description of why/how this flag was generated.""" return self._description @description.setter def description(self, desc: str | None) -> None: self._description = desc @property def isgood(self) -> bool: """Whether `active` segments mean the instrument was in a good state.""" return self._isgood @isgood.setter def isgood(self, good: bool) -> None: self._isgood = bool(good) @property def padding(self) -> tuple[float, float]: """(start, end) padding for this flag's active segments.""" return self._padding @padding.setter def padding(self, pad: None | tuple[float | None, float | None]) -> None: if pad is None: self._padding = (0, 0) else: self._padding = (float(pad[0] or 0.), float(pad[1] or 0.)) @padding.deleter def padding(self) -> None: self._padding = (0., 0.) # -- read-only properties -------- @property def texname(self) -> str | None: """Name of this flag in LaTeX printable format.""" if self.name is None: return None return self.name.replace("_", r"\_") @property def extent(self) -> Segment: """The GPS ``[start, stop)`` enclosing segment of this `DataQualityFlag`.""" return self.known.extent() @property def livetime(self) -> float: """Amount of time this flag was `active`.""" return abs(self.active) @property def regular(self) -> bool: """`True` if the `active` segments are a proper subset of the `known`.""" return abs(self.active - self.known) == 0 # -- i/o ------------------------- read = UnifiedReadWriteMethod(DataQualityFlagRead) write = UnifiedReadWriteMethod(DataQualityFlagWrite) # -- classmethods ----------------
[docs] @classmethod def query_dqsegdb( cls, flag: str, *args: SupportsToGps | Segment | SegmentList, host: str | None = DEFAULT_SEGMENT_SERVER, **kwargs, ) -> Self: """Query a DQSegDB server for a flag. Parameters ---------- flag : `str` The name of the flag for which to query args Either, two `float`-like numbers indicating the GPS [start, stop) interval, or a `SegmentList` defining a number of summary segments host : `str`, optional Name or URL of the DQSegDB instance to talk to. Defaults to :func:`dqsegdb2.utils.get_default_host`. kwargs All other keyword arguments are passed to :func:`dqsegdb2.query.query_segments`. Returns ------- flag : `DataQualityFlag` A new `DataQualityFlag`, with the `known` and `active` lists filled appropriately. Examples -------- The GPS interval(s) of interest can be passed as two arguments specifing the start and end of a single interval: >>> DataQualityDict.query_dqsegdb(["X1:OBSERVING:1", "Y1:OBSERVING:1"], start, end) Or, as a single `Segment`: >>> DataQualityDict.query_dqsegdb(["X1:OBSERVING:1", "Y1:OBSERVING:1"], interval) Or, as a `SegmentList` specifying multiple intervals. >>> DataQualityDict.query_dqsegdb(["X1:OBSERVING:1", "Y1:OBSERVING:1"], intervals) """ # noqa: E501 # parse arguments qsegs = _parse_query_segments(args, cls.query_dqsegdb) # parse deprecated 'url' keyword as 'host' url = kwargs.pop("url", None) if url: warnings.warn( "the `url` keyword argument for `query_dqsegdb` " "has been renamed `host`; this warning will become " "an error in the future", DeprecationWarning, stacklevel=2, ) host = url # parse flag out = cls(name=flag) if out.ifo is None or out.tag is None: msg = f"Cannot parse ifo or tag (name) for flag '{flag}'" raise ValueError(msg) # process query for start, end in qsegs: # handle infinities if float(end) == +inf: end = int(to_gps("now")) # noqa: PLW2901 # query try: data = query_segments( flag, int(start), int(end), host=host, **kwargs, ) except HTTPError as exc: if exc.response.status_code == http_codes.NOT_FOUND: exc.args = (exc.args[0] + f" [{flag}]",) raise # read from json buffer new = cls.read( BytesIO(json.dumps(data).encode("utf-8")), format="json", ) # restrict to query segments segl = SegmentList([Segment(start, end)]) new.known &= segl new.active &= segl out += new # replace metadata out.description = new.description out.isgood = new.isgood return out
# alias for compatibility query = query_dqsegdb
[docs] @classmethod def fetch_open_data( cls, flag: str, start: SupportsToGps, end: SupportsToGps, **kwargs, ) -> Self: """Fetch Open Data timeline segments into a flag. flag : `str` The name of the flag to query start : `~gwpy.time.LIGOTimeGPS`, `float`, `str` GPS start time of required data, any input parseable by `~gwpy.time.to_gps` is fine end : `~gwpy.time.LIGOTimeGPS`, `float`, `str` GPS end time of required data, any input parseable by `~gwpy.time.to_gps` is fine timeout : `int`, optional Timeout for download (seconds). host : `str`, optional URL of GWOSC host, default: ``'https://gwosc.org'``. Returns ------- flag : `DataQualityFlag` a new flag with `active` segments filled from Open Data Examples -------- >>> from gwpy.segments import DataQualityFlag >>> print(DataQualityFlag.fetch_open_data( ... "H1_DATA", ... "Sep 14 2015", ... "Sep 15 2015", ... )) <DataQualityFlag('H1:DATA', known=[[1126224017 ... 1126310417)] active=[[1126251604 ... 1126252133) [1126252291 ... 1126274322) [1126276234 ... 1126281754) ... [1126308670 ... 1126309577) [1126309637 ... 1126309817) [1126309877 ... 1126310417)] description=None)> """ start = int(to_gps(start)) end = ceil(to_gps(end)) known = [(start, end)] active = timeline.get_segments(flag, start, end, **kwargs) return cls( flag.replace("_", ":", 1), known=known, active=active, label=flag, )
[docs] @classmethod def from_veto_def( cls, veto: igwn_ligolw.lsctables.VetoDef | astropy.table.Row, ) -> Self: """Define a `DataQualityFlag` from a `VetoDef`. Parameters ---------- veto : `~igwn_ligolw.lsctables.VetoDef`, `~astropy.table.Row`. Veto definition to convert from. """ # handle getting by item name (astropy) or attribute name (igwn_ligolw) if isinstance(veto, AstropyTableRow): get = veto.get else: def get(name, default=None): # noqa: ANN001,ANN202 return getattr(veto, name, default) name = f"{get('ifo')}:{get('name')}" with contextlib.suppress(TypeError): name += f":{int(get('version'))}" known = Segment(get("start_time"), get("end_time") or +inf) pad = (get("start_pad") or 0, get("end_pad") or 0) return cls( name=name, known=[known], category=get("category", None), description=get("comment", None), padding=pad, )
# -- methods ---------------------
[docs] def populate( self, source: str | None = DEFAULT_SEGMENT_SERVER, segments: SegmentList | None = None, *, pad: bool = True, **kwargs, ) -> Self: """Query the segment database for this flag's active segments. This method assumes all of the metadata for each flag have been filled. Minimally, the following attributes must be filled .. autosummary:: ~DataQualityFlag.name ~DataQualityFlag.known Segments will be fetched from the database, with any :attr:`~DataQualityFlag.padding` added on-the-fly. This `DataQualityFlag` will be modified in-place. Parameters ---------- source : `str` Source of segments for this flag. This must be either a URL for a segment database or a path to a file on disk. segments : `SegmentList`, optional A list of segments during which to query, if not given, existing known segments for this flag will be used. pad : `bool`, optional, default: `True` apply the `~DataQualityFlag.padding` associated with this flag, default: `True`. **kwargs any other keyword arguments to be passed to :meth:`DataQualityFlag.query` or :meth:`DataQualityFlag.read`. Returns ------- self : `DataQualityFlag` a reference to this flag """ tmp = DataQualityDict() tmp[self.name] = self tmp.populate(source=source, segments=segments, pad=pad, **kwargs) return tmp[self.name]
[docs] def contract(self, x: float) -> SegmentList: """Contract each of the ``active`` segments by ``x`` seconds. This method adds ``x`` to each segment's lower bound, and subtracts ``x`` from the upper bound. The :attr:`~DataQualityFlag.active` `SegmentList` is modified in place. Parameters ---------- x : `float` Number of seconds by which to contract each `Segment`. """ self.active = self.active.contract(x) return self.active
[docs] def protract(self, x: float) -> SegmentList: """Protract each of the ``active`` segments by ``x`` seconds. This method subtracts ``x`` from each segment's lower bound, and adds ``x`` to the upper bound, while maintaining that each `Segment` stays within the `known` bounds. The :attr:`~DataQualityFlag.active` `SegmentList` is modified in place. Parameters ---------- x : `float` Number of seconds by which to protact each `Segment`. """ self.active = self.active.protract(x) return self.active
[docs] def pad( self, *args: float, inplace: bool = False, ) -> Self: """Apply a padding to each segment in this `DataQualityFlag`. This method either takes no arguments, in which case the value of the :attr:`~DataQualityFlag.padding` attribute will be used, or two values representing the padding for the start and end of each segment. For both the `start` and `end` paddings, a positive value means pad forward in time, so that a positive `start` pad or negative `end` padding will contract a segment at one or both ends, and vice-versa. This method will apply the same padding to both the `~DataQualityFlag.known` and `~DataQualityFlag.active` lists, but will not :meth:`~DataQualityFlag.coalesce` the result. Parameters ---------- args : `float`, optional Two floats giving the start and end padding to apply. If not given, `self.padding` will be used. inplace : `bool`, optional Modify this object in-place. Default is `False`, i.e. return a copy of the original object with padded segments. Returns ------- paddedflag : `DataQualityFlag` A view of the modified flag. """ if not args: start, end = self.padding else: start, end = args if inplace: new = self else: new = self.copy() def _pad(s): return type(s)(s[0]+start, s[1]+end) new.known = type(self.known)(map(_pad, self.known)) new.active = type(self.active)(map(_pad, self.active)) return new
[docs] def round(self, *, contract: bool = False) -> Self: """Round this flag to integer segments. Parameters ---------- contract : `bool`, optional If `False` (default) expand each segment to the containing integer boundaries, otherwise contract each segment to the contained boundaries. Returns ------- roundedflag : `DataQualityFlag` A copy of the original flag with the `active` and `known` segments padded out to integer boundaries.. """ def _round(seg: Segment) -> Segment: if contract: # round inwards a = type(seg[0])(ceil(seg[0])) b = type(seg[1])(floor(seg[1])) else: # round outwards a = type(seg[0])(floor(seg[0])) b = type(seg[1])(ceil(seg[1])) if a >= b: # if segment is too short, return 'null' segment return type(seg)(0, 0) # will get coalesced away return type(seg)(a, b) new = self.copy() new.active = type(new.active)(map(_round, new.active)) new.known = type(new.known)(map(_round, new.known)) return new.coalesce()
[docs] def coalesce(self) -> Self: """Coalesce the segments for this flag. This method does two things: - `coalesces <SegmentList.coalesce>` the `~DataQualityFlag.known` and `~DataQualityFlag.active` segment lists - forces the `active` segments to be a proper subset of the `known` segments .. note:: This operation is performed in-place. Returns ------- self A view of this flag, not a copy. """ self.known = self.known.coalesce() self.active = self.active.coalesce() self.active = (self.known & self.active).coalesce() return self
def __repr__(self) -> str: """Return a representation of this flag.""" prefix = f"<{type(self).__name__}(" suffix = ")>" indent = " " * len(prefix) # format segment lists parts = {} for attr in ("known", "active"): parts[attr] = rep = textwrap.indent( str(getattr(self, attr)), f"{indent} ", ).strip().splitlines() if len(rep) > 10: # use ellipsis rep = [*rep[:3], f"{indent} ...", *rep[-3:]] # print the thing return "".join(( prefix, f"{os.linesep}{indent}".join([ f"'{self.name}',", f"known={os.linesep.join(parts['known'])}", f"active={os.linesep.join(parts['active'])}", f"description='{self.description}'", ]), suffix, ))
[docs] def copy(self) -> Self: """Build an exact copy of this flag. Returns ------- flag : `DataQualityFlag` A deepcopy of the original flag. """ return deepcopy(self)
[docs] def plot( self, figsize: tuple[float, float] = (12, 4), xscale: str = "auto-gps", **kwargs, ) -> Plot: """Plot this flag on a segments projection. Parameters ---------- figsize : `tuple` of `float` The size (width, height) of the figure to create. xscale: `str` The scaling to use for the X-axis (time axis). Default is ``"auto-gps"`` to dynamically choose the right scaling based on how much time is covered by the visible span. kwargs Other keyword arguments are passed to the :class:`~gwpy.plot.Plot` constructor. Returns ------- figure : `~matplotlib.figure.Figure` The newly created figure, with populated Axes. See Also -------- matplotlib.pyplot.figure For documentation of keyword arguments used to create the figure. matplotlib.figure.Figure.add_subplot For documentation of keyword arguments used to create the axes gwpy.plot.SegmentAxes.plot_segmentlist For documentation of keyword arguments used in rendering the data. """ from matplotlib import rcParams from ..plot import Plot # get the right default label if "label" not in kwargs: if self.label: kwargs["label"] = self.label elif rcParams["text.usetex"]: kwargs["label"] = self.texname else: kwargs["label"] = self.name return Plot( self, projection="segments", figsize=figsize, xscale=xscale, **kwargs, )
def _parse_name( self, name: str | None, ) -> tuple[str | None, str | None, str | None]: """Parse a flag name and set properties of this flag. Parameters ---------- name : `str`, `None` The full name of a `DataQualityFlag` to parse, e.g. ``"H1:DMT-SCIENCE:1"``, or `None` to set all components to `None`. Returns ------- ifo : `str` or `None` The IFO prefix for this flag. tag : `str` or `None` The name tag for this flag. version : `int` or `None` The version number for this flag. Raises ------ ValueError If the input ``name`` cannot be parsed into {ifo}:{tag}:{version} format. """ if name is None: self.ifo = None self.tag = None self.version = None elif match := IFO_TAG_VERSION_REGEX.match(name): self.ifo = match["ifo"] self.tag = match["tag"] self.version = int(match["version"]) elif match := IFO_TAG_REGEX.match(name): self.ifo = match["ifo"] self.tag = match["tag"] self.version = None elif match := TAG_VERSION_REGEX.match(name): self.ifo = None self.tag = match["tag"] self.version = int(match["version"]) else: msg = ( f"no flag name structure detected in '{name}', flags " "should be named as '{ifo}:{tag}:{version}'; " "for arbitrary strings, use the " "`DataQualityFlag.label` attribute" ) raise ValueError(msg) return self.ifo, self.tag, self.version def __and__(self, other: Self) -> Self: """Find the intersection of this one and ``other``.""" return self.copy().__iand__(other) def __iand__(self, other: Self) -> Self: """Intersect this flag with ``other`` in-place.""" self.known &= other.known self.active &= other.active return self def __sub__(self, other: Self) -> Self: """Find the difference between this flag and another.""" return self.copy().__isub__(other) def __isub__(self, other: Self) -> Self: """Subtract the ``other`` `DataQualityFlag` from this one in-place.""" self.known &= other.known self.active -= other.active self.active &= self.known return self def __or__(self, other: Self) -> Self: """Find the union of this flag and ``other``.""" return self.copy().__ior__(other) def __ior__(self, other: Self) -> Self: """Add the ``other`` `DataQualityFlag` to this one in-place.""" self.known |= other.known self.active |= other.active return self __add__ = __or__ __iadd__ = __ior__ def __xor__(self, other: Self) -> Self: """Find the exclusive OR of this one and ``other``.""" return self.copy().__ixor__(other) def __ixor__(self, other: Self) -> Self: """Exclusive OR this flag with ``other`` in-place.""" self.known &= other.known self.active ^= other.active return self def __invert__(self) -> Self: """Return the logical inverse of this flag.""" new = self.copy() new.active = ~self.active new.active &= new.known return new
[docs] class DataQualityDict(dict): """An `dict` of (key, `DataQualityFlag`) pairs.""" _EntryClass: type[DataQualityFlag] = DataQualityFlag # -- i/o ------------------------- read = UnifiedReadWriteMethod(DataQualityDictRead) write = UnifiedReadWriteMethod(DataQualityDictWrite) # -- classmethods ----------------
[docs] @classmethod @_check_on_error def query_dqsegdb( cls, flags: list[str], *args: SupportsToGps | Segment | SegmentList, host: str | None = DEFAULT_SEGMENT_SERVER, on_error: str = "raise", parallel: int = 10, **kwargs, ) -> Self: """Query the advanced LIGO DQSegDB for a list of flags. Parameters ---------- flags : `iterable` A list of flag names for which to query. args Either, two `float`-like numbers indicating the GPS [start, stop) interval, or a `SegmentList` defining a number of summary segments. host : `str`, optional Name or URL of the DQSegDB instance to talk to. Defaults to :func:`dqsegdb2.utils.get_default_host`. on_error : `str`, optional how to handle an error querying for one flag, one of - `'raise'` (default): raise the Exception - `'warn'`: print a warning - `'ignore'`: move onto the next flag as if nothing happened parallel : `int`, optional Maximum number of threads to use for parallel connections to the DQSegDB host. kwargs All other keyword arguments are passed to :func:`dqsegdb2.query.query_segments`. Returns ------- flagdict : `DataQualityDict` An ordered `DataQualityDict` of (name, `DataQualityFlag`) pairs. Examples -------- The GPS interval(s) of interest can be passed as two arguments specifing the start and end of a single interval: >>> DataQualityDict.query_dqsegdb(["X1:OBSERVING:1", "Y1:OBSERVING:1"], start, end) Or, as a single `Segment`: >>> DataQualityDict.query_dqsegdb(["X1:OBSERVING:1", "Y1:OBSERVING:1"], interval) Or, as a `SegmentList` specifying multiple intervals. >>> DataQualityDict.query_dqsegdb(["X1:OBSERVING:1", "Y1:OBSERVING:1"], intervals) """ # noqa: E501 # parse segments qsegs = _parse_query_segments(args, cls.query_dqsegdb) # thread function def _query(flag: str) -> DataQualityFlag | None: try: return cls._EntryClass.query_dqsegdb( flag, qsegs, host=host, **kwargs, ) except Exception as exc: exc.args = (f"{exc} [{flag}]",) if on_error == "raise": raise if on_error == "warn": warnings.warn(str(exc), stacklevel=2) return None # execute queries in threads out = cls() parallel = min(parallel, len(flags)) with ThreadPoolExecutor( max_workers=parallel, thread_name_prefix=f"{cls.__name__}.query_dqsegdb", ) as pool: for flag in filter(None, pool.map(_query, flags)): out[flag.name] = flag return out
# alias for compatibility query = query_dqsegdb
[docs] @classmethod def from_veto_definer_file( cls, source: str, start: SupportsToGps | None = None, end: SupportsToGps | None = None, ifo: str | None = None, **read_kw, ) -> Self: """Read a `DataQualityDict` from a LIGO_LW XML VetoDefinerTable. Parameters ---------- source : `str`, `Path`, `file` Path or URL of veto definer file to read, or open file. start : `~gwpy.time.LIGOTimeGPS`, `float`, `str` GPS start time of required data, any input parseable by `~gwpy.time.to_gps` is fine end : `~gwpy.time.LIGOTimeGPS`, `float`, `str` GPS end time of required data, any input parseable by `~gwpy.time.to_gps` is fine ifo : `str`, optional Interferometer prefix whose flags you want to read. Default is `None` (read all flags). read_kw Other keyword arguments are passed to `~astropy.table.Table.read` when reading the veto definer file. Returns ------- flags : `DataQualityDict` A `DataQualityDict` of flags parsed from the `veto_def_table` of the input file. Notes ----- This method does not automatically `~DataQualityDict.populate` the `active` segment list of any flags, a separate call should be made for that as follows >>> flags = DataQualityDict.from_veto_definer_file('/path/to/file.xml') >>> flags.populate() """ from ..table import Table # read veto definer file as a table read_kw.setdefault("tablename", "veto_definer") read_kw.setdefault("columns", [ "ifo", "name", "version", "category", "comment", "start_time", "start_pad", "end_time", "end_pad", ]) veto_def_table = Table.read(source, **read_kw) # handle GPS types startgps = None if start is None else to_gps(start) endgps = None if end is None else to_gps(end) def _keep(row: astropy.table.Row) -> bool: """Return `True` if this row is relevant.""" if ifo and row["ifo"] != ifo: return False if startgps and 0 < row["end_time"] <= startgps: return False return not (endgps and row["start_time"] >= endgps) # parse flag definitions out = cls() for row in filter(_keep, veto_def_table): # pin the times if startgps: row["start_time"] = max(row["start_time"], startgps) if endgps and not row["end_time"]: row["end_time"] = endgps elif endgps: row["end_time"] = min(row["end_time"], endgps) flag = DataQualityFlag.from_veto_def(row) if flag.name in out: out[flag.name] |= flag else: out[flag.name] = flag return out
[docs] @classmethod def from_ligolw_tables( cls, segmentdeftable: igwn_ligolw.lsctables.SegmentDefTable, segmentsumtable: igwn_ligolw.lsctables.SegmentSumTable, segmenttable: igwn_ligolw.lsctables.SegmentTable, names: list[str] | None = None, gpstype: type[SupportsFloat] | Callable[[float], SupportsFloat] = LIGOTimeGPS, on_missing: str = "error", ) -> Self: """Build a `DataQualityDict` from a set of LIGO_LW segment tables. Parameters ---------- segmentdeftable : :class:`~igwn_ligolw.lsctables.SegmentDefTable` The ``segment_definer`` table to read. segmentsumtable : :class:`~igwn_ligolw.lsctables.SegmentSumTable` The ``segment_summary`` table to read. segmenttable : :class:`~igwn_ligolw.lsctables.SegmentTable` The ``segment`` table to read. names : `list` of `str`, optional A list of flag names to read. Default is to read all names. gpstype : `type`, `callable`, optional Class to use for GPS times in returned objects, can be a function to convert GPS time to something else. Default is `~gwpy.time.LIGOTimeGPS`. on_missing : `str`, optional Action to take when a one or more ``names`` are not found in the ``segment_definer`` table, one of - ``'ignore'`` : do nothing - ``'warn'`` : print a warning - ``error'`` : raise a `ValueError` Returns ------- dqdict : `DataQualityDict` A dict of `DataQualityFlag` objects populated from the LIGO_LW tables. """ out = cls() id_: dict[str, list[int]] = {} # need to record relative IDs from LIGO_LW # read segment definers and generate DataQualityFlag object for row in segmentdeftable: ifos = sorted(row.instruments) ifo = "".join(ifos) if ifos else None tag = row.name version = row.version name = ":".join( str(k) for k in (ifo, tag, version) if k is not None ) if names is None or name in names: out[name] = DataQualityFlag(name) thisid = int(row.segment_def_id) try: id_[name].append(thisid) except KeyError: id_[name] = [thisid] # verify all requested flags were found for flag in names or []: if flag not in out and on_missing != "ignore": msg = f"no segment definition found for flag='{flag}' in file" if on_missing == "warn": warnings.warn(msg, stacklevel=2) else: raise ValueError(msg) def _parse_segments( table: igwn_ligolw.ligolw.Table, listattr: str, ) -> None: """Parse a table into the target DataQualityDict.""" # handle missing *_ns columns in LIGO_LW XML # (LIGO DMT doesn't/didn't write them) if "start_time_ns" in table.columnnames: row_segment = operator.attrgetter("segment") else: row_segment = operator.attrgetter("start_time", "end_time") for row in table: for flag in out: # match row ID to list of IDs found for this flag if int(row.segment_def_id) in id_[flag]: getattr(out[flag], listattr).append( Segment(*map(gpstype, row_segment(row))), ) break # read segment summary table as 'known' _parse_segments(segmentsumtable, "known") # read segment table as 'active' _parse_segments(segmenttable, "active") return out
[docs] def to_ligolw_tables( self, **attrs, ) -> tuple[ igwn_ligolw.lsctables.SegmentDefTable, igwn_ligolw.lsctables.SegmentSumTable, igwn_ligolw.lsctables.SegmentTable, ]: """Convert this `DataQualityDict` into a trio of LIGO_LW segment tables. Parameters ---------- attrs Other attributes to add to all rows in all tables (e.g. ``'process_id'``). Returns ------- segmentdeftable : :class:`~igwn_ligolw.lsctables.SegmentDefTable` The ``segment_definer`` table. segmentsumtable : :class:`~igwn_ligolw.lsctables.SegmentSumTable` The ``segment_summary`` table. segmenttable : :class:`~igwn_ligolw.lsctables.SegmentTable` The ``segment`` table. """ from igwn_ligolw import lsctables from ..io.ligolw import to_table_type as to_ligolw_table_type segdeftab_class = lsctables.SegmentDefTable segsumtab_class = lsctables.SegmentSumTable segtab_class = lsctables.SegmentTable segdeftab = segdeftab_class.new() segsumtab = segsumtab_class.new() segtab = segtab_class.new() def _write_attrs( table: igwn_ligolw.ligolw.Table, row: igwn_ligolw.ligolw.Row, ) -> None: """Write custom attributes to this row.""" for key, val in attrs.items(): setattr(row, key, to_ligolw_table_type(val, table, key)) # write flags to tables for flag in self.values(): # segment definer segdef = segdeftab.RowType() for col in segdeftab.columnnames: # default all columns to None setattr(segdef, col, None) segdef.instruments = {flag.ifo} segdef.name = flag.tag segdef.version = flag.version segdef.comment = flag.description segdef.insertion_time = to_gps("now").gpsSeconds segdef.segment_def_id = segdeftab_class.get_next_id() _write_attrs(segdeftab, segdef) segdeftab.append(segdef) # write segment summary (known segments) for vseg in flag.known: segsum = segsumtab.RowType() for col in segsumtab.columnnames: # default columns to None setattr(segsum, col, None) segsum.segment_def_id = segdef.segment_def_id segsum.segment = map(LIGOTimeGPS, vseg) segsum.comment = None segsum.segment_sum_id = segsumtab_class.get_next_id() _write_attrs(segsumtab, segsum) segsumtab.append(segsum) # write segment table (active segments) for aseg in flag.active: seg = segtab.RowType() for col in segtab.columnnames: # default all columns to None setattr(seg, col, None) seg.segment_def_id = segdef.segment_def_id seg.segment = map(LIGOTimeGPS, aseg) seg.segment_id = segtab_class.get_next_id() _write_attrs(segtab, seg) segtab.append(seg) return segdeftab, segsumtab, segtab
# -- methods ---------------------
[docs] def coalesce(self) -> Self: """Coalesce all segments lists in this `DataQualityDict`. **This method modifies this object in-place.** Returns ------- self A view of this flag, not a copy. """ for flag in self: self[flag].coalesce() return self
[docs] @_check_on_error def populate( self, source: str | None = DEFAULT_SEGMENT_SERVER, segments: SegmentList | None = None, *, pad: bool = True, on_error: str = "raise", **kwargs, ) -> Self: """Query the segment database for each flag's active segments. This method assumes all of the metadata for each flag have been filled. Minimally, the following attributes must be filled .. autosummary:: ~DataQualityFlag.name ~DataQualityFlag.known Segments will be fetched from the database, with any :attr:`~DataQualityFlag.padding` added on-the-fly. Entries in this dict will be modified in-place. Parameters ---------- source : `str` Source of segments for this flag. This must be either a URL for a segment database or a path to a file on disk. segments : `SegmentList`, optional A list of known segments during which to query, if not given, existing known segments for flags will be used. pad : `bool`, optional, default: `True` Apply the `~DataQualityFlag.padding` associated with each flag, default: `True`. on_error : `str` How to handle an error querying for one flag, one of - `'raise'` (default): raise the Exception - `'warn'`: print a warning - `'ignore'`: move onto the next flag as if nothing happened kwargs Any other keyword arguments to be passed to :meth:`DataQualityFlag.query` or :meth:`DataQualityFlag.read`. Returns ------- self : `DataQualityDict` A reference to the modified DataQualityDict """ # format source url = urlparse(source) # query DQSegDB if the URL is just a hostname use_dqsegdb = bool(url.netloc and url.path in ("", "/")) # if given the segments to use, query DQSegDB for everthing now if not use_dqsegdb: tmp = type(self).read(source, **kwargs) elif segments is not None: segments = SegmentList(map(Segment, segments)) tmp = type(self).query_dqsegdb( self.keys(), segments, host=source, on_error=on_error, **kwargs, ) # apply padding and wrap to given known segments for key in self: if use_dqsegdb and segments is None: # get segments for this flag now try: tmp = {key: self[key].query_dqsegdb( self[key].name, self[key].known, host=source, **kwargs, )} except RequestException as exc: if on_error == "raise": raise if on_error == "warn": warnings.warn( f"Error querying for '{key}': {exc}", stacklevel=2, ) continue self[key].known &= tmp[key].known self[key].active = tmp[key].active if pad: self[key] = self[key].pad(inplace=True) if segments is not None: self[key].known &= segments self[key].active &= segments return self
[docs] def copy(self, *, deep: bool = False) -> Self: """Build a copy of this dictionary. Parameters ---------- deep : `bool`, optional, default: `False` perform a deep copy of the original dictionary with a fresh memory address Returns ------- flag2 : `DataQualityFlag` a copy of the original dictionary """ if deep: return deepcopy(self) return shallowcopy(self)
def __iand__(self, other: Self) -> Self: """Intersect this dict with ``other`` in-place.""" for key, value in other.items(): if key in self: self[key] &= value else: self[key] = self._EntryClass() return self def __and__(self, other: Self) -> Self: """Find the intersection of this dict and ``other``.""" if ( sum(len(s.active) for s in self.values()) <= sum(len(s.active) for s in other.values()) ): return self.copy(deep=True).__iand__(other) return other.copy(deep=True).__iand__(self) def __ior__(self, other: Self) -> Self: """Add the ``other`` `DataQualityDict` to this one in-place.""" for key, value in other.items(): if key in self: self[key] |= value else: self[key] = shallowcopy(value) return self def __or__(self, other: Self) -> Self: """Find the union of this dict and ``other``.""" if ( sum(len(s.active) for s in self.values()) >= sum(len(s.active) for s in other.values()) ): return self.copy(deep=True).__ior__(other) return other.copy(deep=True).__ior__(self) __iadd__ = __ior__ __add__ = __or__ def __isub__(self, other: Self) -> Self: """Subtract the ``other`` `DataQualityDict` from this one in-place.""" for key, value in other.items(): if key in self: self[key] -= value return self def __sub__(self, other: Self) -> Self: """Find the difference between this dict and another.""" return self.copy(deep=True).__isub__(other) def __ixor__(self, other: Self) -> Self: """Exclusive OR this dict with ``other`` in-place.""" for key, value in other.items(): if key in self: self[key] ^= value return self def __xor__(self, other: Self) -> Self: """Find the exclusive OR of this dict and ``other``.""" return self.copy(deep=True).__ixor__(other) def __invert__(self) -> Self: """Return the logical inverse of this dict.""" new = self.copy(deep=True) for key, value in new.items(): new[key] = ~value return new
[docs] def union(self) -> DataQualityFlag: """Return the union of all flags in this dict. Returns ------- union : `DataQualityFlag` a new `DataQualityFlag` who's active and known segments are the union of those of the values of this dict """ usegs = reduce(operator.or_, self.values()) usegs.name = " | ".join(self.keys()) return usegs
[docs] def intersection(self) -> DataQualityFlag: """Return the intersection of all flags in this dict. Returns ------- intersection : `DataQualityFlag` a new `DataQualityFlag` who's active and known segments are the intersection of those of the values of this dict """ isegs = reduce(operator.and_, self.values()) isegs.name = " & ".join(self.keys()) return isegs
[docs] def plot( self, label: str = "key", **kwargs, ) -> Plot: """Plot this dict on a segments projection. Parameters ---------- label : `str`, optional Labelling system to use, or fixed label for all flags, special values include - ``'key'``: use the key of the `DataQualityDict`, - ``'name'``: use the :attr:`~DataQualityFlag.name` of the flag If anything else, that fixed label will be used for all lines. kwargs All keyword arguments are passed to the :class:`~gwpy.plot.Plot` constructor. Returns ------- figure : `~matplotlib.figure.Figure` the newly created figure, with populated Axes. See Also -------- matplotlib.pyplot.figure For documentation of keyword arguments used to create the figure. matplotlib.figure.Figure.add_subplot For documentation of keyword arguments used to create the axes. gwpy.plot.SegmentAxes.plot_segmentlist For documentation of keyword arguments used in rendering the data. """ # make plot from ..plot import Plot return Plot( [self], projection="segments", label=label, **kwargs, )