🌍

NASAの時系列データtelemanomの異常値を見てみる(少しだけVAEで異常検知)

に公開

本記事の目的と背景

前回は、シンプルな時系列データにVAE(Variational AutoEncoder)を適用し、異常検知と生成がうまくいくことを確認しました。
今回はさらに一歩進めるために、異常検知の候補としてNASAの実際の宇宙機テレメトリデータを選びVAEで異常検知にチャレンジしました!

前回のおさらい

前回の記事では、VAEを使った異常検知の練習をしました。モデルに学習させたのは、自作したシンプルな時系列データセットで、その正常データにスパイクのようなノイズを載せたデータの検知をさせました。結果、異常検知をすることが出来ました。
その際、学習させたモデルは特にチューニングせずに、ドメイン知識も入出力のサイズ程度しか気にせずに異常検知ができました。そのため、入出力のサイズさえ変えれば実データに対しても異常検知が出来るのではないかと思い、挑戦してみることにしました。

Telemanom データセットとは?

時系列のデータセットとして、NASA提供のTelemanomを選びました。

選定理由は次の通りです:

  • 実際の宇宙機(SMAP、Curiosity Rover)のリアルなセンサーデータ
  • 異常区間のラベルが明示されている
  • データが正規化済みで前処理が不要

データセットはKaggleのNASA Anomaly Detection Dataset SMAP & MSLからダウンロードします。
ダウンロードしてきた dataフォルダlabeled_anomalies.csv を以下のような構成にします。

.
├───data
├───labeled_anomalies.csv
└───main.ipynb

まずは異常データについての情報がある labeled_anomalies.csv を見てみます。

import pandas as pd
label = pd.read_csv('labeled_anomalies.csv')
label
chan_id spacecraft anomaly_sequences class num_values
P-1 SMAP [[2149, 2349], [4536, 4844], [3539, 3779]] [contextual, contextual, contextual] 8505
S-1 SMAP [[5300, 5747]] [point] 7331
E-1 SMAP [[5000, 5030], [5610, 6086]] [contextual, contextual] 8516
E-2 SMAP [[5598, 6995]] [point] 8532
E-3 SMAP [[5094, 8306]] [point] 8307

上記のカラム名は以下のような意味だと思われます。

カラム名 説明
chan_id チャンネルID。各センサーデータ系列の識別子。先頭文字で種類が分かる(例:P = 電源系)
spacecraft データを収集した宇宙機の名前(例:SMAP や MSL)
anomaly_sequences 異常が発生した区間のリスト。各要素は [開始インデックス, 終了インデックス] 形式
class 各異常区間の種類。point(瞬間的な異常)または contextual(文脈的異常)
num_values そのチャンネルのテストデータに含まれる総データ数(サンプル数)

まずは一行目の P-1 をプロットして異常値の範囲を赤く強調します。

import matplotlib.pyplot as plt
import numpy as np
import ast

# 対象のID
chan_id: str = 'P-1'

# 対象データの読み込み
data = np.load(f'data/test/{chan_id}.npy')

# 異常値の範囲を取得
anomaly_sequences: list = ast.literal_eval(label[label['chan_id'] == chan_id]['anomaly_sequences'].values[0])

# データをプロット
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111)
start: int = 0
end: int = data.shape[0]
for seq in anomaly_sequences:
    start_seq: int = seq[0]
    end_seq: int = seq[1]
    ax.axvspan(start_seq, end_seq, color="red", alpha=0.2)
x = np.linspace(start, end, end - start)
ax.plot(x,data[start:end, 0])
plt.show()


ざっくり見た印象では、異常値の範囲は他と比べて振動してなさそうですね。全体では分かりづらいので異常値に注目してみます。

# データをプロット
fig = plt.figure(figsize=(12, 8))
ax_len: int = len(anomaly_sequences)
for i, seq in enumerate(anomaly_sequences):
    start_seq: int = seq[0]
    end_seq: int = seq[1]
    start = start_seq - 200
    end = end_seq + 200

    ax = fig.add_subplot(ax_len, 1, i + 1)
    ax.set_ylim(-1, 1)
    ax.axvspan(start_seq, end_seq, color="red", alpha=0.2)
    x = np.linspace(start, end, end - start)
    ax.plot(x,data[start:end, 0])


注目すると確かに平たんになっていますね。いきなり電源が一定になるのは考えづらいのでデータが飛んだんでしょうか?

コラム:SMAPとは?

少し休憩もかねて、このデータを提供している SMAP について調べてみます。RESTECというサイトが日本語で概要をまとめてくれているのでわかりやすかったです。
そもそもSMAPは Soil Moisture Active/Passive Mission の略称で地球の土壌水分を調査しているのですね。
実際に得られたデータをプロットしたものを引用させていただきます。この図は地球の土壌の水分を表しており、各色は以下のような構成になっています。

  • 青色:湿っている場所
  • 黄色:乾燥している場所
  • 白色:凍っている場所


SMAP's Radiometer Captures Views of Global Soil Moisture 引用元:NASA/JPL-Caltech/GSFC

異常値の可視化

さあ、いよいよtelemanomの異常検知をしていきましょう!モデルに学習させるデータは dataフォルダ の中にある trainフォルダ で正常データが入っています。と異常値が入っている testフォルダ の2つがあります。

data
├───2018-05-19_15.00.10
├───test
└───train

今回はP系のチャネルのみtrainとtestをプロットしてみます。グラフは、青線がtrain、赤線がtest、赤い領域がcontextual、緑の領域がpointです。

def get_datas(folder_path: str) -> dict:
    datas = {}
    for file_name in os.listdir(folder_path):
        if file_name.startswith("P") and file_name.endswith(".npy"):
            file_path = os.path.join(folder_path, file_name)
            data = np.load(file_path)
            datas[file_name] = data
    return datas

# データ読み込み
train = get_datas("data/train/")
test = get_datas("data/test/")

# データをひとまとめにする
plot_data: pd.DataFrame = pd.DataFrame(columns=["value", "train_end", "anomalys"])

for file_name in train.keys():
    chan_id: str = file_name.replace(".npy", "")

    # 値を1次元化して連結
    train_values = train[file_name][:, 0].tolist()
    test_values = test[file_name][:, 0].tolist()
    all_values = train_values + test_values

    # 境界インデックス
    train_end = len(train_values) - 1

    # ラベルから異常区間と種別を抽出
    row = label[label['chan_id'] == chan_id]
    sequences = ast.literal_eval(row['anomaly_sequences'].values[0])
    raw_classes = row['class'].values[0]  # -> "[contextual, contextual, contextual]"
    classes = [s.strip() for s in raw_classes.strip("[]").split(",")]
    anomaly_df = pd.DataFrame({
        "start": [s[0] for s in sequences],
        "end": [s[1] for s in sequences],
        "class": classes
    })
    # オフセット追加:start, end に train_end + 1 を加算
    anomaly_df["start"] = anomaly_df["start"].astype(int) + (train_end + 1)
    anomaly_df["end"] = anomaly_df["end"].astype(int) + (train_end + 1)

    # plot_data に登録
    plot_data.loc[chan_id] = {
        "value": all_values,
        "train_end": train_end,
        "anomalys": anomaly_df
    }

fig, axes = plt.subplots(len(plot_data), 1, figsize=(12, 3 * len(plot_data)), sharex=False)

if len(plot_data) == 1:
    axes = [axes]  # 1つだけのときはリスト化

for i, (chan_id, row) in enumerate(plot_data.iterrows()):
    values = row["value"]
    train_end = row["train_end"]
    anomaly_df = row["anomalys"]

    x = range(len(values))
    axes[i].plot(x[:train_end + 1], values[:train_end + 1], color='blue', label='train')
    axes[i].plot(x[train_end + 1:], values[train_end + 1:], color='red', label='test')

    # 異常区間のハイライト
    for _, anomaly in anomaly_df.iterrows():
        color = 'green' if anomaly["class"].strip() == "point" else 'red'
        alpha = 0.3 if anomaly["class"].strip() == "contextual" else 0.6
        axes[i].axvspan(anomaly["start"], anomaly["end"], color=color, alpha=alpha)

    axes[i].set_title(chan_id)
    axes[i].set_ylabel("Telemetry Value")
    axes[i].set_ylim(-1.2, 1.2)
    axes[i].grid(True)
    axes[i].legend()

plt.xlabel("Time Step")
plt.tight_layout()
plt.show()

グラフをみるとチャネルによってだいぶ波形が違いますね。また、 P-1, P-7 以外はVAEを使わずとも閾値設定で異常検知が容易そうなので、対象をこの2チャネルに絞ります(ラベルにも文脈的な異常であると示唆されてますしね)。

VAEモデル設計と実装

いよいよVAEに学習させて異常検知していきます!入出力の次元は前回と同じ50にするので、データの末尾は切り捨てます。

class ChunkedDataset(Dataset):
    def __init__(self, data, chunk_size=50):
        self._chunk_size = chunk_size
        # 切り捨てる(例:2872 → 2850まで使用)
        usable_len = (len(data) // chunk_size) * chunk_size
        self._data = data[:usable_len]

    def __len__(self):
        return len(self._data) // self._chunk_size

    def __getitem__(self, idx):
        start = idx * self._chunk_size
        end = start + self._chunk_size
        chunk = self._data[start:end]
        return torch.tensor(chunk, dtype=torch.float32)

p1_train = ChunkedDataset(train['P-1.npy'])
p1_test = ChunkedDataset(test['P-1.npy'])
p7_train = ChunkedDataset(train['P-7.npy'])
p7_test = ChunkedDataset(test['P-7.npy'])

データを分割出来たら前回のVAEを引用して学習させます。

class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super().__init__()
        self._linear = nn.Linear(input_dim, hidden_dim)
        self._mu = nn.Linear(hidden_dim, latent_dim)
        self._logvar = nn.Linear(hidden_dim, latent_dim)

    def forward(self, x):
        h = self._linear(x)
        h = F.relu(h)
        mu = self._mu(h)
        logvar = self._logvar(h)
        return mu, logvar

class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super().__init__()
        self._linear = nn.Linear(latent_dim, hidden_dim)
        self._output = nn.Linear(hidden_dim, output_dim)

    def forward(self, z):
        h = self._linear(z)
        h = F.relu(h)
        x_hat = self._output(h)
        return x_hat

def reparameterize(mu, logvar):
    sigma = torch.exp(0.5 * logvar)
    epsilon = torch.randn_like(sigma)
    z = mu + epsilon * sigma
    return z

class VAE(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super().__init__()
        self._encoder = Encoder(input_dim, hidden_dim, latent_dim)
        self._decoder = Decoder(latent_dim, hidden_dim, input_dim)

    def forward(self, x):
        mu, logvar = self._encoder(x)
        z = reparameterize(mu, logvar)
        x_hat = self._decoder(z)
        return x_hat, mu, logvar

    def get_loss(self, x):
        x_hat, mu, logvar = self.forward(x)

        recon_loss = F.mse_loss(x_hat, x, reduction='sum')
        kl_loss = - torch.sum(1 + logvar - mu ** 2 - logvar.exp())

        total_loss = recon_loss + kl_loss
        return total_loss, recon_loss, kl_loss


vae = VAE(input_dim=50, hidden_dim=32, latent_dim=10)
optimizer = torch.optim.Adam(vae.parameters(), lr=1e-4)

epochs = 1024
batch_size = 32

train_loader = DataLoader(
    p1_train,
    batch_size=batch_size,
    shuffle=False)

train_losses = []
for epoch in range(epochs):
    vae.train()
    total_loss = 0
    for batch in train_loader:
        x = batch[0]
        loss, recon_loss, kl_loss = vae.get_loss(x)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    train_losses.append(total_loss)
    if epoch % 10 == 0:
        print(f"Epoch {epoch} | Loss: {total_loss:.2f}")

# プロット
plt.plot(train_losses)
plt.title("Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.show()

学習と評価

プロットされた損失関数をみるとしては減少傾向ではありますが、振動していますね、、、

最後に、VAEに訓練データとテストデータをモデルに入力し生成してみます。

vae.eval()

train_originals = []
train_reconstructions = []
test_originals = []
test_reconstructions = []

train_loader = DataLoader(p1_train, batch_size=1, shuffle=False)
test_loader = DataLoader(p1_test, batch_size=1, shuffle=False)

with torch.no_grad():
    for x in train_loader:
        x = x.squeeze(0)  # shape: (50,)
        x_hat, _, _ = vae(x)
        train_originals.append(x.numpy())
        train_reconstructions.append(x_hat.numpy())

    for x in test_loader:
        x = x.squeeze(0)  # shape: (50,)
        x_hat, _, _ = vae(x)
        test_originals.append(x.numpy())
        test_reconstructions.append(x_hat.numpy())

# 1. 全チャンクを縦に結合(flatten)
plot_data: pd.DataFrame = pd.DataFrame({
    'id': [1, 7],
    'original': [
        np.concatenate(train_originals),
        np.concatenate(test_originals)],
    'reconstruction': [
        np.concatenate(train_reconstructions),
        np.concatenate(test_reconstructions)]
})

# 2. 描画
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(14, 6), sharex=False)

for i, row in plot_data.iterrows():
    original = row['original']
    recon = row['reconstruction']
    error = np.abs(original - recon)  # 絶対誤差

    # --- Original vs Reconstruction ---
    axes[0, i].plot(original, label='Original', color='blue', alpha=0.5)
    axes[0, i].plot(recon, label='Reconstruction', color='red', alpha=0.5)
    axes[0, i].set_title(f"id={row['id']} Original vs Reconstruction")
    axes[0, i].legend()
    axes[0, i].set_ylim(-1.2, 1.2)
    axes[0, i].grid(True)

    # --- Reconstruction Error ---
    axes[1, i].plot(error, label='Abs Error', color='purple', alpha=0.5)
    axes[1, i].set_title(f"id={row['id']} Reconstruction Error")
    axes[1, i].set_ylim(-0.2, 2.2)
    axes[1, i].grid(True)

plt.tight_layout()
plt.show()

for i, row in plot_data.iterrows():
    error = np.abs(row['original'] - row['reconstruction'])

    print(f"▼ id = {row['id']}")
    print(f"  Max Error   : {error.max():.6f}")
    print(f"  Min Error   : {error.min():.6f}")
    print(f"  Mean Error  : {error.mean():.6f}")
    print(f"  Std  (σ)    : {error.std():.6f}")
    print("-" * 40)

プロットされた波形を見ると訓練データのピークをうまく再現できていないですね。閾値をきめるなら最大誤差でしょうか、、、
異常検知をするならもう少しデータを見る必要がありそうですね。

▼ id = 1
  Max Error   : 1.781561
  Min Error   : 0.000207
  Mean Error  : 0.411541
  Std  (σ)    : 0.299734
----------------------------------------
▼ id = 7
  Max Error   : 1.943274
  Min Error   : 0.000011
  Mean Error  : 0.407376
  Std  (σ)    : 0.300571
----------------------------------------

まとめと今後の展望

今回は、VAEを用いた異常検知の応用編として、NASAの実データ(SMAP衛星の電源系)に取り組みました。時系列に対してシンプルなVAEを適用し、再構成誤差を可視化することで異常区間の兆候を捉えるアプローチを試しました。
 今後は、異常検知、異常検知個所の特定まで出来るようにデータの分析とモデルのチューニングをしていこうと考えています!

Discussion