😃

【PyTorch+LSTM】仮想通貨の予測

2023/04/07に公開

はじめに

現在、大学でデータサイエンスを学んでいる大学生です。この記事ではPyTorch+LSTMを使用して、仮想通貨であるXRP(リップル)の予測をしていきたいと思います。

データの準備

今回はInvesting.comで2015年1月から現在(2023/04/06)まで公開されている8年間のXRPデータを使用します。こちらからcsvファイルをダウンロードしてください。

実行環境

  • Google Colaboratory
  • Python 3.8.16

ライブラリのインポート

import os
import datetime
import math
import numpy as np
import pandas as pd
import pandas.tseries.offsets as offsets
import matplotlib.pyplot as plt
%matplotlib inline
from tqdm import tqdm
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler

import torch
import torch.nn as nn
import torch.optim as optim
import warnings
warnings.simplefilter('ignore')
plt.style.use("ggplot")

前処理

  1. データの確認
    csv_data_pathにcsvファイルがあるパスを入力してください。
df = pd.read_csv(csv_data_path, engine="python", encoding="utf-8")
df

  1. 不要なカラムの削除
    今回は終値のみを用いて予測を行うので、他のカラムは削除します。
df = df.drop(["始値", "高値", "安値", "出来高", "変化率 %"], axis=1)
  1. カラム名を英語の変換
df = df.rename(columns={"終値": "Close", "日付け": "Date"})
  1. 日付の形式を変換
    プロットする時に日付がobject型だと上手く表示されないため、Dateをobjectからdatetime型に変換する。
df["Date"] = pd.to_datetime(df["Date"], format="%Y年%m月%d日")
  1. 日付をインデックスに変更
df.set_index("Date", inplace=True)
  1. 時系列データの古い順番でモデルに入れたいので逆順に並べる
df = df.sort_index()
  1. データをプロット
plt.figure(figsize=(16, 8))
plt.title("Close Price History")
plt.plot(df["Close"])
plt.xlabel("Date", fontsize=18)
plt.ylabel("Close Price USD ($)", fontsize=18)
plt.show()

  1. 正規化
    0から1の範囲に落とし込むように正規化を行います。今回は2015年1月22日から2020年12月31日までを訓練データ、2021年1月1日から2023年4月6日までをテストデータとして扱います。
train = df.loc[:"2020-12-31"].values
test = df.loc["2021-01-01"::].values
scaler = MinMaxScaler(feature_range=(0,1))
scaler = scaler.fit(train)
scaled_train_data = scaler.transform(train)
scaled_test_data = scaler.transform(test)

LSTMモデルに投入するためのデータの形成

  1. LSTMモデルに入れるためのデータを形成する関数作成
from typing import Tuple

def make_sequence_data(data: np.ndarray, sequence_size: int) -> Tuple[np.ndarray, np.ndarray]:
    """データをsequence_sizeに指定したサイズのシーケンスに分けてシーケンスとその答えをarrayで返す
    Args:
        data (np.ndarray): 入力データ
        sequence_size (int): シーケンスサイズ
    Returns:
        seq_arr: sequence_sizeに指定した数のシーケンスを格納するarray
        target_arr: シーケンスに対応する答えを格納するarray
    """

    num_data = len(data)
    seq_data = []
    target_data = []
    for i in range(num_data - sequence_size):
        seq_data.append(data[i:i+sequence_size])
        target_data.append(data[i+sequence_size:i+sequence_size+1])
    seq_arr = np.array(seq_data)
    target_arr = np.array(target_data)

    return seq_arr, target_arr
  1. シーケンスデータに変換
    今回は30日分のデータを投入して、次の日の値を予測するように学習します。
seq_length = 30
train_X, train_Y = make_sequence_data(scaled_train_data, seq_length)
test_X, test_Y = make_sequence_data(scaled_test_data, seq_length)
# テンソル変換してLSTMに入力するために軸を変更(シーケンス、バッチサイズ、入力次元)
tensor_train_X = torch.FloatTensor(train_X).permute(1, 0, 2)
tensor_train_Y = torch.FloatTensor(train_Y).permute(1, 0, 2)
tensor_test_X = torch.FloatTensor(test_X).permute(1, 0, 2)
# rmseを計算する時の形を合わせる
test_Y = test_Y.reshape(len(test_Y), 1)

モデルの定義

  1. LSTMモデル定義
class LSTM(nn.Module):
    def __init__(self, hidden_size=100):
        super().__init__()
        self.hidden_size = hidden_size
        # input_sizeは入力する次元数
        self.lstm = nn.LSTM(input_size=1, hidden_size=self.hidden_size) # batch_first=True
        self.linear = nn.Linear(self.hidden_size, 1)

    def forward(self, x):
        x, _ = self.lstm(x)
        x_last = x[-1]
        x = self.linear(x_last)

        return x
  1. 損失関数と最適化の定義
model = LSTM(100)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

学習

  1. 100エポックで学習
epochs = 100
losses = []
with tqdm(total=epochs, desc='Training', position=0) as pbar:
    for epoch in range(epochs):
        optimizer.zero_grad()
        output = model(tensor_train_X)
        loss = criterion(output, tensor_train_Y)
        loss.backward()
        losses.append(loss.item())
        optimizer.step()
        # if epoch % 10 == 0:
        #     pbar.write(f"epoch: {epoch}, loss: {loss.item()}")
        pbar.update()
  1. 損失の推移
plt.figure(figsize=(12, 8))
plt.title("model loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.plot(losses)
plt.legend(["Train"])
plt.show()

推論

  1. テストデータの予測
predictions = model(tensor_test_X).detach().numpy()
  1. RMSEの計算
rmse = mean_squared_error(test_Y, predictions, squared=False)
rmse

出力結果

0.02969738902851506

可視化

train = df.loc["2018-12-31":"2021-01-30"]
valid = df.loc["2021-01-31"::]
valid["Predictions"] = scaler.inverse_transform(predictions)
# 可視化
plt.figure(figsize=(16, 8))
plt.title('Model')
plt.xlabel('Date', fontsize=18)
plt.ylabel('Close Price USD ($)', fontsize=18)
plt.plot(train['Close'])
plt.plot(valid[['Close', 'Predictions']])
plt.legend(['Train', 'Val', 'Predictions'], loc='lower right')
plt.show()

未来の値を予測

  1. データ形成
close_data = np.array(df["Close"].values).reshape(len(df["Close"].values), 1)
scaler = MinMaxScaler(feature_range=(0,1))
scaler = scaler.fit(close_data)
scaled_train_data = scaler.transform(close_data)
train_X, train_Y = make_sequence_data(scaled_train_data, seq_length)
tensor_train_X = torch.FloatTensor(train_X).permute(1, 0, 2)
tensor_train_Y = torch.FloatTensor(train_Y).permute(1, 0, 2)
  1. モデルの初期化
model = LSTM(100)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
  1. 学習
epochs = 100
losses = []
with tqdm(total=epochs, desc='Training', position=0) as pbar:
    for epoch in range(epochs):
        optimizer.zero_grad()
        output = model(tensor_train_X)
        loss = criterion(output, tensor_train_Y)
        loss.backward()
        losses.append(loss.item())
        optimizer.step()
        pbar.update()
  1. 未来の値を予測する関数定義
def get_future_data(model: LSTM, scaled_data: np.ndarray, seq_size: int, num_predict_data: int) -> np.ndarray:
    """指定した日数分未来の値を予測する関数
    Args:
        model (LSTM): モデル
        scaled_data (np.ndarray): スケーリングされたデータ
        seq_size (int): シーケンスサイズ
        num_predict_data(int): 予測する未来の件数
    Returns:
        predicted_data: 予測値が格納されたデータ
    """

    for num in range(num_predict_data):
        seq = torch.FloatTensor(scaled_data[-seq_size:])
        if num == 0:
            seq = seq.unsqueeze(dim=-1)
        else:
            seq = seq.unsqueeze(dim=-1)
            seq = seq.unsqueeze(dim=-1)
        prediction = model(seq)
        scaled_data = np.append(scaled_data, prediction.detach().numpy())
        predicted_data = scaler.inverse_transform(prediction.detach().numpy())

    predicted_data = scaler.inverse_transform(scaled_data[-num_predict_data:].reshape(1, -1))

    return predicted_data
  1. 未来の予測
predicted_data = get_future_data(model, scaled_train_data, 30, 30)
predicted_data

出力結果

array([[0.50087942, 0.501834  , 0.5027307 , 0.50359331, 0.50444781,
        0.50529098, 0.50613402, 0.50697921, 0.50783954, 0.50869591,
        0.50954048, 0.51039241, 0.51123517, 0.51208934, 0.51286068,
        0.51367861, 0.51447767, 0.51529411, 0.51609495, 0.51689136,
        0.51766031, 0.51839959, 0.51911665, 0.51984554, 0.52057096,
        0.52132253, 0.52206665, 0.52283084, 0.52358869, 0.52434456]])

可視化

おわりに

今回はPyTorch+LSTMでXRPデータを活用しながら、仮想通貨の未来の値を予測してみました。予測結果は今後上がっていく方向になりました。備忘録も兼ねて書いてるため、もっとこうしたらいいよ〜、とか、こっちの方がおすすめだよ〜、とかあればコメントいただけると幸いです。

Discussion