Numerai Super Massive Dataを全部使ってKerasで学習する
この記事は、Numerai Advent Calendar 2021の1日目の記事です。
告知
日本初のNumeraiコミュニティイベント Numerai Meetup JAPAN 2021 が2021年12月18日(土)に開催予定です!
はじめに
Numeraiの新しいデータセットが9月にリリースされました。特徴量が1050個になり3倍、行数が週毎になり4倍に膨れ上がりました。
旧データセットではColabでtraining_data
とvalidation_data
を全て使って学習を回してもメモリに収まっていましたが、新データセットでは両方をロードするだけでRAM不足によりクラッシュしてしまう状況です。
フォーラムでは行を月毎に間引いたり、特徴量を減らしたりする方法がオススメされています。
でもやっぱり妥協せずに全部のデータを使って学習してみたい!ということで、無理やり実現してみました。
Generator
Kerasでは model.fit()
でtrain_x
とtrain_y
を渡すだけではなく、データを逐次生成するGeneratorを渡す方法が提供されています。
参考リンク: KerasのGeneratorを自作する
training_data
を分割してファイルに保存しておき、逐次読み出すGeneratorを作れば、メモリを節約できそうです。
まずは numerai_training_data.parquet
を分割します。era
をmod 16して分割します。
def divide_training_data(batch_size=1000, divnum=16):
training_data = load_file('numerai_training_data_int8.parquet')
feature_cols = [x for x in training_data.columns if x.startswith('feature')]
filename_list = []
training_data_len = len(training_data)
amari = training_data_len % batch_size
training_data_len = training_data_len - amari
for idx in range(divnum):
filename = f"numerai_training_data_{idx}.parquet"
print(f'Saving {filename}')
filename_list.append(filename)
if not os.path.exists(filename):
df = training_data[pd.to_numeric(training_data['era']) % divnum == idx].copy()
if idx == 0 and amari > 0:
# バッチサイズに合わせるために切り捨てる
df = df.head(len(df) - amari)
df[feature_cols] = df[feature_cols] / 4.0
df = dataframe_cast_to_float32(df, feature_cols)
df.to_parquet(filename)
return (training_data_len, filename_list, feature_cols)
分割したファイルを別スレッドでロードしてキューに積んでいきます。
ロードしたDataFrameはシャッフルしておきます。
class FileLoader(object):
def __init__(self, filename_list, shuffle=True):
super().__init__()
self._filename_list = filename_list
self._current_filename_list = []
self._shuffle = shuffle
self._file_queue = queue.Queue(1)
self._stop_flag = False
self._th = threading.Thread(target=self._load_thread, daemon=True)
self._th.start()
def _load_thread(self):
while not self._stop_flag:
self._file_queue.put(self._load_next())
def _load_next(self):
if len(self._current_filename_list) == 0:
if self._shuffle:
self._current_filename_list = random.sample(self._filename_list, len(self._filename_list))
else:
self._current_filename_list = self._filename_list
filename = self._current_filename_list[0]
self._current_filename_list = self._current_filename_list[1:]
df = load_file(filename)
if self._shuffle:
df = df.sample(len(df))
return df
def load_next(self):
return self._file_queue.get()
def stop(self):
while self._th.is_alive():
self._stop_flag = True
try:
self._file_queue.get_nowait()
except:
pass
self._th.join(timeout=0.1)
self._file_queue = None
DataFrameをバッチサイズに切り分けてジェネレーターで出力していきます。
class TrainingDataGenerator(object):
def __init__(self, training_data_len, training_filename_list, feature_columns, target_column, batch_size):
self._training_data_len = training_data_len
self._file_loader = FileLoader(training_filename_list, shuffle=True)
self._feature_columns = feature_columns
self._target_column = target_column
self._batch_size = batch_size
self.steps_per_epoch = self._training_data_len // self._batch_size
self._batch_queue = queue.Queue(3)
self._stop_flag = False
self._current_file_df = None
self._current_index = 0
self._change_file()
self._th = threading.Thread(target=self._load_batch_thread, daemon=True)
self._th.start()
def _change_file(self):
self._current_file_df = self._file_loader.load_next()
self._current_index = 0
def stop(self):
while self._th.is_alive():
self._stop_flag = True
try:
self._batch_queue.get_nowait()
except:
pass
self._th.join(timeout=0.1)
self._batch_queue = None
self._file_loader.stop()
def _next_df(self, length):
df = self._current_file_df[self._current_index : self._current_index+length]
self._current_index = self._current_index + length
if len(df) != length:
self._change_file()
df = pd.concat([df, self._next_df(length - len(df))])
return df
def _next_batch(self):
df = self._next_df(self._batch_size)
train_x = df[self._feature_columns].to_numpy()
train_y = df[self._target_column].to_numpy()
return train_x, train_y
def _load_batch_thread(self):
while not self._stop_flag:
self._batch_queue.put(self._next_batch())
def batch_generater(self):
while True:
yield self._batch_queue.get()
学習時にmodel.fit()
の引数x
にGeneratorを渡します。
model.fit_generator()
は廃止されたので注意してください。
model.fit(x=generator.batch_generater(),
validation_data=validation_data_pair,
steps_per_epoch=generator.steps_per_epoch,
epochs=epochs)
実際のNotebookはこちら
おわりに
これで無事にRAM拡張なしの13GBのColabで全てのデータを使って学習を回すことができました。
今後、12月中に特徴量が3000に拡大すると予告されています。そうなっても最初のファイル分割さえうまく行けば学習を回せそうですね。
Discussion