🐥

【機械学習入門】1つのモデルから複数の推論を得るには【マルチタスク学習】

に公開

【はじめに】

機械学習を利用して何らかの現実的な問題解決を目指す場合、複数の推論を行いたいシーンが多くあります。

分類タスク 回帰タスク
📷 物体検知 物体の分類、... 位置、サイズ、...
💃 動作認識 動作の分類、... 人物検知、骨格キーポイント、...
👤 顔認証 人物識別、表情分類、性別判定、... 顔キーポイント、視線方向、年齢、...

「物体検知」は「画像のどこに物があるか・それが何か」を同時に推定していると言えますが、このような複数のタスクの推論はどのようにして機械学習で実現されているのでしょうか。
本記事では、これを解決する手法の1つとして マルチタスク学習 を実際のコードと共に追っていきます。

シングルタスク学習からのステップアップや、モデル構造を自由に定義するためのフックとなれば幸いです。

【概要:マルチタスク学習】

マルチタスク学習(モデル) とは、1つのモデルで複数の関連タスクを解くように学習する手法(モデル構造)です。中間層によって入力から抽出された特徴量をタスクごとに用意された出力層が受け取り、それぞれの予測を実施します。
これに対し、1つのモデルが1つのタスクを解く基本的な手法(モデル構造)を シングルタスク学習(モデル) と呼びます。

今回は複数のタスクを1つのモデルで解くために利用しますが、メインタスクに対して補助的なタスクを設定することで予測精度を向上するための手法としても利用されます。

アンサンブル学習との違い
  • マルチタスク学習:複数の関連タスク間で共通する特徴量を取り出すことで精度向上を図る
  • アンサンブル学習:モデルを複数組み合わせることで精度向上を図る
転移学習との違い
  • マルチタスク学習:複数の関連タスク間で共通する特徴量を取り出すことで精度向上を図る
  • 転移学習:学習済モデルの出力層のみ変更し別タスクに転用することで精度向上を図る
マルチモーダルモデルとの違い
  • マルチタスクモデル:複数の異なるタスクを解くモデル構造
  • マルチモーダルモデル:複数の形式の異なるデータを利用するモデル構造

【タスクの選定】

今回マルチタスク学習をするにあたって使用するタスクの選定を実施します。

💼 タスク選択

マルチタスク学習とは前項の通り「1つのモデルで複数の関連タスクを解く」手法です。
マルチタスク学習による良い効果は「タスク間で共通の特徴量」を共有層で抽出することによってもたらされるため、関連の薄い・共通の特徴を持たないようなタスクを並列して解かせた場合は十分な効果を得られません。
では、同時に解くタスクはどのように決定するべきでしょうか。

基本的には仮説・検証の繰り返しとなりますが、精度向上に効果的なサブタスクやその効率的な選定に関する研究もあるため、自身のタスクに関連する論文を探すと確実性が高いでしょう。
https://proceedings-of-deim.github.io/DEIM2023/1a-3-1.pdf
https://arxiv.org/abs/2109.04617

📝 本記事で取り扱うタスク

複数のタスクを同時に解く例として本記事では「気象予測」を扱います。
前項も踏まえ、今回は「次の日の天気の予測」「次の日の気温の予測」「次の日の湿度の予測」をタスクとして設定しました。

分類タスク 回帰タスク
🌤️ 次の日の気象予測 天気 気温、湿度

天気・気温・湿度を予測するにあたって影響の強そうな気象データを考え、今回は以下を入出力としてモデルを検討します。

  • 入力: 14日分の [年, 月, 日, 平均気温, 平均湿度, 平均気圧, 日照時間, 降水量]
  • 出力: 次の日の [温度, 湿度, 天気ラベル]

📊 使用データの詳細

本記事では気象庁から提供されている気象データより「2020.01.01 ~ 2024.12.31 の期間における東京気象台の観測結果」について、全体の7割を訓練データ、3割を検証データとして使用します。
実際にダウンロードした項目は以下の6つとなります。

  • 日平均気温 [℃]
  • 日平均相対湿度 [%]
  • 平均現地気圧 [hPa]
  • 日照時間(時間)
  • 降水量の合計(mm)
  • 天気概況(昼:06時~18時)

https://www.data.jma.go.jp/risk/obsdl/

天気の分類ラベルについて

天気概況とは、
基本天気 + 天気変化用語 + 伴う現象」で表現されるその時間帯の天気の概要です。
天気概況内で使用される用語の詳細については 気象庁:天気概況について を参照してください。

天気概況には「曇時々雪、あられを伴う」など、パターンが無数に存在します。
本記事では簡単のため天気概況を分解し、基本天気をラベルとして分類を実施しています。

🌤️ 本記事で使用される天気ラベル
 「晴・快晴・曇・雨・薄曇・大雨・みぞれ・雪・霧雨」の9種類[1]

【マルチタスク学習モデルの構造】

📖 基本構造

マルチタスクモデルは、タスク間で共有される層と各タスクで固有の層を持ちます。
共有層によって各タスクに共通の特徴量を効率よく抽出し、固有層でタスクごとの最適化を行なうことで、関連の強いタスク同士を精度良く・高速に学習します。


マルチタスクモデルの例

共有層には共通の特徴量を表現するのに十分な規模のハイパーパラメータを設定してください。
固有層は任意の構造の層を任意の数用意することが可能です。並列で解きたいタスクに合わせて適切な構造の層を用意・接続してください。

💻 実装例

今回の「気象予測」では中間層に LSTM を採用し、LSTMによって入力データから取り出した特徴量をタスク間で共有します。タスクごとの固有層には線型結合層を1層を用意しました。
これらを組み合わせ、共有層による特徴量を「気温予測用出力層」「湿度予測用出力層」「天気予測用出力層」に渡すモデルの構造は以下のようになります。


この構造を PyTorch で記述すると以下のようになります。
ノード数や層数、ドロップアウト率などモデルのハイパーパラメータはタスクに必要な表現力に合わせて適宜調整してください。

class LSTMModel(nn.Module):
    def __init__(self, input_size, num_weather_classes, hidden_layer_size, num_layers, dropout):
        """各層の定義"""
        super(LSTMModel, self).__init__()
        # 中間層(LSTM)の定義
        self.lstm = nn.LSTM(
            input_size,                 # 入力データの次元数
            hidden_layer_size,          # ノード数
            num_layers=num_layers,      # 中間層の層数
            dropout=dropout,            # ドロップアウト率
            batch_first=True
        )
        # ✅ 出力層の定義
        self.temp_linear = nn.Linear(hidden_layer_size, 1)  # 温度回帰予測用
        self.rh_linear = nn.Linear(hidden_layer_size, 1)    # 湿度回帰予測用
        self.weather_linear = nn.Linear(hidden_layer_size, num_weather_classes) # 天気予測用

    def forward(self, x):
        """定義した層の接続、処理"""
        # 入力データxをLSTMへ渡し特徴量を得る
        lstm_out, _ = self.lstm(x)
        x = lstm_out[:, -1, :]
        # ✅ LSTMの出力を各タスク向けの出力層へ渡し予測を得る
        pred_temp = self.temp_linear(x)
        pred_rh = self.rh_linear(x)
        pred_weather = self.weather_linear(x)
        return pred_temp, pred_rh, pred_weather

【マルチタスク学習における学習】

機械学習における学習とは「モデルの予測と正解の差(損失)を算出、これをより小さくするようにモデルのパラメータを更新すること」です。
タスクの数だけ損失が存在する今回の場合、各タスクごとに計算した損失を合成したものをモデルへのフィードバックに利用します。

📖 損失の設計

  1. 各タスクに適切な損失関数を用意
    シングルタスク学習と同様に、目的変数の特性に合わせて選択してください。
    今回使用した損失関数は以下の通りです。

    • 気温予測(回帰):平均2乗誤差(MSELoss)
    • 湿度予測(回帰):平均2乗誤差(MSELoss)
    • 天気予測(分類):交差エントロピー誤差(CrossEntropyLoss)
  2. 各タスクの損失の計算
    タスクの予測結果・正解値・損失関数から各タスク個別に損失を算出します。

  3. 損失の合成
    各タスクの損失より、モデルへのフィードバックに利用する「全体の損失」を合成します。
    ひとまず、最も単純な合成方法として総和を採用します。

Loss = Loss_{task1} + Loss_{task2} + Loss_{task3} + ...

タスクごとの難易度や損失のスケール差に応じて重みづけや合成の方法の変更を行うことで、より適切に学習を進めることができます。

💻 実装例(最小構成)

学習曲線のプロットや早期終了などを省略し、学習部分のみにフォーカスして記述します。
タスクごとに損失関数を設定し、合成損失 loss によってモデルを更新しています。

class Trainer:
    def __init__(self, model, train_loader, val_loader, output_dir, device="cpu", lr=1e-3, epochs=100):
        self.model = model.to(device)       # 学習モデル
        self.train_loader = train_loader    # 訓練データのDataLoader
        self.val_loader = val_loader        # 検証データのDataLoader
        self.output_dir = output_dir        # モデルの保存先ディレクトリのパス
        self.device = device                # 計算デバイス(CPU/GPU)
        # モデルの学習パラメータ
        self.lr = lr            # 学習率
        self.epochs = epochs    # エポック数
        # オプティマイザ
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
        # ✅ 損失関数
        self.temp_criterion = nn.MSELoss()
        self.rh_criterion = nn.MSELoss()
        self.weather_criterion = nn.CrossEntropyLoss()

    def train(self):
        """学習工程の実行"""
        for epoch in range(self.epochs):
            # 学習
            train_loss, train_temp_loss, train_rh_loss, train_weather_loss = (self._train_one_epoch())
            # 検証
            val_loss, val_temp_loss, val_rh_loss, val_weather_loss = (self._validate_one_epoch())
            # 確認のため結果を標準出力
            print(f"Epoch {epoch}/{self.epochs}, [Train Loss] {train_loss:.4f}, [Validation Loss] {val_loss:.4f}")
        # モデルの保存
        torch.save(self.model.state_dict(), f"{self.output_dir}/final_model.pth")
        return self.model

    def _train_one_epoch(self):
        """1エポック分の学習を行い、合成/温度予測/湿度予測/天気予測lossを返す"""
        self.model.train()
        total_loss = total_temp = total_rh = total_weather = 0
        for X, y in self.train_loader:
            # 入力・正解の用意
            X, y = X.to(self.device), y.to(self.device)
            target_temp = y[:, 0].unsqueeze(1)
            target_rh = y[:, 1].unsqueeze(1)
            target_weather = y[:, 2].long()
            # 予測
            pred_temp, pred_rh, pred_weather = self.model(X.float())
            # ✅ 損失の計算
            loss_temp = self.temp_criterion(pred_temp, target_temp)
            loss_rh = self.rh_criterion(pred_rh, target_rh)
            loss_weather = self.weather_criterion(pred_weather, target_weather)
            # ✅ 損失の合成
            loss = loss_temp + loss_rh + loss_weather
            # モデルの更新
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            # 損失の集計
            total_loss += loss.item()
            total_temp += loss_temp.item()
            total_rh += loss_rh.item()
            total_weather += loss_weather.item()
        n = len(self.train_loader)
        return total_loss / n, total_temp / n, total_rh / n, total_weather / n

    def _validate_one_epoch(self):
        """1エポック分の検証を行い、合成/温度予測/湿度予測/天気予測lossを返す"""
        self.model.eval()
        total_loss = total_temp = total_rh = total_weather = 0
        with torch.no_grad():
            for X, y in self.val_loader:
                # 入力・正解の用意
                X, y = X.to(self.device), y.to(self.device)
                target_temp = y[:, 0].unsqueeze(1)
                target_rh = y[:, 1].unsqueeze(1)
                target_weather = y[:, 2].long()
                # 予測
                pred_temp, pred_rh, pred_weather = self.model(X.float())
                # 損失の計算
                loss_temp = self.temp_criterion(pred_temp, target_temp)
                loss_rh = self.rh_criterion(pred_rh, target_rh)
                loss_weather = self.weather_criterion(pred_weather, target_weather)
                # 損失の合成
                loss = loss_temp + loss_rh + loss_weather
                # 損失の集計
                total_loss += loss.item()
                total_temp += loss_temp.item()
                total_rh += loss_rh.item()
                total_weather += loss_weather.item()
        n = len(self.val_loader)
        return total_loss / n, total_temp / n, total_rh / n, total_weather / n

【全体の実行と評価】

ここまでを踏まえ、2週間分の気象データから次の日の天気を予測してみましょう。

💻 コード

気象データ.csv からデータセットを作成し、LSTM を利用して3つのタスクについて予測するモデルを訓練します。学習させたモデルの精度評価の一環として、検証データに対する予測値と正解の比較も出力します。

コード全体

import re
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, Dataset
plt.rcParams["font.family"] = "Hiragino Sans"

csv_path = ""       # 気象データ.csv のパス 
output_dir = ""     # 訓練重み、損失プロットなど出力物の保存先

# データパラメータ
train_rate = 0.7        # 訓練データの割合
observation_days = 14   # 観測日数
predict_days = 1        # 予測日数
# 学習パラメータ
lr = 1e-3           # 学習率
epochs = 100        # エポック数
batch_size = 32     # バッチサイズ
patience = 10       # 早期終了:検証損失が改善しないのを何epoch待つか

# 気象データのカラム名
df_columns = [
    "year",             # 年
    "month",            # 月
    "day",              # 日
    "temp",             # 気温
    "rh",               # 湿度
    "atm",              # 気圧
    "sunlight",         # 日照時間
    "precipitation",    # 降水量の合計
    "weather",          # 天気概況
]
# 気象データのカラムのうち、入力に利用するもの
input_columns = [
    "year",
    "month_sin",    # 日付を三角関数によって周期に変換したもの。
    "month_cos",    # mmdd2cycle() にて month, day を
    "day_sin",      # 三角関数を利用した周期表現に
    "day_cos",      # エンコードしている。
    "temp",
    "rh",
    "atm",
    "sunlight",
    "precipitation",
]
# 気象データのカラムのうち、出力に利用するもの
target_columns = ["temp", "rh", "weather"]

def main():
    # 計算機の確認・設定
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"device: {device}")

    # データ読み込み
    df = pd.read_csv(csv_path, encoding="shift-jis", header=0)
    # データセット作成
    train_loader, val_loader, weather_to_num, scaler_dict = make_dataset(df)
    # モデル用意
    model = LSTMModel(
        input_size=len(input_columns),
        num_weather_classes=len(weather_to_num),
    )
    # 学習・デモ
    trainer = Trainer(model, train_loader, val_loader, weather_to_num, output_dir, device, lr, epochs, patience)
    trainer.train()
    trainer.demo(scaler_dict)


def make_dataset(df):
    """dfを整形し、data_loaderを作成"""

    def mmdd2cycle(df):
        """月日を周期に変換"""
        df = df.copy()
        df["month_sin"] = np.sin(2 * np.pi * df["month"] / 12)
        df["month_cos"] = np.cos(2 * np.pi * df["month"] / 12)
        df["day_sin"] = np.sin(2 * np.pi * df["day"] / 31)
        df["day_cos"] = np.cos(2 * np.pi * df["day"] / 31)
        return df

    def df2input_and_target(df):
        """dfを入力とターゲットに分割"""
        # 初期化
        data_list = []
        target_list = []
        # 入力とターゲットのセットの作成
        for num in range(len(df) - observation_days - predict_days):
            data = df.iloc[num : num + observation_days,][input_columns].values
            label = df.iloc[num + observation_days][target_columns].values
            data_list.append(data)
            target_list.append(label)
        return np.array(data_list), np.array(target_list)

    # カラム名を整理
    df.columns = df_columns
    # 月日を周期に変換
    df = mmdd2cycle(df)

    # 天気の名称が☀️時々☁️のように複数あるため、先頭の天気を抽出
    weather_list = df["weather"].unique()
    df["weather"] = df["weather"].apply(lambda x: re.split("時々|一時|後|後一時|後時々|、", str(x))[0])
    weather_list = df["weather"].unique()
    weather_to_num = {weather: i for i, weather in enumerate(weather_list)}
    # 天気をカテゴリ変数に変換
    df["weather"] = df["weather"].map(weather_to_num)

    # データの分割: 時系列データのためシャッフルしない
    sprit_index = int(len(df) * train_rate)
    train_df = df.iloc[:sprit_index].copy()
    val_df = df.iloc[sprit_index:].copy()
    print("data status:")
    print(f"    total size: {len(df)}")
    print(f"    train size: {len(train_df)}, val size: {len(val_df)}")
    print(f"    weather classes: {weather_to_num}")
    print("    classes num: ")
    for weather, num in weather_to_num.items():
        print(f"        {weather}: {len(df[df['weather'] == num])}")

    df = df.astype(np.float32)

    # データの正規化
    scaler_dict = {
        "year": StandardScaler(),
        "temp": StandardScaler(),
        "rh": StandardScaler(),
        "atm": StandardScaler(),
        "sunlight": StandardScaler(),
        "precipitation": StandardScaler(),
    }
    # 訓練データでfit_transform、検証データでtransform
    for col, scaler in scaler_dict.items():
        if col in train_df.columns:
            train_df[[col]] = scaler.fit_transform(train_df[[col]])
            val_df[[col]] = scaler.transform(val_df[[col]])

    # データの作成
    train_input, train_target = df2input_and_target(train_df)
    val_input, val_target = df2input_and_target(val_df)

    # data loader化
    train_dataset = NextDayWeatherDataset(train_input, train_target)
    val_dataset = NextDayWeatherDataset(val_input, val_target)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

    return train_loader, val_loader, weather_to_num, scaler_dict


class Trainer:
    def __init__(
        self,
        model,              # 学習モデル
        train_loader,       # 訓練データのDataLoader
        val_loader,         # 検証データのDataLoader
        class_dict,         # クラス辞書
        output_dir,         # モデルの重み、プロットなどの出力ディレクトリ
        device,             # 計算デバイス(CPU/GPU)
        lr=1e-3,            # 学習率
        epochs=50,          # エポック数
        patience=10,        # 早期終了:検証損失が改善しないのを何epoch待つか
    ):
        self.model = model.to(device)       
        self.train_loader = train_loader    
        self.val_loader = val_loader        
        self.class_dict = class_dict        
        self.output_dir = output_dir  
        self.device = device  
        # モデルの学習パラメータ
        self.lr = lr  
        self.epochs = epochs 
        # オプティマイザ
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
        # オプション
        self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=5, gamma=0.1)
        self.early_stopping = EarlyStopping(patience=patience)
        # 損失関数
        self.temp_criterion = nn.MSELoss()  # 温度予測を評価する損失関数
        self.rh_criterion = nn.MSELoss()    # 湿度予測を評価する損失関数
        self.weather_criterion = nn.CrossEntropyLoss()  # 天気予測を評価する損失関数
        self.temp_rate = 1.0        # 各タスクの損失の重み(今回は総和のため全て1.0)
        self.rh_rate = 1.0
        self.weather_rate = 1.0
        # 学習履歴の保管先
        self.loss_history = {
            "train": [],
            "val": [],
            "train_temp": [],
            "val_temp": [],
            "train_rh": [],
            "val_rh": [],
            "train_weather": [],
            "val_weather": [],
        }

    def train(self):
        """モデルを学習させる"""
        print("Training...")
        best_val_loss = float("inf")
        for epoch in range(self.epochs):
            # 学習
            train_loss, train_temp_loss, train_rh_loss, train_weather_loss = self._train_one_epoch()
            # 検証
            val_loss, val_temp_loss, val_rh_loss, val_weather_loss = self._validate_one_epoch()
            # 損失の保管
            self.loss_history["train"].append(train_loss)
            self.loss_history["val"].append(val_loss)
            self.loss_history["train_temp"].append(train_temp_loss)
            self.loss_history["val_temp"].append(val_temp_loss)
            self.loss_history["train_rh"].append(train_rh_loss)
            self.loss_history["val_rh"].append(val_rh_loss)
            self.loss_history["train_weather"].append(train_weather_loss)
            self.loss_history["val_weather"].append(val_weather_loss)
            print(f"Epoch {epoch}/{self.epochs}, [Train Loss] {train_loss:.4f}, [Validation Loss] {val_loss:.4f}")
            # 学習率の更新
            self.scheduler.step()
            # 最良であればモデルを保存
            if val_loss < best_val_loss:
                torch.save(self.model.state_dict(), f"{self.output_dir}/best_model.pth")
                print("   Best model saved")
            # 早期終了
            self.early_stopping(val_loss, self.model)
            if self.early_stopping.early_stop:
                print("   Early stopping")
                break
        # 各損失の推移をプロット
        self._plot_losses(self.loss_history["train"], self.loss_history["val"], name="")
        self._plot_losses(self.loss_history["train_temp"], self.loss_history["val_temp"], name="temp_")
        self._plot_losses(self.loss_history["train_rh"], self.loss_history["val_rh"], name="rh_")
        self._plot_losses(self.loss_history["train_weather"],self.loss_history["val_weather"],name="weather_")

        # 最良のモデルをロードして返す
        self.model.load_state_dict(torch.load(f"{self.output_dir}/best_model.pth"))
        return self.model

    def _train_one_epoch(self):
        """1回分の学習"""
        self.model.train()
        total_loss = total_temp = total_rh = total_weather = 0
        for X, y in self.train_loader:
            # inputの用意
            X, y = X.to(self.device), y.to(self.device)
            target_temp = y[:, 0].unsqueeze(1)
            target_rh = y[:, 1].unsqueeze(1)
            target_weather = y[:, 2].long()
            # 予測
            pred_temp, pred_rh, pred_weather = self.model(X.float())
            # 損失の計算
            loss_temp = self.temp_criterion(pred_temp, target_temp)
            loss_rh = self.rh_criterion(pred_rh, target_rh)
            loss_weather = self.weather_criterion(pred_weather, target_weather)
            # 損失の合成
            loss = loss_temp * self.temp_rate + loss_rh * self.rh_rate + loss_weather * self.weather_rate
            # モデルの更新
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()
            # 損失の集計
            total_loss += loss.item()
            total_temp += loss_temp.item()
            total_rh += loss_rh.item()
            total_weather += loss_weather.item()
        n = len(self.train_loader)
        return total_loss / n, total_temp / n, total_rh / n, total_weather / n

    def _validate_one_epoch(self):
        """1回分の検証"""
        self.model.eval()
        total_loss = total_temp = total_rh = total_weather = 0
        with torch.no_grad():
            for X, y in self.val_loader:
                if torch.isnan(X).any() or torch.isnan(y).any():
                    print("nan in validation input or target")
                if torch.isinf(X).any() or torch.isinf(y).any():
                    print("inf in validation input or target")  # inputの用意
                X, y = X.to(self.device), y.to(self.device)
                target_temp = y[:, 0].unsqueeze(1)
                target_rh = y[:, 1].unsqueeze(1)
                target_weather = y[:, 2].long()
                # 予測
                pred_temp, pred_rh, pred_weather = self.model(X.float())
                # 損失の計算
                loss_temp = self.temp_criterion(pred_temp, target_temp)
                loss_rh = self.rh_criterion(pred_rh, target_rh)
                loss_weather = self.weather_criterion(pred_weather, target_weather)
                # 損失の合成
                loss = loss_temp * self.temp_rate + loss_rh * self.rh_rate + loss_weather * self.weather_rate
                # 損失の集計
                total_loss += loss.item()
                total_temp += loss_temp.item()
                total_rh += loss_rh.item()
                total_weather += loss_weather.item()
        n = len(self.val_loader)
        return total_loss / n, total_temp / n, total_rh / n, total_weather / n

    def _plot_losses(self, train_loss_list, val_loss_list, name=""):
        """損失のプロット"""
        plt.plot(range(len(train_loss_list)), train_loss_list, label="train_loss")
        plt.plot(range(len(val_loss_list)), val_loss_list, label="val_loss")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.title(f"{name} Loss")
        plt.legend()
        plt.savefig(f"{self.output_dir}/{name}loss_plot.png")
        plt.close()

    def demo(self, scaler_dict):
        """学習済みモデルに検証データを与え、正解と予測を比較"""
        self.model.eval()
        result_dict = {
            "predicted_temp": [],
            "predicted_rh": [],
            "predicted_weather": [],
            "true_temp": [],
            "true_rh": [],
            "true_weather": [],
        }
        
        with torch.no_grad():
            for X, y in self.val_loader:
                X, y = X.to(self.device), y.to(self.device)
                pred_temp, pred_rh, pred_weather = self.model(X.float())
                # 出力を元のスケール/ラベルに戻す
                pred_temp = scaler_dict["temp"].inverse_transform(pred_temp.cpu().numpy().reshape(-1, 1))
                pred_rh = scaler_dict["rh"].inverse_transform(pred_rh.cpu().numpy().reshape(-1, 1))
                pred_weather_cls = (torch.argmax(torch.softmax(pred_weather, dim=1), dim=1).cpu().numpy())
                # 予測結果を辞書に追加
                result_dict["predicted_temp"].extend(pred_temp)
                result_dict["predicted_rh"].extend(pred_rh)
                result_dict["predicted_weather"].extend(pred_weather_cls)
                # 正解値を元のスケール/ラベルに戻して辞書に追加
                result_dict["true_temp"].extend(scaler_dict["temp"].inverse_transform(y[:, 0].cpu().numpy().reshape(-1, 1)))
                result_dict["true_rh"].extend(scaler_dict["rh"].inverse_transform(y[:, 1].cpu().numpy().reshape(-1, 1)))
                result_dict["true_weather"].extend(y[:, 2].cpu().numpy())

        # 温湿度プロット
        item_dict = {
            "tmp": [result_dict["predicted_temp"], result_dict["true_temp"]],
            "rhs": [result_dict["predicted_rh"], result_dict["true_rh"]],
        }
        item_num = len(item_dict)
        fig, axes = plt.subplots(item_num, 1, figsize=(item_num * 4, item_num * 2))

        def plot(axes, item_dict):
            for i, (item, data) in enumerate(item_dict.items()):
                axes[i].plot(data[1], label=f"True {item}", color="orange")
                axes[i].plot(data[0], label=f"Predicted {item}", color="blue")
                error_data = np.abs(np.array(data[0]).reshape(-1) - np.array(data[1]).reshape(-1))
                mae = np.mean(error_data)
                axes[i].set_xlabel("Time Step")
                axes[i].set_ylabel(item)
                axes[i].set_title(f"{item} Prediction vs True Values (MAE: {mae:.2f})")
                axes[i].legend()
            return axes

        axes = plot(axes, item_dict)
        plt.tight_layout()
        plt.savefig(f"{self.output_dir}/prediction_plot.png")
        plt.close()

        # 混同行列プロット
        weather_labels = list(self.class_dict.keys())
        cm = confusion_matrix(
            result_dict["true_weather"],
            result_dict["predicted_weather"],
            labels=range(len(weather_labels)),
        )
        accuracy = np.trace(cm) / np.sum(cm)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=weather_labels)
        fig, ax = plt.subplots(figsize=(8, 8))
        disp.plot(ax=ax, cmap="Blues", xticks_rotation=45, colorbar=False)
        plt.title(f"Weather Confusion Matrix (accuracy: {accuracy:.2f})")
        plt.tight_layout()
        plt.savefig(f"{self.output_dir}/weather_confusion_matrix.png")
        plt.close()
        return 0


class LSTMModel(nn.Module):
    def __init__(self, input_size, num_weather_classes, hidden_layer_size=256, num_layers=3, dropout=0.3):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(
            input_size,                             # 入力次元数
            hidden_layer_size=hidden_layer_size,    # 隠れ層のノード数 
            num_layers=num_layers,                  # 隠れ層の層数(LSTMブロックの数) 
            dropout=dropout,                        # ドロップアウト率
            batch_first=True,
        )
        self.temp_linear = nn.Linear(hidden_layer_size, 1)  # 温度予測用
        self.rh_linear = nn.Linear(hidden_layer_size, 1)  # 湿度予測用
        self.weather_linear = nn.Linear(hidden_layer_size, num_weather_classes)  # 天気予測用

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        x = lstm_out[:, -1, :]  # 最後のLSTM出力を使用
        pred_temp = self.temp_linear(x)
        pred_rh = self.rh_linear(x)
        pred_weather = self.weather_linear(x)

        return pred_temp, pred_rh, pred_weather


class NextDayWeatherDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        X = torch.tensor(self.X[idx], dtype=torch.float32)
        y = torch.tensor(self.y[idx], dtype=torch.float32)
        return X, y


class EarlyStopping:
    """早期終了:検証損失が改善しない・増加傾向(過学習)に転じた場合、学習を停止する"""
    def __init__(self, patience=5):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.inf

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0


if __name__ == "__main__":
    main()

📊 学習の評価

まずは合成した損失の推移を観察し、学習が正しく進行したかを確認します。
学習に使用したデータでの損失(学習損失)が epoch の進行に合わせて減少していくのに対し、未知データでの損失(検証損失)は途中から増加傾向にあり、過学習 の傾向が見られます。 早期終了を設けているため、検証損失の改善が滞ってから一定のepoch後に学習が打ち切られています。

各 epoch における学習損失(合成)と検証損失(合成)

合成前の損失をプロットすると以下の通りです。

各 epoch における各タスクの損失(左から気温、湿度、天気予測)
各損失より、学習中に発生している問題がいくつかわかります。

  1. 天気予測タスクにて過学習が起きている
  2. 天気予測タスクの損失のスケールが大きく、合成後の値の半分以上を占めている

1. 天気予測タスクにて起きている過学習の原因

学習後のモデルに検証データを与えた場合の予測値と正解について混同行列でまとめ、モデルがどのような予測をしたか観察します。

天気予測の混同行列
予測先が「晴・曇」に集中していることがわかります。
「訓練データの総量」「入力に使用した項目の過不足」「ハイパーパラメータの設定」「モデル構造」など原因として考えられるものは多くありますが、今回まず着目すべきは「気象データが不均衡データである」ことです。
今回使用したデータ全体での各天気ラベルの出現数を以下に示します。

天気ラベル 快晴 薄曇 大雨 みぞれ 霧雨 (合計)
データ数 612 231 603 173 165 40 1 1 1 1827

極端なものでは出現数が1しかなく、それを除いてもクラスごとの出現数にばらつきがあります。このような不均衡データではモデルの予測結果が出現回数の多いものに偏りやすく、実際に今回のモデルが予測先にしやすいラベルもデータ数の多い「晴・曇」となっています。

不均衡データへの対策には「データセットを整理して出現数を揃える」「データの不均衡を考慮して損失を求める」などがあるため、適切なものを適用します。

今回のデータは「データ数の偏りが大きすぎる」「データ数に対するラベルの数が多すぎる」「データ数1のラベルが存在する」など分類を解く上での問題が多いため、抜本的な解決には用意するラベルの見直しなどが必要そうです。

(コラム)交差エントロピー損失に重みを追加することで不均衡データに対応する

交差エントロピー損失の算出に利用している nn.CrossEntropyLoss() では引数 weight を指定することで各クラスの損失に重みづけをすることが可能です。

https://docs.pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html

訓練データにおける「各クラスの出現数の逆数を正規化したもの」を重みとして交差エントロピー損失を求めます。

def make_dataset(df):
    #################
    # データの分割
    #################

    # 重みの作成
    weather_counts = train_df["weather"].value_counts().sort_index()
    class_weights = np.zeros((len(weather_to_num)), dtype=np.float32)
    for cls_idx, count in weather_counts.items():
        class_weights[int(cls_idx)] = 1.0 / (count + 1e-6)
    class_weights = class_weights / class_weights.sum() * len(weather_to_num)
    class_weights = torch.tensor(class_weights, dtype=torch.float32)

    #################
    # データの正規化
    #################

    # 省略

    # 戻り値に class_weight を追加
    return train_loader, val_loader, weather_to_num, scaler_dict, class_weights

class Trainer:
    def __init__(..., class_weights):
        # 省略
        # 重みを追加
        self.weather_criterion = nn.CrossEntropyLoss(weight=class_weights)
        # 省略


重みつき交差エントロピー損失を利用した場合の天気予測の混同行列

モデルが予測先として出力するラベルが広がりましたが、精度は悪化しています。データ数の偏りが大きすぎるために補正が極端に働き、悪影響が出ているようです。

また、データを訓練用・検証用に分割する際にデータ数が1の「霧雨」が検証データ側に割り振られ訓練データ内に無いことから、「訓練データでの出現数の重み」自体が適切に計算できないのも問題です。

(コラム)2つの回帰タスクの精度を比較する

損失の推移は問題のなさそうな気温・湿度の予測値と正解もプロットしてみます。


正解値と予測値の比較(上段:気温、下段:気圧)

気温・湿度ともに季節の周期性と日毎の変化を追って変動している様子が観察できます。
平均絶対誤差(MAE) より、今回のモデルの予測は気温で ±1.80℃、湿度で ±7.81% 平均的に誤差があると言えます。これが精度として「良い」かについては、モデルの用途として必要な精度によります。

ちなみに、この2つの回帰予測はどちらが精度として良いのでしょうか。
今回は平均絶対誤差を 正規化平均絶対誤差(NMAE) に変換して、スケールを合わせてから比較してみます。

NMAE = \frac{MAE}{(data_{max} - deta_{min})}
平均絶対誤差(MAE) 最小値 最大値 正規化平均絶対誤差(NMAE)
気温 [℃] 1.80 2.8 32.3 0.06
湿度 [%] 7.81 30 100 0.11

正規化平均絶対誤差の比較より、今回のモデルでは気温の方が精度が高いことがわかりました。
具体的には、予測誤差のスケールが湿度では気温の2倍近くあるようです。

(補足)重みやデータ整理を適切に行えば、今回の手法で天気予測は可能になるか

結論としては非常に厳しい取り組みです。

実際の天気予報では「項目・観測地点共に大量の観測データ」と「流体力学や熱力学といった物理法則」を組みあわせた 数値予報モデル が使用されています。
これを天気の予測に必要十分な情報とした場合、今回考慮している内容が少ないことがわかります。

2. 損失のスケール差よって発生する問題とその改善

モデルの学習は損失を小さくするように進行します。
すなわち、損失が突出して大きいタスクが存在する場合はその損失を減少させることに集中して学習が行われ、他のタスクの学習の進行が鈍くなる場合があります。

今回の場合、合成後の損失の半分以上は天気予測タスクの損失であり、天気予測タスクが学習に対し支配的に働いています。このタスクへの適応のために他2つの学習の進行を妨げている可能性があるため、損失の合成を調整します。

現在損失の合成時に使用している総和計算を「各タスクに合わせた重みによる重み付き総和」に変更することも手法の1つです。しかし、妥当な重みの手動探索は骨が折れるため、今回は以下の論文の手法による自動調整を試します。

https://arxiv.org/abs/1705.07115

この手法では各タスクの予測の不確実性を分散 \sigma_{n}^2 で表現し、その逆数を重みとして各タスクの損失 Loss_{n} にかけています。これにより不確実性の高いタスクほど損失が減衰され、学習の偏りを軽減します。
ここで、\sigma_{n}^2 はモデルの重みと同様に訓練中に学習される値です。確率モデルに従ってモデルの予測がばらつくと仮定し、予測に対して各確率モデルの尤度が最も大きくなるような \sigma_{n}^2 を推定しています。

仮定に使用する確率モデルの違いから、タスクごとの損失は以下のようになります。

確率モデル 重み付き損失
回帰タスク \sigma_{n}^2 のガウス分布 \frac{1}{2\sigma_{task_n}^2}Loss_{task_n}+ \log\sigma_{task_n}
分類タスク \sigma_{n}^2 でスケーリングされたsoftmax関数 \frac{1}{\sigma_{task_n}^2}Loss_{task_n}+ \log\sigma_{task_n}

ここで + \log\sigma_{task_n} は正規化項であり、\frac{1}{\sigma_{task_n}^2} が安易に小さくなることへのペナルティとして働いています。

以上を踏まえ、「気温の回帰」「湿度の回帰」「天気の分類」の3つのタスクの損失から合成される損失 Loss は以下のようになります。

Loss = \frac{1}{2\sigma_{temp}^2}Loss_{temp} + \frac{1}{2\sigma_{rh}^2}Loss_{rh}+ \frac{1}{\sigma_{weather}^2}Loss_{weather} + \log\sigma_{temp}\sigma_{rh}\sigma_{weather}
コード:変更後の Trainer クラス

実際の学習では \sigma_{n}^2 を直接回帰せず、対数分散 s:=\log\sigma_{n}^2 を回帰します。
\sigma_{n}^2 を直接回帰した場合、 \sigma_{n}^2 が0または0に近い値を取ることによるゼロ除算や \frac{1}{2\sigma_{n}^2} が無限大に発散する場合があるため、これを防ぐための処置です。

これを踏まえ、先ほどの式を書き換えて実装します。

Loss = \frac{1}{2\sigma_{temp}^2}Loss_{temp} + \frac{1}{2\sigma_{rh}^2}Loss_{rh}+ \frac{1}{\sigma_{weather}^2}Loss_{weather} + \log\sigma_{temp}\sigma_{rh}\sigma_{weather} \\ = \frac{1}{2}e^{-s_{temp}}Loss_{temp} + \frac{1}{2}e^{-s_{rh}}Loss_{rh} + e^{-s_{weather}}Loss_{weather} + \frac{1}{2}(s_{temp} + s_{rh} + s_{weather})
class Trainer:
    def __init__(self, model, train_loader, val_loader, class_dict, output_dir, device, lr=1e-6, epochs=50, patience=10):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.class_dict = class_dict
        self.output_dir = output_dir
        self.device = device
        self.lr = lr
        self.epochs = epochs
        # 損失関数
        self.temp_criterion = nn.MSELoss()
        self.rh_criterion = nn.MSELoss()
        self.weather_criterion = nn.CrossEntropyLoss()
        # ✅ 不確実性パラメータの用意
        self.log_sigma_temp = torch.nn.Parameter(torch.zeros(1, device=self.device))
        self.log_sigma_rh = torch.nn.Parameter(torch.zeros(1, device=self.device))
        self.log_sigma_weather = torch.nn.Parameter(torch.zeros(1, device=self.device))
        # ✅ オプティマイザの更新:不確実性パラメータを含める
        self.optimizer = optim.Adam(
            [
                {"params": self.model.parameters()},
                {"params": [self.log_sigma_temp, self.log_sigma_rh, self.log_sigma_weather]},
            ],
            lr=self.lr,
        )
        self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=5, gamma=0.1)
        self.patience = patience
        self.early_stopping = EarlyStopping(patience=self.patience)
        self.loss_history = {
            "train": [],
            "val": [],
            "train_temp": [],
            "val_temp": [],
            "train_rh": [],
            "val_rh": [],
            "train_weather": [],
            "val_weather": [],
        }

    def train(self):
        """モデルを学習させる"""
        print("Training...")
        best_val_loss = float("inf")
        for epoch in range(self.epochs):
            # 学習
            train_loss, train_temp_loss, train_rh_loss, train_weather_loss = self._train_one_epoch()
            # 検証
            val_loss, val_temp_loss, val_rh_loss, val_weather_loss = self._validate_one_epoch()
            # 損失の保管
            self.loss_history["train"].append(train_loss)
            self.loss_history["val"].append(val_loss)
            self.loss_history["train_temp"].append(train_temp_loss)
            self.loss_history["val_temp"].append(val_temp_loss)
            self.loss_history["train_rh"].append(train_rh_loss)
            self.loss_history["val_rh"].append(val_rh_loss)
            self.loss_history["train_weather"].append(train_weather_loss)
            self.loss_history["val_weather"].append(val_weather_loss)
            print(f"Epoch {epoch}/{self.epochs}, [Train Loss] {train_loss:.4f}, [Validation Loss] {val_loss:.4f}")
            # 学習率の更新
            self.scheduler.step()
            # 最良であればモデルを保存
            if val_loss < best_val_loss:
                torch.save(self.model.state_dict(), f"{self.output_dir}/best_model.pth")
                print("   Best model saved")
            # 早期終了
            self.early_stopping(val_loss, self.model)
            if self.early_stopping.early_stop:
                print("   Early stopping")
                break
        # 各損失の推移をプロット
        self._plot_losses(self.loss_history["train"], self.loss_history["val"], name="")
        self._plot_losses(self.loss_history["train_temp"], self.loss_history["val_temp"], name="temp_")
        self._plot_losses(self.loss_history["train_rh"], self.loss_history["val_rh"], name="rh_")
        self._plot_losses(self.loss_history["train_weather"],self.loss_history["val_weather"],name="weather_")

        # 最良のモデルをロードして返す
        self.model.load_state_dict(torch.load(f"{self.output_dir}/best_model.pth"))
        return self.model


    def _train_one_epoch(self):
        self.model.train()
        total_loss = total_temp = total_rh = total_weather = 0
        for X, y in self.train_loader:
            X, y = X.to(self.device), y.to(self.device)
            target_temp = y[:, 0].unsqueeze(1)
            target_rh = y[:, 1].unsqueeze(1)
            target_weather = y[:, 2].long()

            pred_temp, pred_rh, pred_weather = self.model(X.float())

            loss_temp = self.temp_criterion(pred_temp, target_temp)
            loss_rh = self.rh_criterion(pred_rh, target_rh)
            loss_weather = self.weather_criterion(pred_weather, target_weather)
            # ✅ 論文手法に基づいた損失の計算
            # 損失の合成
            loss = (
                0.5 * torch.exp(-self.log_sigma_temp) * loss_temp
                + 0.5 * torch.exp(-self.log_sigma_rh) * loss_rh
                + torch.exp(-self.log_sigma_weather) * loss_weather
                + 0.5 * (self.log_sigma_temp + self.log_sigma_rh + self.log_sigma_weather)
            )

            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()

            total_loss += loss.item()
            total_temp += loss_temp.item()
            total_rh += loss_rh.item()
            total_weather += loss_weather.item()

        n = len(self.train_loader)
        return total_loss / n, total_temp / n, total_rh / n, total_weather / n

    def _validate_one_epoch(self):
        self.model.eval()
        total_loss = total_temp = total_rh = total_weather = 0
        with torch.no_grad():
            for X, y in self.val_loader:
                X, y = X.to(self.device), y.to(self.device)
                target_temp = y[:, 0].unsqueeze(1)
                target_rh = y[:, 1].unsqueeze(1)
                target_weather = y[:, 2].long()

                pred_temp, pred_rh, pred_weather = self.model(X.float())

                loss_temp = self.temp_criterion(pred_temp, target_temp)
                loss_rh = self.rh_criterion(pred_rh, target_rh)
                loss_weather = self.weather_criterion(pred_weather, target_weather)
                # ✅ 論文手法に基づいた損失の計算
                loss = (
                    0.5 * torch.exp(-self.log_sigma_temp) * loss_temp
                    + 0.5 * torch.exp(-self.log_sigma_rh) * loss_rh
                    + torch.exp(-self.log_sigma_weather) * loss_weather
                    + 0.5 * (self.log_sigma_temp + self.log_sigma_rh + self.log_sigma_weather)
                )

                total_loss += loss.item()
                total_temp += loss_temp.item()
                total_rh += loss_rh.item()
                total_weather += loss_weather.item()

        n = len(self.val_loader)
        return total_loss / n, total_temp / n, total_rh / n, total_weather / n

    def _plot_losses(self, train_loss_list, val_loss_list, name=""):
        ### 省略

    def demo(self, scaler_dict):
        ### 省略

重みを適用した結果

結論として、重みの調整によって精度改善の結果を得ることはほとんどできませんでした。現時点では各タスクの損失スケールの差がモデルに及ぼしている影響が小さいと言えます。

マルチタスク学習を取り上げるためにスキップしてしまいましたが、「1. 天気予測タスクにて起きている過学習の原因」で触れたように、今回のモデルでは天気予測タスクを解くこと自体ができていません。
まずは各タスクが適切に解けるモデルを作成し、それから以下のようなマルチタスクで考慮すべき要素に取り組むと良いでしょう。

  • 損失合成の重み
    (今回紹介)
  • タスクの競合
    共有層はタスクで共通の特徴量を生成するため、競合している要素の情報を取り出すことができません。競合している場合はタスク選択を見直す必要があります。
  • モデルの表現力
    複数のタスクに適応するため、シングルタスク学習と比較してより高い表現力(多くのノード・層数)を必要とする場合があります。
(損失推移の変化)


重み調整実施前の各損失の推移

重み調整実施後の各損失の推移

(回帰予測の変化)


重み調整実施前の正解値と予測値の比較(上段:気温、下段:気圧)

重み調整実施後の正解値と予測値の比較(上段:気温、下段:気圧)

(分類の変化)


天気予測の混同行列(左:重み調整前、右:重み調整後)

【まとめ】

一部マルチタスク学習以外の問題を含んでいたため消化不良な箇所もありますが、マルチタスクモデルを実装する一連の流れを実際のコードと共に紹介しました。

  • マルチタスク学習の概要
  • マルチタスクモデルの構造の書き方
  • マルチタスクモデルに学習させる際の損失の扱い
  • 結果からパラメータを調整する

↗️ 次のステップ

以下は「【全体の実行と評価】>コード全体」内にて活用しているものの、記事中では触れていない項目です。馴染みなければ機械学習で使用するメソッドとしておすすめです。

  • データスケーリング(特に三角関数を利用した日付→周期性への変換)
  • 学習率スケジューラ
  • Early Stopping(早期終了)

▼ 転職を検討中の方、お気軽にご連絡ください!カジュアル面談から可能です。
ファーストループテクノロジー株式会社| 採用情報(コチラ
ファーストループテクノロジー株式会社(FLT)の採用情報です。エンジニア・デザイナー・プロダクトマネージャー、セールス、コーポレート等、未来を創る仲間を募集中です!
▼ 取材などメディア関係者の方は会社HPよりお問い合わせください
お問合せフォームはコチラ
▼ 当社のnoteもぜひご覧ください!
コチラ

脚注
  1. 2020.01.01 ~ 2024.12.31 の期間における東京気象台で観測された基本天気。このため、天気概況用語における全ての状態を網羅していない。 ↩︎

ファーストループテクノロジー株式会社

Discussion