CPCV (Combinatorial Purged Cross-Validation) を理解したい
CPCV (Combinatorial Purged Cross-Validation) を理解したい実装する記事です。
CPCVは時系列データに用いられる交差検証法のひとつです。良く用いられる交差検証法としてK-Fold CVやWalk Forward法等複数種類挙げられますが、これらと異なり疑似的に複数のバックテストパスを生成・評価できることが特徴です。
CPCVの理論の詳細は既にまとめている方が居るのでそちらを参考にするとよいでしょう。
また、ファイナンス機械学習[1]にも同様の解説があります。というわけで、細かい理論の解説は書籍や上記記事に譲るとしてここではCPCVをスクラッチ実装していきます。
実装に入る前に
実はわざわざ自分で実装しなくても、既に実装されたものがあります。
[Janestreet]CV method - CombPurgedKFoldCV
ただ、個人的に実装が気に入らなかったので改めて書き直そうというのが本記事の趣旨になります。上記記事で十分であれば、ここから先を読む必要はありません。
実装
本記事の最後に、コードをまとめたものを載せておきます。ここでは読みやすさのためにクラス定義とそのメソッドについて切り分けて書いていきます。
ライブラリのインポート
import math
from dataclasses import dataclass
from functools import cached_property
from itertools import combinations
from typing import Iterable, List, Optional, Union
import numpy as np
import pandas as pd
from numpy.typing import NDArray
以降の実装で使うライブラリです。以降では暗黙的にインポートされたものとして使用していきます。
クラスの定義
@dataclass
class CombinatorialPurgedCrossValidation:
"""Combinatorial purged cross-validation."""
n_splits: int = 5
n_tests: int = 2
purge_gap: Union[int, pd.Timedelta] = 0
embargo_gap: Union[int, pd.Timedelta] = 0
def __post_init__(self) -> None:
"""Post process."""
if self.n_tests >= self.n_splits:
raise ValueError("n_tests must be greater than n_splits.")
CPCVに必要なパラメータをここで定義しています。順に、データの分割数、その内テストに使用する数、パージとエンバーゴの大きさになります。パージとエンバーゴに関してはint型の場合は落とすデータ数、pd.Timedelta型の場合は落とす時間を指定します。このような実装にしているのは、為替や株のOHLCVとtickデータを想定しているためです。OHLCVの場合は落とすバーの数をintで指定すれば十分ですが、tickの場合は時間指定できた方が使いやすいだろうと考えたためです。
メソッドの実装
トレーニングとテスト用のグループIDを作成する
@cached_property
def _test_sets(self) -> List[List[int]]:
"""Return sets of group index for testing."""
test_sets = []
for comb in combinations(range(self.n_splits), self.n_tests):
test_sets.append(list(comb))
return test_sets
@cached_property
def _train_sets(self) -> List[List[int]]:
"""Return sets of group index for training."""
train_sets = []
for test_set in self._test_sets:
train_sets.append(np.setdiff1d(np.arange(self.n_splits), test_set).tolist())
return train_sets
ここでいうグループIDは、各分割に付けられるIDのことを言います。例えばデータを5分割してCPCVを行う場合、各分割には1~5のIDが割りつけられます。_test_sets
と_train_sets
では、CPCVの各試行で用いられるグループIDをリスト形式で返します。以下が例です。
>> cpcv = CombinatorialPurgedCrossValidation(n_splits=3, n_tests=2)
>> cpcv._test_sets
[[0, 1], [0, 2], [1, 2]]
>> cpcv._train_sets
[[2], [1], [0]]
パージとエンバーゴ
def purge(self, indices: pd.Index) -> pd.Index:
if isinstance(self.purge_gap, int):
return indices[: -self.purge_gap]
flags = indices <= (indices.max() - self.purge_gap)
return indices[flags]
def embargo(self, indices: pd.Index) -> pd.Index:
if isinstance(self.embargo_gap, int):
return indices[self.embargo_gap :]
flags = indices >= (indices.min() + self.embargo_gap)
return indices[flags]
時系列データの交差検証では、テストセットにトレーニング時の情報がリークしないように注意する必要があります。トレーニングセットの最後とテストセットの最初に情報のオーバーラップがある場合、トレーニングセットの最後から十分な量のデータを捨てる処理をパージと言います。これとは逆に、テストセットの最後とトレーニングセットの最初に情報のオーバーラップがある場合、トレーニングセットの最初から十分な量のデータを捨てる処理をエンバーゴと呼びます。
上記実装では、データのインデックスのみを引数としてパージとエンバーゴを適用しています。インデックスのみを用いて計算しているのは、いちいちデータを分割すると余分なメモリを食う&処理が遅くなるからです。また、パージとエンバーゴがint型で指定されたかどうかで処理を分けています。暗黙的にmin
やmax
を使用していたりしますが、これらが定義されない(順序が無い)インデックスを持つデータはそもそも時系列ではないと考えてこのようにしています。
分割の生成
交差検証用の分割データを作ります。少し長いコードなので、コードに直接コメントを付与して説明します。
def split(
self,
X: Union[NDArray[np.floating], pd.DataFrame],
y: Optional[Union[NDArray[np.floating], pd.DataFrame, pd.Series]] = None,
return_backtest_labels: bool = False,
) -> Iterable:
"""Split data.
Parameters
----------
X : (N, M) Union[NDArray[np.floating], pd.DataFrame]
Explanatory variables to split, where N is number of data,
M is number of features.
y : (N,) Union[NDArray[np.floating], pd.DataFrame, pd.Series]
Objective variables to split, where N is number of data.
return_backtest_labels : bool, by default False.
If True, return labels test set on backtest path.
Returns
-------
Iterable that generate (Xtrain, ytrain, Xtest, ytest[, labels]) if y was given.
If y wasn't given, this generates (Xtrain, Xtest[, labels]).
"""
if isinstance(X, np.ndarray):
X = pd.DataFrame(X)
if isinstance(y, np.ndarray):
y = pd.DataFrame(y)
# ユニークなインデックスを取得し、分割します。 こうすることで、
# パージとエンバーゴがpd.Timedeltaで指定された場合でも対応可能になります。
inds_unique = X.index.unique()
inds_unique_splitted = np.array_split(inds_unique, self.n_splits)
# メインループ
# 試行ごとのトレーニングセット、テストセットを返します(ジェネレータとして実装)。
# train_gids, test_gidsはトレーニング、テストに用いる分割のIDが
# リストとして格納されています( 例えば[0, 1]など).
# なお、gidはgroup IDの略です。
# test_set_labelsは後述するのでここでは無視してください。
for train_gids, test_gids, labels in zip(
self._train_sets, self._test_sets, self.test_set_labels
):
# パージ・エンバーゴすべき分割IDを求めています。
# 要はテスト前後でパージとエンバーゴが行われるので、
# テストセットのIDに1を加減算しています。
# なお、存在しないIDは単純に無視するので
# 負のID等も特に後処理せずに進めています。
inds_to_purge = np.array(test_gids) - 1
inds_to_embargo = np.array(test_gids) + 1
# テストセットの分割されたインデックスのリストを取得しています
# 長さn_testsのリストになります。
test_inds_list = [inds_unique_splitted[gid] for gid in test_gids]
# トレーニングセットの分割されたインデックスのリストを取得します。
train_inds_list = []
# 分割IDごとにループを回します。gidはintです。
for gid in train_gids:
inds = inds_unique_splitted[gid]
# 先ほど用意したパージすべき分割IDにgidが含まれいたらパージします。
if gid in inds_to_purge:
inds = self.purge(inds)
# 先ほど用意したエンバーゴすべき分割IDにgidが含まれいたらエンバーゴします。
if gid in inds_to_embargo:
inds = self.embargo(inds)
train_inds_list.append(inds)
# トレーニングセットの分割されたインデックスのリストをひとつの配列へ統合します。
# トレーニングセットはあえて分割されたまま扱う理由が無いのでこのようにしています。
train_inds = np.concatenate(train_inds_list).ravel()
# 目的変数yが与えられているかどうかで処理を分けています。
# yが与えられていない場合(Xtrian, Xtest)のような値を返します。
# yが与えられている場合(Xtrian, ytrain, Xtest, ytest)のような値を返します。
# labelsに関しては後述するのでここではスルーしてください。
if y is None:
if return_backtest_labels:
yield (
X.loc[train_inds],
[X.loc[inds] for inds in test_inds_list],
labels,
)
else:
yield X.loc[train_inds], [X.loc[inds] for inds in test_inds_list]
else:
if return_backtest_labels:
yield (
X.loc[train_inds],
y.loc[train_inds],
[X.loc[inds] for inds in test_inds_list],
[y.loc[inds] for inds in test_inds_list],
labels,
)
else:
yield (
X.loc[train_inds],
y.loc[train_inds],
[X.loc[inds] for inds in test_inds_list],
[y.loc[inds] for inds in test_inds_list],
)
では動作確認をしてみましょう。以下では(100, 3)のデータを生成し、日付をインデックスとしたデータフレームを生成して動作確認をしています。
>> arr = np.random.rand(100, 3)
>> start = datetime(2023, 1, 1, 0, 0, 0, 0) # 2023-01-01 00:00:00.000000
>> inds = pd.DatetimeIndex([start + timedelta(days=i) for i in range(100)])
>> df = pd.DataFrame(arr, index=inds)
>> cpcv = CombinatorialPurgedCrossValidation(n_splits=5, n_tests=2, purge_gap=10, embargo_gap=10)
>> for Xtrain, Xtest in cpcv.split(df):
print(Xtrain.shape[0], Xtest[0].shape[0], Xtest[1].shape[0])
50 20 20
30 20 20
30 20 20
40 20 20
40 20 20
20 20 20
30 20 20
40 20 20
30 20 20
50 20 20
出力された一番左の列がトレーニングセットのデータ数、中央と右がテストセットのそれぞれの分割のデータ数です。トレーニングセットのデータ数がまちまちなのは、隣接するテストデータによってパージとエンバーゴが適用された為です。
上記例の条件では、CPCVの各試行とテストセットの分割の関係は以下のようになっています。行は各試行、列は分割IDです。xは当該試行におけるテストセット、oはトレーニングセットを表しています。下記表を参考に、上記出力が正しいか確かめてみると良いでしょう。
1 | 2 | 3 | 4 | 5 | |
---|---|---|---|---|---|
1 | x | x | o | o | o |
2 | x | o | x | o | o |
3 | x | o | o | x | o |
4 | x | o | o | o | x |
5 | o | x | x | o | o |
6 | o | x | o | x | o |
7 | o | x | o | o | x |
8 | o | o | x | x | o |
9 | o | o | x | o | x |
10 | o | o | o | x | x |
補足
return_backtest_labelsについて
splitメソッドの引数として前触れなく現れたreturn_backtest_labels
ですが、これはバックテストパスのラベルを返すかどうかを指定する引数です。というのも、トレーニングセットとテストセットを分割・生成するだけでは、バックテストパスを再現できないためです。というわけで、バックテストパスを再現するプロパティを実装します。
@cached_property
def pathway_labeled(self) -> NDArray[np.integer]:
"""Labeled backtest pathways."""
n_combs = math.comb(self.n_splits, self.n_tests)
pathway_flags = np.zeros((n_combs, self.n_splits), bool)
for i, comb in enumerate(combinations(range(self.n_splits), self.n_tests)):
pathway_flags[i, comb] = True
pathway_labeled = pathway_flags.cumsum(axis=0)
pathway_labeled[~pathway_flags] = 0
return pathway_labeled
@cached_property
def test_set_labels(self) -> List[List[int]]:
"""Return labels of test sets."""
return [labels[labels > 0].tolist() for labels in self.pathway_labeled]
pathway_labeled
で、バックテストパスを生成しています。各パスごとに固有のラベルを連番で付与しています。以下が例です。
>> cpcv = CombinatorialPurgedCrossValidation(n_splits=5, n_tests=2)
>> cpcv.pathway_labeled
array([[1, 1, 0, 0, 0, 0],
[2, 0, 1, 0, 0, 0],
[3, 0, 0, 1, 0, 0],
[4, 0, 0, 0, 1, 0],
[5, 0, 0, 0, 0, 1],
[0, 2, 2, 0, 0, 0],
[0, 3, 0, 2, 0, 0],
[0, 4, 0, 0, 2, 0],
[0, 5, 0, 0, 0, 2],
[0, 0, 3, 3, 0, 0],
[0, 0, 4, 0, 3, 0],
[0, 0, 5, 0, 0, 3],
[0, 0, 0, 4, 4, 0],
[0, 0, 0, 5, 0, 4],
[0, 0, 0, 0, 5, 5]])
>> cpcv.test_set_labels
[[1, 1],
[2, 1],
[3, 1],
[4, 1],
[5, 1],
[2, 2],
[3, 2],
[4, 2],
[5, 2],
[3, 3],
[4, 3],
[5, 3],
[4, 4],
[5, 4],
[5, 5]]
pathway_labeled
は、先述したxoの分割表にラベルを付与したものが出力されています。ラベルごとに結果を集めれば1つのバックテストパスを再現できるというわけです。しかしこのままでは少々使いにくいので、さらに加工したものがtest_set_labels
です。これは、CPCVの各試行におけるテストセットのバックテストパスのラベルを返しています。各試行ごとにラベルと一緒に評価結果を保存しておけばバックテストパスの再現が容易になる寸法です。
全コードまとめ
上記を全てまとめたものが以下の実装です。正しく動作するように上記で説明しなかった細かな関数等も含まれます。また、もう少しdocstringsを充実させたりする予定ですが、現段階のものを貼っておきます。バグやもっと良い実装方法があればコメント頂けると嬉しいです。
import math
from dataclasses import dataclass
from functools import cached_property
from itertools import combinations
from typing import Iterable, List, Optional, Union
import numpy as np
import pandas as pd
from numpy.typing import NDArray
@dataclass
class CombinatorialPurgedCrossValidation:
"""Combinatorial purged cross-validation."""
n_splits: int = 5
n_tests: int = 2
purge_gap: Union[int, pd.Timedelta] = 0
embargo_gap: Union[int, pd.Timedelta] = 0
def __post_init__(self) -> None:
"""Post process."""
if self.n_tests >= self.n_splits:
raise ValueError("n_tests must be greater than n_splits.")
@cached_property
def _test_sets(self) -> List[List[int]]:
"""Return sets of group index for testing."""
test_sets = []
for comb in combinations(range(self.n_splits), self.n_tests):
test_sets.append(list(comb))
return test_sets
@cached_property
def _train_sets(self) -> List[List[int]]:
"""Return sets of group index for training."""
train_sets = []
for test_set in self._test_sets:
train_sets.append(np.setdiff1d(np.arange(self.n_splits), test_set).tolist())
return train_sets
@cached_property
def pathway_labeled(self) -> NDArray[np.integer]:
"""Labeled backtest pathways."""
n_combs = math.comb(self.n_splits, self.n_tests)
pathway_flags = np.zeros((n_combs, self.n_splits), bool)
for i, comb in enumerate(combinations(range(self.n_splits), self.n_tests)):
pathway_flags[i, comb] = True
pathway_labeled = pathway_flags.cumsum(axis=0)
pathway_labeled[~pathway_flags] = 0
return pathway_labeled
@cached_property
def test_set_labels(self) -> List[List[int]]:
"""Return labels of test sets."""
return [labels[labels > 0].tolist() for labels in self.pathway_labeled]
def _is_valid_shape(
self,
X: Union[NDArray[np.floating], pd.DataFrame],
) -> None:
if X.ndim != 2:
raise ValueError("X.ndim must be 2.")
def _is_valid(
self,
X: Union[NDArray[np.floating], pd.DataFrame],
) -> None:
if X.ndim != 2:
raise ValueError("X.ndim must be 2.")
def _is_valid_gap_purge(self, X: pd.DataFrame) -> None:
if isinstance(self.purge_gap, int):
return
if not isinstance(self.purge_gap, type(X.index[1] - X.index[0])):
raise ValueError(
"The type of purge_gap and the type of difference "
"of index in X must be same."
)
def _is_valid_gap_embargo(self, X: pd.DataFrame) -> None:
if isinstance(self.embargo_gap, int):
return
if not isinstance(self.embargo_gap, type(X.index[1] - X.index[0])):
raise ValueError(
"The type of embargo_gap and the type of difference "
"of index in X must be same."
)
def purge(self, indices: pd.Index) -> pd.Index:
if isinstance(self.purge_gap, int):
return indices[: -self.purge_gap]
flags = indices <= (indices.max() - self.purge_gap)
return indices[flags]
def embargo(self, indices: pd.Index) -> pd.Index:
if isinstance(self.embargo_gap, int):
return indices[self.embargo_gap :]
flags = indices >= (indices.min() + self.embargo_gap)
return indices[flags]
def split(
self,
X: Union[NDArray[np.floating], pd.DataFrame],
y: Optional[Union[NDArray[np.floating], pd.DataFrame, pd.Series]] = None,
return_backtest_labels: bool = False,
) -> Iterable:
"""Split data.
Parameters
----------
X : (N, M) Union[NDArray[np.floating], pd.DataFrame]
Explanatory variables to split, where N is number of data,
M is number of features.
y : (N,) Union[NDArray[np.floating], pd.DataFrame, pd.Series]
Objective variables to split, where N is number of data.
return_backtest_labels : bool, by default False.
If True, return labels test set on backtest path.
Returns
-------
Iterable that generate (Xtrain, ytrain, Xtest, ytest[, labels]) if y was given.
If y wasn't given, this generates (Xtrain, Xtest[, labels]).
"""
if isinstance(X, np.ndarray):
X = pd.DataFrame(X)
if isinstance(y, np.ndarray):
y = pd.DataFrame(y)
self._is_valid_shape(X)
self._is_valid_gap_purge(X)
self._is_valid_gap_embargo(X)
inds_unique = X.index.unique()
inds_unique_splitted = np.array_split(inds_unique, self.n_splits)
for train_gids, test_gids, labels in zip(
self._train_sets, self._test_sets, self.test_set_labels
):
inds_to_purge = np.array(test_gids) - 1
inds_to_embargo = np.array(test_gids) + 1
test_inds_list = [inds_unique_splitted[gid] for gid in test_gids]
train_inds_list = []
for gid in train_gids:
inds = inds_unique_splitted[gid]
if gid in inds_to_purge:
inds = self.purge(inds)
if gid in inds_to_embargo:
inds = self.embargo(inds)
train_inds_list.append(inds)
train_inds = np.concatenate(train_inds_list).ravel()
if y is None:
if return_backtest_labels:
yield (
X.loc[train_inds],
[X.loc[inds] for inds in test_inds_list],
labels,
)
else:
yield X.loc[train_inds], [X.loc[inds] for inds in test_inds_list]
else:
if return_backtest_labels:
yield (
X.loc[train_inds],
y.loc[train_inds],
[X.loc[inds] for inds in test_inds_list],
[y.loc[inds] for inds in test_inds_list],
labels,
)
else:
yield (
X.loc[train_inds],
y.loc[train_inds],
[X.loc[inds] for inds in test_inds_list],
[y.loc[inds] for inds in test_inds_list],
)
Discussion