Online DMD を NumPy で実装する
話すこと
1次元ストリーミングデータに対する Online Dynamic Mode Decomoposition (Online DMD) を NumPy で実装します。データ列を Hankel Matrix に変換したものを Online DMD に当てはめ、データ列の主要モードへの分解とそれらを使った再構成を行います。
重み付きの Online DMD を行い、過去データの影響を小さくします。
処理結果
周波数が時間変化する 1次元データに対して Online DMD を行い、主要モードから信号を再構成したものは以下です。複数回に分けて再構成し、それぞれを色分けしています。
通常のデータ全域に対する DMD では周波数が変化するデータ列の主要モード分解は難しいですが、周波数の変化がある場合も再構成ができていることが分かります。

話さないこと
- DMD の理論の詳細
- 特異値分解を使った Online DMD
はじめに: Online DMD のモチベーション
通常の DMD (Dynamic Mode Decomposition) では、データ全体が事前に準備されている必要があります。しかし現実の信号処理ではセンサーデータが時系列でストリーミングされる場面が多くあります。
Online DMD は新しいデータごとに既存結果を更新する手法のため、全データを再計算する必要がなくリアルタイム処理に適しています。
重み付き Online DMD の利点
重み付き Online DMD では過去データに対して指数減衰の重みをかけ時間変化する信号特性に適応します。(例: 周波数が時間とともに変化するチャープ信号)
重み係数
準備: DMD と Hankel Matrix
DMD の仕組み
DMD はシステムの時間発展を線形演算子で近似し、時系列データのモードを抽出します。
基本的な流れ
- データ行列
を構築 (X, Y はY を1ステップ進めたもの)X - 線形関係
を満たす行列Y \simeq AX を推定A -
の固有値, 固有ベクトルから動的モードを抽出A - 各モードの時間発展から信号を再構成
1.
時刻
(
2.
次式を最小化する問題を解きます。
3.
行列
固有ベクトル
4.
固有値の整数べき乗による時間発展と、それらの足し合わせから観測したベクトルを近似します。
Hankel Matrix (ハンケル行列)
1次元時系列データの DMD は各列を多次元状態空間として扱います。
前述のベクトル
Online DMD の解き方
問題設定 (Online DMD と重み付き Online DMD)
最小二乗問題
通常の DMD では決まった時間内での
新しいデータが加わり
重み付き最小二乗問題
新たなデータが取得できる時系列データでは直近の出来事が興味の対象になります。
前述の
オンライン更新アルゴリズム
行列 A の分解
前述のように 擬似逆行列
時系列データとして扱うため
重み付き最小二乗解の A と 重みが考慮された X, Y
この式を Frobenius Norm を使った式で表現するために、
重みがついていないものと同様に
k \rightarrow k+1 の更新
新たなデータが加わり
はじめに
最終的に
また、
NumPy 実装
以下の関数を持つクラス WeightedOnlineDmd を作ります。以降ではこの関数を実装します。
入力データを 1次元の実数列とします。
from numpy.typing import NDArray
class WeightedOnlineDmd:
def __init__(self, window_size: int, rho: float) -> None:
self._window_size = window_size # Hankel行列の行数
self._rho = rho # 重み係数 (0 < rho <= 1)
def set_initial_data(self, data_array: NDArray, low_rank_threshold: float) -> None:
"""初期データからA行列とP行列を計算."""
pass
def update(self, new_data: float) -> None:
"""新データを追加し A行列とP行列を更新."""
def reconstruct(self, valid_number: int, time_index: int, threshold: float) -> list[NDArray]:
"""現在の状態から各モードで信号を再構成."""
データ列 ←→ Hankel Matrix
1次元データ列を Hankel Matrix にし Online DMD を扱います。データ列を Hankel Matrix に変換, Hankel Matrix をデータ列に変換する関数を実装します。
import import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from numpy.typing import NDArray
def make_hankel_matrix(data: NDArray, n_rows: int) -> NDArray:
# 1つずつデータ列をずらしたものを得る numpy sliding_window_view 関数を使います
return sliding_window_view(data, len(data) - n_rows + 1)
def hankel_to_signal(hankel_mat: NDArray) -> NDArray:
# Hankel Matrix は右上, 左下の要素と同じデータ列のインデックスになります。登場回数を数え平均をとります。
n_rows, n_cols = hankel_mat.shape
row_mat = np.arange(n_rows)[:, None]
col_mat = np.arange(n_cols)[None, :]
indices = (row_mat + col_mat).ravel()
sums = np.bincount(indices, weights=hankel_mat.ravel().real, minlength=n_rows + n_cols - 1)
counts = np.bincount(indices, minlength=n_rows + n_cols - 1)
return sums / counts
初期データから行列 A, P の初期化
- 初期データ列から Hankel Matrix を作り, 行列
を求める\tilde{X}, \tilde{Y} - 行列
の算出P, Q, A
行列
def _hermitian(mat_data: NDArray) -> NDArray:
# 随伴行列 (複素共役と転置)
return np.conjugate(mat_data.T)
def _apply_svd(mat_data: NDArray) -> tuple[NDArray, NDArray, NDArray]:
# SVD を行う
u_mat, sigmas, vh_mat = np.linalg.svd(mat_data, full_matrices=False)
return u_mat, sigmas, vh_mat
def _lower_svd(
mat_u: NDArray, sigmas: NDArray, mat_vh: NDArray, rank: int = 0,
threshold: float = 0.95) -> tuple[NDArray, NDArray, NDArray]:
# SVD の結果に対して低ランク化する
## rank が指定されていればそれを返す
if rank >= 1:
return mat_u[:, :rank], sigmas[:rank], mat_vh[:rank, :]
## 累積寄与率から低ランクを判断する
else:
variance = sigmas ** 2
cumulated = np.cumsum(variance) / variance.sum()
_rank = np.searchsorted(cumulated, threshold, side='left') + 1
return mat_u[:, :_rank], sigmas[:_rank], mat_vh[:_rank, :]
def _predict_p_and_q_matrix(mat_x: NDArray, mat_y: NDArray, threshold: float) -> tuple[NDArray, NDArray]:
# Q = Y.(XT)
mat_q = mat_y @ mat_x.T
# SVD
svd_u, svd_sigmas, svd_vh = _apply_svd(mat_x)
# 低ランク近似
svd_u, svd_sigmas, _ = _lower_svd(
svd_u, svd_sigmas, svd_vh, rank=-1, threshold=threshold)
# P = U (S.S^T)^(-1) U.T
mat_p = svd_u @ np.diag(1. / (svd_sigmas**2)) @ _hermitian(svd_u)
return mat_p, mat_q
class WeightedOnlineDmd:
def set_initial_data(self, data_array: NDArray, low_rank_threshold: float) -> None:
"""初期データからA行列とP行列を計算."""
# Hankel Matrix
hankel_mat = make_hankel_matrix(data_array, self._window_size)
mat_x = hankel_mat[:, :-1]
mat_y = hankel_mat[:, 1:]
# rho^(k-i) をかける X, Y を求める
rhos = np.array([self._rho ** i for i in range(mat_x.shape[1])])[::-1]
mat_x = mat_x * rhos[None, :]
mat_y = mat_y * rhos[None, :]
# P, Q, A を求める
_mat_p, _mat_q = _predict_p_and_q_matrix(
mat_x, mat_y, low_rank_threshold)
self._mat_a = _mat_q @ _mat_p
self._mat_p = _mat_p / self._rho
# 行列Yの最終列を保存
self._last_array = mat_y[:, -1]
新データを追加し Online DMD の更新
行列
class WeightedOnlineDmd:
def update(self, new_data: float) -> None:
"""新データを追加し A行列とP行列を更新."""
# 過去データの最終列を取得
vector_x = self._last_array
vector_y = np.r_[self._last_array[1:], new_data]
# 係数更新
coeff = 1. / (1. + vector_x.T @ self._mat_p @ vector_x)
self._mat_a = self._mat_a + coeff * np.outer((vector_y - self._mat_a @ vector_x), vector_x) @ self._mat_p
self._mat_p = 1. / self._rho * (self._mat_p - coeff * self._mat_p @ np.outer(vector_x, vector_x) @ self._mat_p)
# 最終列を保存
self._last_array = vector_y
モード分解とデータ列の再構成
行列
def _valid_eigens(eigens: NDArray, threshold: float) -> int:
# 累積寄与率から閾値以下のインデックスを取得
values = eigens * eigens.conj()
cumulated = np.cumsum(values) / values.sum()
index = np.searchsorted(cumulated, threshold, side='left') + 1
return int(index)
class WeightedOnlineDmd:
def reconstruct(self, valid_number: int, time_index: int, threshold: float) -> list[NDArray]:
"""現在の状態から各モードで信号を再構成."""
# 固有値分解とベクトル b の算出
eigens, phi_mat = np.linalg.eig(self._mat_a)
bn = np.linalg.solve(_hermitian(phi_mat) @ phi_mat, _hermitian(phi_mat) @ self._last_array)
wave_list = []
# 入力 valid_number が 0 以下であれば累積寄与率から必要なモード数を求める
if valid_number <= 0:
valid_number = _valid_eigens(eigens, threshold)
# 再構成 (Hankel Matrix が作られる。これを 1次元に変換する)
# 出力は各モードの信号であり, 元データを再構成する場合はこれらの和を求める
window_size = phi_mat.shape[0]
for i in range(valid_number):
phi_i = phi_mat[:, [i]]
coeff = bn[i] * (1 / (eigens[i] ** np.arange(time_index - window_size + 1)[::-1]))
hankel_mat = phi_i @ coeff[None, :]
xs = hankel_to_signal(hankel_mat)
wave_list.append(xs)
return wave_list
動作例
擬似入力データを作り Online DMD を実行します。
入力データの 2割を初期データとし初期化し、それ以降のデータは 1サンプルずつ更新します。区間ごとに信号の再構成を行い周波数変化に対応できることを確認します。
入力データ: チャープ信号
処理結果 で示した周波数が時間変化するチャープ信号を入力データとします。
def generate_data(
t: NDArray, trend_slope: float = 0.05, f0: float = 2.0, f1: float = 3.0) -> NDArray:
k = (f1 - f0) / t[-1]
phase = 2 * np.pi * (f0 * t + 0.5 * k * t**2)
data = trend_slope * t + np.sin(phase)
return data
# 使用例
fs = 100 # sampling rate
duration = 10.0 # データ時間
times = np.linspace(0, duration, int(fs * duration), endpoint=False)
data = generate_data(times, trend_slope=0.05, f0=2.0, f1=3.0)
データ列を Online DMD に入力しデータ列の再構成を行う
Online DMD ではオンライン更新の前に初期の行列
はじめに、パラメータ設定と初期化用オンライン更新用へのデータ分離を行います。
# 設定
window_size = 40 # Hankel Matrix の行数
rho = 0.98 # 重み ρ
n_blocks = 6 # オンライン更新を行うデータを n_blocks に分けて処理する
first_data_ratio = 0.2 # 初期化に使うデータサイズの割合
# データ (前節の times, data) を初期化用, オンライン更新用に分ける
remain_size = int((1. - first_data_ratio) * len(data))
first_size = len(data) - remain_size
first_data = data[:first_size]
remain_data = data[first_size:]
# クラスのインスタンス化
dmd = WeightedOnlineDmd(window_size, rho=rho)
次に Online DMD の初期化 (行列
# モードごとのデータ列とその和を求める
def reconstruct(
dmd: WeightedOnlineDmd, length: int, threshold: float = 0.98) \
-> tuple[list[NDArray], NDArray]:
wave_list = dmd.reconstruct(valid_number=0, time_index=length, threshold=threshold)
reconstructed = np.sum(np.array(wave_list), axis=0)
return wave_list, reconstructed
# plot 用の保存
wave_keeper = []
reconstructed_list = []
# 初期化とこの時点での再構成
dmd.set_initial_data(first_data, low_rank_threshold=0.99)
_wave_list, _reconstructed = reconstruct(dmd, len(first_data))
wave_keeper.append(_wave_list)
reconstructed_list.append(_reconstructed)
# 残りデータを n_blocks 回に分けて処理する
one_block = remain_size // n_blocks
for i in range(n_blocks):
input_data = remain_data[i * one_block:(i + 1) * one_block]
for one_data in input_data:
dmd.update(float(one_data))
_wave_list, _reconstructed = reconstruct(dmd, len(input_data))
wave_keeper.append(_wave_list)
reconstructed_list.append(_reconstructed)
上記の結果をプロットします。1つ目のプロットはブロックごとにモード分解されたデータ列の和を求め、元のデータ列と近しくなるか確認します。
count = 0
last_time = times[0] # plot の繋ぎ目用
last_value = reconstructed_list[0][0] # plot の繋ぎ目用
plt.figure(figsize=(14, 6))
for i, one_block in enumerate(reconstructed_list):
sub_times = times[count:count + len(one_block)]
count += len(one_block)
# 各ブロックごとに色を付ける
plt.plot(
np.r_[last_time, sub_times], np.r_[last_value, one_block[:len(sub_times)]],
alpha=0.7, c=f'C{i}', label=f'Block {i}')
# for next
last_time = sub_times[-1]
last_value = one_block[len(sub_times)-1]
# 元データを薄い黒色でプロットする
plt.plot(times[:count], data[:count], alpha=0.5, c='black', label='original')
plt.legend()
plt.show()

次に各ブロックごとに和を求めずにモードそれぞれのデータ列をプロットします。今回の結果では 3つの主要モードに分解されました。1つはトレンドを意味するデータ列です。残り 2つは各ブロックで振動するデータ列を捉えています。2つの固有値は互いに共役であり、復元される実部は同一になります。このためプロットは 2種のみの波形が見られます。
count = 0
plt.figure(figsize=(14, 6))
for i, wave_list in enumerate(wave_keeper):
size = len(wave_list[0])
sub_times = times[count:count + size]
count += size
for one_wave in wave_list:
plt.plot(sub_times, one_wave[:len(sub_times)], alpha=0.7, c=f'C{i}')
plt.show()

まとめ、次に行うこと
1次元データ列を Hankel Matrix に変換し Online DMD を実行する機能を NumPy で実装しました。連続的に周波数が変わるデータに対して Online DMD を適用することで短期間において単純なモードに分解できることが分かりました。
こちらの記事 では特異値分解 (SVD) を使い DMD を行いますが、ここで説明した Online DMD は固有値固有ベクトルを直接求めています。今後は計算コストが低く、ノイズ耐性に強いと思われる SVD を使った Online DMD を実装したいと思います。(参考: Extended Online DMD and Weighted Modifications for Streaming Data Analysis)
参考文献
以下を参考にしました。
Discussion