🌀

特異値分解 (SVD) を利用した Online DMD を NumPy で実装する

に公開

話すこと

Dynamic Mode Decomposition では時系列データを複数のモードの組み合わせとして表現します。主成分分析 (PCA) のように主要な信号を抽出できます。
1次元ストリーミングデータに対する Online Dynamic Mode Decomposition (Online DMD) を NumPy で実装します。特異値分解 (SVD) をオンライン処理として使い、低ランク近似によるノイズに対するロバスト性がある Online DMD を得ます。

データ列を Hankel Matrix に変換したものを Online DMD に当てはめ、データ列の主要モードに分解します。重み付きの Online DMD を行い、過去データの影響を小さくします。
以下の過去記事に SVD を使わない Online DMD を記載しています。
記事: Online DMD を NumPy で実装する

話さないこと

  • Dynamic Mode Decomposition (DMD) の数学的な話

試したこと

周波数と振幅が単調変化する時系列データに対して OnlineDMD を行い、時刻ごとの主となる周波数を推定します。

input chirp signal

はじめに: 特異値分解 (SVD) を使用する Online DMD のモチベーション

特異値分解を使用する Online DMD のモチベーション

Online DMD はセンサデータのような時系列で変化するものに使われます。全体ではなく直近の分析に活用できます。
低ランク近似した行列を更新することによりノイズによる影響を抑え、元の行列を求める固有値計算量の低下にもつながります。

DMD では Y=AX となるような行列 A を求め、固有値・固有ベクトルでモードを求めます。
時刻 t で観測した状態ベクトル \bm{x}_t \in \mathbb{R}^m を列方向に k 個並べ行列を定義します。先頭と末尾を 1つずらし現在と 1ステップ後の 2つの行列を作ります。

\begin{align*} X &= (\bm{x}_1, \bm{x}_2, \cdots, \bm{x}_{k} ) \\ Y &= (\bm{x}_2, \bm{x}_3, \cdots, \bm{x}_{k+1} ) \\ &= (\bm{y}_1, \bm{y}_2, \cdots, \bm{y}_{k} ) \end{align*}

この行列 A を低ランク近似した \~{A} を求めます。A, \~{A} の固有値は共通であり、\~{A} の固有ベクトルから A の固有ベクトルを算出できます。これによりサイズが小さい \~{A} でモード分解ができます。

SVD を使った Online DMD の解き方

解き方: X の特異値分解と低ランク近似した行列 \~{A}

X を特異値分解し X=U\Sigma V^T とします。低ランク近似を考え、U は行の数より列の数の方が小さいとします。行列 A に左から U^T, 右から U をかけることで元のサイズより小さい低ランク近似した \~{A} を得ます。
ランク r まで有効にした時、U, \Sigma, V のサイズはそれぞれ m \times r, r \times r, k \times r になります。\~{A} のサイズは r \times r です。

\~{A} = U^T A U

Y=AX の最小二乗解 (\| Y - AX \|^2_{\text{F}} \ \ (\| \cdot \|_{\text{F}}: \ \text{Frobenius Norm}) を最小にする A) は疑似逆行列 X^{+} を使い A=YX^{+} となります。X=U\Sigma V^T より以下を得ます。

\begin{align*} A &= YX^{+} \\ &= Y(U\Sigma V^T)^{+} \\ &= Y V \Sigma^{-1} U^T \\ \~{A} &= U^T A U \\ &= U^T Y V \Sigma^{-1} U^T U \\ &= U^T Y V \Sigma^{-1} \\ &= H \Sigma^{-1} \,\, (H = U^T Y V) \end{align*}

以降では時系列で増えていくデータに対して H\Sigma を更新していきます。

更新アルゴリズム

X の更新

時系列データとしての XX_k = \left(\bm{x}_1, \bm{x}_2, \cdots \bm{x}_k\right) とし X_{k+1} = \left(X_k, \bm{x}_{k+1}\right)X_k の特異値分解 U_k, \Sigma_k, V_k を使い表します。

以下の3つのベクトル (\bm{p}, \bm{r}, \bm{q}) について考えます。

  1. \bm{x}_{k+1}U_k の張る部分空間で表したものを \bm{p} = U_k^T \bm{x}_{k+1} とします
  2. 残差ベクトル \bm{r}\bm{x}_{k+1}U_k に含まれない成分として \bm{r} = \bm{x}_{k+1} - U_k \bm{p} とします
  3. 大きさを 1 にした残差ベクトル \bm{q} = \frac{r}{\gamma} \, (\gamma = \|\bm{r}\|) とします

X_{k+1} = \left(X_k, \bm{x}_{k+1}\right) は (U_k, \Sigma_k, V_k) 使った次の表現ができます。

\begin{align*} (U_k, \ \bm{q}) \begin{pmatrix} \Sigma_k & \bm{p}\\ \bm{0}^T & \gamma \end{pmatrix} \begin{pmatrix} V_k & \bm{0} \\ \bm{0}^T & 1\end{pmatrix}^T &= (U_k \Sigma_k, \ U_k \bm{p} + \gamma \bm{q}) \begin{pmatrix} V_k & \bm{0} \\ \bm{0}^T & 1\end{pmatrix}^T \\ &= (U_k \Sigma_k V_k^T, \ U_k \bm{p} + \gamma \bm{q}) \\ &= (X_k, \ U_k U_k^T \bm{x}_{k+1} + \bm{x}_{k+1} - U_k \bm{p}) \\ &= (X_k, \bm{x}_{k+1}) \\ &= X_{k+1} \end{align*}

行列 K = \begin{pmatrix} \Sigma_k & \bm{p}\\ \bm{0}^T & \gamma \end{pmatrix} を特異値分解により \~{U} \~{\Sigma} \~{V}^T とし、 X_{k+1} を求めます。

\begin{align*} X_{k+1} &= (U_k, \ \bm{q}) \~{U} \~{\Sigma} \~{V}^T \begin{pmatrix} V_k & \bm{0} \\ \bm{0}^T & 1\end{pmatrix}^T \\ &= \begin{pmatrix} (U_k, \ \bm{q}) \~{U}\end{pmatrix} \~{\Sigma} \begin{pmatrix} \begin{pmatrix} V_k &\bm{0}\\ \bm{0} & 1 \end{pmatrix} \~{V}^T\end{pmatrix}^T \\ &= U_{k+1} \Sigma_{k+1} V_{k+1}^T \end{align*}

上記のように X_{k+1} が 行列 \begin{pmatrix} \Sigma_k & \bm{p}\\ \bm{0}^T & \gamma \end{pmatrix} の特異値分解による更新で求まります。

行列のサイズ

X_k を特異値分解した行列 U_k, \Sigma_k, V_k のサイズは m \times r, r \times r, k \times r であり、行列 K の特異値分解 \~{U}, \~{\Sigma}, \~{V}^T のサイズは (r+1) \times (r+1) です。更新された U_{k+1} \Sigma_{k+1} V_{k+1} はそれぞれ m \times (r+1), (r+1) \times (r+1), k \times (r+1) の大きさの行列になります。

H の更新

H_{k+1} = U_{k+1}^T Y_{k+1} V_{k+1}H_k を使い更新します。

\begin{align*} H_{k+1} &= \~{U}^T \begin{pmatrix} U_k^T\\ \bm{q}^T \end{pmatrix} \left(Y_k, \bm{y}_{k+1}\right) \begin{pmatrix} V_k & \bm{0}\\ \bm{0}^T & 1 \end{pmatrix} \~{V} \\ &= \~{U}^T \begin{pmatrix} U_k^T Y_k V_k & U_k^T \bm{y}_{k+1} \\ \bm{q}^T Y_k V_k & \bm{q}^T \bm{y}_{k+1} \end{pmatrix} \~{V} \\ &= \~{U}^T \begin{pmatrix} H_k & U_k^T \bm{y}_{k+1} \\ \bm{q}^T Y_k V_k & \bm{q}^T \bm{y}_{k+1} \end{pmatrix} \~{V} \end{align*}

いいかげんな表現になりますが Y_k \simeq X_k であれば \bm{q}^T Y_k V_k \simeq \bm{0} と近似できます (残差ベクトル \bm{r} は行列 U の列空間と直交します)。そのため H_{k+1} の更新式を以下のように表現します。

H_{k+1} = \~{U}^T \begin{pmatrix} H_k & U_k^T \bm{y}_{k+1} \\ \bm{0}^T & \bm{q}^T \bm{y}_{k+1} \end{pmatrix} \~{V}

H_{k+1} を更新する際には行列 H_k, U_k を保存しておき、行列 \~{U}, \~{V} は行列 K の特異値分解で毎回計算します。データ列の特異値分解の行列 V_k の列数がデータ数になりますが、これを保存する必要はありません。

\~{A} の更新

更新された H_{k+1}, \Sigma_{k+1} より \~{A} = H_{k+1} \Sigma_{k+1}^{-1} になります。

重み付き (忘却) について

入力データは \bm{x}_1, \bm{x}_2, \cdots と増えていき行列 X, Y も列方向に大きくなります。上記では全データが同様に扱われますが、時系列データを次々解釈したいときは直近に価値があり、過去データほど比重が小さくなります (過去を忘れていきます)。
そこで重み係数 (忘却係数) として \lambda を使い時間ごとの重みを変えます。

\~{X}_k = (\lambda^{k-1} \bm{x}_1, \cdots, \lambda^{k-i} \bm{x}_i, \cdots, \lambda \bm{x}_{k-1}, \bm{x}_{k})

これより \~{X}_{k+1} = (\lambda \~{X}_k, \bm{x}_{k+1}) が成り立ちます。

\~{X}_{k+1} の更新式は X_{k+1} と同様に行うと、U, V の更新は変わりませんが \Sigma_{k+1} = \lambda \~{\Sigma} となります。
先ほどの手続きで H_{k+1} = U_{k+1}^T \~Y_{k+1} V_{k+1} となり \~{Y}_{k+1} = (\lambda \~{Y}_k, \bm{y}_{k+1}) から以下の更新式を得ます。

H_{k+1} = \~{U}^T \begin{pmatrix} \lambda H_k & U_k^T \bm{y}_{k+1} \\ \bm{0}^T & \bm{q}^T \bm{y}_{k+1} \end{pmatrix} \~{V}

NumPy 実装

以下の関数を持つクラス OnlineDMD を作ります。以降ではこの関数を実装します。入力データを 1次元の実数列とします。

class OnlineDMD:
    def __init__(
            self, n_dim: int, r_max: int = 10, lambda_: float = 1.0, tau_add: float = 1e-2, tau_rel: float = 1e-3, tau_energy: float = 0.99) -> None:
        self.n = int(n_dim)  # 入力ベクトルの次元数
        self.r_max = int(r_max)  # 低ランク近似の最大ランク数
        self.lambda_ = float(lambda_)  # 忘却係数
        self.tau_add = float(tau_add)  # ランクを増やすかどうかの閾値
        self.tau_rel = float(tau_rel)  # ランク削減の相対特異値閾値
        self.tau_energy = float(tau_energy)  # ランク削減の累積エネルギー閾値

        # 保持するのは U, S, H のみ
        self.U = np.empty(0)  # (n, r)
        self.S = np.empty(0)  # (r,) 特異値
        self.H = np.empty(0)  # (r, r) = U^T X' V

        # ストリーミング用バッファ
        self._x_prev = np.empty(0)  # 次に取り込む列

    def initialize(self, X: NDArray) -> None:
        """最初の DMD 実行"""
        pass

    def update(self, x_new: NDArray) -> None:
        """新しいデータ x_new を入力し行列 H を更新する"""
        pass

    def A_tilde(self) -> None|NDArray:
        """低ランク小行列 tilde A(r,r)"""
        if self.U.size == 0:
            return None
        Sinv = 1.0 / np.maximum(self.S, 1e-15)
        return self.H @ np.diag(Sinv)

    def eigs(self):
        """元の行列 A の固有値固有ベクトル"""
        A = self.A_tilde()
        if A is None:
            return None, None
        evals, W = np.linalg.eig(A)
        modes = self.U @ W
        return evals, modes

    def _truncate_rank(self, U: NDArray, S: NDArray, H: NDArray):
        """特異値の最大との比, 累積値からランク調整"""
        pass

    def get_mode_amplitudes(self, x_init: NDArray|None = None) -> NDArray:
        """Get DMD mode amplitudes for given initial state."""
        _, modes = self.eigs()
        if x_init is None:
            x_init = np.array(self._x_prev)
        if modes is None:
            raise ValueError("No modes available")
        return np.linalg.lstsq(modes, x_init, rcond=None)[0]

    def get_mode_frequencies(self, dt: float = 1.0) -> NDArray:
        """Get frequencies of DMD modes.

        Args:
            dt: Time step size

        Returns:
        Frequencies in Hz (if dt is in seconds)
        """
        eigvals = self.eigs()[0]
        if eigvals is None:
            raise ValueError("No modes available")
        return np.imag(np.log(eigvals)) / (2 * np.pi * dt)

初期化

初めに m 個のデータ列を持つ行列 X を入力とし行列 U, S, H の初期化を行います。

    def initialize(self, X: NDArray) -> None:
        """X.shape = (n_dim, データ数)"""
        n, m = X.shape
        if n != self.n:
            raise ValueError(f"Input X has incompatible dimension {n}, expected {self.n}")

        # 忘却係数処理
        coeff_array = self.lambda_ ** np.arange(m - 1, -1, -1)
        weighted_X = X * coeff_array

        # 特異値分解
        U, S, Vt = np.linalg.svd(weighted_X[:, :-1] * self.lambda_, full_matrices=True)
        r = min(self.r_max, S.size)
        self.U = U[:, :r]
        self.S = S[:r]
        V = Vt.T[:, :r]

        if m > 1:
            Y = weighted_X[:, 1:]
            self.H = self.U.T @ Y @ V
        else:
            self.H = np.zeros((r, r))

        # 次回の更新用に最終列を保存
        self._x_prev = weighted_X[:, -1].copy()

更新

3つのベクトル \bm{p}, \bm{r}, \bm{q} を求めます。この更新では常にランクが 1つずつ増えてしまうためランクを増やすかどうかの判定を行います。残差ベクトル \bm{r} の大きさが小さければランクを増やさずに行列 H を更新します。

    def update(self, x_new: NDArray) -> None:
        # データサイズ確認
        x_new = np.asarray(x_new, dtype=float).reshape(-1)
        assert x_new.shape[0] == self.n
        if self.U.size == 0:
            raise RuntimeError("OnlineDMD must be initialized with initialize() before update().")

        # x_prev を取り込み、x_new で H を更新
        x_prev = self._x_prev
        U, S = self.U, self.S
        r = S.shape[0]

        # 既存空間への射影と残差
        p = U.T @ x_prev  # (r,)
        r_vec = x_prev - U @ p  # (n,)
        rho = np.linalg.norm(r_vec)
        q_vec = r_vec / rho if rho > 0 else np.zeros_like(r_vec)

        # ランク増加の判定(相対閾値 & 上限)
        add_rank = (rho > self.tau_add * max(np.linalg.norm(x_prev), 1e-12)) and (r < self.r_max)

        # 忘却係数適用
        self.H *= self.lambda_
        S *= self.lambda_

        if add_rank:
            K = np.block([
                [np.diag(S), p.reshape(-1, 1)],
                [np.zeros((1, r)), np.array([[rho]])]
            ])  # (r+1, r+1)

            Ut_r, St_r, Vt_r = np.linalg.svd(K, full_matrices=True)
            U_aug = np.column_stack((U, q_vec))

            # update H
            z_old = U.T @ x_new  # (r,)
            z_rho = np.dot(q_vec, x_new)  # scalar
            H_aug = np.block([
                [self.H, z_old[:, None]],
                [np.zeros((1, self.H.shape[1])), np.array([[z_rho]])]
            ])
            U_new = U_aug @ Ut_r  # (n, r)
            S_new = St_r
        else:
            Kthin = np.hstack([np.diag(S), p.reshape(-1, 1)])  # (r, r+1)
            Ut_r, St_r, Vt_r = np.linalg.svd(Kthin, full_matrices=False)  # Vt_r: (r, r+1)
            U_new = U @ Ut_r  # (n, r)
            S_new = St_r  # (r,)

            z_old = U.T @ x_new  # (r,)
            H_aug = np.column_stack([self.H, z_old[:, None]])  # (r, r+1)

        # H
        H_new = Ut_r.conj().T @ H_aug @ Vt_r.conj().T

        # ランク縮小(相対特異値 + エネルギー閾)
        U_new, S_new, H_new = self._truncate_rank(U_new, S_new, H_new)

        # 保存
        self.U, self.S, self.H = U_new, S_new, H_new
        self._x_prev = x_new.copy()

    def _truncate_rank(self, U: NDArray, S: NDArray, H: NDArray):
        """特異値の最大との比, 累積値からランク調整"""
        if S.size == 0:
            return U, S, H

        # 相対比の閾値比較
        lead = float(S[0])
        if lead <= 0:
            r_rel = 1
        else:
            rel_mask = (S / lead) >= self.tau_rel
            if not np.any(rel_mask):
                rel_mask[0] = True
            r_rel = int(np.max(np.nonzero(rel_mask))) + 1

        # 特異値の累積比較
        energy = float(np.sum(S ** 2))
        if energy <= 0:
            r_energy = 1
        else:
            cum_energy = np.cumsum(S ** 2) / energy
            idx = np.searchsorted(cum_energy, self.tau_energy, side="left")
            r_energy = int(idx) + 1
        r_energy = max(1, min(r_energy, S.size))

        # ランク更新
        r_limit = min(self.r_max, S.size, r_rel)
        r_keep = max(1, min(r_limit, r_energy))

        return U[:, :r_keep], S[:r_keep], H[:r_keep, :r_keep]

\~{A} 算出とモード分解

    def A_tilde(self) -> None|NDArray:
        """低ランク小行列 tilde A(r,r)"""
        if self.U.size == 0:
            return None
        Sinv = 1.0 / np.maximum(self.S, 1e-15)
        return self.H @ np.diag(Sinv)

    def get_mode_amplitudes(self, x_init: NDArray|None = None) -> NDArray:
        """初期値(入力)からモードの係数を求める"""
        _, modes = self.eigs()
        if x_init is None:
            x_init = np.array(self._x_prev)
        if modes is None:
            raise ValueError("No modes available")
        return np.linalg.lstsq(modes, x_init, rcond=None)[0]

    def get_mode_frequencies(self, dt: float = 1.0) -> NDArray:
        """モードごとの周波数算出"""
        eigvals = self.eigs()[0]
        if eigvals is None:
            raise ValueError("No modes available")
        return np.imag(np.log(eigvals)) / (2 * np.pi * dt)

動作例

入力信号

周波数が上昇し、振幅が減衰する信号を今回の処理対象にします。以下のように実装します。
今回はサンプリング周波数 100 [Hz], 開始時と終了時の周波数がそれぞれ 1.0, 1.3 [Hz], 振幅が 2.0, 1.0 になるようにします。

def generate_chirp(
        fs: float = 100.0, f0: float = 1.0, f1: float = 1.3, a0: float = 2.0, a1: float = 1.0,
        duration: float = 10.0, noise_level: float = 0.0) \
            -> tuple[NDArray[np.float64], NDArray[np.float64]]:
    """Generate chirp signal sample.

    Args:
        fs: Sampling frequency
        f0: Starting frequency in Hz
        f1: Ending frequency in Hz
        duration: Total duration for sweep
        noise_level: Noise level (0-1)

    Returns:
        NDArray[np.float64]: Generated chirp signal
        NDArray[np.float64]: Time array
    """
    times = np.arange(0, duration, 1 / fs)
    # Linear frequency sweep
    amp = a0 + (a1 - a0) * (times / duration)
    # Instantaneous frequency: f(t) = f0 + (f1-f0)*t/duration
    # Phase: φ(t) = 2π * [f0*t + (f1-f0)*t²/(2*duration)]
    phase = 2 * np.pi * (f0 * times + (f1 - f0) * times**2 / (2 * duration))
    data = amp * np.sin(phase)

    if noise_level > 0:
        data += noise_level * np.random.randn(len(data))

    return data, times

1次元時系列データの OnlineDMD 対応 (Hankel Matrix)

前述の OnlineDMD の説明では入力するデータは m 次元のベクトルであり、使用する 1次元時系列信号と異なります。時系列データを m 次元ベクトルの集合とするために Hankel Matrix 化を行います。
N 個のデータを持つ入力信号 x_n (n=0, 1, \cdots N-1) とします。入力データから行数 m とする行列 X を作ります。各要素は右隣あるいは下に行くとインデックスが 1 つ増えます。このような行列を Hankel Matrix と呼びます。この行列の列を順に OnlineDMD に入力します。

\begin{align*} X = \begin{pmatrix} x_0 & x_1 & x_2 & \cdots & x_{N-m} \\ x_1 & x_2 & x_3 & \cdots & x_{N-m+1} \\ & & & \cdots \\ x_{m-1} & x_{m} & x_{m+1} & \cdots & x_{N-1} \end{pmatrix} \end{align*}

Hankel Matrix の入力列を生成するクラス HankelSignal とデータ列から Hankel Matrix を生成する関数を作ります。

class HankelSignal:
    def __init__(self, window_size: int) -> None:
        """Hankel structure for 1D signal streaming processing.

        Args:
            window_size: Size of the Hankel window
        """
        self._window_size = window_size
        self._array = np.zeros(self._window_size)

    def initialize(self, values: NDArray[np.float64]) -> NDArray[np.float64]:
        """Initialize the Hankel buffer with given values.

        Args:
            values: Initial values to fill the Hankel buffer
        """
        if self._array.shape != values.shape:
            raise ValueError()
        self._array[...] = values
        return self._array

    def update(self, value: float) -> NDArray[np.float64]:
        """Update the Hankel buffer with a new value.

        Args:
            value: New value to insert into the Hankel buffer
        """
        self._array[:-1] = self._array[1:]
        self._array[-1] = value
        return self._array


def array_to_hankel_matrix(data: NDArray, window_size: int) -> NDArray:
    if data.ndim != 1:
        raise ValueError("Input data must be 1D array.")
    return np.lib.stride_tricks.sliding_window_view(data, len(data) - window_size + 1)

実行

以上のプログラムから OnlineDMD を試します。

# Generate chirp signal
np.random.seed(100)
fs = 100.0
duration = 10.0
f0 = 1.0
f1 = 1.3
a0 = 2.0
a1 = 1.0
input_data, times = generate_chirp(
    fs=fs, f0=f0, f1=f1, a0=a0, a1=a1, duration=duration, noise_level=0.1)

# Prepare DMD
window_size = 100
max_rank = 4
forgetting_factor = 0.99
tau_add = 0.01
dmd = OnlineDMD(
    n_dim=window_size, r_max=max_rank, lambda_=forgetting_factor, tau_add=tau_add)

# Initialize DMD with initial data
init_length = 150
init_data = array_to_hankel_matrix(input_data[:init_length], window_size)
dmd.initialize(init_data)

# Prepare Hankel structure
hankel = HankelSignal(window_size)
hankel.initialize(init_data[:, -1])

# Process streaming data
n_steps = 10
dt = 1. / fs

freq_evolution = []
amplitude_evolution = []
time_points = []

for i in range(init_length, len(input_data), n_steps):
    # Update DMD with new samples
    for j in range(n_steps):
        if i + j >= len(input_data):
            break
        new_sample = input_data[i + j]
        hankel_vector = hankel.update(new_sample)
        dmd.update(hankel_vector)

    # Get current analysis
    frequencies = dmd.get_mode_frequencies(dt=dt)
    amplitudes = dmd.get_mode_amplitudes()

    if len(frequencies) > 0 and len(amplitudes) > 0:
        positive_freqs = frequencies[frequencies >= 0]
        positive_amps = amplitudes[frequencies >= 0]
        if len(positive_freqs) == 0:
            continue
        dominant_idx = np.argmax(positive_amps)
        dominant_freq = positive_freqs[dominant_idx]
        dominant_amp = np.abs(positive_amps[dominant_idx])
        freq_evolution.append(dominant_freq)
        amplitude_evolution.append(dominant_amp)
        time_points.append(times[min(i + n_steps - 1, len(times) - 1)])

# Plot the signal and analysis results
ground_truth_freq = f0 + (f1 - f0) * times / duration

_, axes = plt.subplots(3, 1, figsize=(10, 8), sharex=True)
axes[0].plot(times, input_data)
axes[0].set_title("Chirp Signal")
axes[1].plot(time_points, freq_evolution, marker='o')
axes[1].plot(times, ground_truth_freq, linestyle='--', color='gray', label='Ground Truth')
axes[1].set_title("Dominant Frequency Evolution")
axes[1].set_ylabel("Frequency [Hz]")
axes[1].legend()
axes[2].plot(time_points, amplitude_evolution, marker='o', color='orange')
axes[2].set_title("Dominant Amplitude Evolution")
axes[2].set_xlabel("Time [s]")
axes[2].set_ylabel("Amplitude")
plt.tight_layout()
plt.show()

実行結果は以下になります。1段目が周波数が増加する入力信号、2段目が OnlineDMD で推定した時刻ごとの周波数と対応する入力信号の周波数を表示しています。3段目が 2段目で推定した周波数が持つ振幅を示しています。
推定周波数は単調増加していることが分かります。真の周波数と一致しないのは OnlineDMD は入力時刻以前のデータを使用しているためです。3段目の振幅は単調減少していることが分かります。入力データの開始時, 終了時の振幅は 2.0, 1.0 であり、推定の振幅は 1/2 ほどに減衰されています。

onlinedmd result

まとめ

特異値分解を使用した OnlineDMD の計算と NumPy による実装を行いました。周波数と振幅が時間変化するデータに対して OnlineDMD を実施し、周波数と振幅を追従していることが分かりました。

参考文献

以下を参考にしました。

Discussion