⛳
numpyで移動平均・移動標準偏差をとり、時系列データを逐次的に標準化する
以下の記事を拡張し、移動標準偏差を作成する。
実現したいこと:
オンライン異常検知のようなケースで、新たなデータ(以下、testデータ)の直前n個のデータ(以下、trainデータ)の移動平均・移動標準偏差を算出し、testデータを標準化する。
import numpy as np
def moving_average(data, size):
"""
data: (n, d)
size: 移動平均のサイズ
return: data[i-size:i]のデータの平均。i < sizeの場合は最初からi行目までで計算。
"""
result = []
for i in range(data.shape[1]):
xx = data[:, i]
b = np.ones(size)/size
xx_mean = np.convolve(xx, b, mode="full")[:len(xx)]
# 補正部分
for j in range(min(size-1, len(xx))):
xx_mean[j] *= size/(j + 1)
result.append(xx_mean.reshape(-1, 1))
return np.hstack(result)
def moving_std(data, size, return_average=True):
"""
data: (n, d)
size: 移動平均のサイズ
return_average: 平均も返すかどうか
return: data[i-size:i]のデータの標準偏差(と平均)。i < sizeの場合は最初からi行目までで計算。
(n, d) or (n, d) * 2
"""
result = []
ma = moving_average(data, size)
for i in range(data.shape[1]):
xx = data[:, i]
xx_std = np.zeros(len(xx))
# 各点での移動標準偏差を計算
for j in range(len(xx)):
if j < size:
# ウィンドウサイズより小さい場合は、最初からj+1個のデータで計算
window_data = xx[:j+1]
xx_std[j] = np.std(window_data, ddof=0) # ddof=0で母標準偏差
else:
# ウィンドウサイズ以上の場合は、直前size個のデータで計算
window_data = xx[j-size+1:j+1]
xx_std[j] = np.std(window_data, ddof=0)
result.append(xx_std.reshape(-1, 1))
ms = np.hstack(result)
if return_average:
return ms, ma
else:
return ms
def moving_std_fast(data, size, return_average=True):
"""
高速版の移動標準偏差計算(畳み込みを使用)
data: (n, d)
size: 移動平均のサイズ
return_average: 平均も返すかどうか
return: data[i-size:i]のデータの標準偏差(と平均)。
"""
result = []
ma = moving_average(data, size)
for i in range(data.shape[1]):
xx = data[:, i]
xx_mean = ma[:, i]
# E[X^2]を計算
b = np.ones(size)/size
xx_squared_mean = np.convolve(xx**2, b, mode="full")[:len(xx)]
# 補正部分(最初のsize-1個)
for j in range(min(size-1, len(xx))):
xx_squared_mean[j] *= size/(j + 1)
# Var(X) = E[X^2] - E[X]^2
variance = xx_squared_mean - xx_mean**2
# 数値誤差で負になることがあるので、0でクリップ
variance = np.maximum(variance, 0)
xx_std = np.sqrt(variance)
result.append(xx_std.reshape(-1, 1))
ms = np.hstack(result)
if return_average:
return ms, ma
else:
return ms
def moving_standardization(data, size, return_full=True):
"""
data: (n, d)
size: 移動平均のサイズ
return_full: Trueの場合、返り値のサイズはdataと一致する。
return: (n, d)
"""
ms, ma = moving_std_fast(data, size)
if return_full:
# 各データポイントを、その直前のウィンドウの統計量で標準化
result = np.zeros_like(data)
for i in range(len(data)):
if i == 0:
# 最初のデータはそのまま
result[i] = data[i]
else:
# i番目のデータは、i-1番目までのデータの統計量で標準化
# 標準偏差が0の場合は標準化しない
mask = ms[i-1] > 1e-10
result[i] = np.where(mask, (data[i] - ma[i-1]) / ms[i-1], data[i] - ma[i-1])
return result
else:
# size以降のデータのみ返す
mask = ms[size-1:-1] > 1e-10
return np.where(mask, (data[size:] - ma[size-1:-1]) / ms[size-1:-1],
data[size:] - ma[size-1:-1])
# 使用例とテスト
if __name__ == "__main__":
# 1次元データでのテスト
print("=== 1次元データのテスト ===")
data = np.arange(10).reshape(10, 1).astype(float)
print("元データ:")
print(data.flatten())
# 移動平均のテスト
ma = moving_average(data, 3)
print("\n移動平均 (window size=3):")
print(ma.flatten())
# 移動標準偏差のテスト
ms, _ = moving_std_fast(data, 3)
print("\n移動標準偏差 (window size=3):")
print(ms.flatten())
# 標準化のテスト
data_standardized = moving_standardization(data, 3)
print("\n標準化後:")
print(data_standardized.flatten())
# 多次元データでのテスト
print("\n\n=== 多次元データのテスト ===")
np.random.seed(42)
data_2d = np.random.randn(100, 3) * np.array([1, 5, 10]) + np.array([0, 10, -5])
data_2d_standardized = moving_standardization(data_2d, 10)
print(f"元データの形状: {data_2d.shape}")
print(f"標準化後の形状: {data_2d_standardized.shape}")
print(f"\n元データの統計量:")
print(f"平均: {np.mean(data_2d, axis=0)}")
print(f"標準偏差: {np.std(data_2d, axis=0)}")
print(f"\n標準化後のデータ(最後の10行)の統計量:")
print(f"平均: {np.mean(data_2d_standardized[-10:], axis=0)}")
print(f"標準偏差: {np.std(data_2d_standardized[-10:], axis=0)}")
# オンライン異常検知の使用例
print("\n\n=== オンライン異常検知の例 ===")
# 正常データ
normal_data = np.random.randn(50, 1)
# 異常データ(外れ値)
anomaly_data = np.array([[10], [-8], [0.5]])
# 正常データで移動統計量を計算
window_size = 20
ms, ma = moving_std_fast(normal_data, window_size)
# 最後のウィンドウの統計量を使って異常データを標準化
last_mean = ma[-1, 0]
last_std = ms[-1, 0]
standardized_anomaly = (anomaly_data - last_mean) / last_std
print(f"正常データの最後のウィンドウの統計量:")
print(f"平均: {last_mean:.3f}, 標準偏差: {last_std:.3f}")
print(f"\n異常データ: {anomaly_data.flatten()}")
print(f"標準化後: {standardized_anomaly.flatten()}")
print(f"\n|z-score| > 3 の異常判定:")
for i, z in enumerate(standardized_anomaly.flatten()):
is_anomaly = abs(z) > 3
print(f"データ{i+1}: z-score = {z:.3f}, 異常: {is_anomaly}")
Discussion