🐿️

論文実装:厚生を最適化したレコメンデーション

に公開

論文実装:厚生を最適化したレコメンデーション

今回の論文

Welfare-Optimized Recommender Systemsという論文を読みました。

まず、論文の内容ですが、ざっくり次のような感じです。(英語があまり得意ではないので、違っていたらご指摘ください。)

概要

本論文は、従来のレコメンダシステムが個々のユーザーの満足度やクリック率(CTR)最適化に偏っている点を問題視し、社会全体の「厚生(welfare)」を最大化することを目的とした新しい推薦枠組みを提案するものである。
提案モデルは、ユーザーとアイテムの効用(utility)を考慮し、プラットフォーム・生産者・消費者間のバランスを取るように設計されている。これにより、短期的な最適化ではなく、長期的なユーザー満足度・公平性・社会的効率性を同時に達成することを目指す。

目的・背景

従来のレコメンダシステム(RS)は、クリック率や購入率といった即時的な利益指標を最大化するよう設計されてきた。しかし、この手法は次のような課題をもつ:

  • プラットフォーム全体の厚生を考慮せず、人気商品の過剰露出を招く。

  • ユーザー間での推薦の偏りが生じ、不公平を引き起こす。

  • 長期的には、ユーザーの満足度や多様性が低下し、エコーチェンバーや情報バブルを強化する。

本研究は、経済学・社会選択理論(Social Choice Theory)の概念を導入し、「厚生経済学的な視点」からレコメンダシステムの最適化を再定義することを目的とする。

調査・実験・検証内容

本研究では、推薦システムを 社会的厚生最大化問題(Welfare Maximization Problem) として定式化し、
個々のユーザーの効用 ( u_i(j) ) と、推薦ポリシー ( π(j|i) ) を用いて以下の目的関数を構築する:

Welfare=∑iwi⋅Ui(π)\text{Welfare} = \sum_i w_i \cdot U_i(\pi)Welfare=i∑​wi​⋅Ui​(π)

ここで wiw_iwi​ はユーザーの重み(公平性の考慮)を表す。
最適化は以下の3つの側面を考慮して行われる:

  1. 個人効用の最大化 — 各ユーザーに対して最も満足度の高い推薦を選択

  2. 公平性の保証 — 高効用ユーザーに偏らない分配

  3. プラットフォーム全体の厚生 — システム全体の満足度の総和を最適化

実験設計

  • 使用データ:MovieLens 1M Dataset

  • 比較手法:

    • Standard RS(個人最適化型)

    • FairRec(公平性重視)

    • Proposed WRS(Welfare-Optimized RS)

  • 評価指標:

    • 総厚生(aggregate welfare)

    • 公平性指標(Gini coefficient)

    • 多様性指標(intra-list diversity)

結果

  • WRSは、他の手法と比較して全体の厚生を15〜25%改善

  • 一部の高効用ユーザーの満足度は若干低下したが、全体的な公平性と多様性が顕著に向上

  • Gini係数が減少し、推薦のバランスが改善。

  • 長期シミュレーションでは、ユーザー離脱率の低下が確認された。

実施内容

以下では、ノートブック構成をかいつまんで紹介し、重要なコードを抜粋して説明します。
データは、論文の通りMovieLensを用いています。

データ取り込みと前処理

ML_URL = "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
ZIP_PATH = "/content/ml-latest-small.zip"
DST_DIR = "/content/ml-latest-small"

if not os.path.exists(DST_DIR):
    print("Downloading MovieLens ...")
    r = requests.get(ML_URL)
    r.raise_for_status()
    with open(ZIP_PATH, "wb") as f:
        f.write(r.content)
    with zipfile.ZipFile(ZIP_PATH) as zf:
        zf.extractall("/content/")
print("Data ready:", os.listdir(DST_DIR))

ratings = pd.read_csv(os.path.join(DST_DIR, "ratings.csv"))
movies  = pd.read_csv(os.path.join(DST_DIR, "movies.csv"))

ratings = ratings.sort_values("timestamp")
def train_test_split_by_user(df):
    train_idx, test_idx = [], []
    for uid, g in df.groupby("userId", sort=False):
        if len(g) == 1:
            train_idx.extend(g.index.tolist())  # no test for singletons
        else:
            test_idx.append(g.index.tolist()[-1])
            train_idx.extend(g.index.tolist()[:-1])
    return df.loc[train_idx].copy(), df.loc[test_idx].copy()

train, test = train_test_split_by_user(ratings)

モデル(PyTorch MF model)

uids = sorted(ratings.userId.unique())
iids = sorted(ratings.movieId.unique())
uid2idx = {u:i for i,u in enumerate(uids)}
iid2idx = {i:j for j,i in enumerate(iids)}
idx2uid = {i:u for u,i in uid2idx.items()}
idx2iid = {j:i for i,j in iid2idx.items()}

train["u_idx"] = train["userId"].map(uid2idx)
train["i_idx"] = train["movieId"].map(iid2idx)
test["u_idx"]  = test["userId"].map(uid2idx)
test["i_idx"]  = test["movieId"].map(iid2idx)

n_users = len(uids); n_items = len(iids)
r_min, r_max = 0.5, 5.0

class MF(nn.Module):
    def __init__(self, n_users, n_items, n_factors=64):
        super().__init__()
        self.P = nn.Embedding(n_users, n_factors)
        self.Q = nn.Embedding(n_items, n_factors)
        self.bu = nn.Embedding(n_users, 1)
        self.bi = nn.Embedding(n_items, 1)
        self.mu = nn.Parameter(torch.tensor(3.5))  # global mean
        nn.init.normal_(self.P.weight, std=0.05)
        nn.init.normal_(self.Q.weight, std=0.05)
        nn.init.zeros_(self.bu.weight)
        nn.init.zeros_(self.bi.weight)

    def forward(self, u_idx, i_idx):
        pu = self.P(u_idx)
        qi = self.Q(i_idx)
        dot = (pu * qi).sum(dim=1, keepdim=True)
        pred = self.mu + self.bu(u_idx) + self.bi(i_idx) + dot
        return pred.squeeze(1)

model = MF(n_users, n_items, n_factors=64).to(DEVICE)
opt = optim.Adam(model.parameters(), lr=5e-3, weight_decay=1e-5)
loss_fn = nn.MSELoss()

u_t = torch.tensor(train["u_idx"].values, dtype=torch.long, device=DEVICE)
i_t = torch.tensor(train["i_idx"].values, dtype=torch.long, device=DEVICE)
y_t = torch.tensor(train["rating"].values, dtype=torch.float32, device=DEVICE)

Candidate scoring

ALL_USERS = uids
ALL_ITEMS = iids

train_ui = set(zip(train.userId, train.movieId))
user_to_seen = defaultdict(set)
for u, i in train_ui:
    user_to_seen[u].add(i)

TOPN_CANDIDATES = 200  # per-user candidate pool
model.eval()
with torch.no_grad():
    P = model.P.weight.detach().cpu().numpy()
    Q = model.Q.weight.detach().cpu().numpy()
    bu = model.bu.weight.detach().cpu().numpy().squeeze()
    bi = model.bi.weight.detach().cpu().numpy().squeeze()
    mu = float(model.mu.detach().cpu())

user_item_scores = {}
for u in tqdm(ALL_USERS, desc="Scoring"):
    u_idx = uid2idx[u]
    # score all items vectorized
    scores = mu + bu[u_idx] + bi + (P[u_idx] @ Q.T)
    # mask seen
    mask = np.ones(n_items, dtype=bool)
    if len(user_to_seen[u]) > 0:
        seen_idx = [iid2idx[i] for i in user_to_seen[u]]
        mask[seen_idx] = False  # we'll filter after mapping
    pairs = []
    for j in range(n_items):
        item_raw = idx2iid[j]
        if item_raw in user_to_seen[u]:
            continue
        pairs.append((item_raw, float(scores[j])))
    pairs.sort(key=lambda x: x[1], reverse=True)
    user_item_scores[u] = pairs[:TOPN_CANDIDATES]

# For ILDI: item factors (Q) in raw id space
item_factors = { idx2iid[j]: Q[j] for j in range(n_items) }

評価(PICP・Sharpness・Pinball)

学習後に以下の指標を算出しています:

#指標名	目的
#① Utilitarian Welfare(総効用)	全ユーザーの満足度の合計を測る
#② Nash Welfare(ナッシュ厚生)	公平性を考慮した全体効用を測る
#③ Gini Coefficient(ジニ係数)	ユーザー間の不平等度を測る(低いほどいい)
#④ Coverage(被推薦率)	推薦がどれだけ多くのアイテムをカバーしているか
#⑤ ILDI(Intra-List Diversity)	各ユーザーのリスト内でどれだけ多様なアイテムを提示できているか

def pred_score(u_raw, i_raw):
    u = uid2idx[u_raw]; j = iid2idx[i_raw]
    return mu + bu[u] + bi[j] + (P[u] @ Q[j])

def utilitarian_welfare(assignments, use_transform=True):
    total = 0.0
    for u, items in assignments.items():
        for i in items:
            s = pred_score(u, i)
            total += f_utility(s) if use_transform else s
    return total

def nash_welfare(assignments):
    eps = 1e-9
    total = 0.0
    for u, items in assignments.items():
        u_util = 0.0
        for i in items:
            s = pred_score(u, i)
            u_util += f_utility(s)
        total += math.log(u_util + eps)
    return total

def gini(values):
    arr = np.array(values, dtype=float)
    if np.amin(arr) < 0:
        arr -= np.amin(arr)
    mean = np.mean(arr)
    if mean == 0: return 0.0
    diff_sum = np.sum(np.abs(arr.reshape(-1,1) - arr.reshape(1,-1)))
    return diff_sum / (2 * len(arr) * np.sum(arr))

def per_user_utility_list(assignments):
    res = []
    for u, items in assignments.items():
        s = sum(f_utility(pred_score(u, i)) for i in items)
        res.append(s)
    return res

def coverage(assignments):
    items = set()
    for u, lst in assignments.items():
        items.update(lst)
    return len(items) / len(ALL_ITEMS)

def ildi(assignments):
    ild_vals = []
    for u, lst in assignments.items():
        vecs = [item_factors[i] for i in lst if i in item_factors]
        if len(vecs) <= 1:
            continue
        M = cosine_similarity(vecs)
        n = len(vecs)
        sims = []
        for a in range(n):
            for b in range(a+1, n):
                sims.append(1.0 - M[a,b])
        if sims:
            ild_vals.append(float(np.mean(sims)))
    return float(np.mean(ild_vals)) if ild_vals else np.nan

def evaluate(assignments, name):
    util_w = utilitarian_welfare(assignments)
    nash_w = nash_welfare(assignments)
    g = gini(per_user_utility_list(assignments))
    cov = coverage(assignments)
    d = ildi(assignments)
    print(f"[{name}] Utilitarian(sum f): {util_w:,.2f} | Nash(sum log): {nash_w:,.2f} | "
          f"Gini(↓ good): {g:.4f} | Coverage: {cov*100:.2f}% | ILDI: {d:.4f}")

print("\n=== Evaluation ===")
evaluate(topk_baseline, "Per-User TopK (MF)")
evaluate(pop_baseline,  "Popularity")
evaluate(welfare_rec,   f"Welfare-aware (alpha={alpha:.1f}, cap={EXPO_CAP})")

実施結果

③ Welfare-aware (α=0.7, cap=100)に対して、比較として①Per-User TopK、②Popularityのモデルを出しています。

Utilitarian(総効用)や、Nash(公平性) は、③のモデルが良かったのですが、Coverage、ILDI(多様性)では、①Per-User TopKのモデルに負けていました。

モデル Utilitarian(総効用) Nash(公平性) Gini(↓良) Coverage ILDI(多様性)
① Per-User TopK (MF) 9,378.19 1,658.95 0.0900 11.07% 0.8019
② Popularity 6,068.81 1,390.06 0.1105 1.24% 0.7882
③ Welfare-aware (α=0.7, cap=100) 🟢 10,708.18 🟢 1,737.54 0.1015 ⚠️ 3.21% 0.0097

モデル改善

そこで改善として、多様性を確保するようにモデルを一部変更します。

# 推薦スレート内の“平均コサイン類似度”をペナルティとして使う
def slate_similarity_penalty(user_items, candidate_i, item_factors):
    if not user_items:
        return 0.0
    v = item_factors.get(candidate_i)
    if v is None:
        return 0.0
    v_norm = np.linalg.norm(v) + 1e-9
    sims = []
    for j in user_items:
        w = item_factors.get(j)
        if w is None:
            continue
        s = float(np.dot(v, w) / ((np.linalg.norm(w) + 1e-9) * v_norm))
        sims.append(s)
    return float(np.mean(sims)) if sims else 0.0

def welfare_rerank_diverse(
    alpha=0.7,
    expo_cap=30,          # ★ 多様性確保のため露出上限を強めに下げる(例: 100→30)
    K=10,
    lambda_div=0.2        # ★ 多様性重み(0.1〜0.5あたりをチューニング)
):
    assigned = {u: [] for u in ALL_USERS}
    per_user = defaultdict(float)
    item_exp = Counter()
    rng = np.random.default_rng(SEED)
    user_order = list(ALL_USERS)

    for r in range(K):
        rng.shuffle(user_order)
        for u in user_order:
            if len(assigned[u]) >= K:
                continue
            best_i, best_gain = None, -1e9
            base_u = per_user[u]
            # そのユーザーの候補から最も増分利益が大きいアイテムを選ぶ
            for (i, score, util0) in user_candidates[u]:
                if i in user_to_seen[u]:
                    continue
                if item_exp[i] >= expo_cap:
                    continue
                util = util0  # f_utility(score) を事前計算済み
                # Nashの限界増分
                gain_nash = math.log((base_u + util + 1e-9)) - math.log((base_u + 1e-9))
                gain_util = util
                # スレート類似ペナルティ(平均コサイン類似度)
                pen = slate_similarity_penalty(assigned[u], i, item_factors)
                gain = alpha * gain_nash + (1 - alpha) * gain_util - lambda_div * pen
                if gain > best_gain:
                    best_gain = gain
                    best_i = (i, util)
            if best_i is not None:
                i, util = best_i
                assigned[u].append(i)
                per_user[u] += util
                item_exp[i] += 1
    return assigned

# 実行:多様性付き再ランキング
alpha_new = 0.7
EXPO_CAP_NEW = 30
LAMBDA_DIV = 0.2

welfare_rec_div = welfare_rerank_diverse(
    alpha=alpha_new, expo_cap=EXPO_CAP_NEW, K=K, lambda_div=LAMBDA_DIV
)

print("=== Evaluation (diversity-aware) ===")
evaluate(topk_baseline, "Per-User TopK (MF)")
evaluate(pop_baseline,  "Popularity")
evaluate(welfare_rec,   f"Welfare-aware (old, alpha={alpha:.1f}, cap={EXPO_CAP})")
evaluate(welfare_rec_div, f"Welfare-aware + Diversity (alpha={alpha_new:.1f}, cap={EXPO_CAP_NEW}, λ_div={LAMBDA_DIV})")

その結果は下記の通り、若干Utilitarian(総効用)やNash(公平性)の値は下がりましたが、①、②に比べて高い値を保ちつつ、Coverage, ILDI(多様性)については①ほどではないですが、大きく改善することができました。

モデル Utilitarian(総効用) Nash(公平性) Gini(↓良) Coverage ILDI(多様性)
① Per-User TopK (MF) 9,378.19 1,658.95 0.0900 11.07% 0.8019
② Popularity 6,068.81 1,390.06 ❌ 0.1105 1.24% 0.7882
③ Welfare-aware (old) 🟢 10,708.18 🟢 1,737.54 0.1015 3.21% 0.0097
④ Welfare-aware + Diversity (λ_div=0.2) 10,189.56 1,706.94 0.1035 🟢 7.77% 🟢 0.6470

サマリ

本実装では、厚生最適化(Nash + 総効用)に、露出上限と多様性ペナルティを組み合わせることで、**「強い効用」と「バランスの良い露出・多様性」**のトレードオフを実用的に制御できることを確認しました。

  • 得られた成果
    • 露出集中を抑えつつ、効用と公平性を高水準に維持できました。
    • 多様性ペナルティで Coverage/ILDI を大幅改善できました。

Discussion