🤖

【初心者向け】PyTorchで異常検知:VAE(Variational Autoencoder)による時系列解析入門

に公開

はじめに

なんで異常検知が必要なの?
私たちのまわりには、たくさんの「時系列データ」があります。たとえば:

  • IoTセンサーの温度や湿度の記録

  • Webサービスのアクセスログ

  • 工場の機械の動き

これらのデータから「いつもと違う動き(=異常)」を見つけられると、色んなところで役に立ちます。たとえば、機械の故障予知、不正アクセスの検知などです。

最近では、深層学習(ディープラーニング)の力を使って、より柔軟に異常を見つけられるようになってきました。その中でも「VAE(Variational Autoencoder)」というモデルが注目されています。

今回のテーマ

今回は、VAEを使って、シンプルな時系列データから異常を見つける流れをやってみます!

  • VAEってどんな風に異常検知に使えるの?

  • Pythonでサンプルコードを書いてみたい!

  • 結果をグラフで見たい!

という方に向けた、実践的な入門記事です。

どんなことをやるの?

疑似データで練習!

まずは正弦波(サインカーブ)にちょっとだけノイズを足した「正常なデータ」を作ります。そして、そこにスパイク(ピョンと跳ねる異常)を混ぜて「異常なデータ」を作ります。

import numpy as np
import matplotlib.pyplot as plt

# 正常な正弦波 + ノイズ
x = np.linspace(0, 20 * np.pi, 500)
normal = np.sin(x) + 0.1 * np.random.randn(500)

# 異常セグメントを注入(例:スパイク)
anomaly = normal.copy()
anomaly[300:320] += 2.5

# グラフ描画
plt.figure(figsize=(12, 4))
plt.plot(anomaly, label="Anomalous signal", color='blue')
plt.axvspan(300, 320, color='red', alpha=0.3, label="Injected anomaly")
plt.title("Sinusoidal Signal with Injected Anomaly")
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

こんな感じで「いつも通り」と「少し異常な動き」のデータを用意します。

時系列を小分けにして使いやすく

時系列データはそのままだと扱いにくいので、「スライディングウィンドウ」という手法を使って、50個ずつに区切っていきます。これで、VAEに入力しやすい形にできます!

import torch
# 正常データと異常データをスライディングウィンドウで分割
def create_windows(data, window_size=50):
    return np.array([data[i:i+window_size] for i in range(len(data) - window_size)])

data_train = create_windows(normal, 50)
data_test = create_windows(anomaly, 50)

# pytorchで使えるようにTensor型に変換
data_train = torch.tensor(data_train, dtype=torch.float32)
data_test = torch.tensor(data_test, dtype=torch.float32)

モデルをつくって学習させる!

VAEは、入力されたデータを 圧縮(エンコード)→再構成(デコード) するモデルです。

  • 入力:50個の連続データ

  • エンコーダー:50 → 32 → 10次元に圧縮

  • デコーダー:10 → 32 → 50次元に復元

こんなふうに情報を小さくして、もう一度元に戻せるように学習します。

ポイントは:

  • 正常データなら、うまく元通りになる

  • 異常なデータは、再構成がうまくいかない

というところ。これを使って、異常を検出していきます。

import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super().__init__()
        self._linear = nn.Linear(input_dim, hidden_dim)
        self._mu = nn.Linear(hidden_dim, latent_dim)
        self._logvar = nn.Linear(hidden_dim, latent_dim)

    def forward(self, x):
        h = self._linear(x)
        h = F.relu(h)
        mu = self._mu(h)
        logvar = self._logvar(h)
        return mu, logvar

class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super().__init__()
        self._linear = nn.Linear(latent_dim, hidden_dim)
        self._output = nn.Linear(hidden_dim, output_dim)

    def forward(self, z):
        h = self._linear(z)
        h = F.relu(h)
        x_hat = self._output(h)
        return x_hat

def reparameterize(mu, logvar):
    sigma = torch.exp(0.5 * logvar)
    epsilon = torch.randn_like(sigma)
    z = mu + epsilon * sigma
    return z

class VAE(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super().__init__()
        self._encoder = Encoder(input_dim, hidden_dim, latent_dim)
        self._decoder = Decoder(latent_dim, hidden_dim, input_dim)

    def forward(self, x):
        mu, logvar = self._encoder(x)
        z = reparameterize(mu, logvar)
        x_hat = self._decoder(z)
        return x_hat, mu, logvar

    def get_loss(self, x):
        x_hat, mu, logvar = self.forward(x)

        recon_loss = F.mse_loss(x_hat, x, reduction='sum')
        kl_loss = - torch.sum(1 + logvar - mu ** 2 - logvar.exp())
        
        total_loss = recon_loss + kl_loss
        return total_loss, recon_loss, kl_loss

from torch.utils.data import DataLoader, TensorDataset

vae = VAE(input_dim=50, hidden_dim=32, latent_dim=10)
optimizer = torch.optim.Adam(vae.parameters(), lr=1e-3)

epochs = 100
batch_size = 32

train_loader = DataLoader(TensorDataset(data_train), batch_size=batch_size, shuffle=True)

for epoch in range(epochs):
    vae.train()
    total_loss = 0
    for batch in train_loader:
        x = batch[0]
        loss, recon_loss, kl_loss = vae.get_loss(x)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    if epoch % 10 == 0:
        print(f"Epoch {epoch} | Loss: {total_loss:.2f}")

異常はどこにある?

学習が終わったら、異常なデータをVAEに入力してみます。

まず、VAEが内部で使っている「潜在変数(特徴ベクトル)」をPCAで2次元にして可視化すると、正常と異常の違いがなんとなく見えてきます!

さらに、VAEが出力した再構成された波形と、元のデータを比べてみましょう。

異常なところでは、VAEはうまく元通りにできていません。そこが「怪しい」と判断できるわけです!

from sklearn.decomposition import PCA

# モデルを推論モードに
vae.eval()

with torch.no_grad():
    mu_train, logvar_train = vae._encoder(data_train)
    z_train = reparameterize(mu_train, logvar_train)

    mu_test, logvar_test = vae._encoder(data_test)
    z_test = reparameterize(mu_test, logvar_test)

# NumPyへ変換
z_train_np = z_train.numpy()
# z_valid_np = z_valid.numpy()
z_test_np = z_test.numpy()

# すでに reparameterize された z_train, z_test がある前提
z_all = np.vstack([z_train_np, z_test_np])
pca_2d = PCA(n_components=2)
z_all_2d = pca_2d.fit_transform(z_all)

# データを分割
z_train_2d = z_all_2d[:len(z_train)]
z_test_2d = z_all_2d[len(z_train):]

# プロット
plt.figure(figsize=(8, 6))
plt.scatter(z_train_2d[:, 0], z_train_2d[:, 1], label="Train (Normal)", alpha=0.5, color='blue')
plt.scatter(z_test_2d[:, 0], z_test_2d[:, 1], label="Test (Anomalous)", alpha=0.6, color='red')
plt.title("2D Latent Space (PCA)")
plt.xlabel("PCA 1")
plt.ylabel("PCA 2")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# 再構成
vae.eval()
with torch.no_grad():
    recon_train = vae(data_train)[0]
    recon_test = vae(data_test)[0]

# numpy 変換
data_train_np = data_train.numpy()
recon_train_np = recon_train.numpy()
data_test_np = data_test.numpy()
recon_test_np = recon_test.numpy()

# 窓サイズと合成関数(各時点の平均をとる)
def reconstruct_full_sequence(windows):
    length = windows.shape[0] + windows.shape[1] - 1
    full = np.zeros(length)
    count = np.zeros(length)

    for i in range(windows.shape[0]):
        full[i:i+windows.shape[1]] += windows[i]
        count[i:i+windows.shape[1]] += 1

    return full / count

# 再構成した波形の生成
full_original_train = reconstruct_full_sequence(data_train_np)
full_recon_train = reconstruct_full_sequence(recon_train_np)

full_original_test = reconstruct_full_sequence(data_test_np)
full_recon_test = reconstruct_full_sequence(recon_test_np)

# 可視化
def plot_reconstruction_full(original, reconstructed, title=""):
    plt.figure(figsize=(14, 4))
    plt.plot(original, label='Original', linewidth=2)
    plt.plot(reconstructed, label='Reconstructed', linestyle='--')
    plt.title(title)
    plt.xlabel("Time")
    plt.ylabel("Amplitude")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

plot_reconstruction_full(full_original_train, full_recon_train, title="Train (Full Sequence)")
plot_reconstruction_full(full_original_test, full_recon_test, title="Test (Anomalous, Full Sequence)")

おわりに

今回はお試しということで、正弦波+スパイクという簡単なデータで検証しました。次は、実際に公開されている異常検知のデータセットを使って、もう少しリアルなタスクに挑戦してみたいと思っています!

もしこの記事が参考になったら「いいね」してもらえるとうれしいです!

Discussion