😄

シンプルな推薦システムを作ってみよう:アイテムベース協調フィルタリング入門

に公開

はじめに

EC サイトや動画配信サービスなどで「あなたへのおすすめ」が表示されるのを見たことがあると思います。これは推薦システムと呼ばれる技術によって実現されており、ユーザー体験を向上させる重要な機能です。

この記事では、推薦システムのアルゴリズムの中でも比較的シンプルで理解しやすい「アイテムベース協調フィルタリング」の基本的な考え方と、Python を使った簡単な実装例を紹介します。

アイテムベース協調フィルタリングとは?

アイテムベース協調フィルタリングは、「あなたが過去に高く評価したアイテムと似ているアイテムを推薦する」というアプローチです。

ここで言う「似ている」とは、色や形が似ているということではなく、「多くのユーザーの評価パターンから見て、一緒に高く評価される傾向がある」という意味です。

よくある例えとしては、

  • 「商品 A を買った人は、商品 B もよく買っています」
  • 「映画 X を高く評価した人は、映画 Y も高く評価する傾向があります」

といった関係性を見つけ出し、それに基づいて推薦を行います。

実装してみよう

ここでは、ユーザーが商品につけた評価 (rating) データを使って、推薦エンジンを実装する例を見ていきます。

必要なライブラリ

計算のためにいくつかの Python ライブラリを使用します。

  • numpy: 行列計算など、数値計算の基本的なライブラリ。
  • scikit-learn: 機械学習ライブラリ。今回は商品間の類似度計算(コサイン類似度)に使います。
  • pydantic: データの構造を定義し、バリデーションを行うために使います(必須ではありませんが便利です)。
  • pickle: 学習済みの推薦エンジンオブジェクトをファイルに保存・読み込みするために使います。
# 必要なライブラリ (事前にインストールしておく)
# pip install numpy scikit-learn pydantic
import numpy as np
from numpy.typing import NDArray
from sklearn.metrics.pairwise import cosine_similarity
from pydantic import BaseModel
from typing import List, Dict, Optional
import pickle
import os

# SQLAlchemy 関連 (DBからデータを読む場合)
# from sqlalchemy.orm import Session
# from app.db.session import SessionLocal
# from app.models.purchase import Purchase

データ構造の定義
ユーザーの評価データと、推薦結果の形式を Pydantic モデルで定義しておくと便利です。

# ユーザーの評価データを表すクラス
class UserPreference(BaseModel):
    user_id: int      # ユーザーID
    product_id: int   # 商品ID
    rating: float     # 評価値 (例: 1.0 ~ 5.0)

# 推薦結果を表すクラス
class RecommendationResult(BaseModel):
    product_id: int   # 推薦された商品ID
    score: float      # 推薦スコア (どれくらいおすすめか)
推薦エンジンクラス (RecommendationEngine)
推薦の計算ロジックをまとめたクラスです。
class RecommendationEngine:
    def __init__(self) -> None:
        """エンジンの初期化"""
        self.user_item_matrix: Optional[NDArray] = None       # ユーザー×商品の評価行列
        self.item_similarity_matrix: Optional[NDArray] = None # 商品×商品の類似度行列
        self.product_ids: Optional[List[int]] = None          # 商品IDのリスト (行列の列に対応)
        self.user_idx: Dict[int, int] = {}                    # ユーザーIDを行列の行番号に変換する辞書

    def train(self, preferences: List[UserPreference]) -> None:
        """
        ユーザーの評価データ (preferences) を使ってモデルを学習(類似度行列などを作成)
        """
        if not preferences:
            print("Warning: No training data provided.")
            return # データがなければ学習しない

        print(f"Starting training with {len(preferences)} preferences...")

        # 1. ユーザー×商品の評価行列 (user_item_matrix) を作成
        user_ids = sorted(list(set(pref.user_id for pref in preferences)))
        product_ids = sorted(list(set(pref.product_id for pref in preferences)))

        self.user_item_matrix = np.zeros((len(user_ids), len(product_ids)))
        self.product_ids = product_ids

        # ユーザーID/商品IDを行列のインデックス番号に対応付ける辞書を作成
        user_id_to_index = {uid: idx for idx, uid in enumerate(user_ids)}
        product_id_to_index = {pid: idx for idx, pid in enumerate(product_ids)}
        self.user_idx = user_id_to_index # 後で推薦時に使うため保持

        # 行列に評価値を埋める
        for pref in preferences:
            if pref.user_id in user_id_to_index and pref.product_id in product_id_to_index:
                user_index = user_id_to_index[pref.user_id]
                product_index = product_id_to_index[pref.product_id]
                self.user_item_matrix[user_index, product_index] = pref.rating
            else:
                 print(f"Warning: Skipping preference for unknown user {pref.user_id} or product {pref.product_id}")


        # 2. 商品同士の類似度行列 (item_similarity_matrix) を計算
        #    評価行列の列(商品)同士の類似度をコサイン類似度で計算
        #    user_item_matrix を転置(.T)して、商品を行、ユーザーを列にしてから計算
        print("Calculating item similarity matrix...")
        # ユーザーが一人もいない、または商品が一つもない場合、類似度計算はできない
        if self.user_item_matrix.shape[0] == 0 or self.user_item_matrix.shape[1] == 0:
             print("Warning: Cannot calculate similarity with empty user-item matrix.")
             self.item_similarity_matrix = None
             return

        # アイテムが1つしかない場合も類似度は計算できない(または自明)
        if self.user_item_matrix.shape[1] < 2:
             print("Warning: Need at least two items to calculate similarity.")
             # 自分自身との類似度を1とする対角行列を便宜的に作成
             self.item_similarity_matrix = np.identity(self.user_item_matrix.shape[1])
             return

        self.item_similarity_matrix = cosine_similarity(self.user_item_matrix.T)
        print("Training complete.")


    def get_recommendations(
        self, user_id: int, n_recommendations: int = 5
    ) -> List[RecommendationResult]:
        """
        指定された user_id のユーザーにおすすめ商品を推薦する
        """
        print(f"Generating recommendations for user_id={user_id}...")
        if self.user_item_matrix is None or self.item_similarity_matrix is None or self.product_ids is None:
            print("Warning: Model not trained yet.")
            return [] # 学習が終わっていない場合は空リストを返す

        # 指定されたユーザーIDが学習データに存在するか確認
        if user_id not in self.user_idx:
            print(f"Warning: User ID {user_id} not found in training data.")
            return [] # ユーザーが存在しない場合は空リストを返す

        # 1. ユーザーの評価履歴を行列から取得
        user_index = self.user_idx[user_id]
        user_ratings = self.user_item_matrix[user_index] # そのユーザーの行を取得

        # 2. ユーザーがまだ評価していない商品のスコアを計算
        scores = []
        for product_index, product_id in enumerate(self.product_ids):
            # ユーザーがまだ評価/購入していない商品 (評価=0) のみ対象
            if user_ratings[product_index] == 0:
                # その商品の類似度ベクトルとユーザーの評価ベクトルを掛け合わせ、合計する
                # -> ユーザーが高く評価した商品と「似ている」度合いが高いほどスコアが高くなる
                weighted_sum = self.item_similarity_matrix[product_index] * user_ratings
                score = np.sum(weighted_sum)
                scores.append((product_id, score))

        # 3. スコアが高い順にソートして上位 N 件を取得
        recommendations = sorted(scores, key=lambda x: x[1], reverse=True)[:n_recommendations]
        print(f"Generated {len(recommendations)} recommendations.")

        # 4. 結果を整形して返す
        return [
            RecommendationResult(product_id=pid, score=score)
            for pid, score in recommendations
        ]
学習の実行とモデルの保存 (train_and_save_model)
実際にデータベースなどからデータを読み込み、RecommendationEngine を学習させ、結果をファイルに保存する関数です。この関数を定期的に(例: 11回)実行することで、推薦モデルを最新の状態に保ちます。

def train_and_save_model():
    print("Starting train_and_save_model process...")
    engine = RecommendationEngine()

    # --- データの準備 ---
    # ここでは例としてDBから読み込む想定
    # 実際にはCSVファイルから読むなど、状況に合わせて変更可能
    db = SessionLocal() # DBセッション開始
    try:
        print("Loading purchase data from database...")
        purchases = db.query(Purchase).all()
        if not purchases:
             print("No purchase data found in database. Skipping training.")
             return # データがなければ終了

        print(f"Loaded {len(purchases)} purchase records.")
        preferences = [
            UserPreference(
                user_id=p.user_id,
                product_id=p.product_id,
                rating=p.rating  # Purchase モデルに rating がある想定
            )
            for p in purchases if p.rating is not None # rating が無いデータは除外など
        ]
        print(f"Converted to {len(preferences)} user preferences.")

        # --- 学習の実行 ---
        engine.train(preferences)

        # --- モデルの保存 ---
        # 環境変数 MODEL_PATH があればそれを使用、なければデフォルトパス
        model_path = os.getenv('MODEL_PATH', '/app/models/recommender.pkl')
        model_dir = os.path.dirname(model_path)
        if not os.path.exists(model_dir):
            print(f"Creating directory: {model_dir}")
            os.makedirs(model_dir)

        print(f"Saving trained model to: {model_path}")
        with open(model_path, 'wb') as f:
            # 学習済みエンジンオブジェクトを pickle でファイルに保存
            pickle.dump(engine, f)
        print("Model saved successfully.")

    except Exception as e:
        print(f"An error occurred during training or saving: {e}")
        import traceback
        traceback.print_exc() # エラー詳細を出力
    finally:
        print("Closing database session.")
        db.close() # DBセッションを閉じる

# スクリプトとして直接実行された場合に訓練を開始
if __name__ == "__main__":
    train_and_save_model()
モデルのロード (get_recommendation_engine)
API サーバーなどが推薦を行う際に、ファイルに保存された学習済みモデルを読み込むための関数です。

# グローバル変数でロード済みエンジンをキャッシュ(簡易的な方法)
_engine: Optional[RecommendationEngine] = None

def get_recommendation_engine() -> RecommendationEngine:
    """
    保存された学習済みモデルをファイルからロードする。
    一度ロードしたらメモリ上にキャッシュして使い回す。
    """
    global _engine
    if _engine is None: # メモリ上にまだなければロード試行
        model_path = os.getenv('MODEL_PATH', '/app/models/recommender.pkl')
        print(f"Attempting to load model from: {model_path}")
        try:
            with open(model_path, 'rb') as f:
                # pickle ファイルからエンジンオブジェクトを復元
                _engine = pickle.load(f)
            print("Model loaded successfully from file.")
            # ロードしたモデルが訓練済みか簡易チェック
            if _engine is not None and _engine.user_item_matrix is None:
                 print("Warning: Loaded model appears to be untrained.")

        except FileNotFoundError:
            print(f"Model file not found at {model_path}. Returning new untrained engine.")
            # ファイルが見つからない場合は、空のエンジンを生成して返す
            _engine = RecommendationEngine()
        except Exception as e:
            print(f"Error loading model from {model_path}: {e}")
            print("Returning new untrained engine due to loading error.")
             # ロード中に他のエラーが発生した場合も空のエンジンを返す
            _engine = RecommendationEngine()

    # メモリ上のエンジン(ロード済or新規)を返す
    return _engine

まとめ

この記事では、アイテムベース協調フィルタリングの基本的な考え方と Python での実装例を紹介しました。

流れ:

全ユーザーの評価データを集める。
データから「ユーザー×商品」の評価表を作る。
評価表から「商品×商品」の類似度表を計算する(ここまでが train)。
学習結果(主に類似度表)をファイルに保存する (train_and_save_model)。
推薦リクエストが来たら、保存した学習結果をファイルから読み込む (get_recommendation_engine)。
ユーザーの評価履歴と商品の類似度を使って、未評価商品のスコアを計算し、上位を推薦する (get_recommendations)。

限界と発展:

この実装はシンプルですが、データ量が多くなると計算量(特に類似度計算)が増大し、パフォーマンスが問題になる可能性があります(その場合は、numpy の密行列ではなく scipy.sparse の疎行列を使うなどの工夫が必要です)。
新規ユーザーや新規アイテムに対する推薦が難しい(コールドスタート問題)。
評価データが少ないと精度が出にくい(スパース性問題)。
より高度な推薦システムでは、行列分解やディープラーニングを用いたモデルベースの手法、コンテンツ情報との組み合わせ(ハイブリッド)、リアルタイム性の考慮など、様々な技術が利用されています。

まずはこの基本的な実装を動かし、推薦システムの仕組みを理解する第一歩としてみてください。

Discussion