🕌

LinUCB

2024/01/01に公開

LinUCBを用いて、ユーザーが次に見る映画のジャンルの探索を行う。

UCB

\text{UCB} = \bar{x}_j + \sqrt{\frac{2 \ln n}{n_j}}
変数
  • \bar{x}_j: アーム j の平均報酬
  • n: これまでの試行回数
  • n_j: アーム j がこれまでに選ばれた回数
  • \ln: 自然対数

この式は、アームの平均報酬 \bar{x}_j と試行回数 n を基に、そのアームを選択する際の信頼上限 (Upper Confidence Bound) を計算する。

LinUCB

原著論文は下記である。
A Contextual-Bandit Approach to Personalized News Article Recommendation

\text{LinUCB}_j = \boldsymbol{\theta}_j^\top \boldsymbol{x}_t + \alpha \sqrt{\boldsymbol{x}_t^\top \boldsymbol{A}_j^{-1} \boldsymbol{x}_t}
変数
  • \boldsymbol{\theta}_j: アーム j のパラメータ(ベクトル)
  • \boldsymbol{x}_t: 時刻 t におけるコンテキスト(ベクトル)
  • \boldsymbol{A}_j: アーム j に関連するデータの共分散行列(行列)
  • \alpha: 探索パラメータ

articles as a contextual bandit problem a principled approach in which a learning algorithm sequentially selects articles to serve users based on contextual information about the users and articles while simultaneously adapting its article-selection strategy based on user-click feedback to maximize total user clicks.

ユーザーと記事に関するコンテクスト情報に基づいて記事を選択し、同時にユーザーのクリックフィードバックに基づいて記事選択戦略を修正し、クリック数を最大化させる。

実装

class LinUCB:
    def __init__(self, alpha, num_users, num_genres, num_items):
        self.alpha = alpha
        self.num_users = num_users
        self.num_genres = num_genres
        self.num_items = num_items
        self.d = num_genres + num_items
        self.A = np.repeat(np.identity(self.d, dtype=np.float32)[np.newaxis, :, :], num_genres, axis=0)
        self.b = np.zeros((num_genres, self.d), dtype=np.float32)

    def fit(self, user_genre_matrix, user_movie_matrix, num_epochs, batch_size=50):
        avg_rewards = []
        for epoch in tqdm(range(num_epochs)):
            rewards = []
            A_a_inv = np.array([np.linalg.inv(self.A[a]) for a in range(self.num_genres)])

            for batch_start in range(0, self.num_users, batch_size):
                batch_end = min(batch_start + batch_size, self.num_users)
                batch_user_features = np.concatenate((user_genre_matrix[batch_start:batch_end], 
                                                      user_movie_matrix[batch_start:batch_end]), axis=1)
                
                for a in range(self.num_genres):
                    theta_a = A_a_inv[a].dot(self.b[a])
                    p_t_batch = batch_user_features.dot(theta_a) + \
                                self.alpha * np.sqrt(np.sum(batch_user_features.dot(A_a_inv[a]) * batch_user_features, axis=1))
                    
                    for i, user_id in enumerate(range(batch_start, batch_end)):
                        a_t = a if p_t_batch[i] == max(p_t_batch) else None
                        if a_t is not None:
                            r_t = 1 if user_genre_matrix[user_id, a_t] == 1 else 0
                            rewards.append(r_t)

                            x_t_at = batch_user_features[i].reshape(-1, 1)
                            self.A[a_t] = self.A[a_t] + x_t_at.dot(x_t_at.T)
                            self.b[a_t] = self.b[a_t] + r_t * x_t_at.flatten()

                            A_a_inv[a_t] = np.linalg.inv(self.A[a_t])

            avg_rewards.append(np.mean(rewards))
            print(np.mean(rewards))

        return avg_rewards

Discussion