🏋

Numerai Super Massive Dataを全部使ってKerasで学習する

2021/12/01に公開

この記事は、Numerai Advent Calendar 2021の1日目の記事です。

告知

日本初のNumeraiコミュニティイベント Numerai Meetup JAPAN 2021 が2021年12月18日(土)に開催予定です!

はじめに

Numeraiの新しいデータセットが9月にリリースされました。特徴量が1050個になり3倍、行数が週毎になり4倍に膨れ上がりました。

旧データセットではColabでtraining_datavalidation_dataを全て使って学習を回してもメモリに収まっていましたが、新データセットでは両方をロードするだけでRAM不足によりクラッシュしてしまう状況です。

フォーラムでは行を月毎に間引いたり、特徴量を減らしたりする方法がオススメされています。

でもやっぱり妥協せずに全部のデータを使って学習してみたい!ということで、無理やり実現してみました。

Generator

Kerasでは model.fit()train_xtrain_yを渡すだけではなく、データを逐次生成するGeneratorを渡す方法が提供されています。

参考リンク: KerasのGeneratorを自作する

training_dataを分割してファイルに保存しておき、逐次読み出すGeneratorを作れば、メモリを節約できそうです。

まずは numerai_training_data.parquet を分割します。eraをmod 16して分割します。

def divide_training_data(batch_size=1000, divnum=16):
  training_data = load_file('numerai_training_data_int8.parquet')
  feature_cols = [x for x in training_data.columns if x.startswith('feature')]
  filename_list = []
  training_data_len = len(training_data)
  amari = training_data_len % batch_size
  training_data_len = training_data_len - amari
  for idx in range(divnum):
    filename = f"numerai_training_data_{idx}.parquet"
    print(f'Saving {filename}')
    filename_list.append(filename)
    if not os.path.exists(filename):
      df = training_data[pd.to_numeric(training_data['era']) % divnum == idx].copy()
      if idx == 0 and amari > 0:
        # バッチサイズに合わせるために切り捨てる
        df = df.head(len(df) - amari)
      df[feature_cols] = df[feature_cols] / 4.0
      df = dataframe_cast_to_float32(df, feature_cols)
      df.to_parquet(filename)
  return (training_data_len, filename_list, feature_cols)

分割したファイルを別スレッドでロードしてキューに積んでいきます。
ロードしたDataFrameはシャッフルしておきます。

class FileLoader(object):

  def __init__(self, filename_list, shuffle=True):
    super().__init__()
    self._filename_list = filename_list
    self._current_filename_list = []
    self._shuffle = shuffle
    self._file_queue = queue.Queue(1)
    self._stop_flag = False
    self._th = threading.Thread(target=self._load_thread, daemon=True)
    self._th.start()

  def _load_thread(self):
    while not self._stop_flag:
      self._file_queue.put(self._load_next())

  def _load_next(self):
    if len(self._current_filename_list) == 0:
      if self._shuffle:
        self._current_filename_list = random.sample(self._filename_list, len(self._filename_list))
      else:
        self._current_filename_list = self._filename_list

    filename = self._current_filename_list[0]
    self._current_filename_list = self._current_filename_list[1:]
    df = load_file(filename)
    if self._shuffle:
      df = df.sample(len(df))
    return df

  def load_next(self):
    return self._file_queue.get()

  def stop(self):
    while self._th.is_alive():
      self._stop_flag = True
      try:
        self._file_queue.get_nowait()
      except:
        pass
      self._th.join(timeout=0.1)
    self._file_queue = None

DataFrameをバッチサイズに切り分けてジェネレーターで出力していきます。

class TrainingDataGenerator(object):

  def __init__(self, training_data_len, training_filename_list, feature_columns, target_column, batch_size):
    self._training_data_len = training_data_len
    self._file_loader = FileLoader(training_filename_list, shuffle=True)
    self._feature_columns = feature_columns
    self._target_column = target_column
    self._batch_size = batch_size
    self.steps_per_epoch = self._training_data_len // self._batch_size
    self._batch_queue = queue.Queue(3)
    self._stop_flag = False

    self._current_file_df = None
    self._current_index = 0
    self._change_file()

    self._th = threading.Thread(target=self._load_batch_thread, daemon=True)
    self._th.start()

  def _change_file(self):
    self._current_file_df = self._file_loader.load_next()
    self._current_index = 0

  def stop(self):
    while self._th.is_alive():
      self._stop_flag = True
      try:
        self._batch_queue.get_nowait()
      except:
        pass
      self._th.join(timeout=0.1)
    self._batch_queue = None
    self._file_loader.stop()

  def _next_df(self, length):
    df = self._current_file_df[self._current_index : self._current_index+length]
    self._current_index = self._current_index + length
    if len(df) != length:
      self._change_file()
      df = pd.concat([df, self._next_df(length - len(df))])
    return df

  def _next_batch(self):
    df = self._next_df(self._batch_size)
    train_x = df[self._feature_columns].to_numpy()
    train_y = df[self._target_column].to_numpy()
    return train_x, train_y

  def _load_batch_thread(self):
    while not self._stop_flag:
      self._batch_queue.put(self._next_batch())

  def batch_generater(self):
    while True:
      yield self._batch_queue.get()

学習時にmodel.fit()の引数xにGeneratorを渡します。
model.fit_generator()は廃止されたので注意してください。

model.fit(x=generator.batch_generater(),
          validation_data=validation_data_pair,
          steps_per_epoch=generator.steps_per_epoch,
          epochs=epochs)

実際のNotebookはこちら

おわりに

これで無事にRAM拡張なしの13GBのColabで全てのデータを使って学習を回すことができました。

今後、12月中に特徴量が3000に拡大すると予告されています。そうなっても最初のファイル分割さえうまく行けば学習を回せそうですね。

Discussion