👅

価格変動パターンの類似度を使用したデータバリデーション

2023/12/11に公開

当記事は、マケデコ Advent Calendar 2023 10日目の記事になります。
https://qiita.com/advent-calendar/2023/market-api

はじめに

みなさんデータのバリデーションはどの様にしていますか?
バリデーションの方法は様々あり、その詳細に関しては私が書くまでもなく、世の中には情報があふれています。
株などの価格時系列データをバリデーションする際にはどのようにしたらいいのでしょうか。
こないだちょっとその事について思いついた方法があり、この記事を執筆するのに合わせて実装してみました。有用かどうかの検証は済ませていませんので、気になった方はご自分で行ってみてください。

何をするのか?

よく予測モデルを作成していて出会うのは、ある特定の時期は非常によく稼いでくれているが、その後予測力が死んでしまっているというものです。単純に損益の合計を見た時に「これはいいかも」とグラフを作成すると、直近では戦略が機能していないという感じのものはよく見ます(私が弱いだけです)。
予測に使用する特徴量や執行戦略も重要だと思いますが、今回はデータのバリデーションに焦点を当てて記事を執筆していきます。

どの様にデータを分割するのか

日経先物やオプションなどでは毎月第2金曜日に清算日があります。今回はその日付を使用してデータを分割してみます。
SQを分水峰にトレンドが変わる的な場合もあり、記憶に新しい所だとコロナショックなどが思い浮かびます。3か月に1度のメジャーSQになりますが、2020年3月のSQを境に暴落していた日経の価格トレンドが下降トレンドから上昇トレンドに切り替わりました。下図は当時の価格をSQで色分けしたグラフです。

以下は生成AIに聞いた回答のコピペです。

SQの概要

SQとは、日経225先物などの株価指数先物取引、または株価指数のオプション取引などを、最終的な決済期日前で決済するための「特別な価格」のことを指します¹。算出された価格は特別清算指数、最終清算指数、あるいはただ単にSQ値などと呼ばれます¹。SQ値は、指数算出日における各指数構成銘柄の始値に基づいて算出され、大引け後大阪取引所より発表されます¹。SQ算出日は各限月の第2金曜日となり、祝日にあたる場合は前倒しされます¹。

¹: SBIネオトレード証券

(1) SQ値(速報値・確定値を掲載)|SBIネオトレード証券. https://www.sbineotrade.jp/225/outline/sq.htm.
(2) SQとは|株式投資をする上で知っておきたい基礎知識 | 投資の教科書. https://toushi-kyokasho.com/what-is-sq/.
(3) メジャーSQ通過後の日経平均株価の動きについて | 三井住友DS .... https://www.smd-am.co.jp/market/ichikawa/2021/09/irepo210910/.
(4) 最終清算数値・最終決済価格 | 日本取引所グループ. https://www.jpx.co.jp/markets/derivatives/special-quotation/.

SQ値が決まる日時

SQ値は、指数算出日における各指数構成銘柄の始値に基づいて算出されます。SQ日の225銘柄の始値から算出されるため、SQ値は毎回異なります¹。SQ算出日は各限月の第2金曜日となり、祝日にあたる場合は前倒しされます¹。詳細については、SBIネオトレード証券を参照してください。

(1) SQとは?SQ前後の株価・チャートの値動きを利用した有効 .... https://openeducation.co.jp/media/sq/.
(2) SQとは?SQ前後の株価・チャートの値動きを利用した有効 .... https://bing.com/search?q=SQ値の決め方.
(3) SQ決済時の計算方法を教えてください。 - auカブコム証券. https://faq.kabu.com/s/article/k000539.
(4) SQ(オプションSQ・メジャーSQ・幻のSQ・米国のSQ)の解説. https://www.kabusoba.jp/kabushiki851sq.html.
(5) SQ値(速報値・確定値を掲載)|SBIネオトレード証券. https://www.sbineotrade.jp/225/outline/sq.htm.

メジャーSQとマイナーSQの違い

メジャーSQとマイナーSQの違いは、以下の通りです¹:

  • メジャーSQ:3月・6月・9月・12月の第2金曜日
  • マイナーSQ:それ以外の月の第2金曜日

先物取引はメジャーSQのみ、オプション取引は毎月SQ日があるため、メジャーSQに該当する場合もあればマイナーSQに該当する場合もあります¹。

¹: SBIネオトレード証券

(1) メジャーSQ、マイナーSQとは | ファイナンシャルプランナー講座 .... https://www.foresight.jp/blog/fp/archives/7392.
(2) SQとは? わかりやすく教えてください。 | いま聞きたいQ&A .... https://manabow.com/qa/sq.html.
(3) メジャーSQ前は相場が荒れる・SQで相場が転換すると言われる理由. https://foxorz.com/major-sq-condition/.
(4) マーケット|SBI証券. https://www.sbisec.co.jp/ETGate/?OutSide=on&_ControlID=WPLETmgR001Control&_PageID=WPLETmgR001Mdtl20&_DataStoreID=DSWPLETmgR001Control&_ActionID=DefaultAID&getFlg=on&burl=search_market&cat1=market&cat2=report&dir=report&file=market_report_ew_150601.html.
(5) メジャーSQとマイナーSQの日程 | 日経平均株価に連動する .... https://nikkeiheikinnkabusyoukenn.com/newpage47.html.

分割したデータを学習用データと評価用データに振り分ける

SQ日毎に分割したデータを適当に振り分けてもいいのですが、それだと学習する際に偏りが出るかもしれません。今回は価格変動パターンの類似度を測り3つのクラスに分類し、偏りが無い様に学習用データと評価用データに分割していきます。
類似度の測定はDTW(Dynamic Time Warping)とK-meansを使用します。

実装

外部ライブラリのインストール

今回はGoogleColabを使用する前提で書いていきます。

# Google Colabに必要なパッケージをインストール
!pip install yfinance
!pip install jpholiday
!pip install tslearn

ライブラリのインポートと設定

記事の中ではDataFrameを扱うライブラリとしてpandasではなくpolarsを使用しています。書き方は似ているので普段pandasを使用している方も読めると思います。

import calendar
import datetime
import random
from typing import Dict, Iterable, List

import jpholiday
from matplotlib import pyplot as plt
import numpy as np
import plotly.express as px
import polars as pl
from tslearn.utils import to_time_series_dataset
from tslearn.clustering import TimeSeriesKMeans
import yfinance

pl.Config.set_tbl_rows(100)
plt.style.use('seaborn-whitegrid')


global DATETIME_COL
global DATETIME_TYPE
global OPEN_COL
global CLOSE_COL
global SQ_ID_COL
global CLUSTER_ID_COL
global PRICE_COL
DATETIME_COL = 'datetime'
DATETIME_TYPE = 'ms'
OPEN_COL = 'op'
CLOSE_COL = 'cl'
SQ_ID_COL = 'sq_id'
CLUSTER_ID_COL = 'cluster_id'
PRICE_COL = 'pri'

データの取得

今回は誰でも試せるように、yahoo financeからデータを取得します。日経平均株価の日足を使用しますが、データ数を増やす為に夜間取引のデータも追加していきます(今回は予測精度に関する有効性まで検証できなかったので、本当は必要ありません)

データ取得と夜間取引追加の関数
def request_yfinance(
    tic_code: str='^N225',
    start: str='2000-01-01',
    end: str=datetime.datetime.now().date().strftime('%Y-%m-%d'),
    opening: datetime.time=datetime.time(9, 0, 0),
) -> pl.DataFrame:
    # 今回はYahoo financeから日経平均株価を取得する
    ticker = yfinance.Ticker(tic_code)
    history = (
        # 個人的にpolarsの方が使いやすいので変換する
        pl.DataFrame(
            # yahoo financeからデータを取得する
            ticker
            .history(start=start, end=end)
            .reset_index()
        )
        .with_columns([
            pl.col('Date')
                .dt.date()
                .dt.combine(opening)
                .cast(pl.Datetime('ms'))
        ])
        .select(['Date', 'Open', 'Close'])
    )
    history.columns = [DATETIME_COL, OPEN_COL, CLOSE_COL]
    return history


def generate_night_session(
    dataframe: pl.DataFrame,
    opening: datetime.time=datetime.time(16, 30, 0)
) -> pl.DataFrame:
    night_session = (
        dataframe
        .select([DATETIME_COL, CLOSE_COL, OPEN_COL])
        .with_columns([
            pl.col(DATETIME_COL)
                .dt.date()
                .dt.combine(opening)
                .cast(pl.Datetime('ms')),
            pl.col(OPEN_COL).shift(-1),
        ])
    )
    night_session.columns = [DATETIME_COL, OPEN_COL, CLOSE_COL]
    return night_session
day_session = request_yfinance()
night_session = generate_night_session(day_session)
data = (
    pl.concat([day_session, night_session])
    .sort(DATETIME_COL)
)
print(data.head())
OUTPUT
┌─────────────────────┬──────────────┬──────────────┐
│ datetime            ┆ op           ┆ cl           │
│ ---------          │
│ datetime[ms]        ┆ f64          ┆ f64          │
╞═════════════════════╪══════════════╪══════════════╡
│ 2000-01-04 09:00:0018937.44921919002.859375 │
│ 2000-01-04 16:30:0019002.85937519003.509766 │
│ 2000-01-05 09:00:0019003.50976618542.550781 │
│ 2000-01-05 16:30:0018542.55078118574.009766 │
│ 2000-01-06 09:00:0018574.00976618168.269531 │
└─────────────────────┴──────────────┴──────────────┘

SQ日を特定する

SQ日でデータを分割するのでSQ日を特定していきます。SQ日とSQ別IDが入力されたDataFrameを作成し、元の価格データに結合して全ての行にSQ別IDを割り当てます。

SQ関連の関数
def second_friday(
    year: int,
    month: int
) -> datetime.datetime:
    '''
    指定した月の第2金曜日を取得します。
    '''
    first_day = datetime.datetime(year, month, 1, 9, 0, 0)
    day_of_week = first_day.weekday()
    days_until_friday  = (calendar.FRIDAY - day_of_week) % 7
    second_friday = first_day + datetime.timedelta(days_until_friday + 7)
    return second_friday


def convert_from_holiday(
    dt_obj: datetime.datetime
) -> datetime.datetime:
    '''
    引数として渡された日付が祝日ならば、前日の日付を返します
    '''
    while True:
        if jpholiday.is_holiday(dt_obj):
            dt_obj = dt_obj - datetime.timedelta(days=1)
        else:
            return dt_obj


def last_trading_datetime(
    latest_dt: datetime.datetime
) -> datetime.datetime:
    """
    指定した日付が祝日だったならば前営業日を返します
    """
    sf = second_friday(latest_dt.year, latest_dt.month)
    last_trade = convert_from_holiday(sf)
    while True:
        if jpholiday.is_holiday(last_trade):
            last_trade = convert_from_holiday(last_trade)
        else:
            return last_trade


def select_major_sq(df) -> pl.DataFrame:
    '''
    3, 6, 9, 12のメジャーSQだけに絞り込む
    '''
    df = (
        df\
        .with_columns([
            pl.when(pl.col(DATETIME_COL).dt.month() % 3 == 0)
                .then(True)
                .otherwise(False)
                .alias('query')
        ])\
        .filter(pl.col('query') == True)\
        .drop('query')
    )
    ids = pl.Series(SQ_ID_COL, list(range(df.shape[0])))
    df = df.with_columns([ids])
    return df


def generate_sq_calendar(
    start_year: int,
    end_year: int,
    major: bool=False
) -> pl.DataFrame:
    '''
    SQカレンダーを作成する。
    '''
    sqs = []
    for year in range(start_year, end_year + 2):
        for month in range(1, 13):
            dt = datetime.datetime(year, month, 1)
            sq = last_trading_datetime(dt)
            sqs.append(sq)
    # SQごとにIDを割り当てたDataFrameを作成する
    df = (
        pl\
        .DataFrame({DATETIME_COL: sqs})\
        .with_columns([
            pl.col(DATETIME_COL).cast(pl.Datetime(DATETIME_TYPE)),
            pl.Series(SQ_ID_COL, np.arange(0, len(sqs)))
        ])
    )

    if major:
        # メジャーSQだけに絞り込む
        df = select_major_sq(df)

    return df

データ期間内の全SQ日を取得する

years = data[DATETIME_COL].dt.year().unique()
sqs = generate_sq_calendar(years.min(), years.max(), True)

print('データ内のSQの数:', sqs[SQ_ID_COL].unique().shape[0])
print(sqs.head())
OUTPUT
データ内のSQの数: 300
┌─────────────────────┬───────┐
│ datetime            ┆ sq_id │
│ ------   │
│ datetime[ms]        ┆ i64   │
╞═════════════════════╪═══════╡
│ 2000-01-14 09:00:000     │
│ 2000-02-10 09:00:001     │
│ 2000-03-10 09:00:002     │
│ 2000-04-14 09:00:003     │
│ 2000-05-12 09:00:004     │
└─────────────────────┴───────┘

SQ日の入力されたDataFrameを価格のDataFrameに結合します。

# SQ日のデータを横結合し、全ての日付にSQ日のIDを付与します
df = (
    data
    .join(sqs, on=DATETIME_COL, how='outer')
    .sort(DATETIME_COL)
    .with_columns([
        pl.col(SQ_ID_COL).shift(-1).fill_null(strategy='backward')
    ])
    .drop_nulls(CLOSE_COL)
)
print(df.shape)
print(df.head(20))
OUTPUT
(11729, 4)
┌─────────────────────┬──────────────┬──────────────┬───────┐
│ datetime            ┆ op           ┆ cl           ┆ sq_id │
│ ------------   │
│ datetime[ms]        ┆ f64          ┆ f64          ┆ i64   │
╞═════════════════════╪══════════════╪══════════════╪═══════╡
│ 2000-01-04 09:00:0018937.44921919002.8593750     │
│ 2000-01-04 16:30:0019002.85937519003.5097660     │
│ 2000-01-05 09:00:0019003.50976618542.5507810     │
│ 2000-01-05 16:30:0018542.55078118574.0097660     │
│ 2000-01-06 09:00:0018574.00976618168.2695310

価格時系列の指数化

SQ日の始値を基準に指数化を行います。

df = (
    df
    .with_columns([
        (
            pl.col(CLOSE_COL) /
            pl.when(pl.col(SQ_ID_COL) != pl.col(SQ_ID_COL).shift())
                .then(pl.col(CLOSE_COL))
                .otherwise(None)
                .fill_null(strategy='forward')
        ).alias(PRICE_COL)

    ])
)

print(df.head())

OUTPUT
┌─────────────────────┬──────────────┬──────────────┬───────┬──────────┐
│ datetime            ┆ op           ┆ cl           ┆ sq_id ┆ pri      │
│ ---------------      │
│ datetime[ms]        ┆ f64          ┆ f64          ┆ i64   ┆ f64      │
╞═════════════════════╪══════════════╪══════════════╪═══════╪══════════╡
│ 2000-01-04 09:00:0018937.44921919002.85937501.0      │
│ 2000-01-04 16:30:0019002.85937519003.50976601.000034 │
│ 2000-01-05 09:00:0019003.50976618542.55078100.975777 │
│ 2000-01-05 16:30:0018542.55078118574.00976600.977432 │
│ 2000-01-06 09:00:0018574.00976618168.26953100.956081 │
└─────────────────────┴──────────────┴──────────────┴───────┴──────────┘

SQ毎の指数化した価格データを取り出します。

# SQごとの時系列データを取得
arys = []
for i in df['sq_id'].unique():
    ary = (
        df
        .filter(pl.col('sq_id') == i)
        [PRICE_COL]
        .to_numpy()
    )
    arys.append(ary)
可視化のコード
# 適当な分を可視化してみる
fig, ax = plt.subplots(figsize=(10, 5))
for ary in arys[: 50]:
    ax.plot(ary, c='gray', lw=1)

plt.show()

価格変動パターンの分類

便利なライブラリがあるので今回は "tslearn" の "tslearn.clustering.TimeSeriesKMeans" を使用していきます。
https://tslearn.readthedocs.io/en/stable/gen_modules/clustering/tslearn.clustering.TimeSeriesKMeans.html
ただ上記のライブラリを使用すると、分類後のデータが多い順に番号が割り当てられてしまうので、関数を作成し「 上昇:0, そのまま:1, 下降:2 」になるように番号を振り直します。

クラスターIDの振り直し関数
def reclass(
    cluster: Iterable[int],
    arys: Iterable[Iterable[float]],
    n_cluster: int=3,
):
    # クラス内のデータ量が多い順にcluster_idが割り振られてしまうので修正する
    # 上昇傾向: 0,  平行: 1,下降傾向: 2
    temp = {key: [] for key in list(set(cluster))}
    for clus, ary in zip(cluster, arys):
        temp[clus].append(np.mean(ary))

    # 適当に平均値を使用して割り当てを考える
    means = {key: np.mean(lst) for key, lst in temp.items()}
    _sorted = sorted(means.items(),key=lambda x:x[1], reverse=True)
    _cmaps = list(range(3))

    # reclassed = {現在のcluster_id: 新しいcluster_id}
    reclassed = {tup[0]: cmap for tup, cmap in zip(_sorted, _cmaps)}
    # cluster_idの降り直し
    recluster_ids = [reclassed.get(clus) for clus in cluster]
    return recluster_ids
# ちょっと試してみる
km = TimeSeriesKMeans(
    n_clusters=3,
    metric="dtw",
    random_state=9
)

# ----- ここから下は試しに可視化する際に使用するだけ ----- #
test_clus = km.fit_predict(to_time_series_dataset(arys[: 50]))
test_clus = reclass(test_clus, arys)

# 適当に3色作成
cmaps = ['red', 'green','blue']
# 最も長い時間を取得
max_idx = np.max([len(ary) for ary in arys])

fig, ax = plt.subplots(figsize=(10, 5))
for clus, ary in zip(test_clus, arys[: 50]):
    ax.plot(ary, color=cmaps[clus], lw=1)
    # ただのラインだと見づらいので最終取引のポイントを強調してみる
    ax.scatter(x=max_idx + clus, y=ary[-1], color=cmaps[clus], s=13)

plt.show()

実際に全体を分類してみましょう。

# 全体を分けてみる
clusters = km.fit_predict(to_time_series_dataset(arys))
clusters = reclass(clusters, arys)


cluster_ids = (
    pl.DataFrame({
        SQ_ID_COL: list(range(len(clusters))),
        CLUSTER_ID_COL: clusters
    })
)

joined = (
    df
    .join(
        cluster_ids,
        on=SQ_ID_COL,
        how='outer'
    )
)
print(joined.head())
OUTPUT
┌─────────────────────┬──────────────┬──────────────┬───────┬──────────┬────────────┐
│ datetime            ┆ op           ┆ cl           ┆ sq_id ┆ pri      ┆ cluster_id │
│ ------------------        │
│ datetime[ms]        ┆ f64          ┆ f64          ┆ i64   ┆ f64      ┆ i64        │
╞═════════════════════╪══════════════╪══════════════╪═══════╪══════════╪════════════╡
│ 2000-01-04 09:00:0018937.44921919002.85937501.01          │
│ 2000-01-04 16:30:0019002.85937519003.50976601.0000341          │
│ 2000-01-05 09:00:0019003.50976618542.55078100.9757771          │
│ 2000-01-05 16:30:0018542.55078118574.00976600.9774321          │
│ 2000-01-06 09:00:0018574.00976618168.26953100.9560811          │
└─────────────────────┴──────────────┴──────────────┴───────┴──────────┴────────────┘

学習用データと評価用データにIDを分ける

学習用と評価用にデータを分ける為に、それぞれのcluster_idを取得する。

学習と評価にIDを分ける
def splits_cluster_id(
    cluster_ids: List[int],
    train_size: float=0.7,
    eval_size: float=0.3
) -> Dict[str, List[int]]:
    cluster_dict = {}
    for unique in list(set(cluster_ids)):
        sq_ids = []
        for sq_id, clus in enumerate(cluster_ids):
            # cluster_idsのindexはsq_idと同じ
            if clus == unique:
                sq_ids.append(sq_id)
        cluster_dict[unique] = sq_ids

    # 学習用と評価用にsq_idを振り分ける
    train_ids = []
    eval_ids = []
    for sq_ids in cluster_dict.values():
        length = len(sq_ids)
        tlen = int(length * train_size)
        elen = length - tlen

        for _ in range(tlen):
            idx = random.randint(0, len(sq_ids) - 1)
            sq_id = sq_ids.pop(idx)
            train_ids.append(sq_id)

        for _ in range(elen):
            idx = random.randint(0, len(sq_ids) - 1)
            sq_id = sq_ids.pop(idx)
            eval_ids.append(sq_id)

    train_ids.sort()
    eval_ids.sort()
    return dict(train_ids=train_ids, eval_ids=eval_ids)
split_ids = splits_cluster_id(clusters)

train_df = (
    joined
    .filter(
        pl.col(SQ_ID_COL)
        .is_in(split_ids.get('train_ids'))
    )
    .sort(DATETIME_COL)
)

eval_df = (
    joined
    .filter(
        pl.col(SQ_ID_COL)
        .is_in(split_ids.get('eval_ids'))
    )
    .sort(DATETIME_COL)
)
print('train:', train_df.shape)
print('eval :', eval_df.shape)
OUTPUT
train: (8133, 6)
eval : (3598, 6)

上手く分類できているかを確認する。

# 最新のpolarsでは.groupby -> .group_byに変わっています。
view_df = (
    pl.concat([
        train_df.with_columns([pl.lit('train').alias('data')]),
        eval_df.with_columns([pl.lit('eval').alias('data')])
    ])
    .sort(DATETIME_COL)
    .groupby('data', CLUSTER_ID_COL)
        .agg([
            pl.col(SQ_ID_COL).count()
        ])
    .sort('data', CLUSTER_ID_COL)
    .with_columns([
        pl.when(pl.col('data') == 'train')
            .then(pl.col(SQ_ID_COL) / len(train_df))
            .otherwise(pl.col(SQ_ID_COL) / len(eval_df))
            .alias('ratio')
    ])
)

fig = px.bar(
    view_df,
    x='ratio',
    y='data',
    color=CLUSTER_ID_COL,
    color_continuous_scale=cmaps
)
fig.update_layout(width=1200, template='plotly_white')
fig.show()

リークを防ぐ為に隙間をあける

# リークを防ぐ為にデータに隙間を開ける場合
distance = 2

splits_df = (
    pl.concat([
        train_df.with_columns([pl.lit('train').alias('data')]),
        eval_df.with_columns([pl.lit('eval').alias('data')])
    ])
    .sort('datetime')
    # 隙間を開けてNull値にしてしまう
    .with_columns([
        pl.when(pl.col('data') != pl.col('data').shift(distance))
            .then(pl.lit(None))
            .otherwise(pl.col('data'))
            .alias('data')
    ])
    .with_columns([
        pl.when(pl.col('data') != pl.col('data').shift(-distance))
            .then(pl.lit(None))
            .otherwise(pl.col('data'))
            .alias('data')
    ])
)

print(splits_df[88: 96].select(['datetime', 'cluster_id', 'data']))
OUTPUT
┌─────────────────────┬────────────┬───────┐
│ datetime            ┆ cluster_id ┆ data  │
│ ---------   │
│ datetime[ms]        ┆ i64        ┆ str   │
╞═════════════════════╪════════════╪═══════╡
│ 2000-03-08 09:00:001eval  │
│ 2000-03-08 16:30:001eval  │
│ 2000-03-09 09:00:001          ┆ null  │
│ 2000-03-09 16:30:001          ┆ null  │
│ 2000-03-10 09:00:000          ┆ null  │
│ 2000-03-10 16:30:000          ┆ null  │
│ 2000-03-13 09:00:000          ┆ train │
│ 2000-03-13 16:30:000          ┆ train │
└─────────────────────┴────────────┴───────┘

最後にNullが入力されている行を削除して、余分な列を削除します。

splits_df = (
    splits_df
    .drop_nulls('data')
    .select(['datetime', 'data'])
)

# 特徴量の生成を終えたDataFrameに結合すれば終了です
"""
features = ...
features = (
    features
    .join(splits_df, on='datetime', how='outer')
    .sort('datetime')
)
"""

まとめ

思いつきで実装しましたが、有効かどうかの検証はしていません。今回は日経先物などで使用されるSQ日を期間の区切りに使用しましたが、流用は簡単だと思います。

参考

https://www.ai-gakkai.or.jp/jsai2017/webprogram/2017/pdf/367.pdf
https://zenn.dev/bee2/articles/e8623a603752ff

Discussion