🌊
PySparkでデータ処理するときのちょっとしたtips集
はじめに
PySparkでちょっとしたデータ処理するときに自分の備忘録としてまとめています(tipsってほどでもないですが)。
随時更新していくつもりです。間違いあればご指摘ください。
nullを含むカラムでconcatしたい場合
nullを含むカラムでconcatすると片方がnullでなくてもnullになってしまいます。
concat_ws()
を使ってあげることで回避できます。coalesce()
使ってもいいかもしれません。
data = [
('A', None, 'hoge'),
('B', 'hoge', 'fuga'),
('B', 'hoge', None),
]
df = spark.createDataFrame(data=data, schema = ['id', 'str1', 'str2'])
df.printSchema()
df.show(truncate=False)
"""
root
|-- id: string (nullable = true)
|-- str1: string (nullable = true)
|-- str2: string (nullable = true)
+---+----+----+
|id |str1|str2|
+---+----+----+
|A |null|hoge|
|B |hoge|fuga|
|B |hoge|null|
+---+----+----+
"""
# だめな例
df.withColumn('concat', concat(col('str1'), col('str2')))\
.show()
"""
+---+----+----+---------+
| id|str1|str2| concat|
+---+----+----+---------+
| A|null|hoge| null|
| B|hoge|fuga| hogefuga|
| B|hoge|null| null|
+---+----+----+---------+
"""
# 良い例
df.withColumn('concat', concat_ws('' ,col('str1'), col('str2')))\
.show()
"""
+---+----+----+--------+
| id|str1|str2| concat|
+---+----+----+--------+
| A|null|hoge| hoge|
| B|hoge|fuga|hogefuga|
| B|hoge|null| hoge|
+---+----+----+--------+
"""
順序保証された配列を作りたい
collect_list()で配列を作れますが、順序が保持できていません。
なので、特定のカラムでソートされた順番で配列をつくりたい場合には以下のようにしてあげる必要があります。
また、spark sqlで書きたいときはdistribute by
とsort by
を使うことで、同様の処理が可能です。
sort byは各reducerごとにデータをソートするというもので、distribute byは指定されたカラムが同じ値のデータは同じreducerに割り当てられる事を保証するというものです。
data = [
('A', 'a', 1),
('A', 'b', 2),
('A', 'c', 3),
('B', 'b', 2),
('B', 'c', 1),
('C', 'c', 3),
('C', 'd', 1),
('C', 'e', 2)
]
df = spark.createDataFrame(data=data, schema = ['user_id', 'item_id', 'rank'])
df.printSchema()
df.show(truncate=False)
"""
root
|-- user_id: string (nullable = true)
|-- item_id: string (nullable = true)
|-- rank: long (nullable = true)
+-------+-------+----+
|user_id|item_id|rank|
+-------+-------+----+
|A |a |1 |
|A |b |2 |
|A |c |3 |
|B |b |2 |
|B |c |1 |
|C |c |3 |
|C |d |1 |
|C |e |2 |
+-------+-------+----+
"""
# だめな例
# このままcollect_list()するとrankの順にならない
df.groupBy('user_id')\
.agg(collect_list('item_id').alias('item_list'))\
.show()
"""
+-------+---------+
|user_id|item_list|
+-------+---------+
| B| [b, c]|
| C|[c, d, e]|
| A|[a, b, c]|
+-------+---------+
"""
# 良い例
df.withColumn('sorted_list', collect_list('item_id').over(Window.partitionBy('user_id').orderBy('rank')))\
.groupBy('user_id')\
.agg(max('sorted_list').alias('sorted_item_list'))\
.show()
"""
+-------+----------------+
|user_id|sorted_item_list|
+-------+----------------+
| B| [c, b]|
| C| [d, e, c]|
| A| [a, b, c]|
+-------+----------------+
"""
# spark.sqlで書くときはdistribute byとsort byを使っても可能
df.createOrReplaceTempView('df')
spark.sql('''
with
tmp as (
select
user_id
, item_id
, rank
from df
distribute by user_id
sort by
user_id, rank
)
select
user_id
, collect_list(item_id) as item_list
from tmp
group by
user_id
''').show()
"""
+-------+---------+
|user_id|item_list|
+-------+---------+
| B| [c, b]|
| C|[d, e, c]|
| A|[a, b, c]|
+-------+---------+
"""
sklearnのkNNを使って分散推論したい
SparkにはkNNが実装されていません。
PySparNNなどのライブラリもありますが、sklearnでサクッとやりたいときに以下のコードが使えるかと思います。
また、tensorflowやpytorchで作ったモデルの推論にも応用可能です。
from sklearn.neighbors import NearestNeighbors
from pyspark.sql.types import *
from typing import Iterator
import pandas as pd
# データの作成
data = [
('A', [-1, -1]),
('B', [-2, -1]),
('C', [-3, -2]),
('D', [1, 1]),
('E', [2, 1]),
('F', [3, 2]),
]
df = spark.createDataFrame(data=data, schema = ['item_id', 'vector'])
df.printSchema()
df.show(truncate=False)
"""
root
|-- item_id: string (nullable = true)
|-- vector: array (nullable = true)
| |-- element: long (containsNull = true)
+-------+--------+
|item_id|vector |
+-------+--------+
|A |[-1, -1]|
|B |[-2, -1]|
|C |[-3, -2]|
|D |[1, 1] |
|E |[2, 1] |
|F |[3, 2] |
+-------+--------+
"""
# ドライバーでknnのモデル作成
df_pd = df.toPandas()
index = list(df_pd.item_id.values)
label = list(range(len(df_pd)))
x = list(df_pd.vector.values)
nbrs = NearestNeighbors(n_neighbors=3, algorithm='ball_tree').fit(x, label)
# pandas_udfを使って推論処理を定義
def predict_func(index, neighbors):
@pandas_udf(StructType([
StructField("items", ArrayType(StringType()), True),
StructField("scores", ArrayType(FloatType()), True),
]))
def predict(iter: Iterator[pd.Series]) -> Iterator[pd.DataFrame]:
for batch in iter:
for b in batch:
scores, indices = neighbors.kneighbors(b.reshape(1, -1), n_neighbors=3)
indices = [index[int(ind)] for ind in indices[0]]
scores = [float(s) for s in scores[0]]
yield pd.DataFrame({
'items': [indices],
'scores': [scores],
})
return predict
df = df.withColumn('y', predict_func(index, nbrs)('vector'))
df.printSchema()
df.show(truncate=False)
"""
root
|-- item_id: string (nullable = true)
|-- vector: array (nullable = true)
| |-- element: long (containsNull = true)
|-- y: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- scores: array (nullable = true)
| | |-- element: float (containsNull = true)
+-------+--------+---------------------------------------+
|item_id|vector |y |
+-------+--------+---------------------------------------+
|A |[-1, -1]|{[A, B, C], [0.0, 1.0, 2.236068]} |
|B |[-2, -1]|{[B, A, C], [0.0, 1.0, 1.4142135]} |
|C |[-3, -2]|{[C, B, A], [0.0, 1.4142135, 2.236068]}|
|D |[1, 1] |{[D, E, F], [0.0, 1.0, 2.236068]} |
|E |[2, 1] |{[E, D, F], [0.0, 1.0, 1.4142135]} |
|F |[3, 2] |{[F, E, D], [0.0, 1.4142135, 2.236068]}|
+-------+--------+---------------------------------------+
"""
複数カラムでexplodeしたい
上記の推論結果を縦持ちしたいときはarray_zip
でまとめてからexplode
しましょう
df.show(truncate=False)
"""
+-------+--------+---------------------------------------+
|item_id|vector |y |
+-------+--------+---------------------------------------+
|A |[-1, -1]|{[A, B, C], [0.0, 1.0, 2.236068]} |
|B |[-2, -1]|{[B, A, C], [0.0, 1.0, 1.4142135]} |
|C |[-3, -2]|{[C, B, A], [0.0, 1.4142135, 2.236068]}|
|D |[1, 1] |{[D, E, F], [0.0, 1.0, 2.236068]} |
|E |[2, 1] |{[E, D, F], [0.0, 1.0, 1.4142135]} |
|F |[3, 2] |{[F, E, D], [0.0, 1.4142135, 2.236068]}|
+-------+--------+---------------------------------------+
"""
df.withColumn("tmp", explode(arrays_zip(col('y.items'), col('y.scores'))))\
.select('item_id', col('tmp.0').alias('similar_item'), col('tmp.1').alias('score'))\
.orderBy(col('item_id'), col('score'))\
.show()
"""
+-------+--------------+---------+
|item_id|similar_item | score|
+-------+--------------+---------+
| A| A| 0.0|
| A| B| 1.0|
| A| C| 2.236068|
| B| B| 0.0|
| B| A| 1.0|
| B| C|1.4142135|
| C| C| 0.0|
| C| B|1.4142135|
| C| A| 2.236068|
| D| D| 0.0|
| D| E| 1.0|
| D| F| 2.236068|
| E| E| 0.0|
| E| D| 1.0|
| E| F|1.4142135|
| F| F| 0.0|
| F| E|1.4142135|
| F| D| 2.236068|
+-------+--------------+---------+
"""
Discussion