Open4

PySparkの便利テクニック

KijitoraButiKijitoraButi

会社の人にSparkの処理よくわからないからテクニック集作ってよと言われたので作ってみる。

前提

from pyspark.sql import functions as F, Window as W
KijitoraButiKijitoraButi

複数の列に同じ処理を行う

データフレームの2列目以降に対して1を足す処理の場合

cols = [F.col(x) + 1 for x in df1.columns[1:]]
df2 = df1.select(*cols)
KijitoraButiKijitoraButi

横持から縦持ちに

id key1 key2
id_1 0 100
id_2 1 200
id_3 2 300

id key value
id_1 key1 0
id_1 key2 100
id_2 key1 1
id_2 key2 200
id_3 key1 2
id_3 key2 300

にする。
explodeを使うとarrayに入っている値が縦持ちになるのでそれを利用する。

key_value_arrays = [F.array(F.lit(x), F.col(x)) for x in df1.columns[1:]]
df2 = (
    df1
    .select(
        "id",
        F.explode(F.array(key_value_arrays)).alias("tmp_array")
    )
    .select(
        "id",
        F.col("tmp_array")[0].alias("key"),
        F.col("tmp_array")[1].alias("value")
    )
)
KijitoraButiKijitoraButi

列の値が最大 or 最小のレコードを抽出

x列が最大でy列が最小のレコードを抽出する場合

df2 = (
    df1
    .withColumn(
        "tmp",
         F.row_number().over(W.orderBy(F.col("x").desc(), F.col("y").asc())))
    )
    .where(F.col("tmp") == 1)
    .drop("tmp")
)