Source code for gwexpy.interop.openems_

"""
gwexpy.interop.openems_
-----------------------

Interoperability with openEMS field dump HDF5 files.

openEMS writes electromagnetic field data via ``CSPropDumpBox``.
The HDF5 layout is::

    /Mesh/
        x   # 1-D node coordinates along x
        y
        z
    /FieldData/
        TD/          # time-domain dumps
            <step_0> # 4-D array (Nx, Ny, Nz, 3)  — 3 = x/y/z components
            <step_1>
            ...
        FD/          # frequency-domain dumps
            f0_real  # 4-D (Nx, Ny, Nz, 3)
            f0_imag
            f1_real
            ...

The DumpType value (set in the CSXCAD ``AddDump`` call) controls which physical
quantity is stored and in which domain.  Only ``h5py`` is required; the openEMS
Python bindings are not needed.

HDF5 dataset attributes
-----------------------
If time-domain datasets carry a ``"Time"`` attribute (float, seconds), the
returned axis0 uses those physical time values.  If frequency-domain datasets
carry a ``"frequency"`` attribute (float, Hz), axis0 uses those physical
frequency values.  Both fall back to integer indices when the attribute is
absent.

References
----------
https://openems.de/index.php/HDF5_Field_Dumps.html
"""

from __future__ import annotations

import re
from pathlib import Path
from typing import Any, Literal

import numpy as np

from gwexpy.fields import ScalarField, VectorField

from ._optional import require_optional

__all__ = ["from_openems_hdf5", "DUMP_TYPE_MAP"]

# ---------------------------------------------------------------------------
# DumpType → (quantity_name, unit_string, domain)
# ---------------------------------------------------------------------------

DUMP_TYPE_MAP: dict[int, tuple[str, str, str]] = {
    0: ("E-field", "V/m", "time"),
    1: ("H-field", "A/m", "time"),
    2: ("current density J", "A/m2", "time"),
    3: ("current density rot(H)", "A/m2", "time"),
    10: ("E-field", "V/m", "frequency"),
    11: ("H-field", "A/m", "frequency"),
    20: ("SAR local", "W/kg", "frequency"),
    21: ("SAR 1g", "W/kg", "frequency"),
    22: ("SAR 10g", "W/kg", "frequency"),
}

# SAR types produce scalar (no spatial component axis)
_SAR_DUMP_TYPES = {20, 21, 22}

# Component axis → label
_COMP_LABELS = {0: "x", 1: "y", 2: "z"}


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------


def _read_openems_mesh(
    h5file: Any,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Read mesh node coordinates from an openEMS HDF5 file.

    Parameters
    ----------
    h5file : h5py.File
        Open HDF5 file.

    Returns
    -------
    tuple of ndarray
        ``(x, y, z)`` coordinate arrays (1-D each).

    Raises
    ------
    KeyError
        If the ``/Mesh`` group or any coordinate dataset is absent.
    """
    mesh = h5file["Mesh"]
    x = np.asarray(mesh["x"], dtype=np.float64)
    y = np.asarray(mesh["y"], dtype=np.float64)
    z = np.asarray(mesh["z"], dtype=np.float64)
    return x, y, z


def _read_openems_td(
    h5file: Any,
    timestep: int | None,
) -> tuple[np.ndarray, np.ndarray]:
    """Load time-domain field data from ``/FieldData/TD``.

    Parameters
    ----------
    h5file : h5py.File
        Open HDF5 file.
    timestep : int or None
        If an integer, load that single time step.
        If ``None``, concatenate all steps along a new leading axis.

    Returns
    -------
    data : ndarray
        Shape ``(n_time, Nx, Ny, Nz, 3)`` or ``(1, Nx, Ny, Nz, 3)``.
    times : ndarray
        Time values (1-D, length ``n_time``), taken from dataset names or
        sequential integers when attributes are unavailable.
    """
    td = h5file["FieldData/TD"]
    # Sort keys numerically
    keys = sorted(td.keys(), key=lambda k: int(k) if k.isdigit() else 0)

    if not keys:
        raise ValueError("No time steps found in /FieldData/TD.")

    if timestep is not None:
        key = str(timestep)
        if key not in td:
            raise ValueError(
                f"Timestep {timestep!r} not found in /FieldData/TD. Available: {keys}"
            )
        arr = np.asarray(td[key], dtype=np.float64)[
            np.newaxis, ...
        ]  # (1, Nx, Ny, Nz, 3)
        t_attr = td[key].attrs.get("Time", None)
        times = np.array(
            [float(t_attr) if t_attr is not None else timestep], dtype=np.float64
        )
    else:
        slices = [np.asarray(td[k], dtype=np.float64) for k in keys]
        arr = np.stack(slices, axis=0)  # (n_time, Nx, Ny, Nz, 3)
        # Try to read physical time values from dataset attributes
        time_vals = [td[k].attrs.get("Time", None) for k in keys]
        if all(v is not None for v in time_vals):
            times = np.array([float(v) for v in time_vals], dtype=np.float64)
        else:
            times = np.arange(len(keys), dtype=np.float64)

    return arr, times


def _read_openems_fd(
    h5file: Any,
    freq_idx: int | None,
) -> tuple[np.ndarray, np.ndarray]:
    """Load frequency-domain field data from ``/FieldData/FD``.

    Parameters
    ----------
    h5file : h5py.File
        Open HDF5 file.
    freq_idx : int or None
        If an integer, load that single frequency index.
        If ``None``, concatenate all frequencies.

    Returns
    -------
    data : ndarray
        Complex array of shape ``(n_freq, Nx, Ny, Nz, 3)`` or
        ``(1, Nx, Ny, Nz, 3)``.
    freqs : ndarray
        Frequency index values (1-D).
    """
    fd = h5file["FieldData/FD"]
    # Collect frequency indices from dataset names like "f0_real", "f1_real"
    real_keys = {k for k in fd.keys() if re.match(r"f\d+_real$", k)}
    indices = sorted(
        {
            int(match.group(1))
            for k in real_keys
            for match in [re.search(r"f(\d+)_real", k)]
            if match is not None
        }
    )

    if not indices:
        raise ValueError("No frequency datasets found in /FieldData/FD.")

    if freq_idx is not None:
        if freq_idx not in indices:
            raise ValueError(
                f"Frequency index {freq_idx} not found. Available: {indices}"
            )
        rk = f"f{freq_idx}_real"
        ik = f"f{freq_idx}_imag"
        real = np.asarray(fd[rk], dtype=np.float64)
        imag = np.asarray(fd[ik], dtype=np.float64)
        arr = (real + 1j * imag)[np.newaxis, ...]  # (1, Nx, Ny, Nz, 3)
        f_attr = fd[rk].attrs.get("frequency", None)
        freqs = np.array(
            [float(f_attr) if f_attr is not None else freq_idx], dtype=np.float64
        )
    else:
        slices = []
        freq_vals = []
        for idx in indices:
            rk = f"f{idx}_real"
            real = np.asarray(fd[rk], dtype=np.float64)
            imag = np.asarray(fd[f"f{idx}_imag"], dtype=np.float64)
            slices.append(real + 1j * imag)
            f_attr = fd[rk].attrs.get("frequency", None)
            freq_vals.append(float(f_attr) if f_attr is not None else None)
        arr = np.stack(slices, axis=0)  # (n_freq, Nx, Ny, Nz, 3)
        if all(v is not None for v in freq_vals):
            freqs = np.array(freq_vals, dtype=np.float64)
        else:
            freqs = np.array(indices, dtype=np.float64)

    return arr, freqs


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------


[docs] def from_openems_hdf5( cls: type, filepath: str | Path, *, dump_type: int = 0, timestep: int | None = None, frequency_index: int | None = None, component: Literal["x", "y", "z"] | None = None, unit: Any | None = None, ) -> ScalarField | VectorField: """Read an openEMS field dump HDF5 file into a ScalarField or VectorField. openEMS dumps field data as HDF5 files containing mesh coordinates under ``/Mesh/x``, ``/Mesh/y``, ``/Mesh/z`` and field values under ``/FieldData/TD`` (time domain) or ``/FieldData/FD`` (frequency domain). Each time step or frequency bin stores a 4-D array of shape ``(Nx, Ny, Nz, 3)`` where the last axis holds the x, y, z components. Parameters ---------- cls : type ``ScalarField`` or ``VectorField`` class. filepath : str or Path Path to the HDF5 dump file. dump_type : int, default 0 openEMS ``DumpType`` value used when the dump was configured. Determines the physical quantity and domain: - 0 → E-field, time domain - 1 → H-field, time domain - 10 → E-field, frequency domain - 11 → H-field, frequency domain - 20-22 → SAR (scalar, frequency domain) timestep : int, optional For time-domain dumps: which step index to extract. ``None`` loads all steps along axis 0. frequency_index : int, optional For frequency-domain dumps: which frequency index to extract. ``None`` loads all frequencies along axis 0. component : {"x", "y", "z"}, optional Return a single vector component as ``ScalarField``. ``None`` returns all three components as ``VectorField``. unit : str or astropy.units.Unit, optional Override the physical unit. Defaults to the unit implied by *dump_type* (see ``DUMP_TYPE_MAP``). Returns ------- ScalarField When *component* is given, dump type is SAR, or *cls* is ``ScalarField``. VectorField When *component* is ``None`` and *cls* is ``VectorField``. Raises ------ ValueError If the required field group is absent, or if a requested time step / frequency index does not exist. ImportError If ``h5py`` is not installed. Examples -------- Read all components of a time-domain E-field dump: >>> from gwexpy.fields import VectorField >>> vf = VectorField.from_openems_hdf5("e_dump.h5", dump_type=0) Read only the z-component: >>> from gwexpy.fields import ScalarField >>> sf = ScalarField.from_openems_hdf5("e_dump.h5", component="z") """ h5py = require_optional("h5py") filepath = Path(filepath) type_info = DUMP_TYPE_MAP.get(dump_type, (f"DumpType{dump_type}", "", "time")) _quantity, auto_unit_str, domain = type_info axis0_domain: Literal["time", "frequency"] = ( "frequency" if domain == "frequency" else "time" ) effective_unit = unit if unit is not None else (auto_unit_str or None) with h5py.File(str(filepath), "r") as f: # Validate required groups if "Mesh" not in f: raise ValueError(f"'Mesh' group not found in '{filepath}'.") x_coords, y_coords, z_coords = _read_openems_mesh(f) is_sar = dump_type in _SAR_DUMP_TYPES if axis0_domain == "time": if "FieldData/TD" not in f: raise ValueError( f"'/FieldData/TD' group not found in '{filepath}'. " "Check that dump_type is correct (time-domain types: 0,1,2,3)." ) data, axis0_vals = _read_openems_td(f, timestep) else: if "FieldData/FD" not in f: raise ValueError( f"'/FieldData/FD' group not found in '{filepath}'. " "Check that dump_type is correct (freq-domain types: 10,11,20-22)." ) data, axis0_vals = _read_openems_fd(f, frequency_index) # data shape: (n_axis0, Nx, Ny, Nz, 3) for vector, or (n_axis0, Nx, Ny, Nz) for SAR # SAR data has no component axis — treat as scalar if is_sar or data.ndim == 4: # Already (n_axis0, Nx, Ny, Nz) return ScalarField( data, unit=effective_unit, axis0=axis0_vals, axis0_domain=axis0_domain, axis1=x_coords, axis2=y_coords, axis3=z_coords, ) # Vector data: (n_axis0, Nx, Ny, Nz, 3) comp_idx = {"x": 0, "y": 1, "z": 2} if component is not None: if component not in comp_idx: raise ValueError( f"Invalid component {component!r}. Expected 'x', 'y', or 'z'." ) c = comp_idx[component] arr = data[..., c] # (n_axis0, Nx, Ny, Nz) return ScalarField( arr, unit=effective_unit, axis0=axis0_vals, axis0_domain=axis0_domain, axis1=x_coords, axis2=y_coords, axis3=z_coords, ) # Build one ScalarField per component scalar_fields: dict[str, ScalarField] = {} for ci, label in _COMP_LABELS.items(): arr = data[..., ci] # (n_axis0, Nx, Ny, Nz) scalar_fields[label] = ScalarField( arr, unit=effective_unit, axis0=axis0_vals, axis0_domain=axis0_domain, axis1=x_coords, axis2=y_coords, axis3=z_coords, ) if issubclass(cls, VectorField): return VectorField(scalar_fields) # Fallback: return x component as ScalarField return scalar_fields["x"]