Open4

PySparkの便利テクニック

会社の人にSparkの処理よくわからないからテクニック集作ってよと言われたので作ってみる。

前提

from pyspark.sql import functions as F, Window as W

複数の列に同じ処理を行う

データフレームの2列目以降に対して1を足す処理の場合

cols = [F.col(x) + 1 for x in df1.columns[1:]]
df2 = df1.select(*cols)

横持から縦持ちに

id key1 key2
id_1 0 100
id_2 1 200
id_3 2 300

id key value
id_1 key1 0
id_1 key2 100
id_2 key1 1
id_2 key2 200
id_3 key1 2
id_3 key2 300

にする。
explodeを使うとarrayに入っている値が縦持ちになるのでそれを利用する。

key_value_arrays = [F.array(F.lit(x), F.col(x)) for x in df1.columns[1:]]
df2 = (
    df1
    .select(
        "id",
        F.explode(F.array(key_value_arrays)).alias("tmp_array")
    )
    .select(
        "id",
        F.col("tmp_array")[0].alias("key"),
        F.col("tmp_array")[1].alias("value")
    )
)

列の値が最大 or 最小のレコードを抽出

x列が最大でy列が最小のレコードを抽出する場合

df2 = (
    df1
    .withColumn(
        "tmp",
         F.row_number().over(W.orderBy(F.col("x").desc(), F.col("y").asc())))
    )
    .where(F.col("tmp") == 1)
    .drop("tmp")
)

ログインするとコメントできます