🐡
MixPR
src/MixPR/mixpr.py
import numpy as np
class MixPR:
"""
Personalized PageRankを用いて文書をランキングするクラス。
ベクトライズのロジック(TF-IDF or BERT など)はコンストラクタ引数の `vectorizer` で差し替える。
"""
def __init__(self, vectorizer, alpha=0.85, max_iter=100, tol=1e-6):
"""
Parameters:
- vectorizer : BaseVectorizer を継承したベクトライズクラスのインスタンス
- alpha: float, damping factor for Personalized PageRank.
- max_iter: int, maximum number of iterations.
- tol: float, tolerance for convergence.
"""
self.vectorizer = vectorizer
self.alpha = alpha
self.max_iter = max_iter
self.tol = tol
self.corpus_embeddings = None # コーパス埋め込みをキャッシュする
def _normalize_matrix(self, matrix: np.ndarray) -> np.ndarray:
"""
列方向に正規化し、各列の和が1になるようにする。
"""
column_sums = matrix.sum(axis=0)
column_sums[column_sums == 0] = 1 # Avoid division by zero
return matrix / column_sums
def _create_adjacency_matrix(self, embeddings: np.ndarray) -> np.ndarray:
"""
コサイン類似度に近い形を想定し、行列を作成して列方向に正規化する。
埋め込みデータから正規化隣接行列を作成する。
"""
similarity_matrix = np.dot(embeddings, embeddings.T)
adjacency_matrix = self._normalize_matrix(similarity_matrix)
return adjacency_matrix
def _compute_ppr(self, adjacency_matrix: np.ndarray, personalization_vector: np.ndarray) -> np.ndarray:
"""
Personalized PageRank を反復計算してスコアを得る。
"""
n = adjacency_matrix.shape[0]
# テレポートベクトルを確率分布になるよう正規化
teleport = personalization_vector / (personalization_vector.sum() + 1e-10)
scores = np.ones(n) / n
for _ in range(self.max_iter):
new_scores = (
self.alpha * adjacency_matrix @ scores + (1 - self.alpha) * teleport
)
if np.linalg.norm(new_scores - scores, 1) < self.tol:
break
scores = new_scores
return scores
def embedding_corpus(self, corpus: list[str]) -> np.ndarray:
"""
コーパス全体をベクトル化し、内部にキャッシュする。
"""
self.corpus_embeddings = self.vectorizer.fit_transform(corpus)
# 必要であれば埋め込みの L2 正規化など行う:
norms = np.linalg.norm(self.corpus_embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
self.corpus_embeddings = self.corpus_embeddings / norms
return self.corpus_embeddings
def rank_documents(self, query: str, top_k: int = 10) -> tuple[np.ndarray, np.ndarray]:
"""
Personalized PageRankを用いて文書をスコアリングし、関連度の高い順に並べたインデックスとスコアを返す。
Parameters:
query: str, クエリ文字列
Returns:
ranked_indices: np.ndarray, スコアが高い順にソートされた文書のインデックス
scores: np.ndarray, 各文書のスコア
"""
if self.corpus_embeddings is None:
raise ValueError("Corpus embeddings not found. Call embedding_corpus(corpus) first.")
# クエリをベクトル化し、L2正規化
query_vec = self.vectorizer.transform([query])[0]
query_norm = np.linalg.norm(query_vec)
if query_norm > 0:
query_vec = query_vec / query_norm
# 隣接行列(正規化済み)を作成
adjacency_matrix = self._create_adjacency_matrix(self.corpus_embeddings)
# パーソナライズベクトル(文書とクエリの類似度)を計算
similarities = np.dot(self.corpus_embeddings, query_vec)
personalization_vector = np.maximum(similarities, 0) # ネガティブは0に
# 全部0にならないように微小値を足す
if personalization_vector.sum() == 0:
personalization_vector += 1e-10
# PPRスコアを計算
scores = self._compute_ppr(adjacency_matrix, personalization_vector)
# スコアの高い順に文書インデックスをソート
ranked_indices = np.argsort(-scores)
return (
ranked_indices[:top_k],
scores[ranked_indices[:top_k]]
)
src/MixPR/vectorizer/base.py
from abc import ABC, abstractmethod
from typing import List
import numpy as np
class BaseVectorizer(ABC):
"""
ベクトライズのインターフェースクラス。
すべてのベクトライズクラスは、この抽象メソッドを実装します。
"""
@abstractmethod
def fit_transform(self, corpus: List[str]) -> np.ndarray:
"""
コーパス(複数文書)を受け取り、学習(必要に応じて)した上でベクトル変換し、配列を返す。
Parameters:
corpus: List[str], 文書のリスト
Returns:
embeddings: np.ndarray, shape = (n_docs, embedding_dim)
"""
pass
@abstractmethod
def transform(self, texts: List[str]) -> np.ndarray:
"""
fit が完了した状態をもとに、text をベクトルに変換して返す。
Parameters:
texts: List[str], 文書またはクエリのリスト
Returns:
embeddings: np.ndarray, shape = (n_texts, embedding_dim)
"""
pass
src/MixPR/vectorizer/bert.py
import numpy as np
from sentence_transformers import SentenceTransformer
import transformers
from typing import List, Sequence
from .base import BaseVectorizer
class SentenceBERT:
MODEL_NAME = 'sonoisa/sentence-bert-base-ja-mean-tokens-v2'
class VectorType():
"""出力ベクトルの種類を指定するためのクラス。"""
NORMAL = 'normal'
"""入力の先頭から512tokenのみ利用して文章ベクトルを作成する。"""
AVERAGE = 'average'
"""分割した文ごとのベクトル値を平均して文章ベクトルを作成する。"""
MAX = 'max'
"""分割した文ごとのベクトル値の各要素のMAXをとって文章ベクトルを作成する。"""
def __init__(self, model_name: str = MODEL_NAME) -> None:
self.model = SentenceTransformer(model_name)
def vectorize(
self, corpus: List[str],
vector_type: str = VectorType.NORMAL
) -> Sequence:
if vector_type == self.VectorType.AVERAGE:
embeddings = []
for text in corpus:
split_text = self._split_sentence(text)
if len(split_text) == 0:
# たとえば 768 次元のゼロベクトルにする
# (モデルによっては 768 ではなく 1024 の場合もあるので要確認)
zero_vec = np.zeros(768, dtype=np.float32)
embeddings.append(zero_vec)
continue
# print(split_text)
embedding = np.mean(self.model.encode(split_text), axis=0)
embeddings.append(embedding)
vector = np.array(embeddings)
elif vector_type == self.VectorType.MAX:
embeddings = []
for text in corpus:
split_text = self._split_sentence(text)
if len(split_text) == 0:
# たとえば 768 次元のゼロベクトルにする
# (モデルによっては 768 ではなく 1024 の場合もあるので要確認)
zero_vec = np.zeros(768, dtype=np.float32)
embeddings.append(zero_vec)
continue
embedding = np.amax(self.model.encode(split_text), axis=0)
embeddings.append(embedding)
vector = np.array(embeddings)
else:
vector = self.model.encode(corpus)
return vector
def _split_sentence(self, text: str, max_length: int = 300) -> List[str]:
"""指定文字数でテキストを分割する関数"""
line_list = []
temp_line = ''
for line in text.splitlines():
if len(line) + len(temp_line) > max_length:
line_list.append(temp_line)
temp_line = line
else:
temp_line = f'{temp_line}\n{line}'
if temp_line:
line_list.append(temp_line)
return line_list
# 日本語BERTトークナイザをセット
transformers.BertTokenizer = transformers.BertJapaneseTokenizer
class SentenceBERTVectorizerWrapper(BaseVectorizer):
"""
日本語 Sentence-BERT を使って文章をベクトル化するクラス。
"""
def __init__(self, vector_type: str = SentenceBERT.VectorType.NORMAL) -> None:
self.bert = SentenceBERT()
self.fitted = False
self.vector_type = vector_type
def fit_transform(self, corpus: List[str]) -> np.ndarray:
""" コーパスの埋め込み """
embeddings = self.bert.vectorize(corpus, vector_type=self.vector_type)
self.fitted = True
return np.array(embeddings)
def transform(self, texts: List[str]) -> np.ndarray:
""" query の埋め込み """
if not self.fitted:
raise ValueError("Vectorizer is not fitted. Call fit_transform first.")
embeddings = self.bert.vectorize(texts, vector_type=self.vector_type)
return np.array(embeddings)
src/MixPR/vectorizer/tfidf.py
from janome.tokenizer import Tokenizer
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List
import numpy as np
from .base import BaseVectorizer
class TfidfVectorizerWrapper(BaseVectorizer):
"""
Janome を使ってトークナイズし、スキルで TF-IDF を計算するラッパークラス。
"""
def __init__(self):
self.tokenizer = Tokenizer()
self.vectorizer = TfidfVectorizer()
self.fitted = False # fit済みかどうか判定用
def _tokenize(self, text: str) -> str:
tokens = [token.surface for token in self.tokenizer.tokenize(text)]
return " ".join(tokens)
def fit_transform(self, corpus: List[str]) -> np.ndarray:
""" コーパスの埋め込み """
tokenized_corpus = [self._tokenize(doc) for doc in corpus]
tfidf_matrix = self.vectorizer.fit_transform(tokenized_corpus).toarray()
self.fitted = True
return tfidf_matrix
def transform(self, texts: List[str]) -> np.ndarray:
""" query の埋め込み """
if not self.fitted:
raise ValueError("Vectorizer is not fitted. Call fit_transform first.")
tokenized_texts = [self._tokenize(doc) for doc in texts]
return self.vectorizer.transform(tokenized_texts).toarray()
Discussion