numpyで移動平均・移動標準偏差をとり、時系列データを逐次的に標準化する

に公開

以下の記事を拡張し、移動標準偏差を作成する。
https://zenn.dev/bluepost/articles/1b7b580ab54e95

実現したいこと:
オンライン異常検知のようなケースで、新たなデータ(以下、testデータ)の直前n個のデータ(以下、trainデータ)の移動平均・移動標準偏差を算出し、testデータを標準化する。

import numpy as np

def moving_average(data, size):
    """
    data: (n, d)
    size: 移動平均のサイズ
    return: data[i-size:i]のデータの平均。i < sizeの場合は最初からi行目までで計算。
    """
    result = []
    for i in range(data.shape[1]):
        xx = data[:, i]
        b = np.ones(size)/size
        xx_mean = np.convolve(xx, b, mode="full")[:len(xx)]
        # 補正部分
        for j in range(min(size-1, len(xx))):
            xx_mean[j] *= size/(j + 1)
        result.append(xx_mean.reshape(-1, 1))
    return np.hstack(result)

def moving_std(data, size, return_average=True):
    """
    data: (n, d)
    size: 移動平均のサイズ
    return_average: 平均も返すかどうか
    return: data[i-size:i]のデータの標準偏差(と平均)。i < sizeの場合は最初からi行目までで計算。
        (n, d) or (n, d) * 2
    """
    result = []
    ma = moving_average(data, size)
    
    for i in range(data.shape[1]):
        xx = data[:, i]
        xx_std = np.zeros(len(xx))
        
        # 各点での移動標準偏差を計算
        for j in range(len(xx)):
            if j < size:
                # ウィンドウサイズより小さい場合は、最初からj+1個のデータで計算
                window_data = xx[:j+1]
                xx_std[j] = np.std(window_data, ddof=0)  # ddof=0で母標準偏差
            else:
                # ウィンドウサイズ以上の場合は、直前size個のデータで計算
                window_data = xx[j-size+1:j+1]
                xx_std[j] = np.std(window_data, ddof=0)
        
        result.append(xx_std.reshape(-1, 1))
    
    ms = np.hstack(result)
    if return_average:
        return ms, ma
    else:
        return ms

def moving_std_fast(data, size, return_average=True):
    """
    高速版の移動標準偏差計算(畳み込みを使用)
    data: (n, d)
    size: 移動平均のサイズ
    return_average: 平均も返すかどうか
    return: data[i-size:i]のデータの標準偏差(と平均)。
    """
    result = []
    ma = moving_average(data, size)
    
    for i in range(data.shape[1]):
        xx = data[:, i]
        xx_mean = ma[:, i]
        
        # E[X^2]を計算
        b = np.ones(size)/size
        xx_squared_mean = np.convolve(xx**2, b, mode="full")[:len(xx)]
        
        # 補正部分(最初のsize-1個)
        for j in range(min(size-1, len(xx))):
            xx_squared_mean[j] *= size/(j + 1)
        
        # Var(X) = E[X^2] - E[X]^2
        variance = xx_squared_mean - xx_mean**2
        
        # 数値誤差で負になることがあるので、0でクリップ
        variance = np.maximum(variance, 0)
        
        xx_std = np.sqrt(variance)
        result.append(xx_std.reshape(-1, 1))
    
    ms = np.hstack(result)
    if return_average:
        return ms, ma
    else:
        return ms

def moving_standardization(data, size, return_full=True):
    """
    data: (n, d)
    size: 移動平均のサイズ
    return_full: Trueの場合、返り値のサイズはdataと一致する。
    return: (n, d)
    """
    ms, ma = moving_std_fast(data, size)
    
    if return_full:
        # 各データポイントを、その直前のウィンドウの統計量で標準化
        result = np.zeros_like(data)
        
        for i in range(len(data)):
            if i == 0:
                # 最初のデータはそのまま
                result[i] = data[i]
            else:
                # i番目のデータは、i-1番目までのデータの統計量で標準化
                # 標準偏差が0の場合は標準化しない
                mask = ms[i-1] > 1e-10
                result[i] = np.where(mask, (data[i] - ma[i-1]) / ms[i-1], data[i] - ma[i-1])
        
        return result
    else:
        # size以降のデータのみ返す
        mask = ms[size-1:-1] > 1e-10
        return np.where(mask, (data[size:] - ma[size-1:-1]) / ms[size-1:-1], 
                       data[size:] - ma[size-1:-1])

# 使用例とテスト
if __name__ == "__main__":
    # 1次元データでのテスト
    print("=== 1次元データのテスト ===")
    data = np.arange(10).reshape(10, 1).astype(float)
    print("元データ:")
    print(data.flatten())
    
    # 移動平均のテスト
    ma = moving_average(data, 3)
    print("\n移動平均 (window size=3):")
    print(ma.flatten())
    
    # 移動標準偏差のテスト
    ms, _ = moving_std_fast(data, 3)
    print("\n移動標準偏差 (window size=3):")
    print(ms.flatten())
    
    # 標準化のテスト
    data_standardized = moving_standardization(data, 3)
    print("\n標準化後:")
    print(data_standardized.flatten())
    
    # 多次元データでのテスト
    print("\n\n=== 多次元データのテスト ===")
    np.random.seed(42)
    data_2d = np.random.randn(100, 3) * np.array([1, 5, 10]) + np.array([0, 10, -5])
    
    data_2d_standardized = moving_standardization(data_2d, 10)
    
    print(f"元データの形状: {data_2d.shape}")
    print(f"標準化後の形状: {data_2d_standardized.shape}")
    print(f"\n元データの統計量:")
    print(f"平均: {np.mean(data_2d, axis=0)}")
    print(f"標準偏差: {np.std(data_2d, axis=0)}")
    
    print(f"\n標準化後のデータ(最後の10行)の統計量:")
    print(f"平均: {np.mean(data_2d_standardized[-10:], axis=0)}")
    print(f"標準偏差: {np.std(data_2d_standardized[-10:], axis=0)}")
    
    # オンライン異常検知の使用例
    print("\n\n=== オンライン異常検知の例 ===")
    # 正常データ
    normal_data = np.random.randn(50, 1)
    # 異常データ(外れ値)
    anomaly_data = np.array([[10], [-8], [0.5]])
    
    # 正常データで移動統計量を計算
    window_size = 20
    ms, ma = moving_std_fast(normal_data, window_size)
    
    # 最後のウィンドウの統計量を使って異常データを標準化
    last_mean = ma[-1, 0]
    last_std = ms[-1, 0]
    
    standardized_anomaly = (anomaly_data - last_mean) / last_std
    
    print(f"正常データの最後のウィンドウの統計量:")
    print(f"平均: {last_mean:.3f}, 標準偏差: {last_std:.3f}")
    print(f"\n異常データ: {anomaly_data.flatten()}")
    print(f"標準化後: {standardized_anomaly.flatten()}")
    print(f"\n|z-score| > 3 の異常判定:")
    for i, z in enumerate(standardized_anomaly.flatten()):
        is_anomaly = abs(z) > 3
        print(f"データ{i+1}: z-score = {z:.3f}, 異常: {is_anomaly}")

Discussion