🌀

Online DMD を NumPy で実装する

に公開

話すこと

1次元ストリーミングデータに対する Online Dynamic Mode Decomoposition (Online DMD) を NumPy で実装します。データ列を Hankel Matrix に変換したものを Online DMD に当てはめ、データ列の主要モードへの分解とそれらを使った再構成を行います。
重み付きの Online DMD を行い、過去データの影響を小さくします。

処理結果

周波数が時間変化する 1次元データに対して Online DMD を行い、主要モードから信号を再構成したものは以下です。複数回に分けて再構成し、それぞれを色分けしています。
通常のデータ全域に対する DMD では周波数が変化するデータ列の主要モード分解は難しいですが、周波数の変化がある場合も再構成ができていることが分かります。

\begin{align*} y &= at + \sin(2\pi (f_0 t + 0.5 k t^2)) \\ k &= (f_1 - f_0) / T \\ (a &= 0.05, \ f_0 = 2.0, \ f_1 = 3.0, \ T = 10.0) \end{align*}

チャープ信号の Online DMD の信号再構成

話さないこと

  • DMD の理論の詳細
  • 特異値分解を使った Online DMD

はじめに: Online DMD のモチベーション

通常の DMD (Dynamic Mode Decomposition) では、データ全体が事前に準備されている必要があります。しかし現実の信号処理ではセンサーデータが時系列でストリーミングされる場面が多くあります。
Online DMD は新しいデータごとに既存結果を更新する手法のため、全データを再計算する必要がなくリアルタイム処理に適しています。

重み付き Online DMD の利点

重み付き Online DMD では過去データに対して指数減衰の重みをかけ時間変化する信号特性に適応します。(例: 周波数が時間とともに変化するチャープ信号)
重み係数 \rho (0 < \rho ≤ 1) により、古いデータの影響を小さくし最近の信号の影響を大きくします。

準備: DMD と Hankel Matrix

DMD の仕組み

DMD はシステムの時間発展を線形演算子で近似し、時系列データのモードを抽出します。

基本的な流れ

  1. データ行列 X, Y を構築 (YX を1ステップ進めたもの)
  2. 線形関係 Y \simeq AX を満たす行列 A を推定
  3. A の固有値, 固有ベクトルから動的モードを抽出
  4. 各モードの時間発展から信号を再構成

1.
時刻 k で観測した状態ベクトル \bm{x}_k \in \mathbb{R}^m を列方向に並べ行列を定義します。先頭と末尾を 1つずらし現在と 1ステップ後の 2つの行列を作ります。

\begin{align*} X &= (\bm{x}_0, \bm{x}_1, \cdots, \bm{x}_{m-1} ) \\ Y &= (\bm{x}_1, \bm{x}_2, \cdots, \bm{x}_{m} ) \\ &= (\bm{y}_0, \bm{y}_1, \cdots, \bm{y}_{m-1} ) \end{align*}

(X, Y の要素を区別するために \bm{y} を使っています)

2.
次式を最小化する問題を解きます。

\begin{align*} J &= \sum_{i} \|\bm{y}_i - A\bm{x}_i\|^2 \\ &= \| Y - AX \|^2_{\text{F}} \ \ (\| \cdot \|_{\text{F}}: \ \text{Frobenius Norm}) \end{align*}

X^{+} を行列 X に関する擬似逆行列として、 A = YX^{+} となります。

3.
行列 Ai 番目の固有値, 固有ベクトルを \lambda_i, \phi_i を求めます。
固有ベクトル \phi_i が DMD モードと呼ばれます。固有値 \lambda_i を対応する成長(増大, 減衰), 振動数と解釈できます。
|\lambda_i| > 1 であれば増大, |\lambda_i|<1 ならば減衰, \arg(\lambda_i) から振動周波数 f_i=\dfrac{\arg(\lambda_i)}{2\pi\Delta t} が得られます。

4.
固有値の整数べき乗による時間発展と、それらの足し合わせから観測したベクトルを近似します。

\begin{align*} \bm{x}_k &\simeq \sum_{i=0}^{r-1} b_i \bm{\phi}_i \lambda_{i}^{k-1} \\ \bm{b} &= \Phi^{+}\bm{x}_1 = (b_0, b_1, \cdots)^T \\ \Phi &=[\bm{\phi}_0\,\dots\,\bm{\phi}_{r-1}] \end{align*}

Hankel Matrix (ハンケル行列)

N 個の 1次元時系列データ (x_0, x_1, x_2, \cdots, x_{n-1})^T を DMD で扱うために Hankel Matrix に変換します。 以下は窓サイズ m の Hankel Matrix の例です。サイズ m x (n-m+1) の行列になります。列が進むたびに時間が進んでいます。
1次元時系列データの DMD は各列を多次元状態空間として扱います。

\begin{pmatrix} x_0 & x_1 & x_2 & \dots & x_{\,n-m} \\ x_1 & x_2 & x_3 & \dots & x_{\,n-m+1} \\ x_2 & x_3 & x_4 & \dots & x_{\,n-m+2} \\ \vdots & \vdots & \vdots & \ddots & \vdots \\ x_{m-1} & x_{m} & x_{m+1} & \dots & x_{\,n-1} \end{pmatrix}

前述のベクトル \bm{x}_k を例にとると、\bm{x}_0 = (x_0, x_1, \cdots, x_{m-1})^T, \bm{x}_1 = (x_1, x_2, \cdots, x_{m})^T になります。

Online DMD の解き方

問題設定 (Online DMD と重み付き Online DMD)

最小二乗問題

通常の DMD では決まった時間内での X, Y を使いますが、Online DMD では次々新しいデータが入ってくるため X, Y の列数が増えていきます。そこで X, Y の列数を k としたときの行列 A を使った最小化する問題を次式で表します。このときの行列 X, Y, AX_k, Y_k, A_k とします。

\begin{align*} J_k &= \sum_{i}^{k} \|\bm{y}_i - A_{k}\bm{x}_i\|^2 \\ &= \| Y_k - A_{k}X_{k} \|^2_{\text{F}} \\ X_k &= (\bm{x}_0, \bm{x}_1, \cdots, \bm{x}_{k-1}) \\ Y_k &= (\bm{x}_1, \bm{x}_2, \cdots, \bm{x}_{k}) \\ &= (\bm{y}_0, \bm{y}_1, \cdots, \bm{y}_{k-1}) \end{align*}

新しいデータが加わり k \rightarrow k+1 \rightarrow \cdots となり、J_k を最小化する A_k を次々求めることが Online DMD になります。

重み付き最小二乗問題

新たなデータが取得できる時系列データでは直近の出来事が興味の対象になります。
前述の J_k は各列は同じ重み付けがされています。そこで過去のデータほど J が小さくなるように指数的な重みを設けます。重みのパラメータ \rho (0 < \rho \leq 1) とし J_k を再定義します。

\begin{align*} J_k &= \sum_{i}^{k} \rho^{k-i} \|\bm{y}_i - A_{k}\bm{x}_i\|^2 \end{align*}

オンライン更新アルゴリズム

行列 A の分解

前述のように 擬似逆行列 X^{+} を使い A = YX^{+} となります。X^{+} を展開し新たに行列 P, Q を用いて A を表します。

\begin{align*} A &= YX^{+} \\ &= Y X^T (X X^T)^{-1} \\ &= Q P \\ P &= (X X^T)^{-1}, \ Q = Y X^T \end{align*}

時系列データとして扱うため X, Y の列サイズを k とし A, P, Q に添え字 k をつけます。

A_k = Q_k P_k

重み付き最小二乗解の A と 重みが考慮された X, Y

J_k\sigma^2 = \rho を用いて表します。

J_k = \sum_{i}^{k} \| \sigma^{k-i} \bm{y}_i - \sigma^{k-i} A_{k}\bm{x}_i\|^2

この式を Frobenius Norm を使った式で表現するために、\tilde{X}, \tilde{Y} を次のように定義します。

\begin{align*} \tilde{X_k} &= (\sigma^{k-1} \bm{x}_0, \sigma^{k-2} \bm{x}_1, \cdots, \sigma \bm{x}_{k-2}, \bm{x}_{k-1}) \\ \tilde{Y_k} &= (\sigma^{k-1} \bm{y}_0, \sigma^{k-2} \bm{y}_1, \cdots, \sigma \bm{y}_{k-2}, \bm{y}_{k-1}) \\ J_k &= \| \tilde{Y_k} - A_k \tilde{X_k}\|^2_{\text{F}} \end{align*}

重みがついていないものと同様に A_k = Q_k P_k と表せます。

k \rightarrow k+1 の更新

新たなデータが加わり P_{k+1}, Q_{k+1}, A_{k+1} が更新されます。漸化式のように過去データを使いこれらを求めることでオンライン更新ができます。
はじめに \tilde{X}, \tilde{Y} に関して更新式を求め、次に P, Q, A の更新式を求めます。

\begin{align*} \tilde{X}_{k+1} &= (\sigma^{k} \bm{x}_0\ \ \sigma^{k-1} \bm{x}_1\ \ \cdots\ \ \sigma \bm{x}_{k-1}\ \ \bm{x}_{k}) \\ &= (\sigma \tilde{X}_{k}\ \ \bm{x}_k) \\ \tilde{Y}_{k+1} &= (\sigma^{k} \bm{y}_0\ \ \sigma^{k-1} \bm{y}_1\ \ \cdots\ \ \sigma \bm{y}_{k-1}\ \ \bm{y}_{k}) \\ &= (\sigma \tilde{Y}_{k}\ \ \bm{y}_k) \end{align*}

Q_{k+1}, P_{k+1}^{-1} は次のように更新されます。

\begin{align*} Q_{k+1} &= \tilde{Y}_{k+1} \tilde{X}_{k+1}^T \\ &= (\sigma \tilde{Y}_{k}\ \ \bm{y}_k) (\sigma \tilde{X}_k\ \ \bm{x}_{k})^T \\ &= \sigma^2 \tilde{Y}_{k} \tilde{X}_{k} + \bm{y}_k \bm{x}_{k}^T \\ &= \rho Q_{k} + \bm{y}_k \bm{x}_{k}^T \\ P_{k+1}^{-1} &= \tilde{X}_{k+1} \tilde{X}_{k+1}^T \\ &= (\sigma \tilde{X}_{k}\ \ \bm{x}_k) (\sigma \tilde{X}_k\ \ \bm{x}_{k})^T \\ &= \sigma^2 \tilde{X}_{k} \tilde{X}_{k} + \bm{x}_k \bm{x}_{k}^T \\ &= \rho P_{k}^{-1} + \bm{x}_k \bm{x}_{k}^T \\ P_{k+1} &= (\rho P_{k}^{-1} + \bm{x}_k \bm{x}_{k}^T)^{-1} \end{align*}

P_{k+1} の更新には 次式 Sherman–Morrison formula を使います (Wikipedia: Sherman–Morrison formula)。

(B + \bm{u}\bm{v}^T)^{-1} = B^{-1} - \frac{B^{-1} \bm{u} \bm{v}^T B^{-1}}{1 + \bm{v}^T B^{-1} \bm{u}}

最終的に A_{k+1}, P_{k+1} を更新すればよいことがわかります。(導出は Online DMD の論文 を参考にしてください)
また、\hat{P}_k = P_k / \rho と定義します。

\begin{align*} \hat{P}_{k+1} &= \frac{1}{\rho} \left( \hat{P}_k - \gamma_{k+1} \hat{P}_k \bm{x}_k \bm{x}_k ^T \hat{P}_k \right) \\ \gamma_{k+1} &= \frac{1}{1 + \bm{x}_{k}^T \hat{P}_k \bm{x}_k} \\ A_{k+1} &= A_{k} + \gamma_{k+1} (\bm{y}_k - A_k \bm{x}_k) \bm{x}_{k}^T \hat{P}_k \\ \end{align*}

NumPy 実装

以下の関数を持つクラス WeightedOnlineDmd を作ります。以降ではこの関数を実装します。
入力データを 1次元の実数列とします。

from numpy.typing import NDArray

class WeightedOnlineDmd:
    def __init__(self, window_size: int, rho: float) -> None:
        self._window_size = window_size  # Hankel行列の行数
        self._rho = rho  # 重み係数 (0 < rho <= 1)

    def set_initial_data(self, data_array: NDArray, low_rank_threshold: float) -> None:
        """初期データからA行列とP行列を計算."""
        pass

    def update(self, new_data: float) -> None:
        """新データを追加し A行列とP行列を更新."""

    def reconstruct(self, valid_number: int, time_index: int, threshold: float) -> list[NDArray]:
        """現在の状態から各モードで信号を再構成."""

データ列 ←→ Hankel Matrix

1次元データ列を Hankel Matrix にし Online DMD を扱います。データ列を Hankel Matrix に変換, Hankel Matrix をデータ列に変換する関数を実装します。

import import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from numpy.typing import NDArray

def make_hankel_matrix(data: NDArray, n_rows: int) -> NDArray:
    # 1つずつデータ列をずらしたものを得る numpy sliding_window_view 関数を使います
    return sliding_window_view(data, len(data) - n_rows + 1)

def hankel_to_signal(hankel_mat: NDArray) -> NDArray:
    # Hankel Matrix は右上, 左下の要素と同じデータ列のインデックスになります。登場回数を数え平均をとります。
    n_rows, n_cols = hankel_mat.shape
    row_mat = np.arange(n_rows)[:, None]
    col_mat = np.arange(n_cols)[None, :]
    indices = (row_mat + col_mat).ravel()
    sums   = np.bincount(indices, weights=hankel_mat.ravel().real, minlength=n_rows + n_cols - 1)
    counts = np.bincount(indices, minlength=n_rows + n_cols - 1)
    return sums / counts

初期データから行列 A, P の初期化

  1. 初期データ列から Hankel Matrix を作り, 行列 \tilde{X}, \tilde{Y} を求める
  2. 行列 P, Q, A の算出

行列 P の初期値は X の特異値分解 (SVD) から求めます。

\begin{align*} X &= U \Sigma V^{*} \ \ (V^{*}: V \text{の随伴行列}) \\ X X^T &= (U \Sigma V^{*}) (U \Sigma V^{*})^T \\ &= U (\Sigma \Sigma^T) U^T \\ P &= (X X^T)^{-1}\\ &= U (\Sigma \Sigma^T)^{-1} U^T \\ (\Sigma \Sigma^T)^{-1} &= \text{diag}(\sigma_0 ^{-2}, \sigma_1^{-2}, \cdots) \end{align*}
def _hermitian(mat_data: NDArray) -> NDArray:
    # 随伴行列 (複素共役と転置)
    return np.conjugate(mat_data.T)

def _apply_svd(mat_data: NDArray) -> tuple[NDArray, NDArray, NDArray]:
    # SVD を行う
    u_mat, sigmas, vh_mat = np.linalg.svd(mat_data, full_matrices=False)
    return u_mat, sigmas, vh_mat

def _lower_svd(
        mat_u: NDArray, sigmas: NDArray, mat_vh: NDArray, rank: int = 0,
        threshold: float = 0.95) -> tuple[NDArray, NDArray, NDArray]:
    # SVD の結果に対して低ランク化する
    ## rank が指定されていればそれを返す
    if rank >= 1:
        return mat_u[:, :rank], sigmas[:rank], mat_vh[:rank, :]
    ## 累積寄与率から低ランクを判断する
    else:
        variance = sigmas ** 2
        cumulated = np.cumsum(variance) / variance.sum()
        _rank = np.searchsorted(cumulated, threshold, side='left') + 1
        return mat_u[:, :_rank], sigmas[:_rank], mat_vh[:_rank, :]

def _predict_p_and_q_matrix(mat_x: NDArray, mat_y: NDArray, threshold: float) -> tuple[NDArray, NDArray]:
    # Q = Y.(XT)
    mat_q = mat_y @ mat_x.T
    # SVD
    svd_u, svd_sigmas, svd_vh = _apply_svd(mat_x)
    # 低ランク近似
    svd_u, svd_sigmas, _ = _lower_svd(
        svd_u, svd_sigmas, svd_vh, rank=-1, threshold=threshold)
    # P = U (S.S^T)^(-1) U.T
    mat_p = svd_u @ np.diag(1. / (svd_sigmas**2)) @ _hermitian(svd_u)
    return mat_p, mat_q

class WeightedOnlineDmd:
    def set_initial_data(self, data_array: NDArray, low_rank_threshold: float) -> None:
        """初期データからA行列とP行列を計算."""
        # Hankel Matrix
        hankel_mat = make_hankel_matrix(data_array, self._window_size)
        mat_x = hankel_mat[:, :-1]
        mat_y = hankel_mat[:, 1:]
        # rho^(k-i) をかける X, Y を求める
        rhos = np.array([self._rho ** i for i in range(mat_x.shape[1])])[::-1]
        mat_x = mat_x * rhos[None, :]
        mat_y = mat_y * rhos[None, :]
        # P, Q, A を求める
        _mat_p, _mat_q = _predict_p_and_q_matrix(
            mat_x, mat_y, low_rank_threshold)
        self._mat_a = _mat_q @ _mat_p
        self._mat_p = _mat_p / self._rho
        # 行列Yの最終列を保存
        self._last_array = mat_y[:, -1]

新データを追加し Online DMD の更新

行列 P, A を更新します。(下記式は再掲)

\begin{align*} \hat{P}_{k+1} &= \frac{1}{\rho} \left( \hat{P}_k - \gamma_{k+1} \hat{P}_k \bm{x}_k \bm{x}_k ^T \hat{P}_k \right) \\ \gamma_{k+1} &= \frac{1}{1 + \bm{x}_{k}^T \hat{P}_k \bm{x}_k} \\ A_{k+1} &= A_{k} + \gamma_{k+1} (\bm{y}_k - A_k \bm{x}_k) \bm{x}_{k}^T \hat{P}_k \\ \end{align*}
class WeightedOnlineDmd:
    def update(self, new_data: float) -> None:
        """新データを追加し A行列とP行列を更新."""
        # 過去データの最終列を取得
        vector_x = self._last_array
        vector_y = np.r_[self._last_array[1:], new_data]
        # 係数更新
        coeff = 1. / (1. + vector_x.T @ self._mat_p @ vector_x)
        self._mat_a = self._mat_a + coeff * np.outer((vector_y - self._mat_a @ vector_x), vector_x) @ self._mat_p
        self._mat_p = 1. / self._rho * (self._mat_p - coeff * self._mat_p @ np.outer(vector_x, vector_x) @ self._mat_p)
        # 最終列を保存
        self._last_array = vector_y

モード分解とデータ列の再構成

行列 A の固有値を \lambda_i, 固有ベクトルを \bm{\phi}_i とし、分解とそれぞれのデータ列を復元します。(下記式は再掲)

\begin{align*} \bm{x}_k &\simeq \sum_{i=0}^{r-1} b_i \bm{\phi}_i \lambda_{i}^{k-1} \\ \bm{b} &= \Phi^{+}\bm{x}_1 = (b_0, b_1, \cdots)^T \\ \Phi &=[\bm{\phi}_0\,\dots\,\bm{\phi}_{r-1}] \end{align*}
def _valid_eigens(eigens: NDArray, threshold: float) -> int:
    # 累積寄与率から閾値以下のインデックスを取得
    values = eigens * eigens.conj()
    cumulated = np.cumsum(values) / values.sum()
    index = np.searchsorted(cumulated, threshold, side='left') + 1
    return int(index)

class WeightedOnlineDmd:
    def reconstruct(self, valid_number: int, time_index: int, threshold: float) -> list[NDArray]:
        """現在の状態から各モードで信号を再構成."""
        # 固有値分解とベクトル b の算出
        eigens, phi_mat = np.linalg.eig(self._mat_a)
        bn = np.linalg.solve(_hermitian(phi_mat) @ phi_mat, _hermitian(phi_mat) @ self._last_array)
        wave_list = []
        # 入力 valid_number が 0 以下であれば累積寄与率から必要なモード数を求める
        if valid_number <= 0:
            valid_number = _valid_eigens(eigens, threshold)

        # 再構成 (Hankel Matrix が作られる。これを 1次元に変換する)
        # 出力は各モードの信号であり, 元データを再構成する場合はこれらの和を求める
        window_size = phi_mat.shape[0]
        for i in range(valid_number):
            phi_i = phi_mat[:, [i]]
            coeff = bn[i] * (1 / (eigens[i] ** np.arange(time_index - window_size + 1)[::-1]))
            hankel_mat = phi_i @ coeff[None, :]
            xs = hankel_to_signal(hankel_mat)
            wave_list.append(xs)
        return wave_list

動作例

擬似入力データを作り Online DMD を実行します。
入力データの 2割を初期データとし初期化し、それ以降のデータは 1サンプルずつ更新します。区間ごとに信号の再構成を行い周波数変化に対応できることを確認します。

入力データ: チャープ信号

処理結果 で示した周波数が時間変化するチャープ信号を入力データとします。

\begin{align*} y &= at + \sin(2\pi (f_0 t + 0.5 k t^2)) \\ k &= (f_1 - f_0) / T \end{align*}
def generate_data(
        t: NDArray, trend_slope: float = 0.05, f0: float = 2.0, f1: float = 3.0) -> NDArray:
    k = (f1 - f0) / t[-1]
    phase = 2 * np.pi * (f0 * t + 0.5 * k * t**2)
    data = trend_slope * t + np.sin(phase)
    return data

# 使用例
fs = 100  # sampling rate
duration = 10.0  # データ時間
times = np.linspace(0, duration, int(fs * duration), endpoint=False)
data = generate_data(times, trend_slope=0.05, f0=2.0, f1=3.0)

データ列を Online DMD に入力しデータ列の再構成を行う

Online DMD ではオンライン更新の前に初期の行列 P, A を求める必要があります。初めにこの初期化を行い、以降は 1サンプルずつオンライン更新を行います。ただし、初期化以降のデータを複数に分けそれぞれのブロック内の再構成を行うことで効果を確認します。

はじめに、パラメータ設定と初期化用オンライン更新用へのデータ分離を行います。

# 設定
window_size = 40  # Hankel Matrix の行数
rho = 0.98  # 重み ρ
n_blocks = 6  # オンライン更新を行うデータを n_blocks に分けて処理する
first_data_ratio = 0.2  # 初期化に使うデータサイズの割合

# データ (前節の times, data) を初期化用, オンライン更新用に分ける
remain_size = int((1. - first_data_ratio) * len(data))
first_size = len(data) - remain_size
first_data = data[:first_size]
remain_data = data[first_size:]

# クラスのインスタンス化
dmd = WeightedOnlineDmd(window_size, rho=rho)

次に Online DMD の初期化 (行列 P, A) と残データに対して更新を行います。各ブロックごとにデータのモード分解したデータ列と全体の再構成結果を求めます。

# モードごとのデータ列とその和を求める
def reconstruct(
        dmd: WeightedOnlineDmd, length: int, threshold: float = 0.98) \
            -> tuple[list[NDArray], NDArray]:
    wave_list = dmd.reconstruct(valid_number=0, time_index=length, threshold=threshold)
    reconstructed = np.sum(np.array(wave_list), axis=0)
    return wave_list, reconstructed

# plot 用の保存
wave_keeper = []
reconstructed_list = []

# 初期化とこの時点での再構成
dmd.set_initial_data(first_data, low_rank_threshold=0.99)
_wave_list, _reconstructed = reconstruct(dmd, len(first_data))
wave_keeper.append(_wave_list)
reconstructed_list.append(_reconstructed)

# 残りデータを n_blocks 回に分けて処理する
one_block = remain_size // n_blocks
for i in range(n_blocks):
    input_data = remain_data[i * one_block:(i + 1) * one_block]
    for one_data in input_data:
        dmd.update(float(one_data))
    _wave_list, _reconstructed = reconstruct(dmd, len(input_data))
    wave_keeper.append(_wave_list)
    reconstructed_list.append(_reconstructed)

上記の結果をプロットします。1つ目のプロットはブロックごとにモード分解されたデータ列の和を求め、元のデータ列と近しくなるか確認します。

count = 0
last_time = times[0]  # plot の繋ぎ目用
last_value = reconstructed_list[0][0]  # plot の繋ぎ目用

plt.figure(figsize=(14, 6))
for i, one_block in enumerate(reconstructed_list):
    sub_times = times[count:count + len(one_block)]
    count += len(one_block)
    # 各ブロックごとに色を付ける
    plt.plot(
        np.r_[last_time, sub_times], np.r_[last_value, one_block[:len(sub_times)]],
        alpha=0.7, c=f'C{i}', label=f'Block {i}')
    # for next
    last_time = sub_times[-1]
    last_value = one_block[len(sub_times)-1]

# 元データを薄い黒色でプロットする
plt.plot(times[:count], data[:count], alpha=0.5, c='black', label='original')
plt.legend()
plt.show()

チャープ信号の Online DMD の信号再構成

次に各ブロックごとに和を求めずにモードそれぞれのデータ列をプロットします。今回の結果では 3つの主要モードに分解されました。1つはトレンドを意味するデータ列です。残り 2つは各ブロックで振動するデータ列を捉えています。2つの固有値は互いに共役であり、復元される実部は同一になります。このためプロットは 2種のみの波形が見られます。

count = 0
plt.figure(figsize=(14, 6))
for i, wave_list in enumerate(wave_keeper):
    size = len(wave_list[0])
    sub_times = times[count:count + size]
    count += size
    for one_wave in wave_list:
        plt.plot(sub_times, one_wave[:len(sub_times)], alpha=0.7, c=f'C{i}')
plt.show()

Online DMD の各モード

まとめ、次に行うこと

1次元データ列を Hankel Matrix に変換し Online DMD を実行する機能を NumPy で実装しました。連続的に周波数が変わるデータに対して Online DMD を適用することで短期間において単純なモードに分解できることが分かりました。
こちらの記事 では特異値分解 (SVD) を使い DMD を行いますが、ここで説明した Online DMD は固有値固有ベクトルを直接求めています。今後は計算コストが低く、ノイズ耐性に強いと思われる SVD を使った Online DMD を実装したいと思います。(参考: Extended Online DMD and Weighted Modifications for Streaming Data Analysis)

参考文献

以下を参考にしました。

Discussion