Note
このページは Jupyter Notebook から生成されました。 ノートブックをダウンロード (.ipynb)
PCA / ICA 分解チュートリアル
このチュートリアルでは TimeSeriesMatrix を使った 主成分分析(PCA) と 独立成分分析(ICA) を解説します。
どちらも tsm.pca() / tsm.ica() という高レベル API として利用可能で、内部では scikit-learn をラップしています。
代表的なユースケース:
PCA: 次元削減、相関センサーチャンネルの非相関化
ICA: 盲目信号分離(BSS)— 環境ノイズから物理信号を分離する
sklearn.decomposition.PCA / FastICA を直接使っていたコードは matrix.pca() / matrix.ica() に置き換えられます。[ ]:
import matplotlib.pyplot as plt
import numpy as np
from astropy import units as u
from gwexpy.timeseries import TimeSeries, TimeSeriesMatrix
1. マルチチャンネル合成データの準備
現実的なシナリオを再現します:
隠れた信号源 2本: 10 Hz サイン波とブロードバンドノイズ
観測チャンネル 4本: 信号源の線形混合
これは重力波検出器で複数のセンサーに物理信号が混合して現れる状況を模倣しています。
[ ]:
rng = np.random.default_rng(42)
fs = 512 # サンプリングレート [Hz]
T = 16.0 # 継続時間 [s]
n = int(fs * T)
t = np.arange(n) / fs
# 独立した信号源 3 本
src1 = np.sin(2 * np.pi * 10.0 * t) # 10 Hz トーン
src2 = rng.normal(0, 1, n) # ブロードバンドノイズ
src3 = np.sin(2 * np.pi * 40.0 * t + 0.7) # 40 Hz トーン
# 混合行列(3 信号源 → 4 チャンネル)
A = np.array([
[0.8, 0.3, 0.1],
[0.4, 0.7, 0.2],
[0.2, 0.5, 0.9],
[0.6, -0.2, 0.5],
])
sources = np.stack([src1, src2, src3], axis=0) # (3, n)
mixed = A @ sources # (4, n)
# 計器ノイズを加算
mixed += 0.05 * rng.normal(0, 1, mixed.shape)
# TimeSeriesMatrix 構築 (4 channels × 1 col × n samples)
dt = (1 / fs) * u.s
t0 = 0 * u.s
data_3d = mixed[:, np.newaxis, :] # (4, 1, n)
tsm = TimeSeriesMatrix(data_3d, dt=dt, t0=t0, units=np.full((4, 1), u.ct))
tsm.channel_names = [f"CH{i}" for i in range(4)]
print("TimeSeriesMatrix shape:", tsm.shape)
tsm.plot(subplots=True, xscale="seconds");
2. PCA – 次元削減
TimeSeriesMatrix.pca() はスコア行列と、オプションで PCAResult オブジェクトを返します。
[ ]:
# PCA で 4 チャンネルを 2 主成分に圧縮
pca_scores, pca_res = tsm.pca(n_components=2, return_model=True)
print("PCA スコア shape:", pca_scores.shape) # (2, 1, n)
print("寄与率:", pca_res.explained_variance_ratio)
pca_scores.plot(subplots=True, xscale="seconds");
[ ]:
# スクリープロット(寄与率の可視化)
fig, ax = plt.subplots(figsize=(5, 3))
evr = pca_res.explained_variance_ratio
ax.bar(range(1, len(evr) + 1), evr * 100, color="steelblue")
ax.set_xlabel("主成分")
ax.set_ylabel("寄与率 [%]")
ax.set_title("スクリープロット")
ax.set_xticks(range(1, len(evr) + 1))
plt.tight_layout()
[ ]:
# 元チャンネル vs PC スコアの ASD 比較
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
asd_orig = tsm.asd(fftlength=4, overlap=0.5)
for i in range(asd_orig.shape[0]):
axes[0].loglog(asd_orig[i, 0].frequencies.value, asd_orig[i, 0].value, label=f"CH{i}")
axes[0].set_xlabel("周波数 [Hz]")
axes[0].set_ylabel("ASD [ct/√Hz]")
axes[0].set_title("元チャンネル")
axes[0].legend()
asd_pca = pca_scores.asd(fftlength=4, overlap=0.5)
for i in range(asd_pca.shape[0]):
axes[1].loglog(asd_pca[i, 0].frequencies.value, asd_pca[i, 0].value, label=f"PC{i+1}")
axes[1].set_xlabel("周波数 [Hz]")
axes[1].set_title("PCA スコアチャンネル")
axes[1].legend()
plt.tight_layout()
3. ICA – 盲目信号分離
TimeSeriesMatrix.ica() は FastICA を実行し、独立成分行列を返します。
[ ]:
# ICA で 3 独立成分を復元
ica_sources = tsm.ica(n_components=3, random_state=0)
print("ICA ソース shape:", ica_sources.shape) # (3, 1, n)
ica_sources.plot(subplots=True, xscale="seconds");
[ ]:
# ICA 復元成分 vs 真の信号源(周波数領域で比較)
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
src_3d = np.stack([src1, src2, src3], axis=0)[:, np.newaxis, :]
tsm_src = TimeSeriesMatrix(src_3d, dt=dt, t0=t0)
tsm_src.channel_names = ["src_10Hz", "src_noise", "src_40Hz"]
asd_src = tsm_src.asd(fftlength=4, overlap=0.5)
for i in range(3):
axes[0].loglog(asd_src[i, 0].frequencies.value, asd_src[i, 0].value, label=tsm_src.channel_names[i])
axes[0].set_title("真の信号源")
axes[0].set_xlabel("周波数 [Hz]")
axes[0].legend()
asd_ica = ica_sources.asd(fftlength=4, overlap=0.5)
for i in range(asd_ica.shape[0]):
axes[1].loglog(asd_ica[i, 0].frequencies.value, asd_ica[i, 0].value, label=f"IC{i+1}")
axes[1].set_title("ICA 復元成分")
axes[1].set_xlabel("周波数 [Hz]")
axes[1].legend()
plt.tight_layout()
4. 前処理:標準化とホワイトニング
ICA・PCA にはスケーリングが重要です。TimeSeriesMatrix には組み込み前処理が備わっています:
メソッド |
説明 |
|---|---|
|
各チャンネルを平均 0、分散 1 に正規化 |
|
外れ値に強い中央値/MAD スケーリング |
|
PCA ホワイトニング(チャンネル間相関の除去) |
pca() / ica() には scale パラメータで内部適用も可能です。
[ ]:
# 手動前処理パイプライン
tsm_z = tsm.standardize(method='zscore')
print("平均 ≈ 0:", np.allclose(tsm_z.value.mean(axis=-1), 0, atol=1e-10))
print("標準偏差 ≈ 1:", np.allclose(tsm_z.value.std(axis=-1), 1, atol=0.01))
# ICA に内部 z-score スケーリングを適用
ica_scaled = tsm.ica(n_components=3, scale='zscore', random_state=0)
print("ICA (with zscore) shape:", ica_scaled.shape)
# PCA にロバストスケーリングを適用
pca_robust, pca_res2 = tsm.pca(n_components=2, scale='robust', return_model=True)
print("PCA (robust) 寄与率:", pca_res2.explained_variance_ratio)
5. 欠損値 (NaN) の取り扱い
実際のデータには観測ギャップがある場合があります。nan_policy='impute' で分解前に補完できます。
[ ]:
# NaN ギャップを挿入(データギャップのシミュレーション)
tsm_nan = tsm.copy()
tsm_nan.value[0, 0, 100:120] = np.nan
tsm_nan.value[2, 0, 500:510] = np.nan
print("NaN 数:", np.sum(np.isnan(tsm_nan.value)))
# 線形補間でNaNを補完してから PCA
pca_nanfill, _ = tsm_nan.pca(
n_components=2,
nan_policy='impute',
impute_kwargs={'method': 'interpolate'},
return_model=True,
)
print("PCA (nan impute) shape:", pca_nanfill.shape)
6. scikit-learn からの移行ガイド
移行前(scikit-learn 直接使用):
from sklearn.decomposition import PCA, FastICA
import numpy as np
# 生の numpy 配列 (n_samples, n_channels)
X = data.reshape(n_channels, n_samples).T
pca = PCA(n_components=2)
scores = pca.fit_transform(X) # (n_samples, 2)
ica = FastICA(n_components=3, random_state=0)
sources = ica.fit_transform(X) # (n_samples, 3)
移行後(gwexpy):
# dt/t0/unit を保持したまま処理
pca_scores, pca_res = tsm.pca(n_components=2, return_model=True)
ica_sources = tsm.ica(n_components=3, random_state=0)
gwexpy API が自動で行うこと:
時間軸(
dt、t0、times)を出力行列に引き継ぎ単位メタデータを保持
.plot()で直接プロット可能NaN 処理・スケーリングを内部で処理
まとめ
メソッド |
主要パラメータ |
返り値 |
|---|---|---|
|
|
|
|
|
|
どちらのメソッドも TimeSeriesMatrix を返すので、.plot(), .asd(), .coherence() などの後続操作がそのまま使えます。
関連チュートリアル: