Note

このページは Jupyter Notebook から生成されました。 ノートブックをダウンロード (.ipynb)

GBD 形式の I/O: GRAPHTEC データロガー・ファイルの読み書き

Open In Colab

このチュートリアルでは、GBD ファイルの読み込みと書き出しについて解説します。GBD は、KAGRA 等の実験において PEM(環境モニタリング)に広く使用されている GRAPHTEC 製データロガー (GL シリーズ) のネイティブバイナリ形式です。

GBD とは?

GBD (GRAPHTEC Binary Data) は以下のような特徴を持つ独自形式です:

  • メタデータを含む ASCII ヘッダー(チャンネル名、サンプリングレート、ローカル時刻、振幅レンジ等)

  • バイナリデータブロック(モデルにより float32 または int16)

  • マルチチャンネル記録(最大 32 チャンネル同時記録)

  • 任意のデジタル(ロジック)チャンネル

レガシー移行ノート:
これまでの KAGRA リポジトリで使われていた gbd2gwf.pyread_gbd.py などのスクリプトは、gwexpy.timeseries.io.gbd に統合されました。
詳細は API_MAPPING.md カテゴリ 1 を参照してください。
[ ]:
import io
import struct
import tempfile
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
from astropy import units as u

from gwexpy.timeseries import TimeSeries, TimeSeriesDict, TimeSeriesMatrix

1. デモンストレーション用合成 GBD ファイルの作成

実機がなくても実行できるように、最小限の有効な GBD ファイルを合成します。

GBD の構造は以下の通りです:

[ASCII ヘッダー]  (HeaderSize バイトの固定長)
[バイナリデータ]   (サンプル数 × チャンネル数 × itemsize バイト)
[ ]:
def make_synthetic_gbd(path: Path, n_samples: int = 10000, fs: float = 1000.0):
    """最小限の合成 GBD ファイル (GRAPHTEC GL シリーズ形式) を書き出します。"""
    rng = np.random.default_rng(7)
    t = np.arange(n_samples) / fs

    ch0 = 2.0 * np.sin(2 * np.pi * 10.0 * t) + 0.1 * rng.normal(0, 1, n_samples)   # 10 Hz
    ch1 = 1.5 * np.sin(2 * np.pi * 50.0 * t) + 0.1 * rng.normal(0, 1, n_samples)   # 50 Hz
    ch2 = np.where(np.sin(2 * np.pi * 1.0 * t) > 0, 1.0, 0.0).astype(np.float32)   # デジタルトリガー

    local_start = "2023-05-20 14:30:00.000"
    local_stop  = f"2023-05-20 14:30:{n_samples / fs:.3f}"

    header_lines = [
        "[Header]",
        f"HeaderSize = 1024",
        f"Start = {local_start}",
        f"Stop  = {local_stop}",
        f"Sample = {1000.0 / fs:.3f}ms",
        "Type = little,float32",
        "Order = CH0, CH1, CH2_LOGIC",
        f"Counts = {n_samples}",
        "$AMP",
        "CH0 = , , 10V",
        "CH1 = , , 5V",
        "CH2_LOGIC = , , 1V",
    ]
    header_str = "\r\n".join(header_lines) + "\r\n"
    header_bytes = header_str.encode("ascii")
    # 1024 バイトまでパディング
    header_bytes = header_bytes.ljust(1024, b"\x00")

    # データブロック: インターリーブされた float32 (サンプル数 × チャンネル数)
    data = np.stack([ch0, ch1, ch2], axis=1).astype(np.float32)  # (N, 3)

    with open(path, "wb") as f:
        f.write(header_bytes)
        f.write(data.tobytes())

    print(f"作成完了: {path}  ({path.stat().st_size / 1024:.1f} KB)")


# 一時ディレクトリに書き出し
tmpdir = Path(tempfile.mkdtemp())
gbd_path = tmpdir / "synthetic_pem.gbd"
make_synthetic_gbd(gbd_path, n_samples=10000, fs=1000.0)

2. TimeSeriesDict への読み込み

TimeSeriesDict.read()format='gbd' を指定します。GBD 内の時刻はローカルタイムで記録されているため、timezone パラメータが必要になります。

[ ]:
# 全チャンネルの読み込み
tsd = TimeSeriesDict.read(
    str(gbd_path),
    format='gbd',
    timezone='Asia/Tokyo',   # JST (UTC+9) – GPS 時刻への変換に必要
)

print("読み込んだチャンネル:", list(tsd.keys()))
for name, ts in tsd.items():
    print(f"  {name:15s}: {len(ts.value):6d} samples, dt={ts.dt}, unit={ts.unit}")
[ ]:
# 特定のチャンネルのみ読み込み
tsd_subset = TimeSeriesDict.read(
    str(gbd_path),
    format='gbd',
    timezone='Asia/Tokyo',
    channels=['CH0', 'CH1'],   # アナログチャンネルのみ
)

print("フィルタ後のチャンネル:", list(tsd_subset.keys()))
[ ]:
# 単一の TimeSeries として読み込み
ts_ch0 = TimeSeries.read(
    str(gbd_path),
    format='gbd',
    timezone='Asia/Tokyo',
    channels=['CH0'],
)

print(f"CH0: t0={ts_ch0.t0:.3f}, dt={ts_ch0.dt}, N={len(ts_ch0.value)}")

ts_ch0.plot(xscale='seconds');

3. GBD データに対する解析

読み込まれたデータは標準的な gwexpy オブジェクトであるため、すべての解析メソッドが利用可能です。

[ ]:
ts_ch0 = tsd['CH0']
ts_ch1 = tsd['CH1']

# ASD 比較
asd_ch0 = ts_ch0.asd(fftlength=1.0, overlap=0.5)
asd_ch1 = ts_ch1.asd(fftlength=1.0, overlap=0.5)

fig, ax = plt.subplots(figsize=(8, 4))
ax.loglog(asd_ch0.frequencies.value, asd_ch0.value, label='CH0 (10 Hz)')
ax.loglog(asd_ch1.frequencies.value, asd_ch1.value, label='CH1 (50 Hz)')
ax.axvline(10,  color='C0', linestyle='--', alpha=0.5)
ax.axvline(50,  color='C1', linestyle='--', alpha=0.5)
ax.set_xlabel('Frequency [Hz]')
ax.set_ylabel(f'ASD [{asd_ch0.unit}]')
ax.set_title('GBD Channel ASD')
ax.legend()
ax.grid(True, which='both', alpha=0.3)
plt.tight_layout()
[ ]:
# スペクトログラム (CH0)
sg = ts_ch0.spectrogram(stride=0.5, fftlength=0.4, overlap=0.2)

plot = sg.plot()
ax = plot.gca()
ax.set_title('CH0 Spectrogram')
plot.colorbar(mappable=plt.gca().get_images()[-1] if plt.gca().get_images() else plt.gca().collections[-1], label=f'PSD [V²/Hz]');
[ ]:
# デジタル(ロジック)チャンネル: トリガーとして使用
ts_logic = tsd['CH2_LOGIC']

# 立ち上がりエッジ (0→1) を検出
logic_val = ts_logic.value
transitions = np.where(np.diff(logic_val) > 0.5)[0]

t_axis = np.arange(len(logic_val)) * ts_logic.dt.value
trigger_times = t_axis[transitions]

print(f"ロジックトリガー回数: {len(trigger_times)}")

fig, axes = plt.subplots(2, 1, figsize=(10, 5), sharex=True)
axes[0].plot(t_axis, ts_ch0.value, lw=0.5, label='CH0')
for tt in trigger_times[:5]:
    axes[0].axvline(tt, color='red', alpha=0.6, lw=1.0)
axes[0].set_ylabel('Amplitude [V]')
axes[0].legend()

axes[1].step(t_axis, logic_val, where='post', color='C2', lw=1.0, label='Logic trigger')
axes[1].set_xlabel('Time [s]')
axes[1].set_ylabel('State')
axes[1].set_ylim(-0.1, 1.3)
axes[1].legend()

plt.suptitle('CH0 with Logic Trigger Overlay')
plt.tight_layout()

4. TimeSeriesMatrix による読み込み

一括分析を行う場合は、TimeSeriesMatrix に直接読み込むことができます。

[ ]:
# 全アナログチャンネルを Matrix として読み込み
tsm = TimeSeriesMatrix.read(
    str(gbd_path),
    format='gbd',
    timezone='Asia/Tokyo',
    channels=['CH0', 'CH1'],
)

print("TimeSeriesMatrix shape:", tsm.shape)  # (2, 1, N)

# 全チャンネルの ASD を一括計算
asd_all = tsm.asd(fftlength=1.0, overlap=0.5)
asd_all.plot(subplots=True);

5. タイムゾーンの扱い

GBD ファイルにはローカル時刻が格納されています。gwexpy は timezone パラメータを使用してこれらを自動的に GPS 時刻(UTC ベース)に変換します。

タイムゾーン

IANA 名

UTC オフセット

日本標準時 (KAGRA)

'Asia/Tokyo'

+09:00

米国東部 (LIGO Hanford/LLO)

'US/Eastern'

−05:00 / −04:00

イタリア/欧州 (Virgo)

'Europe/Rome'

+01:00 / +02:00

UTC

'UTC'

±00:00

[ ]:
# タイムゾーン指定による GPS 時刻の違いを確認
tsd_jst = TimeSeriesDict.read(str(gbd_path), format='gbd', timezone='Asia/Tokyo')
tsd_utc = TimeSeriesDict.read(str(gbd_path), format='gbd', timezone='UTC')

t0_jst = tsd_jst['CH0'].t0
t0_utc = tsd_utc['CH0'].t0
print(f"t0 (JST→GPS): {t0_jst:.3f}")
print(f"t0 (UTC→GPS): {t0_utc:.3f}")
print(f"差分: {(t0_jst - t0_utc).value:.1f} s  (期待値: 9×3600 = 32400 s)")

6. 移行ガイド: 従来の gbd2gwf.py → gwexpy

Before (従来のスクリプト):

# gbd2gwf.py スタイル
import numpy as np
from gwpy.timeseries import TimeSeries, TimeSeriesDict

# 手動でのヘッダーパース
with open('data.gbd', 'rb') as f:
    header = f.read(1024).decode('ascii')
    # ... Start, Sample, Order, Counts 等を自前で抽出 ...
    n_ch = len(channels)
    data = np.frombuffer(f.read(), dtype='<f4').reshape(-1, n_ch)

# 手動での GPS 変換
import pytz
from gwpy.time import to_gps
tz = pytz.timezone('Asia/Tokyo')
t_local = datetime.strptime(start_str, '%Y-%m-%d %H:%M:%S')
t_utc = tz.localize(t_local).astimezone(pytz.utc)
gps_start = to_gps(t_utc.strftime('%Y-%m-%dT%H:%M:%S'))

# 手動でのコンテナ構築
tsd = TimeSeriesDict()
for i, ch in enumerate(channels):
    tsd[ch] = TimeSeries(data[:, i] * scale[i], t0=gps_start, dt=dt)

After (gwexpy):

from gwexpy.timeseries import TimeSeriesDict

tsd = TimeSeriesDict.read(
    'data.gbd',
    format='gbd',
    timezone='Asia/Tokyo',
)

まとめ

タスク

API

全チャンネル読み込み

TimeSeriesDict.read(path, format='gbd', timezone='Asia/Tokyo')

一部のみ読み込み

channels=['CH0', 'CH1'] パラメータを追加

単一チャンネル読み込み

TimeSeries.read(path, format='gbd', timezone=..., channels=['CH0'])

Matrix として読み出し

TimeSeriesMatrix.read(path, format='gbd', timezone=..., channels=...)

デジタルチャンネル自動検出

ALARM, LOGIC*, PULSE* 等の名称は自動的に 0/1 に変換

GPS 起点の明示

epoch=1234567890 (GPS 秒)

参考: