🐡

MixPR

2025/02/14に公開

https://arxiv.org/pdf/2412.06078

src/MixPR/mixpr.py
import numpy as np

class MixPR:
    """
    Personalized PageRankを用いて文書をランキングするクラス。
    ベクトライズのロジック(TF-IDF or BERT など)はコンストラクタ引数の `vectorizer` で差し替える。
    """
    def __init__(self, vectorizer, alpha=0.85, max_iter=100, tol=1e-6):
        """
        Parameters:
        - vectorizer : BaseVectorizer を継承したベクトライズクラスのインスタンス
        - alpha: float, damping factor for Personalized PageRank.
        - max_iter: int, maximum number of iterations.
        - tol: float, tolerance for convergence.
        """
        self.vectorizer = vectorizer
        self.alpha = alpha
        self.max_iter = max_iter
        self.tol = tol
        
        self.corpus_embeddings = None  # コーパス埋め込みをキャッシュする

    def _normalize_matrix(self, matrix: np.ndarray) -> np.ndarray:
        """
        列方向に正規化し、各列の和が1になるようにする。
        """
        column_sums = matrix.sum(axis=0)
        column_sums[column_sums == 0] = 1  # Avoid division by zero
        return matrix / column_sums

    def _create_adjacency_matrix(self, embeddings: np.ndarray) -> np.ndarray:
        """
        コサイン類似度に近い形を想定し、行列を作成して列方向に正規化する。
        埋め込みデータから正規化隣接行列を作成する。
        """
        similarity_matrix = np.dot(embeddings, embeddings.T)
        adjacency_matrix = self._normalize_matrix(similarity_matrix)
        return adjacency_matrix

    def _compute_ppr(self, adjacency_matrix: np.ndarray, personalization_vector: np.ndarray) -> np.ndarray:
        """
        Personalized PageRank を反復計算してスコアを得る。
        """
        n = adjacency_matrix.shape[0]
        # テレポートベクトルを確率分布になるよう正規化
        teleport = personalization_vector / (personalization_vector.sum() + 1e-10)

        scores = np.ones(n) / n
        for _ in range(self.max_iter):
            new_scores = (
                self.alpha * adjacency_matrix @ scores + (1 - self.alpha) * teleport
            )
            if np.linalg.norm(new_scores - scores, 1) < self.tol:
                break
            scores = new_scores
        return scores

    def embedding_corpus(self, corpus: list[str]) -> np.ndarray:
        """
        コーパス全体をベクトル化し、内部にキャッシュする。
        """
        self.corpus_embeddings = self.vectorizer.fit_transform(corpus)
        # 必要であれば埋め込みの L2 正規化など行う:
        norms = np.linalg.norm(self.corpus_embeddings, axis=1, keepdims=True)
        norms[norms == 0] = 1
        self.corpus_embeddings = self.corpus_embeddings / norms
        return self.corpus_embeddings

    def rank_documents(self, query: str, top_k: int = 10) -> tuple[np.ndarray, np.ndarray]:
        """
        Personalized PageRankを用いて文書をスコアリングし、関連度の高い順に並べたインデックスとスコアを返す。
        
        Parameters:
            query: str, クエリ文字列
        
        Returns:
            ranked_indices: np.ndarray, スコアが高い順にソートされた文書のインデックス
            scores: np.ndarray, 各文書のスコア
        """
        if self.corpus_embeddings is None:
            raise ValueError("Corpus embeddings not found. Call embedding_corpus(corpus) first.")

        # クエリをベクトル化し、L2正規化
        query_vec = self.vectorizer.transform([query])[0]
        query_norm = np.linalg.norm(query_vec)
        if query_norm > 0:
            query_vec = query_vec / query_norm

        # 隣接行列(正規化済み)を作成
        adjacency_matrix = self._create_adjacency_matrix(self.corpus_embeddings)

        # パーソナライズベクトル(文書とクエリの類似度)を計算
        similarities = np.dot(self.corpus_embeddings, query_vec)
        personalization_vector = np.maximum(similarities, 0)  # ネガティブは0に

        # 全部0にならないように微小値を足す
        if personalization_vector.sum() == 0:
            personalization_vector += 1e-10

        # PPRスコアを計算
        scores = self._compute_ppr(adjacency_matrix, personalization_vector)

        # スコアの高い順に文書インデックスをソート
        ranked_indices = np.argsort(-scores)
        return (
            ranked_indices[:top_k], 
            scores[ranked_indices[:top_k]]
        )
src/MixPR/vectorizer/base.py
from abc import ABC, abstractmethod
from typing import List
import numpy as np

class BaseVectorizer(ABC):
    """
    ベクトライズのインターフェースクラス。
    すべてのベクトライズクラスは、この抽象メソッドを実装します。
    """
    
    @abstractmethod
    def fit_transform(self, corpus: List[str]) -> np.ndarray:
        """
        コーパス(複数文書)を受け取り、学習(必要に応じて)した上でベクトル変換し、配列を返す。
        Parameters:
            corpus: List[str], 文書のリスト
        Returns:
            embeddings: np.ndarray, shape = (n_docs, embedding_dim)
        """
        pass
    
    @abstractmethod
    def transform(self, texts: List[str]) -> np.ndarray:
        """
        fit が完了した状態をもとに、text をベクトルに変換して返す。
        
        Parameters:
            texts: List[str], 文書またはクエリのリスト
        Returns:
            embeddings: np.ndarray, shape = (n_texts, embedding_dim)
        """
        pass

src/MixPR/vectorizer/bert.py
import numpy as np
from sentence_transformers import SentenceTransformer
import transformers
from typing import List, Sequence

from .base import BaseVectorizer


class SentenceBERT:

    MODEL_NAME = 'sonoisa/sentence-bert-base-ja-mean-tokens-v2'

    class VectorType():
        """出力ベクトルの種類を指定するためのクラス。"""
        NORMAL = 'normal'
        """入力の先頭から512tokenのみ利用して文章ベクトルを作成する。"""
        AVERAGE = 'average'
        """分割した文ごとのベクトル値を平均して文章ベクトルを作成する。"""
        MAX = 'max'
        """分割した文ごとのベクトル値の各要素のMAXをとって文章ベクトルを作成する。"""

    def __init__(self, model_name: str = MODEL_NAME) -> None:
        self.model = SentenceTransformer(model_name)

    def vectorize(
            self, corpus: List[str],
            vector_type: str = VectorType.NORMAL
        ) -> Sequence:
        if vector_type == self.VectorType.AVERAGE:
            embeddings = []
            for text in corpus:
                split_text = self._split_sentence(text)
                if len(split_text) == 0:
                    # たとえば 768 次元のゼロベクトルにする
                    # (モデルによっては 768 ではなく 1024 の場合もあるので要確認)
                    zero_vec = np.zeros(768, dtype=np.float32)
                    embeddings.append(zero_vec)
                    continue

                # print(split_text)
                embedding = np.mean(self.model.encode(split_text), axis=0)
                embeddings.append(embedding)
            vector = np.array(embeddings)
        elif vector_type == self.VectorType.MAX:
            embeddings = []
            for text in corpus:
                split_text = self._split_sentence(text)
                if len(split_text) == 0:
                    # たとえば 768 次元のゼロベクトルにする
                    # (モデルによっては 768 ではなく 1024 の場合もあるので要確認)
                    zero_vec = np.zeros(768, dtype=np.float32)
                    embeddings.append(zero_vec)
                    continue

                embedding = np.amax(self.model.encode(split_text), axis=0)
                embeddings.append(embedding)
            vector = np.array(embeddings)
        else:
            vector = self.model.encode(corpus)
        return vector

    def _split_sentence(self, text: str, max_length: int = 300) -> List[str]:
        """指定文字数でテキストを分割する関数"""
        line_list = []
        temp_line = ''
        for line in text.splitlines():
            if len(line) + len(temp_line) > max_length:
                line_list.append(temp_line)
                temp_line = line
            else:
                temp_line = f'{temp_line}\n{line}'
        if temp_line:
            line_list.append(temp_line)
        return line_list


# 日本語BERTトークナイザをセット
transformers.BertTokenizer = transformers.BertJapaneseTokenizer

class SentenceBERTVectorizerWrapper(BaseVectorizer):
    """
    日本語 Sentence-BERT を使って文章をベクトル化するクラス。
    """

    def __init__(self, vector_type: str = SentenceBERT.VectorType.NORMAL) -> None:
        self.bert = SentenceBERT()
        self.fitted = False
        self.vector_type = vector_type

    def fit_transform(self, corpus: List[str]) -> np.ndarray:
        """ コーパスの埋め込み """
        embeddings = self.bert.vectorize(corpus, vector_type=self.vector_type)
        self.fitted = True
        return np.array(embeddings)

    def transform(self, texts: List[str]) -> np.ndarray:
        """ query の埋め込み """
        if not self.fitted:
            raise ValueError("Vectorizer is not fitted. Call fit_transform first.")
        embeddings = self.bert.vectorize(texts, vector_type=self.vector_type)
        return np.array(embeddings)


src/MixPR/vectorizer/tfidf.py
from janome.tokenizer import Tokenizer
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List
import numpy as np
from .base import BaseVectorizer

class TfidfVectorizerWrapper(BaseVectorizer):
    """
    Janome を使ってトークナイズし、スキルで TF-IDF を計算するラッパークラス。
    """
    def __init__(self):
        self.tokenizer = Tokenizer()
        self.vectorizer = TfidfVectorizer()
        self.fitted = False  # fit済みかどうか判定用

    def _tokenize(self, text: str) -> str:
        tokens = [token.surface for token in self.tokenizer.tokenize(text)]
        return " ".join(tokens)

    def fit_transform(self, corpus: List[str]) -> np.ndarray:
        """ コーパスの埋め込み """
        tokenized_corpus = [self._tokenize(doc) for doc in corpus]
        tfidf_matrix = self.vectorizer.fit_transform(tokenized_corpus).toarray()
        self.fitted = True
        return tfidf_matrix

    def transform(self, texts: List[str]) -> np.ndarray:
        """ query の埋め込み """
        if not self.fitted:
            raise ValueError("Vectorizer is not fitted. Call fit_transform first.")
        tokenized_texts = [self._tokenize(doc) for doc in texts]
        return self.vectorizer.transform(tokenized_texts).toarray()

Discussion