🦆

推薦システム入門:(1) データの前処理方法

に公開

前処理の為の使用ライブラリ比較

推薦システムのためのデータ前処理方法を検討するために、MovieLens データセットの前処理をいろんなライブラリで行って速度を比較してみます。

対象は下記の2つの MovieLens データセットです。それぞれデータの行数は 1M 行と 10M 行になります。

今回は下記の指示を出して前処理コードを自動生成させてみました(が、結構エラーが出たので人手で修正しています)。

  • 入力データの ratings.csv のデータ形式は userId::movieId::rating::timestamp というCSV形式になっている。
  • rating は 0-5 の数字で与えられる。rating 4以上のデータを positive sample として、implicit feedback のデータを作成したい。
  • 出力データも同じCSV形式で、ただし implicit feedback データとするため、rating は positive sample は 1、negative sample は 0 とする。
  • 出力データは userid, movieid, timestamp 順に並ばないようにランダムシャッフルして欲しい。
  • negative sample は、全映画の中からそのユーザが rating 4以上と評価していない映画からランダムに抽出して作成したい(4未満と評価した映画を含んでも良い)。その個数は positive sample の個数に NEGATIVE_RATIO をかけたものとしたい。
  • データ処理には、numpy, pandas, SQLite, duckdb, polars を用いたコードをそれぞれ書いて欲しい。処理速度を比較するためです。

実行は手持ちの MacBook Air (M2, 24 GBメモリ) の上で、Jupyter Notebook 上で実行しました。

比較結果はこうなりました。

ライブラリ 1M data 10M data
pandas 15.85 529.25
numpy 32.98 2705.48
SQLite 211.91 中断
DuckDB 13.36 172.94
Polars 10.92 311.23
DuckDB + Polars 11.06 160.46

Polars の方が速いかなと思ったのですが、DuckDB が予想以上に速かったです。
また、DuckDB のコードで使ってる pandas を Polars に変更してもさほどの高速化にはなりませんでした。
Polars は Int32 や Int64 などのデータ型に厳密な感じで前処理にはちょっと使いにくい感じもしました。
という訳で、この程度のデータサイズであれば DuckDB をメモリ上で使うのが良さそうです。

DuckDB の高速化の原因としては、

  • データ読み込みの効率: DuckDB の読み込みは Pandas より高速(特に大規模データ)。
  • positive sample の抽出: DuckDB の列指向処理は Pandas の行ベース操作より高速。特に、条件フィルタリングが多い場合に有利。
  • 全ユーザーと映画の取得: DuckDB の DISTINCT 処理は Pandas の unique より効率的。データサイズが大きいほど差が顕著。
  • negative sample の生成: DuckDB の SQL クエリは Pandas のフィルタリングより高速。

と考えられます。

高速化の余地としては、区切り文字が :: でなければ、pandas の CSV ファイル読み込みで engine="c" を指定して多少速くなる可能性があります。

S3 への保存

また DuckDB は AWS S3 とも親和性が高そうです(ここはサンプルコードのみで実際には動作確認してない)。S3 との連携のためには、S3 拡張機能をロードする必要があります。

# DuckDB データベース作成
con = duckdb.connect(":memory:")

# S3 拡張機能をロード(S3 保存する場合)
con.execute("INSTALL httpfs")
con.execute("LOAD httpfs")
if AWS_ACCESS_KEY and AWS_SECRET_KEY:
    con.execute(f"SET s3_access_key_id = '{AWS_ACCESS_KEY}'")
    con.execute(f"SET s3_secret_access_key = '{AWS_SECRET_KEY}'")

保存は、こんな感じ。

# 保存:1. CSV 形式(ローカル)
con.execute("""
COPY result_shuffled TO 'implicit_feedback.csv' (DELIMITER ',', HEADER)
""")

# 保存:2. Parquet 形式(ローカル)
con.execute("""
COPY result_shuffled TO 'implicit_feedback.parquet' (FORMAT PARQUET)
""")

# 保存:3. DuckDB 形式(ローカル)
con.execute("""
COPY result_shuffled TO 'implicit_feedback.duckdb' (FORMAT DUCKDB)
""")

# 保存:4. S3 に CSV 形式で保存(オプション)
if AWS_ACCESS_KEY and AWS_SECRET_KEY:
    con.execute(f"""
    COPY result_shuffled TO 's3://{S3_BUCKET}/{S3_PREFIX}implicit_feedback_s3.csv' (DELIMITER ',', HEADER)
    """)

# 保存:5. S3 に Parquet 形式で保存(オプション)
if AWS_ACCESS_KEY and AWS_SECRET_KEY:
    con.execute(f"""
    COPY result_shuffled TO 's3://{S3_BUCKET}/{S3_PREFIX}implicit_feedback_s3.parquet' (FORMAT PARQUET)
    """)

逆に読み込みはこんな感じになります。

# S3 から Parquet ファイルを読み込み
con.execute(f"""
CREATE TABLE result_shuffled AS
SELECT * FROM read_parquet('{S3_PATH}')
""")

業務使用だと、S3 でデータを保存したくなることや、容量などの効率を考えると、前処理後などの中間データは parquet 形式での保存が良さそうです。

実験に用いたコード

比較に使ったコードは下記になります。

pandas での実装

import pandas as pd
import numpy as np
import time

NEGATIVE_RATIO = 2  # negative sample の数を positive sample の 2 倍に設定
OUTPUT_FILE = "implicit_feedback.csv"

start_time = time.time()

# データ読み込み(区切り文字 ::)
df = pd.read_csv("ratings.csv", sep="::", engine="python", names=["userId", "movieId", "rating", "timestamp"])

# positive sample (rating >= 4)
positive_df = df[df["rating"] >= 4].copy()
positive_df["rating"] = 1

# 全ユーザーと映画
all_users = df["userId"].unique()
all_movies = df["movieId"].unique()

# negative sample の生成
negative_samples = []
for user in all_users:
    # ユーザの positive movieId
    user_positive_movies = set(positive_df[positive_df["userId"] == user]["movieId"])
    # negative 候補 (全映画 - positive)
    negative_candidates = list(set(all_movies) - user_positive_movies)
    num_positive = len(user_positive_movies)
    num_negative = int(num_positive * NEGATIVE_RATIO)
    
    if negative_candidates and num_negative > 0:
        # negative sample をランダムに選択
        selected_negatives = np.random.choice(
            negative_candidates, 
            size=min(num_negative, len(negative_candidates)), 
            replace=False
        )
        # timestamp はユーザの既存 timestamp からランダムに選択
        user_timestamps = df[df["userId"] == user]["timestamp"].values
        for movie in selected_negatives:
            timestamp = np.random.choice(user_timestamps) if len(user_timestamps) > 0 else df["timestamp"].sample(1).iloc[0]
            negative_samples.append([user, movie, 0, timestamp])

# negative sample の DataFrame
negative_df = pd.DataFrame(negative_samples, columns=["userId", "movieId", "rating", "timestamp"])

# positive と negative を結合
result_df = pd.concat([positive_df, negative_df], ignore_index=True)

# シャッフル
result_df = result_df.sample(frac=1, random_state=None).reset_index(drop=True)

# 出力
result_df.to_csv(OUTPUT_FILE, index=False)

print(f"Pandas execution time: {time.time() - start_time:.2f} seconds")

NumPy による実装

import numpy as np
import pandas as pd
import time

NEGATIVE_RATIO = 2  # negative sample の数を positive sample の 2 倍に設定
OUTPUT_FILE = "implicit_feedback.csv"

start_time = time.time()

# データ読み込み(区切り文字 ::)
df = pd.read_csv("ratings.csv", sep="::", engine="python", names=["userId", "movieId", "rating", "timestamp"])
data = df.to_numpy()  # [userId, movieId, rating, timestamp]

# positive sample (rating >= 4)
positive_mask = data[:, 2] >= 4
positive_data = data[positive_mask].copy()
positive_data[:, 2] = 1  # rating を 1 に設定

# 全ユーザーと映画
all_users = np.unique(data[:, 0])
all_movies = np.unique(data[:, 1])

# negative sample の生成
negative_samples = []
for user in all_users:
    user_mask = data[:, 0] == user
    user_positive_movies = set(positive_data[positive_data[:, 0] == user, 1])
    # negative 候補 (全映画 - positive)
    negative_candidates = list(set(all_movies) - user_positive_movies)
    num_positive = len(user_positive_movies)
    num_negative = int(num_positive * NEGATIVE_RATIO)
    
    if negative_candidates and num_negative > 0:
        selected_negatives = np.random.choice(
            negative_candidates, 
            size=min(num_negative, len(negative_candidates)), 
            replace=False
        )
        user_timestamps = data[user_mask, 3]
        for movie in selected_negatives:
            timestamp = np.random.choice(user_timestamps) if len(user_timestamps) > 0 else np.random.choice(data[:, 3])
            negative_samples.append([user, movie, 0, timestamp])

# negative sample を numpy 配列に
negative_data = np.array(negative_samples)

# positive と negative を結合
result_data = np.vstack([positive_data, negative_data])

# シャッフル
np.random.shuffle(result_data)

# pandas に変換して出力
result_df = pd.DataFrame(result_data, columns=["userId", "movieId", "rating", "timestamp"])
result_df.to_csv(OUTPUT_FILE, index=False)

print(f"NumPy execution time: {time.time() - start_time:.2f} seconds")

SQLite による実装

import sqlite3
import pandas as pd
import numpy as np
import time

NEGATIVE_RATIO = 2  # negative sample の数を positive sample の 2 倍に設定
OUTPUT_FILE = "implicit_feedback.csv"

start_time = time.time()

# データ読み込み(区切り文字 ::)
df = pd.read_csv("ratings.csv", sep="::", engine="python", names=["userId", "movieId", "rating", "timestamp"])

# SQLite データベース作成
conn = sqlite3.connect(":memory:")
df.to_sql("ratings", conn, index=False)

# positive sample の抽出
query_positive = """
SELECT userId, movieId, 1 as rating, timestamp
FROM ratings
WHERE rating >= 4
"""
positive_df = pd.read_sql_query(query_positive, conn)

# 全ユーザーと映画
all_users = pd.read_sql_query("SELECT DISTINCT userId FROM ratings", conn)["userId"].values
all_movies = pd.read_sql_query("SELECT DISTINCT movieId FROM ratings", conn)["movieId"].values

# negative sample の生成
negative_samples = []
for user in all_users:
    user_positive_movies = set(pd.read_sql_query(
        f"SELECT movieId FROM ratings WHERE userId = {user} AND rating >= 4", conn
    )["movieId"])
    # negative 候補 (全映画 - positive)
    negative_candidates = list(set(all_movies) - user_positive_movies)
    num_positive = len(user_positive_movies)
    num_negative = int(num_positive * NEGATIVE_RATIO)
    
    if negative_candidates and num_negative > 0:
        selected_negatives = np.random.choice(
            negative_candidates, 
            size=min(num_negative, len(negative_candidates)), 
            replace=False
        )
        user_timestamps = pd.read_sql_query(
            f"SELECT timestamp FROM ratings WHERE userId = {user}", conn
        )["timestamp"].values
        for movie in selected_negatives:
            timestamp = np.random.choice(user_timestamps) if len(user_timestamps) > 0 else pd.read_sql_query(
                "SELECT timestamp FROM ratings LIMIT 1", conn
            )["timestamp"].iloc[0]
            negative_samples.append([user, movie, 0, timestamp])

# negative sample の DataFrame
negative_df = pd.DataFrame(negative_samples, columns=["userId", "movieId", "rating", "timestamp"])

# positive と negative を結合
result_df = pd.concat([positive_df, negative_df], ignore_index=True)

# シャッフル
result_df = result_df.sample(frac=1, random_state=None).reset_index(drop=True)

# 出力
result_df.to_csv(OUTPUT_FILE, index=False)

conn.close()
print(f"SQLite execution time: {time.time() - start_time:.2f} seconds")

DuckDB による実装

import duckdb
import pandas as pd
import numpy as np
import time

NEGATIVE_RATIO = 2  # negative sample の数を positive sample の 2 倍に設定
OUTPUT_FILE = "implicit_feedback.csv"

start_time = time.time()

# DuckDB データベース作成
con = duckdb.connect(":memory:")
# データ読み込み(ヘッダなし、区切り文字 ::)
con.execute("""
CREATE TABLE ratings AS 
SELECT * FROM read_csv_auto('ratings.csv', delim='::', header=False, names=['userId', 'movieId', 'rating', 'timestamp'])
""")

# positive sample の抽出
positive_df = con.execute("""
SELECT userId, movieId, 1 AS rating, timestamp
FROM ratings
WHERE rating >= 4
""").fetchdf()

# 全ユーザーと映画
all_users = con.execute("SELECT DISTINCT userId FROM ratings").fetchdf()["userId"].values
all_movies = con.execute("SELECT DISTINCT movieId FROM ratings").fetchdf()["movieId"].values

# negative sample の生成
negative_samples = []
for user in all_users:
    user_positive_movies = set(con.execute(
        f"SELECT movieId FROM ratings WHERE userId = {user} AND rating >= 4"
    ).fetchdf()["movieId"])
    # negative 候補 (全映画 - positive)
    negative_candidates = list(set(all_movies) - user_positive_movies)
    num_positive = len(user_positive_movies)
    num_negative = int(num_positive * NEGATIVE_RATIO)
    
    if negative_candidates and num_negative > 0:
        selected_negatives = np.random.choice(
            negative_candidates, 
            size=min(num_negative, len(negative_candidates)), 
            replace=False
        )
        user_timestamps = con.execute(
            f"SELECT timestamp FROM ratings WHERE userId = {user}"
        ).fetchdf()["timestamp"].values
        for movie in selected_negatives:
            timestamp = np.random.choice(user_timestamps) if len(user_timestamps) > 0 else con.execute(
                "SELECT timestamp FROM ratings LIMIT 1"
            ).fetchdf()["timestamp"].iloc[0]
            negative_samples.append([user, movie, 0, timestamp])

# negative sample の DataFrame
negative_df = pd.DataFrame(negative_samples, columns=["userId", "movieId", "rating", "timestamp"])

# positive と negative を結合
result_df = pd.concat([positive_df, negative_df], ignore_index=True)

# シャッフル
result_df = result_df.sample(frac=1, random_state=None).reset_index(drop=True)

# 出力
result_df.to_csv(OUTPUT_FILE, index=False)

con.close()
print(f"DuckDB execution time: {time.time() - start_time:.2f} seconds")

Polars による実装

Polars は区切り文字を1バイト文字しか受け付けないので、事前に

$ sed 's/::/:/g' ratings.csv > ratings.csv2

と変換しておく必要があります。

import polars as pl
import numpy as np
import time

NEGATIVE_RATIO = 2  # negative sample の数を positive sample の 2 倍に設定
OUTPUT_FILE = "implicit_feedback.csv"

start_time = time.time()

# データ読み込み(区切り文字 ::)
df = pl.read_csv("ml-1m/ratings.dat2", separator=":", has_header=False,
    new_columns=["userId", "movieId", "rating", "timestamp"])

# positive sample (rating >= 4)
positive_df = df.filter(pl.col("rating") >= 4).with_columns(pl.lit(float(1.0)).alias("rating"))

# 全ユーザーと映画
all_users = df["userId"].unique().to_numpy()
all_movies = df["movieId"].unique().to_numpy()

# negative sample の生成
negative_samples = []
for user in all_users:
    user_positive_movies = set(positive_df.filter(pl.col("userId") == user)["movieId"].to_numpy())
    # negative 候補 (全映画 - positive)
    negative_candidates = list(set(all_movies) - user_positive_movies)
    num_positive = len(user_positive_movies)
    num_negative = int(num_positive * NEGATIVE_RATIO)
    
    if negative_candidates and num_negative > 0:
        selected_negatives = np.random.choice(
            negative_candidates, 
            size=min(num_negative, len(negative_candidates)), 
            replace=False
        )
        user_timestamps = df.filter(pl.col("userId") == user)["timestamp"].to_numpy()
        for movie in selected_negatives:
            timestamp = np.random.choice(user_timestamps) if len(user_timestamps) > 0 else df["timestamp"].sample(1)[0]
            negative_samples.append([user, movie, float(0.0), timestamp])

# negative sample の DataFrame
negative_df = pl.DataFrame(negative_samples, schema=["userId", "movieId", "rating", "timestamp"], orient="row")

# positive と negative を結合
result_df = pl.concat([positive_df, negative_df])

# シャッフル
result_df = result_df.sample(fraction=1.0, shuffle=True)

# 出力
result_df.write_csv(OUTPUT_FILE)

print(f"Polars execution time: {time.time() - start_time:.2f} seconds")

DuckDB + Polars

import duckdb
import polars as pl
import numpy as np
import time

NEGATIVE_RATIO = 2  # negative sample の数を positive sample の 2 倍に設定
OUTPUT_FILE = "implicit_feedback.csv

start_time = time.time()

# DuckDB データベース作成
con = duckdb.connect(":memory:")
# データ読み込み(ヘッダなし、区切り文字 ::)
con.execute("""
CREATE TABLE ratings AS 
SELECT * FROM read_csv_auto('ratings.csv', delim='::', header=False, names=['userId', 'movieId', 'rating', 'timestamp'])
""")

# positive sample の抽出
positive_df = con.execute("""
SELECT userId, movieId, 1 AS rating, timestamp
FROM ratings
WHERE rating >= 4
""").pl()  # Polars DataFrame に直接変換

positive_df = positive_df.select(
    pl.col("userId").cast(pl.Int64),
    pl.col("movieId").cast(pl.Int64),
    pl.col("rating").cast(pl.Int64),
    pl.col("timestamp").cast(pl.Int64)
)

# 全ユーザーと映画
all_users = con.execute("SELECT DISTINCT userId FROM ratings").pl()["userId"].to_numpy()
all_movies = con.execute("SELECT DISTINCT movieId FROM ratings").pl()["movieId"].to_numpy()

# negative sample の生成
negative_samples = []
for user in all_users:
    user_positive_movies = set(con.execute(
        f"SELECT movieId FROM ratings WHERE userId = {user} AND rating >= 4"
    ).pl()["movieId"].to_numpy())
    # negative 候補 (全映画 - positive)
    negative_candidates = list(set(all_movies) - user_positive_movies)
    num_positive = len(user_positive_movies)
    num_negative = int(num_positive * NEGATIVE_RATIO)
    
    if negative_candidates and num_negative > 0:
        selected_negatives = np.random.choice(
            negative_candidates, 
            size=min(num_negative, len(negative_candidates)), 
            replace=False
        )
        user_timestamps = con.execute(
            f"SELECT timestamp FROM ratings WHERE userId = {user}"
        ).pl()["timestamp"].to_numpy()
        for movie in selected_negatives:
            timestamp = np.random.choice(user_timestamps) if len(user_timestamps) > 0 else con.execute(
                "SELECT timestamp FROM ratings LIMIT 1"
            ).pl()["timestamp"][0]
            negative_samples.append([int(user), int(movie), int(0), int(timestamp)])  # numpy.int64 を int に変換

# negative sample の DataFrame
negative_df = pl.DataFrame(negative_samples, schema=["userId", "movieId", "rating", "timestamp"], orient="row")

# positive と negative を結合
result_df = pl.concat([positive_df, negative_df])

# シャッフル
result_df = result_df.sample(fraction=1.0, shuffle=True)

# 出力(区切り文字 ,)
result_df.write_csv("implicit_feedback.csv")

con.close()
print(f"DuckDB + Polars execution time: {time.time() - start_time:.2f} seconds")

Discussion