Open4
PySparkの便利テクニック
会社の人にSparkの処理よくわからないからテクニック集作ってよと言われたので作ってみる。
前提
from pyspark.sql import functions as F, Window as W
複数の列に同じ処理を行う
データフレームの2列目以降に対して1を足す処理の場合
cols = [F.col(x) + 1 for x in df1.columns[1:]]
df2 = df1.select(*cols)
横持から縦持ちに
id | key1 | key2 |
---|---|---|
id_1 | 0 | 100 |
id_2 | 1 | 200 |
id_3 | 2 | 300 |
を
id | key | value |
---|---|---|
id_1 | key1 | 0 |
id_1 | key2 | 100 |
id_2 | key1 | 1 |
id_2 | key2 | 200 |
id_3 | key1 | 2 |
id_3 | key2 | 300 |
にする。
explodeを使うとarrayに入っている値が縦持ちになるのでそれを利用する。
key_value_arrays = [F.array(F.lit(x), F.col(x)) for x in df1.columns[1:]]
df2 = (
df1
.select(
"id",
F.explode(F.array(key_value_arrays)).alias("tmp_array")
)
.select(
"id",
F.col("tmp_array")[0].alias("key"),
F.col("tmp_array")[1].alias("value")
)
)
列の値が最大 or 最小のレコードを抽出
x列が最大でy列が最小のレコードを抽出する場合
df2 = (
df1
.withColumn(
"tmp",
F.row_number().over(W.orderBy(F.col("x").desc(), F.col("y").asc())))
)
.where(F.col("tmp") == 1)
.drop("tmp")
)