🤖

Prophetをゼロから実装する(Python)

2023/11/28に公開

この記事では、下記の論文と実装を参考にPrphetをゼロから実装していきたい。

https://www.tandfonline.com/doi/abs/10.1080/00031305.2017.1380080

https://github.com/facebook/prophet

使用するライブラリとしては、下記のものがある。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
from scipy.optimize import minimize

1. Prophetについて

Prophetは、データサイエンスの知識がない人でも様々なビジネス上の時系列予測タスクに対して、扱えるように開発された。

we present a time series model which is flexible enough for a wide range of business time series, yet configurable by non-experts who may have domain knowledge about the data generating process but little knowledge about time series models and methods.(Taylor and Letham, 2014)

Prophetモデルは下記のように、成長関数g(t)、季節関数s(t)、休日関数h(t)、誤差項εから構成される。

y(t)=g(t)+s(t)+h(t)+ε_t

時系列分析といえば、過去のデータを使って未来の値を予測する自己回帰モデルが有名である。しかし、Prophetはこの自己回帰成分を思い切って捨て去ることで、大胆にも時系列分析を曲線フィッティングの問題に置き換えている。そのおかげ欠損値があるから未来の値を予測できないといった問題も回避している。

では、ここで実際にProphetを動かしてみよう。

from sklearn.metrics import mean_squared_error
from prophet import Prophet

df = pd.read_csv('https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv')

model = Prophet()
model.fit(df)
future = model.make_future_dataframe(periods=0)
forecast = model.predict(future)
fig = model.plot(forecast)
plt.show()

データはWikipediaにおけるPeyton Manningの日次ログページビューである。本記事では、本家のProphetの平均二乗誤差にどれだけ近づけるかを一つの指標としたい。

image.png

mse = mean_squared_error(df['y'], forecast['yhat'])
print(f"MSE: {mse:.5f}")

本家Prophetの平均二乗誤差はMSE: 0.23396であった。

2.フラットトレンド

まずは下記のように単純なフラットトレンドモデルを実装してみよう。モデルは次のように成る。

y(t)= mean(y) +ε

まず、myProphetクラスを定義する。

class myProphet:
    def __init__(self):
    #コードはこれから追加
    def fit(self, df):
    #コードはこれから追加
    def plot(self):
    #コードはこれから追加

次に、コンストラクタにおいて、データフレームdf、切片a、予測値y_hatを定義する。

    def __init__(self):
        self.df = None
        self.a = None
        self.y_hat = None

続いて、フラットトレンドのコードを実装していく。

    def fit(self, df):
        df['ds'] = pd.to_datetime(df['ds'])
        self.df = df
        self.a = df['y'].mean()
        self.y_hat = np.full(len(df), self.a)

最後に、可視化のための関数を定義する。plot関数は本家のProphetの図っぽくなるようにいろいろ設定しておく。

    def plot(self):
        plt.figure(figsize=(10, 6))
        plt.scatter(self.df['ds'], self.df['y'], color='black', s=10)
        plt.plot(self.df['ds'], self.y_hat, color='#0072B2')
        plt.grid(True, color='#cccccc')
        plt.legend()
        plt.show()

plot関数はこれで完成。とりえあえずこれ以降変更はしない。
それでは、実際に使ってみよう。

model = myProphet()
model.fit(df)
model.plot()
mse = mean_squared_error(df['y'], model.y_hat)
print(f"MSE: {mse:.5f}")

平均二乗誤差はMSE: 0.71540であった。本家が0.23396であることを考えるとだいぶ差があると言える。

3. 季節関数(年次レベル)

さて、ここから本家のProphetに近づけていく作業に取り掛かる。まずは季節関数を実装する。
季節関数は論文では下記のように定義されている。

s(t) = \sum_{n=1}^{N}(a_n cos(\frac{2πnt}{P}) + b_n sin(\frac{2πnt}{P}))

そのため、たとえば、N=10のときの年次レベルの季節効果は下記のようになる。論文によれば、年次レベルはn=10が週次レベルでは3が最もうまくいいくことが知られている。

X(t) = [cos(\frac{2π(1)t}{365.25}), ..., sin(\frac{2π(10)t}{365.25})]\\ \beta \sim \text{Normal}(0, \sigma^2)\\ s(t) = X(t)\beta\\

今回、myProphetクラスで変更する箇所は下記のようになっている。

class myProphet:
    def __init__(self, N_annual=10, P_annual=365.25):
    #コードを置換
    def seasonal_function(self, x, params, N, P):
    #新しく実装
    def perform_seasonal_regression(self, df, N, P):
    #新しく実装
    def fit(self, df):
    #コードを置換
    def plot(self):
    #前回と同じ

まずはコンストラクタから見ていこう。新しくパラメータを追加する。
論文通り、年次レベルの季節性のNとして10を選ぶ。

class myProphet:
    #ハイパーパラメータの初期値を追加
    def __init__(self, N=10, P=365.25):
	#省略
        #self.y_hat = None以降に下記のコードを追加
        self.N_annual = N_annual
        self.P_annual = P_annual
        self.seasonal_params_annual = None

続いて季節関数を実装する。

    def seasonal_function(self, x, params, N, P):
        seasonal = np.zeros_like(x)
        for n in range(1, N + 1):
            seasonal += params[2 * n - 2] * np.cos(2 * np.pi * n * x / P) + params[2 * n - 1] * np.sin(2 * np.pi * n * x / P)
        return seasonal

    def perform_seasonal_regression(self, df, N, P):
        initial_params = np.random.randn(2 * N)
        result = minimize(lambda params: np.mean((df['y'] - self.a  - self.seasonal_function(df['x'], params, N, P))**2), initial_params)

        if not result.success:
            raise Exception("最適化の失敗: " + result.message)

        return result.x

そして、fit関数を下記のように書き換える

    def fit(self, df):
	#省略
        self.seasonal_params_annual = self.perform_seasonal_regression(df, self.N_annual, self.P_annual)
        self.y_hat = self.a + self.seasonal_function(df['x'], self.seasonal_params_annual, self.N_annual, self.P_annual)

では、実際に実行してみよう。

model = myProphet()
model.fit(df)
model.plot()
mse = mean_squared_error(df['y'], model.y_hat)
print(f"MSE: {mse:.5f}")

MSE: 0.38493となっており、大幅な改善が見込めている。

4. 季節関数(週次レベル)

さらに、週次レベルの季節性についても実装してみよう。これができるとだいぶ本家のProphetに近づく。

論文通り、週次レベルの季節性のNとして3を選ぶ。

class myProphet:
        #ハイパーパラメータを追加
    def __init__(self, N_annual=10, P_annual=365.25, N_weekly=3, P_weekly=7):
	#省略
        #self.P_annual = P_annual以降に下記のコードで置換
        self.N_weekly = N_weekly
        self.P_weekly = P_weekly
        self.seasonal_params_annual = None
        self.seasonal_params_weekly = None

そして、fit関数も修正する。

    def fit(self, df):
	#省略
        #self.seasonal_params_annual = self.perform_seasonal_regression(df, self.N_annual, self.P_annual)以降を下記のコードで追加
        self.seasonal_params_weekly = self.perform_seasonal_regression(df, self.N_weekly, self.P_weekly)
        self.y_hat = self.a + self.seasonal_function(df['x'], self.seasonal_params_annual, self.N_annual, self.P_annual) + self.seasonal_function(df['x'], self.seasonal_params_weekly, self.N_weekly, self.P_weekly)

では、実際に実行してみよう。

model = myProphet()
model.fit(df)
model.plot()
mse = mean_squared_error(df['y'], model.y_hat)
print(f"MSE: {mse:.5f}")

結果はだいぶ本家に近づいて来ているように見える。
MSE: 0.34873となっており、約0.04の改善が見込めた。

5. 休日関数

ここで、さらに休日関数も実装してみよう。ここでは単純化のため土日だけを扱う。

Z(t) = [1(t∈D_1),...,1(t∈D_L)]\\ κ ~ Normal(0, v)\\ h(t) = Z(t)^κ
class myProphet:
    def __init__(self, N_annual=10, P_annual=365.25, N_weekly=3, P_weekly=7):
            #省略
        #self.seasonal_params_weekly = None以降に追加
        self.seasonal_params_holidays = None

さて休日関数を実装する。Series.dt.dayofweekでは月曜日から日曜日まで0から6までの値が割り当てられる。そのため、5以上の値であれば自動的に休日ということになる。

perform_seasonal_regressionメソッドの以降に下記のコードを追加

    def detect_holidays(self, df):
        df['is_holiday'] = df['ds'].dt.dayofweek >= 5
        return df

    def holiday_function(self, is_holiday, params):
        return np.where(is_holiday, np.exp(params), 1)

    def perform_holiday_regression(self, df):
        initial_params = np.random.normal(0, 1, size=1)  
        result = minimize(
            lambda params: np.mean((df['y'] - self.y_hat * self.holiday_function(df['is_holiday'], params))**2), 
            initial_params)
        if not result.success:
            raise Exception("最適化の失敗: " + result.message)
        return result.x

さらに、fit関数を修正する。

    def fit(self, df):
        #self.y_hat = self.a + self.seasonal_function(df['x'], self.seasonal_params_annual, self.N_annual, self.P_annual) + self.seasonal_function(df['x'], self.seasonal_params_weekly, self.N_weekly, self.P_weekly)以降にコードを追加
        self.holiday_params = self.perform_holiday_regression(self.detect_holidays(df))
        self.y_hat *= self.holiday_function(df['is_holiday'], self.holiday_params)

MSE: 0.34872となっており、微々たる改善ができた。

一応ここまでで、成長関数をflatに指定したProphetとほぼ同じものができた。

model = Prophet(growth='flat')
model.fit(df)
future = model.make_future_dataframe(periods=0)
forecast = model.predict(future)
fig = model.plot(forecast)
plt.show()

mse = mean_squared_error(df['y'], forecast['yhat'])
print(f"MSE: {mse:.5f}")

flat成長関数を指定した本家Prophetでは、MSE: 0.34861となっている。
本家Prophetの方が精度がいいのは、休日関数の実装に祝日なども考慮されているからだろう。さらなる改善として、区分線形トレンドの実装もあるのだが、これは時間のあるときに実装してみたい。

付録 最終的なコード

class myProphet:
    def __init__(self, N_annual=10, P_annual=365.25, N_weekly=3, P_weekly=7):
        self.df = None
        self.a = None
        self.y_hat = None
        self.N_annual = N_annual
        self.P_annual = P_annual
        self.N_weekly = N_weekly
        self.P_weekly = P_weekly
        self.seasonal_params_annual = None
        self.seasonal_params_weekly = None
        self.seasonal_params_holidays = None

    def seasonal_function(self, x, params, N, P):
        seasonal = np.zeros_like(x)
        for n in range(1, N + 1):
            seasonal += params[2 * n - 2] * np.cos(2 * np.pi * n * x / P) + params[2 * n - 1] * np.sin(2 * np.pi * n * x / P)
        return seasonal

    def perform_seasonal_regression(self, df, N, P):
        initial_params = np.random.randn(2 * N)
        result = minimize(lambda params: np.mean((df['y'] - self.a - self.seasonal_function(df['x'], params, N, P))**2), initial_params)

        if not result.success:
            raise Exception("最適化の失敗: " + result.message)

        return result.x

    def detect_holidays(self, df):
        df['is_holiday'] = df['ds'].dt.dayofweek >= 5
        return df

    def holiday_function(self, is_holiday, params):
        return np.where(is_holiday, np.exp(params), 1)

    def perform_holiday_regression(self, df):
        initial_params = np.random.normal(0, 1, size=1)
        result = minimize(
            lambda params: np.mean((df['y'] - self.y_hat * self.holiday_function(df['is_holiday'], params))**2),
            initial_params)
        if not result.success:
            raise Exception("最適化の失敗: " + result.message)
        return result.x

    def fit(self, df):
        df['ds'] = pd.to_datetime(df['ds'])
        df['x'] = (df['ds'] - df['ds'].min()) / np.timedelta64(1, 'D')
        self.df = df
        self.a = df['y'].mean()

        self.seasonal_params_annual = self.perform_seasonal_regression(df, self.N_annual, self.P_annual)
        self.seasonal_params_weekly = self.perform_seasonal_regression(df, self.N_weekly, self.P_weekly)
        self.y_hat = self.a + self.seasonal_function(df['x'], self.seasonal_params_annual, self.N_annual, self.P_annual) + self.seasonal_function(df['x'], self.seasonal_params_weekly, self.N_weekly, self.P_weekly)
        self.holiday_params = self.perform_holiday_regression(self.detect_holidays(df))
        self.y_hat *= self.holiday_function(df['is_holiday'], self.holiday_params)

    def plot(self):
        plt.figure(figsize=(10, 6))
        plt.scatter(self.df['ds'], self.df['y'], color='black', s=10)
        plt.plot(self.df['ds'], self.y_hat, color='#0072B2')
        plt.grid(True, color='#cccccc')
        plt.legend()
        plt.show()

model = myProphet()
model.fit(df)
model.plot()

Discussion