🌊

PySparkでデータ処理するときのちょっとしたtips集

2021/08/20に公開

はじめに

PySparkでちょっとしたデータ処理するときに自分の備忘録としてまとめています(tipsってほどでもないですが)。

随時更新していくつもりです。間違いあればご指摘ください。

nullを含むカラムでconcatしたい場合

nullを含むカラムでconcatすると片方がnullでなくてもnullになってしまいます。
concat_ws()を使ってあげることで回避できます。coalesce()使ってもいいかもしれません。

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.concat_ws.html

data = [
  ('A', None, 'hoge'),
  ('B', 'hoge', 'fuga'),
  ('B', 'hoge', None),
]

df = spark.createDataFrame(data=data, schema = ['id', 'str1', 'str2'])
df.printSchema()
df.show(truncate=False)

"""
root
 |-- id: string (nullable = true)
 |-- str1: string (nullable = true)
 |-- str2: string (nullable = true)

+---+----+----+
|id |str1|str2|
+---+----+----+
|A  |null|hoge|
|B  |hoge|fuga|
|B  |hoge|null|
+---+----+----+
"""

# だめな例
df.withColumn('concat', concat(col('str1'), col('str2')))\
  .show()
  
"""
+---+----+----+---------+
| id|str1|str2|   concat|
+---+----+----+---------+
|  A|null|hoge|     null|
|  B|hoge|fuga| hogefuga|
|  B|hoge|null|     null|
+---+----+----+---------+
"""

# 良い例
df.withColumn('concat', concat_ws('' ,col('str1'), col('str2')))\
  .show()
  
"""  
+---+----+----+--------+
| id|str1|str2|  concat|
+---+----+----+--------+
|  A|null|hoge|    hoge|
|  B|hoge|fuga|hogefuga|
|  B|hoge|null|    hoge|
+---+----+----+--------+
"""

順序保証された配列を作りたい

collect_list()で配列を作れますが、順序が保持できていません。
なので、特定のカラムでソートされた順番で配列をつくりたい場合には以下のようにしてあげる必要があります。

また、spark sqlで書きたいときはdistribute bysort byを使うことで、同様の処理が可能です。
sort byは各reducerごとにデータをソートするというもので、distribute byは指定されたカラムが同じ値のデータは同じreducerに割り当てられる事を保証するというものです。

https://saurzcode.in/2015/01/hive-sort-order-distribute-cluster/?cn-reloaded=1

data = [
  ('A', 'a', 1),
  ('A', 'b', 2),
  ('A', 'c', 3),
  ('B', 'b', 2),
  ('B', 'c', 1),
  ('C', 'c', 3),
  ('C', 'd', 1),
  ('C', 'e', 2)
]

df = spark.createDataFrame(data=data, schema = ['user_id', 'item_id', 'rank'])
df.printSchema()
df.show(truncate=False)

"""
root
 |-- user_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- rank: long (nullable = true)

+-------+-------+----+
|user_id|item_id|rank|
+-------+-------+----+
|A      |a      |1   |
|A      |b      |2   |
|A      |c      |3   |
|B      |b      |2   |
|B      |c      |1   |
|C      |c      |3   |
|C      |d      |1   |
|C      |e      |2   |
+-------+-------+----+
"""

# だめな例
# このままcollect_list()するとrankの順にならない
df.groupBy('user_id')\
  .agg(collect_list('item_id').alias('item_list'))\
  .show()

"""
+-------+---------+
|user_id|item_list|
+-------+---------+
|      B|   [b, c]|
|      C|[c, d, e]|
|      A|[a, b, c]|
+-------+---------+
"""

# 良い例
df.withColumn('sorted_list', collect_list('item_id').over(Window.partitionBy('user_id').orderBy('rank')))\
  .groupBy('user_id')\
  .agg(max('sorted_list').alias('sorted_item_list'))\
  .show()
  
"""
+-------+----------------+
|user_id|sorted_item_list|
+-------+----------------+
|      B|          [c, b]|
|      C|       [d, e, c]|
|      A|       [a, b, c]|
+-------+----------------+
"""

# spark.sqlで書くときはdistribute byとsort byを使っても可能
df.createOrReplaceTempView('df')
spark.sql('''
with
tmp as (
select
  user_id
, item_id
, rank
from df
distribute by user_id
sort by
  user_id, rank
)
select
  user_id
, collect_list(item_id) as item_list
from tmp
group by
  user_id
''').show()
"""
+-------+---------+
|user_id|item_list|
+-------+---------+
|      B|   [c, b]|
|      C|[d, e, c]|
|      A|[a, b, c]|
+-------+---------+
"""

sklearnのkNNを使って分散推論したい

SparkにはkNNが実装されていません。
PySparNNなどのライブラリもありますが、sklearnでサクッとやりたいときに以下のコードが使えるかと思います。

また、tensorflowやpytorchで作ったモデルの推論にも応用可能です。

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html

from sklearn.neighbors import NearestNeighbors
from pyspark.sql.types import *
from typing import Iterator
import pandas as pd


# データの作成
data = [
  ('A', [-1, -1]),
  ('B', [-2, -1]),
  ('C', [-3, -2]),
  ('D', [1, 1]),
  ('E', [2, 1]),
  ('F', [3, 2]),
]

df = spark.createDataFrame(data=data, schema = ['item_id', 'vector'])
df.printSchema()
df.show(truncate=False)

"""
root
 |-- item_id: string (nullable = true)
 |-- vector: array (nullable = true)
 |    |-- element: long (containsNull = true)

+-------+--------+
|item_id|vector  |
+-------+--------+
|A      |[-1, -1]|
|B      |[-2, -1]|
|C      |[-3, -2]|
|D      |[1, 1]  |
|E      |[2, 1]  |
|F      |[3, 2]  |
+-------+--------+
"""

# ドライバーでknnのモデル作成
df_pd = df.toPandas()
index = list(df_pd.item_id.values)
label = list(range(len(df_pd)))
x = list(df_pd.vector.values)
nbrs = NearestNeighbors(n_neighbors=3, algorithm='ball_tree').fit(x, label)

# pandas_udfを使って推論処理を定義
def predict_func(index, neighbors):

  @pandas_udf(StructType([
      StructField("items", ArrayType(StringType()), True),
      StructField("scores", ArrayType(FloatType()), True),
  ]))
  def predict(iter: Iterator[pd.Series]) -> Iterator[pd.DataFrame]:
    for batch in iter:
      for b in batch:
        scores, indices = neighbors.kneighbors(b.reshape(1, -1), n_neighbors=3)
        indices = [index[int(ind)] for ind in indices[0]]
        scores = [float(s) for s in scores[0]]
        yield pd.DataFrame({
          'items': [indices],
          'scores': [scores],
        })
  return predict

df = df.withColumn('y', predict_func(index, nbrs)('vector'))
df.printSchema()
df.show(truncate=False)

"""
root
 |-- item_id: string (nullable = true)
 |-- vector: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- y: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- scores: array (nullable = true)
 |    |    |-- element: float (containsNull = true)

+-------+--------+---------------------------------------+
|item_id|vector  |y                                      |
+-------+--------+---------------------------------------+
|A      |[-1, -1]|{[A, B, C], [0.0, 1.0, 2.236068]}      |
|B      |[-2, -1]|{[B, A, C], [0.0, 1.0, 1.4142135]}     |
|C      |[-3, -2]|{[C, B, A], [0.0, 1.4142135, 2.236068]}|
|D      |[1, 1]  |{[D, E, F], [0.0, 1.0, 2.236068]}      |
|E      |[2, 1]  |{[E, D, F], [0.0, 1.0, 1.4142135]}     |
|F      |[3, 2]  |{[F, E, D], [0.0, 1.4142135, 2.236068]}|
+-------+--------+---------------------------------------+
"""

複数カラムでexplodeしたい

上記の推論結果を縦持ちしたいときはarray_zipでまとめてからexplodeしましょう

df.show(truncate=False)
"""
+-------+--------+---------------------------------------+
|item_id|vector  |y                                      |
+-------+--------+---------------------------------------+
|A      |[-1, -1]|{[A, B, C], [0.0, 1.0, 2.236068]}      |
|B      |[-2, -1]|{[B, A, C], [0.0, 1.0, 1.4142135]}     |
|C      |[-3, -2]|{[C, B, A], [0.0, 1.4142135, 2.236068]}|
|D      |[1, 1]  |{[D, E, F], [0.0, 1.0, 2.236068]}      |
|E      |[2, 1]  |{[E, D, F], [0.0, 1.0, 1.4142135]}     |
|F      |[3, 2]  |{[F, E, D], [0.0, 1.4142135, 2.236068]}|
+-------+--------+---------------------------------------+
"""

df.withColumn("tmp", explode(arrays_zip(col('y.items'), col('y.scores'))))\
  .select('item_id', col('tmp.0').alias('similar_item'), col('tmp.1').alias('score'))\
  .orderBy(col('item_id'), col('score'))\
  .show()

"""
+-------+--------------+---------+
|item_id|similar_item  |    score|
+-------+--------------+---------+
|      A|             A|      0.0|
|      A|             B|      1.0|
|      A|             C| 2.236068|
|      B|             B|      0.0|
|      B|             A|      1.0|
|      B|             C|1.4142135|
|      C|             C|      0.0|
|      C|             B|1.4142135|
|      C|             A| 2.236068|
|      D|             D|      0.0|
|      D|             E|      1.0|
|      D|             F| 2.236068|
|      E|             E|      0.0|
|      E|             D|      1.0|
|      E|             F|1.4142135|
|      F|             F|      0.0|
|      F|             E|1.4142135|
|      F|             D| 2.236068|
+-------+--------------+---------+
"""

Discussion