🐈

Act 36. LSTMで為替相場を予測する(リベンジ - その①)

2025/01/06に公開

はじめに

以前の記事でLSTMを用いたFXの相場予測を行ってみた。

結果は惨敗。
全く持って予測出来ていない。

ということで、リベンジマッチと行こうではないか!
今回は特定の時間のみを予測するモデルを作ってみた。

特定の時間とは、例えば18時から21時までというイメージ。
実際のFX相場では、日本時間やロンドン時間など、時間によって市場が異なるため、相場には時間による影響が存在する。

今回はニューヨーク時間で経済指標が発表される前の18~21時をターゲットに予測を行ってみようと思う。

結論

青色の線が実際の値。
オレンジ色の線が予測した値。

※定期的に出現する垂直の線は、日付が変わっている個所(次の日の18時の始値がずれるため)

あれ…。
意外と悪くない…?

と思ったが、世の中そんなに甘くない。
以下の画像は先ほどの内容の一部分を拡大したもの。

気づいた方もいると思うが、実際の値から少し遅れて予測した値も変動している。

例えば、10個目あたり。
実際のデータが上がった後に、予測したデータも上がっている。

そして15個目あたりの実際のデータが下がりきったタイミングで、予測したデータが下がり始めている。
これでは実戦では使えなさそう。

実装

上手くは行かなかったが、一応コードは載せておく。
jupyter notebookでコードを書いていたため、セルごとに内容を記載していこうと思う。

まずは必要なライブラリのインポート。
utils.indicatorは、以前の記事で作ったインジケータ用の関数をまとめた、オリジナルのライブラリ。
気になる方はコチラをご覧あれ。

import numpy as np
import pandas as pd
import requests
import time
import os

from keras.models import Sequential
from keras.layers import Input, LSTM, Dropout, Dense
from sklearn.preprocessing import MinMaxScaler
from utils.indicator import (
  calculate_adx, calculate_bb, calculate_macd, calculate_rsi
)
from sklearn.metrics import r2_score

import matplotlib.pyplot as plt
import japanize_matplotlib

次にデータの準備を行う。
今回はUSD/JPYの2024年1月~11月の5分足データ。

GMOコイン様のAPIを叩いてデータを取得している。

# ファイルが存在する場合は読み込み、存在しない場合はAPI実行 + ファイル出力
file_path = "./USDJPY_5min_2024.csv"
if os.path.exists(file_path):
  # ファイルの読み込み
  df = pd.read_csv(file_path, index_col="datetime", parse_dates=True)

else:
  # データの取得
  get_data = []

  for month in range(1, 12):
    for day in range(1, 32):
      # 一桁の場合は0を設定
      month_str = str(month).zfill(2)
      day_str = str(day).zfill(2)

      # URLの設定
      endPoint = "https://forex-api.coin.z.com/public"
      path = f"/v1/klines?symbol=USD_JPY&priceType=ASK&interval=5min&date=2024{month_str}{day_str}"

      # 5分足データ取得
      response = requests.get(endPoint + path)

      # 取得したデータの確認
      if "status_code" in response.json():
        continue
      if response.json()["data"] == []:
        continue
      
      get_data.extend(response.json()["data"])
      time.sleep(0.2)
      
  # DataFrameに変換
  df = pd.DataFrame(get_data)

  # IndexをopenTimeに変更し、Index名をdatetimeに変更
  df.set_index("openTime", inplace=True)
  df.index.name = "datetime"

  # Indexの形式を変換
  df.index = pd.to_datetime(df.index.astype(int), unit="ms")

  # CSVファイルに出力
  df.to_csv(file_path)

取得したデータを見てみる。

# DataFrameの形状を確認
print(df.shape)

# データ確認
print(df.head(3))
print(df.tail(3))

出力は以下の通り。

(68256, 4)
                        open     high      low    close
datetime                                               
2024-01-01 22:00:00  141.039  141.039  141.023  141.023
2024-01-01 22:05:00  141.023  141.028  141.023  141.025
2024-01-01 22:10:00  141.025  141.036  141.025  141.033
                        open     high      low    close
datetime                                               
2024-11-29 20:45:00  149.672  149.696  149.658  149.688
2024-11-29 20:50:00  149.688  149.731  149.680  149.697
2024-11-29 20:55:00  149.697  149.762  149.684  149.727

インジケータの追加を行う。

# インジケータの追加
# ボリンジャーバンド
df[["Upper 2 sigma", "Lower 2 sigma"]] = calculate_bb(data=df, sigma=1.9, period=10)

# RSI
df["RSI"] = calculate_rsi(data=df, period=14)

# MACD
df[["MACD", "Signal", "MACD_Histgram"]] = calculate_macd(data=df)

# ADX
df[["ADX", "+DI", "-DI"]] = calculate_adx(data=df, period=14)

# 曜日の設定 月曜: 0、火曜: 1 etc...
df["Weekday"] = df.index.weekday

予測する値は次の値としたいので、その値をターゲットとして取得する。
また、"2024-01-02"と"2024-11-28"は、中途半端な時間までしかデータが存在しないため削除。

# ターゲット値の取得
df["Target"] = df["close"].shift(-1)

# 不要な列の削除
df = df.drop(["open", "high", "low"], axis=1)

# 初日と最終日を削除(中途半端な時間のデータとなっているため)
df = df.loc["2024-01-02":"2024-11-28"]

分析に不要な時間を削除

# 18:00~19:55の予測を行うため、特定の時間のみを抽出(ステップタイムを考慮して17時からのデータを取得)
df_20h = df.between_time("17:00:00", "20:55:00")

データから目的変数と説明変数を取得。

# 目的変数
y = pd.DataFrame({"Target": df_20h["Target"]}, index=df_20h.index)

# 説明変数
X = df_20h.drop(["Target"], axis=1)

全ての列の型を変更し、データを正規化する。

# 全てのカラムをfloat64型に変更する
X = X.astype("float64")
y = y.astype("float64")

# データを列ごとに正規化
X_scaler = MinMaxScaler(feature_range=(0, 1))
y_scaler = MinMaxScaler(feature_range=(0, 1))
X[:] = X_scaler.fit_transform(X)
y[:] = y_scaler.fit_transform(y)

テストデータと訓練データに分割する関数を作成。
data_sizeは訓練データの割合で、window_sizeはステップタイム(12時点前から現在までのデータを使って、次の値を予測するイメージ)

def train_test_split(X, y, data_size=0.8, window_size=12):
    """
    説明変数と目的変数を受け取り、指定されたサイズで訓練データとテストデータを作成する関数。
    重複を避けるため、テストデータの開始日は訓練データの最終日翌営業日から開始する。
    """
    # 訓練データのサイズ
    data_len = int(len(X) * data_size)
    last_index = X.iloc[:data_len].index[-1]
    year = last_index.year
    month = last_index.month
    day = last_index.day

    # 最後のインデックスの次の営業日を計算
    next_business_day = last_index + pd.offsets.BDay(1)
    next_year = next_business_day.year
    next_month = next_business_day.month
    next_day = next_business_day.day

    # 訓練データとテストデータ
    X_train_data = X.loc[:f"{year}-{month}-{day}"]
    X_test_data = X.loc[f"{next_year}-{next_month}-{next_day}":]
    y_train_data = y.loc[:f"{year}-{month}-{day}"]
    y_test_data = y.loc[f"{next_year}-{next_month}-{next_day}":]

    X_train, y_train = [], []
    X_test, y_test = [], []
    # 訓練データの形式変更(window_step分の予測期間を持たせる)
    for i in range(window_size, len(X_train_data)):
        step_data = X_train_data.iloc[i - window_size:i]
        # 期間のデータが同じ日付の場合のみ処理を行う
        if step_data.index[0].day == step_data.index[-1].day:
            X_train.append(step_data)
            y_train.append(y_train_data.iloc[i])

    # テストデータの形式変更(window_step分の予測期間を持たせる)
    for i in range(window_size, len(X_test_data)):
        step_data = X_test_data.iloc[i - window_size:i]
        # 期間のデータが同じ日付の場合のみ処理を行う
        if step_data.index[0].day == step_data.index[-1].day:
            X_test.append(step_data)
            y_test.append(y_test_data.iloc[i])
    
    return np.array(X_train), np.array(y_train), np.array(X_test), np.array(y_test)

関数を呼び出して、データの形状を確認する。

# 訓練データとテストデータを作成
X_train, y_train, X_test, y_test = train_test_split(X, y)

# 形式の確認
print("元のデータの形式")
print(X.shape)
print(y.shape)
print("分割後のデータの形式")
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

出力は以下の通り。

元のデータの形式
(11424, 11)
(11424, 1)
分割後のデータの形式
(7066, 12, 11)
(7066, 1)
(1738, 12, 11)
(1738, 1)

モデルの定義。

# モデルの定義
model = Sequential()
model.add(Input(shape=(X_train.shape[1], X_train.shape[2])))
model.add(LSTM(units=50, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=50))
model.add(Dropout(0.2))
model.add(Dense(units=1))

モデルの学習

# モデルのコンパイルと学習
model.compile(optimizer="adam", loss="mean_squared_error")
model.fit(X_train, y_train, batch_size=32, epochs=50, verbose=1)

今回は問題なく学習に成功。

Epoch 1/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 3s 8ms/step - loss: 0.0027
Epoch 2/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 0.0019
Epoch 3/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 1s 7ms/step - loss: 0.0017
Epoch 4/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 0.0015
Epoch 5/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 0.0015
Epoch 6/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 1s 7ms/step - loss: 0.0013
Epoch 7/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 0.0011
Epoch 8/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 0.0010
Epoch 9/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 1s 7ms/step - loss: 0.0010
Epoch 10/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 1s 7ms/step - loss: 8.9805e-04
Epoch 11/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 8.4718e-04
Epoch 12/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 1s 7ms/step - loss: 8.1976e-04
Epoch 13/50
...
Epoch 49/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 5.1270e-04
Epoch 50/50
221/221 ━━━━━━━━━━━━━━━━━━━━ 2s 7ms/step - loss: 5.2698e-04
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...
<keras.src.callbacks.history.History at 0x7f0cd66bc620>

一応サマリを確認しておく。

model.summary()

出力は以下の通り。

Model: "sequential_1"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Layer (type)                    ┃ Output Shape           ┃       Param # ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ lstm_2 (LSTM)(None, 12, 50)12,400 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ dropout_2 (Dropout)(None, 12, 50)0 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ lstm_3 (LSTM)(None, 50)20,200 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ dropout_3 (Dropout)(None, 50)0 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ dense_1 (Dense)(None, 1)51 │
└─────────────────────────────────┴────────────────────────┴───────────────┘
 Total params: 97,955 (382.64 KB)
 Trainable params: 32,651 (127.54 KB)
 Non-trainable params: 0 (0.00 B)
 Optimizer params: 65,304 (255.10 KB)

念のためモデルを保存しておく。

# モデルの保存
model.save("20250106_model_20h_1.keras")

予測を行い、正規化していた値を元に戻す。

# 予測
y_pred = model.predict(X_test)

# 正規化した値を元に戻す
y_pred = y_scaler.inverse_transform(y_pred)
y_test = y_scaler.inverse_transform(y_test)

モデルの評価を行う。

# 二乗平均平方根誤差(RMSE): 0に近いほど良い
rmse = np.sqrt(np.mean(((y_pred - y_test) ** 2)))
print(f"RMSE: {rmse}")

# 決定係数(r2) : 1に近いほど良い
r2s = r2_score(y_true=y_test, y_pred=y_pred)
print(f"R2 Score: {r2s}")

出力は以下の通り。

RMSE: 0.25643209785000065
R2 Score: 0.9944436448633183

プロットを行う。

fig, ax = plt.subplots(figsize=(16, 8))
ax.plot(y_test, label="実値")
ax.plot(y_pred, label="予測値")
ax.legend()

出力は以下の通り。

さいごに

以前よりは多少コードがスラスラ書けるようになり、何をしているのかも何となくは理解出来てきた。
後は予測の精度を上げる必要があるため、そこら辺を対応していこうと思う。

今後は、トレンド除去やラグを特徴量に含めてみようと思う。
もっと精度を上げられるように頑張るぞー!

ではまた。

Discussion