【初心者向け】PyTorchで異常検知:VAE(Variational Autoencoder)による時系列解析入門
はじめに
なんで異常検知が必要なの?
私たちのまわりには、たくさんの「時系列データ」があります。たとえば:
-
IoTセンサーの温度や湿度の記録
-
Webサービスのアクセスログ
-
工場の機械の動き
これらのデータから「いつもと違う動き(=異常)」を見つけられると、色んなところで役に立ちます。たとえば、機械の故障予知、不正アクセスの検知などです。
最近では、深層学習(ディープラーニング)の力を使って、より柔軟に異常を見つけられるようになってきました。その中でも「VAE(Variational Autoencoder)」というモデルが注目されています。
今回のテーマ
今回は、VAEを使って、シンプルな時系列データから異常を見つける流れをやってみます!
-
VAEってどんな風に異常検知に使えるの?
-
Pythonでサンプルコードを書いてみたい!
-
結果をグラフで見たい!
という方に向けた、実践的な入門記事です。
どんなことをやるの?
疑似データで練習!
まずは正弦波(サインカーブ)にちょっとだけノイズを足した「正常なデータ」を作ります。そして、そこにスパイク(ピョンと跳ねる異常)を混ぜて「異常なデータ」を作ります。
import numpy as np
import matplotlib.pyplot as plt
# 正常な正弦波 + ノイズ
x = np.linspace(0, 20 * np.pi, 500)
normal = np.sin(x) + 0.1 * np.random.randn(500)
# 異常セグメントを注入(例:スパイク)
anomaly = normal.copy()
anomaly[300:320] += 2.5
# グラフ描画
plt.figure(figsize=(12, 4))
plt.plot(anomaly, label="Anomalous signal", color='blue')
plt.axvspan(300, 320, color='red', alpha=0.3, label="Injected anomaly")
plt.title("Sinusoidal Signal with Injected Anomaly")
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

こんな感じで「いつも通り」と「少し異常な動き」のデータを用意します。
時系列を小分けにして使いやすく
時系列データはそのままだと扱いにくいので、「スライディングウィンドウ」という手法を使って、50個ずつに区切っていきます。これで、VAEに入力しやすい形にできます!
import torch
# 正常データと異常データをスライディングウィンドウで分割
def create_windows(data, window_size=50):
return np.array([data[i:i+window_size] for i in range(len(data) - window_size)])
data_train = create_windows(normal, 50)
data_test = create_windows(anomaly, 50)
# pytorchで使えるようにTensor型に変換
data_train = torch.tensor(data_train, dtype=torch.float32)
data_test = torch.tensor(data_test, dtype=torch.float32)
モデルをつくって学習させる!
VAEは、入力されたデータを 圧縮(エンコード)→再構成(デコード) するモデルです。
-
入力:50個の連続データ
-
エンコーダー:50 → 32 → 10次元に圧縮
-
デコーダー:10 → 32 → 50次元に復元
こんなふうに情報を小さくして、もう一度元に戻せるように学習します。
ポイントは:
-
正常データなら、うまく元通りになる
-
異常なデータは、再構成がうまくいかない
というところ。これを使って、異常を検出していきます。
import torch.nn as nn
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self, input_dim, hidden_dim, latent_dim):
super().__init__()
self._linear = nn.Linear(input_dim, hidden_dim)
self._mu = nn.Linear(hidden_dim, latent_dim)
self._logvar = nn.Linear(hidden_dim, latent_dim)
def forward(self, x):
h = self._linear(x)
h = F.relu(h)
mu = self._mu(h)
logvar = self._logvar(h)
return mu, logvar
class Decoder(nn.Module):
def __init__(self, latent_dim, hidden_dim, output_dim):
super().__init__()
self._linear = nn.Linear(latent_dim, hidden_dim)
self._output = nn.Linear(hidden_dim, output_dim)
def forward(self, z):
h = self._linear(z)
h = F.relu(h)
x_hat = self._output(h)
return x_hat
def reparameterize(mu, logvar):
sigma = torch.exp(0.5 * logvar)
epsilon = torch.randn_like(sigma)
z = mu + epsilon * sigma
return z
class VAE(nn.Module):
def __init__(self, input_dim, hidden_dim, latent_dim):
super().__init__()
self._encoder = Encoder(input_dim, hidden_dim, latent_dim)
self._decoder = Decoder(latent_dim, hidden_dim, input_dim)
def forward(self, x):
mu, logvar = self._encoder(x)
z = reparameterize(mu, logvar)
x_hat = self._decoder(z)
return x_hat, mu, logvar
def get_loss(self, x):
x_hat, mu, logvar = self.forward(x)
recon_loss = F.mse_loss(x_hat, x, reduction='sum')
kl_loss = - torch.sum(1 + logvar - mu ** 2 - logvar.exp())
total_loss = recon_loss + kl_loss
return total_loss, recon_loss, kl_loss
from torch.utils.data import DataLoader, TensorDataset
vae = VAE(input_dim=50, hidden_dim=32, latent_dim=10)
optimizer = torch.optim.Adam(vae.parameters(), lr=1e-3)
epochs = 100
batch_size = 32
train_loader = DataLoader(TensorDataset(data_train), batch_size=batch_size, shuffle=True)
for epoch in range(epochs):
vae.train()
total_loss = 0
for batch in train_loader:
x = batch[0]
loss, recon_loss, kl_loss = vae.get_loss(x)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if epoch % 10 == 0:
print(f"Epoch {epoch} | Loss: {total_loss:.2f}")
異常はどこにある?
学習が終わったら、異常なデータをVAEに入力してみます。
まず、VAEが内部で使っている「潜在変数(特徴ベクトル)」をPCAで2次元にして可視化すると、正常と異常の違いがなんとなく見えてきます!
さらに、VAEが出力した再構成された波形と、元のデータを比べてみましょう。
異常なところでは、VAEはうまく元通りにできていません。そこが「怪しい」と判断できるわけです!
from sklearn.decomposition import PCA
# モデルを推論モードに
vae.eval()
with torch.no_grad():
mu_train, logvar_train = vae._encoder(data_train)
z_train = reparameterize(mu_train, logvar_train)
mu_test, logvar_test = vae._encoder(data_test)
z_test = reparameterize(mu_test, logvar_test)
# NumPyへ変換
z_train_np = z_train.numpy()
# z_valid_np = z_valid.numpy()
z_test_np = z_test.numpy()
# すでに reparameterize された z_train, z_test がある前提
z_all = np.vstack([z_train_np, z_test_np])
pca_2d = PCA(n_components=2)
z_all_2d = pca_2d.fit_transform(z_all)
# データを分割
z_train_2d = z_all_2d[:len(z_train)]
z_test_2d = z_all_2d[len(z_train):]
# プロット
plt.figure(figsize=(8, 6))
plt.scatter(z_train_2d[:, 0], z_train_2d[:, 1], label="Train (Normal)", alpha=0.5, color='blue')
plt.scatter(z_test_2d[:, 0], z_test_2d[:, 1], label="Test (Anomalous)", alpha=0.6, color='red')
plt.title("2D Latent Space (PCA)")
plt.xlabel("PCA 1")
plt.ylabel("PCA 2")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# 再構成
vae.eval()
with torch.no_grad():
recon_train = vae(data_train)[0]
recon_test = vae(data_test)[0]
# numpy 変換
data_train_np = data_train.numpy()
recon_train_np = recon_train.numpy()
data_test_np = data_test.numpy()
recon_test_np = recon_test.numpy()
# 窓サイズと合成関数(各時点の平均をとる)
def reconstruct_full_sequence(windows):
length = windows.shape[0] + windows.shape[1] - 1
full = np.zeros(length)
count = np.zeros(length)
for i in range(windows.shape[0]):
full[i:i+windows.shape[1]] += windows[i]
count[i:i+windows.shape[1]] += 1
return full / count
# 再構成した波形の生成
full_original_train = reconstruct_full_sequence(data_train_np)
full_recon_train = reconstruct_full_sequence(recon_train_np)
full_original_test = reconstruct_full_sequence(data_test_np)
full_recon_test = reconstruct_full_sequence(recon_test_np)
# 可視化
def plot_reconstruction_full(original, reconstructed, title=""):
plt.figure(figsize=(14, 4))
plt.plot(original, label='Original', linewidth=2)
plt.plot(reconstructed, label='Reconstructed', linestyle='--')
plt.title(title)
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
plot_reconstruction_full(full_original_train, full_recon_train, title="Train (Full Sequence)")
plot_reconstruction_full(full_original_test, full_recon_test, title="Test (Anomalous, Full Sequence)")


おわりに
今回はお試しということで、正弦波+スパイクという簡単なデータで検証しました。次は、実際に公開されている異常検知のデータセットを使って、もう少しリアルなタスクに挑戦してみたいと思っています!
もしこの記事が参考になったら「いいね」してもらえるとうれしいです!
Discussion