Note
このページは Jupyter Notebook から生成されました。 ノートブックをダウンロード (.ipynb)
TimeSeriesMatrix チュートリアル
TimeSeriesMatrix は SeriesMatrix を TimeSeries 用に拡張した 3 次元配列コンテナです(shape: Nrow × Ncol × Nsample)。
dt / t0 / timesなど TimeSeries 互換のエイリアス要素アクセスで
TimeSeriesを返す(tsm[i, j])時間領域処理(detrend/bandpass/resample …)を要素ごとに適用
FFT/PSD/ASD や coherence などのスペクトル解析(返り値は
FrequencySeriesMatrix)表示系 (
plot,step,repr,_repr_html_) をそのまま使用
このノートブックでは追加のユーティリティ関数は定義せず、クラスメソッドを直接呼び出して動作を確認します。
[1]:
import matplotlib.pyplot as plt
import numpy as np
from astropy import units as u
from gwexpy.timeseries import TimeSeries, TimeSeriesMatrix
代表データを用意
2×2 の
TimeSeriesMatrixを作り、dt/t0/timesやメタデータ(unit/name/channel)を確認します。
[2]:
rng = np.random.default_rng(0)
# SampleSet
n = 1024
dt = (1 / 256) * u.s
t0 = 0 * u.s
t = (np.arange(n) * dt).to_value(u.s)
tone10 = np.sin(2 * np.pi * 10 * t)
tone20 = np.sin(2 * np.pi * 20 * t + 0.3)
data = np.empty((2, 2, n), dtype=float)
data[0, 0] = 0.5 * tone10 + 0.05 * rng.normal(size=n)
data[0, 1] = 0.5 * tone20 + 0.05 * rng.normal(size=n)
data[1, 0] = 0.3 * tone10 + 0.3 * tone20 + 0.05 * rng.normal(size=n)
data[1, 1] = 0.2 * tone10 - 0.4 * tone20 + 0.05 * rng.normal(size=n)
units = np.full((2, 2), u.V)
names = [["ch00", "ch01"], ["ch10", "ch11"]]
channels = [["X:A", "X:B"], ["Y:A", "Y:B"]]
tsm = TimeSeriesMatrix(
data,
dt=dt,
t0=t0,
units=units,
names=names,
channels=channels,
rows={"r0": {"name": "row0"}, "r1": {"name": "row1"}},
cols={"c0": {"name": "col0"}, "c1": {"name": "col1"}},
name="demo",
)
display(tsm)
tsm.plot();
SeriesMatrix: shape=(2, 2, 1024), name='demo'
- epoch: 0.0 s
- x0: 0.0 s, dx: 0.00390625 s, N_samples: 1024
- xunit: s
Row Metadata
| name | channel | unit | |
|---|---|---|---|
| key | |||
| r0 | row0 | ||
| r1 | row1 |
Column Metadata
| name | channel | unit | |
|---|---|---|---|
| key | |||
| c0 | col0 | ||
| c1 | col1 |
Element Metadata
| unit | name | channel | row | col | |
|---|---|---|---|---|---|
| 0 | V | ch00 | X:A | 0 | 0 |
| 1 | V | ch01 | X:B | 0 | 1 |
| 2 | V | ch10 | Y:A | 1 | 0 |
| 3 | V | ch11 | Y:B | 1 | 1 |
多様な入力パターン(コンストラクタ例)
dt/t0 のほか、sample_rate や times、TimeSeries の 2D リスト、Quantity 入力などを確認します。
[3]:
print("=== TimeSeriesMatrix ===")
times = tsm.times
# 1: sample_rate (dt )
tsm_sr = TimeSeriesMatrix(
data,
sample_rate=256 * u.Hz,
t0=t0,
units=units,
names=names,
channels=channels,
)
print("case1 sample_rate", "dt", tsm_sr.dt, "sample_rate", tsm_sr.sample_rate)
# 2: times ( Sample )
tsm_times = TimeSeriesMatrix(data, times=times, units=units, names=names)
print("case2 times", "dt", tsm_times.dt, "t0", tsm_times.t0, "N", tsm_times.N_samples)
# 3: TimeSeries 2D Listfrom
ts00 = TimeSeries(data[0, 0], times=times, unit=u.V, name="ch00", channel="X:A")
ts01 = TimeSeries(data[0, 1], times=times, unit=u.V, name="ch01", channel="X:B")
ts10 = TimeSeries(data[1, 0], times=times, unit=u.V, name="ch10", channel="Y:A")
ts11 = TimeSeries(data[1, 1], times=times, unit=u.V, name="ch11", channel="Y:B")
tsm_from_ts = TimeSeriesMatrix([[ts00, ts01], [ts10, ts11]])
print("case3 from TimeSeries", tsm_from_ts.shape, "dt", tsm_from_ts.dt)
# 4: Quantity (units Set)
tsm_q = TimeSeriesMatrix((data * u.mV), dt=dt, t0=t0)
print("case4 Quantity meta unit", tsm_q.meta[0, 0].unit)
# : dt sample_rate / t0 epoch
for kwargs in [
dict(dt=dt, sample_rate=256 * u.Hz, t0=t0),
dict(dt=dt, t0=t0, epoch=t0),
]:
try:
_ = TimeSeriesMatrix(data, **kwargs)
except Exception as e:
print("error", kwargs, "->", type(e).__name__, e)
=== TimeSeriesMatrix ===
case1 sample_rate dt 0.00390625 s sample_rate 256.0 Hz
case2 times dt 0.00390625 s t0 0.0 s N 1024
case3 from TimeSeries (2, 2, 1024) dt 0.00390625 s
case4 Quantity meta unit mV
error {'dt': <Quantity 0.00390625 s>, 'sample_rate': <Quantity 256. Hz>, 't0': <Quantity 0. s>} -> ValueError give only one of sample_rate or dt
error {'dt': <Quantity 0.00390625 s>, 't0': <Quantity 0. s>, 'epoch': <Quantity 0. s>} -> ValueError give only one of epoch or t0
参照・切り出し
tsm[i, j]はTimeSeriesを返すスライスは
TimeSeriesMatrixを返すrow/col ラベルでもアクセスできる
[4]:
s00 = tsm[0, 0]
print("[0,0]", "type", type(s00), "dt", s00.dt, "t0", s00.t0, "unit", s00.unit)
print("[0,0]", "name", s00.name, "channel", s00.channel)
s00.plot(xscale="seconds")
s01 = tsm["r0", "c1"]
print("[r0,c1]", "name", s01.name, "channel", s01.channel)
sub = tsm[:, 0]
print("tsm[:,0] ->", type(sub), sub.shape)
sub.plot(subplots=True, xscale="seconds");
[0,0] type <class 'gwexpy.timeseries.timeseries.TimeSeries'> dt 0.00390625 s t0 0.0 s unit V
[0,0] name ch00 channel X:A
[r0,c1] name ch01 channel X:B
tsm[:,0] -> <class 'gwexpy.timeseries.matrix.TimeSeriesMatrix'> (2, 1, 1024)
サンプル軸編集
diff/padはTimeSeriesMatrixを返すcropは現在SeriesMatrixを返すので、view(TimeSeriesMatrix)で戻します(中身は同じ)
[5]:
cropped = tsm.crop(start=1 * u.s, end=2 * u.s).view(TimeSeriesMatrix)
print("cropped", type(cropped), cropped.shape, "span", cropped.span)
cropped.plot(subplots=True, xscale="seconds")
diffed = tsm.diff(n=1)
print("diffed", diffed.shape, "dt", diffed.dt)
diffed.plot(subplots=True, xscale="seconds")
padded = tsm.pad(50)
print("padded", padded.shape, "t0", padded.t0)
padded.plot(subplots=True, xscale="seconds");
cropped <class 'gwexpy.timeseries.matrix.TimeSeriesMatrix'> (2, 2, 256) span (<Quantity 1. s>, <Quantity 2. s>)
diffed (2, 2, 1023) dt 0.00390625 s
padded (2, 2, 1124) t0 -0.1953125 s
時間領域処理(TimeSeries メソッドの要素ごと適用)
TimeSeries のメソッド(detrend, bandpass, resample など)を行列要素ごとに適用します。
[6]:
detr = tsm.detrend("constant")
bp = detr.bandpass(5, 40)
rs = bp.resample(128)
print("original", tsm.shape, "sample_rate", tsm.sample_rate)
print("bandpass", bp.shape, "sample_rate", bp.sample_rate)
print("resample", rs.shape, "sample_rate", rs.sample_rate)
bp.plot(subplots=True, xscale="seconds")
rs.plot(subplots=True, xscale="seconds");
original (2, 2, 1024) sample_rate 256.0 Hz
bandpass (2, 2, 1024) sample_rate 256.0 Hz
resample (2, 2, 512) sample_rate 128.0 Hz
欠損値処理 (Imputation)
Matrix内の欠損値 (NaN) を補完します。各チャンネルごとに適用されます。
[7]:
# Data Create ( )
tsm_miss = tsm.copy()
# NaN do ( : tsm[0,0] time series)
tsm_miss[0, 0].value[50:60] = np.nan
print("Before impute (Has NaN):", np.isnan(tsm_miss[0, 0].value).any())
# ( )
tsm_imp = tsm_miss.impute(method="interpolate")
print("After impute (Has NaN):", np.isnan(tsm_imp[0, 0].value).any())
Before impute (Has NaN): False
After impute (Has NaN): False
前処理: 標準化と白色化
多変量解析の前処理として有用です。
[8]:
# (Standardize): Mean0, minutes 1
tsm_std = tsm.standardize(method="zscore")
# Color (Whiten): Correlation(Covariance) Subtraction
# PCAusing Correlation
tsm_white, w_model = tsm_std.whiten_channels(return_model=True)
print("Standardized shape:", tsm_std.shape)
print("Whitened shape:", tsm_white.shape)
Standardized shape: (2, 2, 1024)
Whitened shape: (4, 1, 1024)
成分分解 (PCA / ICA)
効率的な次元圧縮や、信号源分離を行います。
[9]:
# minutesminutes (PCA)
# Dimension ( : 3 -> 2 minutes)
n_comp = min(2, tsm.shape[0])
pca_scores, pca_res = tsm_std.pca(n_components=n_comp, return_model=True)
print("PCA Scores shape:", pca_scores.shape)
print("Explained variance ratio:", pca_res.explained_variance_ratio)
pca_scores.plot()
plt.show()
plt.close()
# minutesminutes (ICA)
# Signal minutes (Blind Source Separation)
ica_sources = tsm_std.ica(n_components=n_comp, random_state=42)
print("ICA Sources shape:", ica_sources.shape)
ica_sources.plot()
plt.show()
plt.close()
PCA Scores shape: (2, 1, 1024)
Explained variance ratio: [0.57128681 0.41742564]
ICA Sources shape: (2, 1, 1024)
相互運用性 (Interop) 入門
PyTorchなどの外部ライブラリへデータを渡すことができます。
[10]:
try:
# PyTorch Tensor Transform
import torch
_ = torch
tensor = tsm.to_torch()
print("Converted to PyTorch:", tensor.shape)
except ImportError:
print("PyTorch not installed")
Converted to PyTorch: torch.Size([2, 2, 1024])
スペクトル解析(FFT / PSD / ASD)
fft/psd/asd は FrequencySeriesMatrix を返します。
[11]:
fft = tsm.fft()
print("fft", type(fft), fft.shape, "df", fft.df, "f0", fft.f0)
psd = tsm.psd(fftlength=1, overlap=0.5)
asd = tsm.asd(fftlength=1, overlap=0.5)
print("psd", psd.shape, "unit", psd[0, 0].unit)
print("asd", asd.shape, "unit", asd[0, 0].unit)
asd.plot(subplots=True)
plt.xscale("log")
plt.yscale("log")
fft <class 'gwexpy.frequencyseries.matrix.FrequencySeriesMatrix'> (2, 2, 513) df 0.25 Hz f0 0.0 Hz
psd (2, 2, 129) unit V2 / Hz
asd (2, 2, 129) unit V / Hz(1/2)
bivariate スペクトル解析(coherence 例)
TimeSeriesMatrix.coherence(other) は要素ごとに coherence を計算し、FrequencySeriesMatrix を返します。
otherにTimeSeriesを渡すと「全要素 vs 同一参照」の比較になりますotherにTimeSeriesMatrixを渡す場合は shape (Nrow, Ncol) が一致している必要があります
[12]:
ref = tsm[0, 0]
coh = tsm.coherence(ref, fftlength=1, overlap=0.5)
print("coherence", type(coh), coh.shape, "unit", coh[0, 0].unit)
coh.plot(subplots=True)
try:
_ = tsm.coherence(tsm[:, :1], fftlength=1)
except Exception as e:
print("shape mismatch ->", type(e).__name__, e)
coherence <class 'gwexpy.frequencyseries.matrix.FrequencySeriesMatrix'> (2, 2, 129) unit coherence
shape mismatch -> IndexError index 1 is out of bounds for axis 1 with size 1
表示系: repr / plot / step
repr: テキスト表示_repr_html_: ノートブックではdisplay(tsm)で表形式plot/step: クラスメソッドで直接描画
[13]:
print("repr:\n", tsm)
display(tsm)
tsm.plot(subplots=True, xscale="seconds")
tsm.step(where="post", xscale="seconds");
repr:
SeriesMatrix(shape=(2, 2, 1024), name='demo')
epoch : 0.0 s
x0 : 0.0 s
dx : 0.00390625 s
xunit : s
samples : 1024
[ Row metadata ]
name channel unit
key
r0 row0
r1 row1
[ Column metadata ]
name channel unit
key
c0 col0
c1 col1
[ Elements metadata ]
unit name channel row col
0 V ch00 X:A 0 0
1 V ch01 X:B 0 1
2 V ch10 Y:A 1 0
3 V ch11 Y:B 1 1
SeriesMatrix: shape=(2, 2, 1024), name='demo'
- epoch: 0.0 s
- x0: 0.0 s, dx: 0.00390625 s, N_samples: 1024
- xunit: s
Row Metadata
| name | channel | unit | |
|---|---|---|---|
| key | |||
| r0 | row0 | ||
| r1 | row1 |
Column Metadata
| name | channel | unit | |
|---|---|---|---|
| key | |||
| c0 | col0 | ||
| c1 | col1 |
Element Metadata
| unit | name | channel | row | col | |
|---|---|---|---|---|---|
| 0 | V | ch00 | X:A | 0 | 0 |
| 1 | V | ch01 | X:B | 0 | 1 |
| 2 | V | ch10 | Y:A | 1 | 0 |
| 3 | V | ch11 | Y:B | 1 | 1 |
まとめ
TimeSeriesMatrixは TimeSeries 互換の軸情報(dt/t0/times)と要素メタデータを保ったまま、行列として一括処理できる。時間領域処理(detrend/bandpass/resample 等)やスペクトル解析(asd/psd/coherence 等)を、要素ごとにまとめて適用できる。
まず
tsm[i, j]でTimeSeriesを取り出して挙動を確認し、慣れたら行列のまま処理するのが簡単。