📘

PySparkのpandas_udfをnumbaで高速化する方法

に公開

はじめに

Apache Spark は、ビッグデータ処理のための分散コンピューティングフレームワークであり、PySpark はその Python API です。PySpark では、ユーザー定義関数(UDF)を使用してカスタム処理を行うことができます。

UDF は、Python の関数を Spark の 分散処理に適用するための便利な方法ですが、パフォーマンスが低下することがあります。特に、Python の関数は遅いため、大量のデータを処理する際にはボトルネックになることがあります。

そこで、今回は、PySpark で numba を使用して UDF を高速化する方法について説明します。

実行環境

  • Azure Databricks
  • 16.3 ML (includes Apache Spark 3.5.2, Scala 2.12)
  • Standard_DS3_v2 メモリ 14GB、4 個のコア
検証用コード

numba とは

numba は、Python の数値計算を高速化するためのライブラリです。特に、NumPy 配列や SciPy 関数を使用する場合に効果的です。numba は、Python コードを JIT(Just-In-Time)コンパイルして、ネイティブコードに変換します。これにより、Python の関数を高速化することができます。

今回は、Vector → Scalar 変換を行う UDF と、Vector → Vector 変換を行う UDF の 2 つの例で、numba を使用して UDF を高速化する方法を説明します。

Vector → Scalar 変換

ベクトルをスカラーに変換する UDF の例を示します。以下の関数は、ベクトルの全ての組み合わせの距離の平均を計算する関数です。

import numpy as np
import numba

@numba.njit
def average_abs(x: np.ndarray) -> float:
    n = len(x)
    tot = 0.0
    cnt = 0
    for i in range(n):
        for j in range(i+1, n):
            tot += abs(x[i] - x[j])
            cnt += 1
    return tot / cnt

この関数は、numba を使用して JIT コンパイルされており、Python の関数よりも高速に動作します。

Python のみ: 10.4589 秒, 結果=0.3338157665593986
Numba JIT 版: 0.0160 秒, 結果=0.3338157665593986
Numba 版は約 653.9 倍高速

処理時間の計測用コード
@numba.njit
def average_abs(x: np.ndarray) -> float:
    n = len(x)
    tot = 0.0
    cnt = 0
    for i in range(n):
        for j in range(i+1, n):
            tot += abs(x[i] - x[j])
            cnt += 1
    return tot / cnt

def py_average_abs(x: np.ndarray) -> float:
    n = len(x)
    tot = 0.0
    cnt = 0
    for i in range(n):
        for j in range(i+1, n):
            tot += abs(x[i] - x[j])
            cnt += 1
    return tot / cnt


N = 1536
x = np.random.rand(N).astype(np.float32)

average_abs(x) # JITコンパイルのため事前実行
py_average_abs(x)

nrepeat = 10

t0 = time.time()
for _ in range(nrepeat):
    res_py = py_average_abs(x)
t1 = time.time()

t2 = time.time()
for _ in range(nrepeat):
    res_nb = average_abs(x)
t3 = time.time()

print(f'Pythonのみ:   {t1-t0:.4f} 秒, 結果={res_py}')
print(f'Numba JIT版:  {t3-t2:.4f} 秒, 結果={res_nb}')
print(f'Numba版は約{((t1-t0)/(t3-t2)):.1f}倍高速')

Vector → Vector 変換

ベクトルをベクトルに変換する UDF の例を示します。以下の関数は、移動平均を計算する関数です。

import numpy as np
import numba

@numba.njit
def moving_average(x: np.ndarray, w: int) -> np.ndarray:
    n = len(x)
    y = np.empty(n-w+1, dtype=x.dtype)
    for i in range(n-w+1):
        s = 0.0
        for j in range(w):
            s += x[i+j]
        y[i] = s / w
    return y

この関数も、numba を使用して JIT コンパイルされており、Python の関数よりも高速に動作します。

Python のみ: 0.2635 秒, 結果=[0.5414437 0.5210445 0.5266397 ... 0.5417057 0.5366109 0.5074862]
Numba JIT 版: 0.0007 秒, 結果=[0.5414437 0.5210445 0.5266397 ... 0.5417057 0.5366109 0.5074862]
Numba 版は約 401.9 倍高速

処理時間の計測用コード
@numba.njit
def moving_average(x: np.ndarray, w: int) -> np.ndarray:
    n = len(x)
    y = np.empty(n-w+1, dtype=x.dtype)
    for i in range(n-w+1):
        s = 0.0
        for j in range(w):
            s += x[i+j]
        y[i] = s / w
    return y

def py_moving_average(x: np.ndarray, w: int) -> np.ndarray:
    n = len(x)
    y = np.empty(n-w+1, dtype=x.dtype)
    for i in range(n-w+1):
        s = 0.0
        for j in range(w):
            s += x[i+j]
        y[i] = s / w
    return y

N = 1536
x = np.random.rand(N).astype(np.float32)
w = 128

moving_average(x, w)
py_moving_average(x, w)

nrepeat = 10

t0 = time.time()
for _ in range(nrepeat):
    res_py = py_moving_average(x, w)
t1 = time.time()

t2 = time.time()
for _ in range(nrepeat):
    res_nb = moving_average(x, w)
t3 = time.time()

print(f'Pythonのみ:   {t1-t0:.4f} 秒, 結果={res_py}')
print(f'Numba JIT版:  {t3-t2:.4f} 秒, 結果={res_nb}')
print(f'Numba版は約{((t1-t0)/(t3-t2)):.1f}倍高速')

PySpark の UDF で numba を使用

このように nambda を利用すると Python の関数を数百倍高速化することができます。PySpark でも同様に numba を使用して UDF を高速化することができます。

PySpark で numba を使用するには、以下のように UDF を定義します。

import pyspark.sql.functions as F

@F.pandas_udf("float")
def udf_func(v: pd.Series) -> pd.Series:
    return v.apply(py_average_abs)

このように、pandas_udf 内で numba を使用することで、UDF を高速化することができます。実際に、numba を使用した UDF の実行時間を計測してみましょう。

入力データは、1536 次元のベクトルを 1000 件用意し、それぞれに対して、numba を使用した UDF を適用します。

入力データ
import pandas as pd
import numpy as np

# サンプル件数・ベクトル次元
ROW_NUM, VEC_DIM = 1000, 1536

# numpyで乱数ベクトルを生成 & pandasへ
np.random.seed(42)
pdf = pd.DataFrame({
    "id": np.arange(ROW_NUM),
    "vector": [np.random.rand(VEC_DIM).astype(np.float32) for _ in range(ROW_NUM)],
})

# PySparkのDataFrameへ変換
sdf = spark.createDataFrame(pdf)

Vector → Scalar 変換の計測結果

udf_func を使用して、ベクトルをスカラーに変換する UDF の実行時間を計測します。

sdf.withColumn("func", udf_func("vector")).show(1000)
実装方法 実行時間
numba UDF 4s
Python UDF 3m 12s

48 倍の高速化が得られました。

Vector → Vector 変換の計測結果

Vector → Vector 変換についても同様に pandas_udf を定義します。

import pyspark.sql.functions as F

@F.pandas_udf("array<float>")
def udf_func(v: pd.Series) -> pd.Series:
    w = 128
    return v.apply(lambda x: moving_average(x, w))

実際に、udf_func を使用して、ベクトルをベクトルに変換する UDF の実行時間を計測します。

sdf.withColumn("func", udf_func("vector")).show(1000)
実装方法 実行時間
numba UDF 2s
Python UDF 18s

9 倍の高速化が得られました。

まとめ

PySpark の UDF を numba を使用して高速化する方法について説明しました。numba を使用することで、Python の関数を数百倍高速化することができ、PySpark でも同様に UDF を高速化することができます。今回は array<float>型のベクトルを扱いましたが、numba は様々なデータ型に対応しているため、他のデータ型でも同様に UDF を高速化することができます。実際のデータ処理においても numba を使用することで、パフォーマンスを大幅に向上させることができるでしょう。

Discussion