特異値分解 (SVD) を利用した Online DMD を NumPy で実装する
話すこと
Dynamic Mode Decomposition では時系列データを複数のモードの組み合わせとして表現します。主成分分析 (PCA) のように主要な信号を抽出できます。
1次元ストリーミングデータに対する Online Dynamic Mode Decomposition (Online DMD) を NumPy で実装します。特異値分解 (SVD) をオンライン処理として使い、低ランク近似によるノイズに対するロバスト性がある Online DMD を得ます。
データ列を Hankel Matrix に変換したものを Online DMD に当てはめ、データ列の主要モードに分解します。重み付きの Online DMD を行い、過去データの影響を小さくします。
以下の過去記事に SVD を使わない Online DMD を記載しています。
記事: Online DMD を NumPy で実装する
話さないこと
- Dynamic Mode Decomposition (DMD) の数学的な話
試したこと
周波数と振幅が単調変化する時系列データに対して OnlineDMD を行い、時刻ごとの主となる周波数を推定します。

はじめに: 特異値分解 (SVD) を使用する Online DMD のモチベーション
特異値分解を使用する Online DMD のモチベーション
Online DMD はセンサデータのような時系列で変化するものに使われます。全体ではなく直近の分析に活用できます。
低ランク近似した行列を更新することによりノイズによる影響を抑え、元の行列を求める固有値計算量の低下にもつながります。
DMD では
時刻
この行列
SVD を使った Online DMD の解き方
解き方: X の特異値分解と低ランク近似した行列 \~{A}
ランク
以降では時系列で増えていくデータに対して
更新アルゴリズム
X の更新
時系列データとしての
以下の3つのベクトル (
-
を\bm{x}_{k+1} の張る部分空間で表したものをU_k とします\bm{p} = U_k^T \bm{x}_{k+1} - 残差ベクトル
を\bm{r} の\bm{x}_{k+1} に含まれない成分としてU_k とします\bm{r} = \bm{x}_{k+1} - U_k \bm{p} - 大きさを 1 にした残差ベクトル
とします\bm{q} = \frac{r}{\gamma} \, (\gamma = \|\bm{r}\|)
行列
上記のように
行列のサイズ
H の更新
いいかげんな表現になりますが
\~{A} の更新
更新された
重み付き (忘却) について
入力データは
そこで重み係数 (忘却係数) として
これより
先ほどの手続きで
NumPy 実装
以下の関数を持つクラス OnlineDMD を作ります。以降ではこの関数を実装します。入力データを 1次元の実数列とします。
class OnlineDMD:
def __init__(
self, n_dim: int, r_max: int = 10, lambda_: float = 1.0, tau_add: float = 1e-2, tau_rel: float = 1e-3, tau_energy: float = 0.99) -> None:
self.n = int(n_dim) # 入力ベクトルの次元数
self.r_max = int(r_max) # 低ランク近似の最大ランク数
self.lambda_ = float(lambda_) # 忘却係数
self.tau_add = float(tau_add) # ランクを増やすかどうかの閾値
self.tau_rel = float(tau_rel) # ランク削減の相対特異値閾値
self.tau_energy = float(tau_energy) # ランク削減の累積エネルギー閾値
# 保持するのは U, S, H のみ
self.U = np.empty(0) # (n, r)
self.S = np.empty(0) # (r,) 特異値
self.H = np.empty(0) # (r, r) = U^T X' V
# ストリーミング用バッファ
self._x_prev = np.empty(0) # 次に取り込む列
def initialize(self, X: NDArray) -> None:
"""最初の DMD 実行"""
pass
def update(self, x_new: NDArray) -> None:
"""新しいデータ x_new を入力し行列 H を更新する"""
pass
def A_tilde(self) -> None|NDArray:
"""低ランク小行列 tilde A(r,r)"""
if self.U.size == 0:
return None
Sinv = 1.0 / np.maximum(self.S, 1e-15)
return self.H @ np.diag(Sinv)
def eigs(self):
"""元の行列 A の固有値固有ベクトル"""
A = self.A_tilde()
if A is None:
return None, None
evals, W = np.linalg.eig(A)
modes = self.U @ W
return evals, modes
def _truncate_rank(self, U: NDArray, S: NDArray, H: NDArray):
"""特異値の最大との比, 累積値からランク調整"""
pass
def get_mode_amplitudes(self, x_init: NDArray|None = None) -> NDArray:
"""Get DMD mode amplitudes for given initial state."""
_, modes = self.eigs()
if x_init is None:
x_init = np.array(self._x_prev)
if modes is None:
raise ValueError("No modes available")
return np.linalg.lstsq(modes, x_init, rcond=None)[0]
def get_mode_frequencies(self, dt: float = 1.0) -> NDArray:
"""Get frequencies of DMD modes.
Args:
dt: Time step size
Returns:
Frequencies in Hz (if dt is in seconds)
"""
eigvals = self.eigs()[0]
if eigvals is None:
raise ValueError("No modes available")
return np.imag(np.log(eigvals)) / (2 * np.pi * dt)
初期化
初めに
def initialize(self, X: NDArray) -> None:
"""X.shape = (n_dim, データ数)"""
n, m = X.shape
if n != self.n:
raise ValueError(f"Input X has incompatible dimension {n}, expected {self.n}")
# 忘却係数処理
coeff_array = self.lambda_ ** np.arange(m - 1, -1, -1)
weighted_X = X * coeff_array
# 特異値分解
U, S, Vt = np.linalg.svd(weighted_X[:, :-1] * self.lambda_, full_matrices=True)
r = min(self.r_max, S.size)
self.U = U[:, :r]
self.S = S[:r]
V = Vt.T[:, :r]
if m > 1:
Y = weighted_X[:, 1:]
self.H = self.U.T @ Y @ V
else:
self.H = np.zeros((r, r))
# 次回の更新用に最終列を保存
self._x_prev = weighted_X[:, -1].copy()
更新
3つのベクトル
def update(self, x_new: NDArray) -> None:
# データサイズ確認
x_new = np.asarray(x_new, dtype=float).reshape(-1)
assert x_new.shape[0] == self.n
if self.U.size == 0:
raise RuntimeError("OnlineDMD must be initialized with initialize() before update().")
# x_prev を取り込み、x_new で H を更新
x_prev = self._x_prev
U, S = self.U, self.S
r = S.shape[0]
# 既存空間への射影と残差
p = U.T @ x_prev # (r,)
r_vec = x_prev - U @ p # (n,)
rho = np.linalg.norm(r_vec)
q_vec = r_vec / rho if rho > 0 else np.zeros_like(r_vec)
# ランク増加の判定(相対閾値 & 上限)
add_rank = (rho > self.tau_add * max(np.linalg.norm(x_prev), 1e-12)) and (r < self.r_max)
# 忘却係数適用
self.H *= self.lambda_
S *= self.lambda_
if add_rank:
K = np.block([
[np.diag(S), p.reshape(-1, 1)],
[np.zeros((1, r)), np.array([[rho]])]
]) # (r+1, r+1)
Ut_r, St_r, Vt_r = np.linalg.svd(K, full_matrices=True)
U_aug = np.column_stack((U, q_vec))
# update H
z_old = U.T @ x_new # (r,)
z_rho = np.dot(q_vec, x_new) # scalar
H_aug = np.block([
[self.H, z_old[:, None]],
[np.zeros((1, self.H.shape[1])), np.array([[z_rho]])]
])
U_new = U_aug @ Ut_r # (n, r)
S_new = St_r
else:
Kthin = np.hstack([np.diag(S), p.reshape(-1, 1)]) # (r, r+1)
Ut_r, St_r, Vt_r = np.linalg.svd(Kthin, full_matrices=False) # Vt_r: (r, r+1)
U_new = U @ Ut_r # (n, r)
S_new = St_r # (r,)
z_old = U.T @ x_new # (r,)
H_aug = np.column_stack([self.H, z_old[:, None]]) # (r, r+1)
# H
H_new = Ut_r.conj().T @ H_aug @ Vt_r.conj().T
# ランク縮小(相対特異値 + エネルギー閾)
U_new, S_new, H_new = self._truncate_rank(U_new, S_new, H_new)
# 保存
self.U, self.S, self.H = U_new, S_new, H_new
self._x_prev = x_new.copy()
def _truncate_rank(self, U: NDArray, S: NDArray, H: NDArray):
"""特異値の最大との比, 累積値からランク調整"""
if S.size == 0:
return U, S, H
# 相対比の閾値比較
lead = float(S[0])
if lead <= 0:
r_rel = 1
else:
rel_mask = (S / lead) >= self.tau_rel
if not np.any(rel_mask):
rel_mask[0] = True
r_rel = int(np.max(np.nonzero(rel_mask))) + 1
# 特異値の累積比較
energy = float(np.sum(S ** 2))
if energy <= 0:
r_energy = 1
else:
cum_energy = np.cumsum(S ** 2) / energy
idx = np.searchsorted(cum_energy, self.tau_energy, side="left")
r_energy = int(idx) + 1
r_energy = max(1, min(r_energy, S.size))
# ランク更新
r_limit = min(self.r_max, S.size, r_rel)
r_keep = max(1, min(r_limit, r_energy))
return U[:, :r_keep], S[:r_keep], H[:r_keep, :r_keep]
\~{A} 算出とモード分解
def A_tilde(self) -> None|NDArray:
"""低ランク小行列 tilde A(r,r)"""
if self.U.size == 0:
return None
Sinv = 1.0 / np.maximum(self.S, 1e-15)
return self.H @ np.diag(Sinv)
def get_mode_amplitudes(self, x_init: NDArray|None = None) -> NDArray:
"""初期値(入力)からモードの係数を求める"""
_, modes = self.eigs()
if x_init is None:
x_init = np.array(self._x_prev)
if modes is None:
raise ValueError("No modes available")
return np.linalg.lstsq(modes, x_init, rcond=None)[0]
def get_mode_frequencies(self, dt: float = 1.0) -> NDArray:
"""モードごとの周波数算出"""
eigvals = self.eigs()[0]
if eigvals is None:
raise ValueError("No modes available")
return np.imag(np.log(eigvals)) / (2 * np.pi * dt)
動作例
入力信号
周波数が上昇し、振幅が減衰する信号を今回の処理対象にします。以下のように実装します。
今回はサンプリング周波数 100 [Hz], 開始時と終了時の周波数がそれぞれ 1.0, 1.3 [Hz], 振幅が 2.0, 1.0 になるようにします。
def generate_chirp(
fs: float = 100.0, f0: float = 1.0, f1: float = 1.3, a0: float = 2.0, a1: float = 1.0,
duration: float = 10.0, noise_level: float = 0.0) \
-> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Generate chirp signal sample.
Args:
fs: Sampling frequency
f0: Starting frequency in Hz
f1: Ending frequency in Hz
duration: Total duration for sweep
noise_level: Noise level (0-1)
Returns:
NDArray[np.float64]: Generated chirp signal
NDArray[np.float64]: Time array
"""
times = np.arange(0, duration, 1 / fs)
# Linear frequency sweep
amp = a0 + (a1 - a0) * (times / duration)
# Instantaneous frequency: f(t) = f0 + (f1-f0)*t/duration
# Phase: φ(t) = 2π * [f0*t + (f1-f0)*t²/(2*duration)]
phase = 2 * np.pi * (f0 * times + (f1 - f0) * times**2 / (2 * duration))
data = amp * np.sin(phase)
if noise_level > 0:
data += noise_level * np.random.randn(len(data))
return data, times
1次元時系列データの OnlineDMD 対応 (Hankel Matrix)
前述の OnlineDMD の説明では入力するデータは
Hankel Matrix の入力列を生成するクラス HankelSignal とデータ列から Hankel Matrix を生成する関数を作ります。
class HankelSignal:
def __init__(self, window_size: int) -> None:
"""Hankel structure for 1D signal streaming processing.
Args:
window_size: Size of the Hankel window
"""
self._window_size = window_size
self._array = np.zeros(self._window_size)
def initialize(self, values: NDArray[np.float64]) -> NDArray[np.float64]:
"""Initialize the Hankel buffer with given values.
Args:
values: Initial values to fill the Hankel buffer
"""
if self._array.shape != values.shape:
raise ValueError()
self._array[...] = values
return self._array
def update(self, value: float) -> NDArray[np.float64]:
"""Update the Hankel buffer with a new value.
Args:
value: New value to insert into the Hankel buffer
"""
self._array[:-1] = self._array[1:]
self._array[-1] = value
return self._array
def array_to_hankel_matrix(data: NDArray, window_size: int) -> NDArray:
if data.ndim != 1:
raise ValueError("Input data must be 1D array.")
return np.lib.stride_tricks.sliding_window_view(data, len(data) - window_size + 1)
実行
以上のプログラムから OnlineDMD を試します。
# Generate chirp signal
np.random.seed(100)
fs = 100.0
duration = 10.0
f0 = 1.0
f1 = 1.3
a0 = 2.0
a1 = 1.0
input_data, times = generate_chirp(
fs=fs, f0=f0, f1=f1, a0=a0, a1=a1, duration=duration, noise_level=0.1)
# Prepare DMD
window_size = 100
max_rank = 4
forgetting_factor = 0.99
tau_add = 0.01
dmd = OnlineDMD(
n_dim=window_size, r_max=max_rank, lambda_=forgetting_factor, tau_add=tau_add)
# Initialize DMD with initial data
init_length = 150
init_data = array_to_hankel_matrix(input_data[:init_length], window_size)
dmd.initialize(init_data)
# Prepare Hankel structure
hankel = HankelSignal(window_size)
hankel.initialize(init_data[:, -1])
# Process streaming data
n_steps = 10
dt = 1. / fs
freq_evolution = []
amplitude_evolution = []
time_points = []
for i in range(init_length, len(input_data), n_steps):
# Update DMD with new samples
for j in range(n_steps):
if i + j >= len(input_data):
break
new_sample = input_data[i + j]
hankel_vector = hankel.update(new_sample)
dmd.update(hankel_vector)
# Get current analysis
frequencies = dmd.get_mode_frequencies(dt=dt)
amplitudes = dmd.get_mode_amplitudes()
if len(frequencies) > 0 and len(amplitudes) > 0:
positive_freqs = frequencies[frequencies >= 0]
positive_amps = amplitudes[frequencies >= 0]
if len(positive_freqs) == 0:
continue
dominant_idx = np.argmax(positive_amps)
dominant_freq = positive_freqs[dominant_idx]
dominant_amp = np.abs(positive_amps[dominant_idx])
freq_evolution.append(dominant_freq)
amplitude_evolution.append(dominant_amp)
time_points.append(times[min(i + n_steps - 1, len(times) - 1)])
# Plot the signal and analysis results
ground_truth_freq = f0 + (f1 - f0) * times / duration
_, axes = plt.subplots(3, 1, figsize=(10, 8), sharex=True)
axes[0].plot(times, input_data)
axes[0].set_title("Chirp Signal")
axes[1].plot(time_points, freq_evolution, marker='o')
axes[1].plot(times, ground_truth_freq, linestyle='--', color='gray', label='Ground Truth')
axes[1].set_title("Dominant Frequency Evolution")
axes[1].set_ylabel("Frequency [Hz]")
axes[1].legend()
axes[2].plot(time_points, amplitude_evolution, marker='o', color='orange')
axes[2].set_title("Dominant Amplitude Evolution")
axes[2].set_xlabel("Time [s]")
axes[2].set_ylabel("Amplitude")
plt.tight_layout()
plt.show()
実行結果は以下になります。1段目が周波数が増加する入力信号、2段目が OnlineDMD で推定した時刻ごとの周波数と対応する入力信号の周波数を表示しています。3段目が 2段目で推定した周波数が持つ振幅を示しています。
推定周波数は単調増加していることが分かります。真の周波数と一致しないのは OnlineDMD は入力時刻以前のデータを使用しているためです。3段目の振幅は単調減少していることが分かります。入力データの開始時, 終了時の振幅は 2.0, 1.0 であり、推定の振幅は 1/2 ほどに減衰されています。

まとめ
特異値分解を使用した OnlineDMD の計算と NumPy による実装を行いました。周波数と振幅が時間変化するデータに対して OnlineDMD を実施し、周波数と振幅を追従していることが分かりました。
参考文献
以下を参考にしました。
Discussion