🔥

前処理大全をPysparkで試みる(5章〜7章)

2022/07/24に公開

はじめに

こちらの記事のつづき
https://zenn.dev/tjjj/articles/ed6819e3ade170

前提

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

5章:分割

5-1:交差検証

5-2:時系列データの分割

  • 当初takeやheadで取得してtailを考えたが、そもそもtakeやheadはlist型が戻されるのでその後にtailができない。
  • 今回はRDDのzipWithIndex関数を使って、全行の連番をふる(なお連番の生成には複数のやり方がある[1])。この連番をつかってwindowをloop処理でずらしながら実行する。ただし、これはloop処理の1loop処理が分散処理されるのみなので、Spark的にはnot awesomeである。
  • そもそも並列分散処理であるSparkを行の全体の順序に対する処理に使うのが、not awesomeの気もするw
  • 単純にpyspark.sql.DataFrame.collectでlist型にして、indexでやるとかも考えられる。(これを言うなら、pyspark.sql.DataFrame.toPandasも同じではあるが。)
df_monthly_index = spark.read.option('header', True).option(
    'inferSchema', True).csv('../../../data/monthly_index.csv')
df_monthly_index = df_monthly_index.orderBy('year_month')
df1 = df_monthly_index.rdd.zipWithIndex().toDF()
df2 = df1.select(F.col("_1.*"), F.col("_2").alias('seq_num'))

train_window_start = 0
train_window_end = 23
horizon = 12
skip = 12

while (train_window_end + horizon + 1) <= df2.count():
    df2.where((train_window_start <= df2.seq_num)
                      & (df2.seq_num <= train_window_end)).show(100)
    df2.where((train_window_end < df2.seq_num) & (
        df2.seq_num <= train_window_end + horizon)).show(1000)

    # train,testのデータを使った学習

    train_window_start = train_window_start + skip
    train_window_end = train_window_end + skip

6章:生成

6-2:オーバーサンプリング

  • SparkのMLlibなど公式にはSMOTEのライブラリは提供されていない。そのため、ライブラリを見つける必要があるが、現時点ではデファクトになりそうなものはない。自身の手で実装するのは。。。。。
  • 現状SMOTEについては、Pandasでimbalanced-learnを使ってやるのが良い気がする。

7章:展開

7-1:横持ち変換

  • Pysparkもpivot関数が存在するため利用する
  • PysparkのpivotにはPandasのpivot_table[2]のようにfill_valueがないので、pivot後にnullを0にする。
df_7_1_1 = df_reserve.groupBy('customer_id').pivot('people_num').count().fillna(0)

df_7_1_1.show(3)
+-----------+---+---+---+---+
|customer_id|  1|  2|  3|  4|
+-----------+---+---+---+---+
|      c_124|  2|  4|  1|  1|
|      c_291|  1|  0|  3|  1|
|      c_657|  0|  0|  2|  0|
+-----------+---+---+---+---+
only showing top 3 rows
  • pivotは重い処理であり、spark2.0以前はパフォーマンス改善として、次のコーディング手法がある。(ただし、spark3.0ではこれが必要となるかは微妙)
  • 1個目(df_7_1_2)は事前にpivotの列データをpivot関数の引数に指定する。ただし、事前に値がわかっていない場合には下記のように抽出する必要があり、これがトータルとして、パフォーマンス最適となるかは微妙である。
  • 2個目(df_7_1_3)は2フェーズ集約である。
people_num_list = [int(row.people_num) for row in df_reserve.select('people_num').distinct().collect()]
df_7_1_2 = df_reserve.groupBy('customer_id').pivot('people_num',people_num_list).count()
df_7_1_3 = df_reserve.groupBy('customer_id','people_num').count().groupBy('customer_id').pivot('people_num').sum('count')

7-2:スパースマトリクス

  • そもそもsparse matrixとはなんぞやは、はむかずさんのわかりやすい説明[3]を見るのが一番よい。
  • scipy.sparse.csc_matrix[4]の場合、下記のものを含めた複数の引数のパターンがある。前処理大全では、下記の1個目を使っている。
    • csc_matrix((data, (row_ind, col_ind)), [shape=(M, N)])
    • csc_matrix((data, indices, indptr), [shape=(M, N)])
  • SparkにもSparseMatrix[5]のクラスは用意されている。一方でSparkのSparseMatrixは上記で言う2点目のパターンのみが引数として対応している。(なお、SparkのSparMatrixはCSCのみである)
  • 2点目のパターンを生成するということは自身でsparseの内部構造を生成するのと同義である。せっかくのクラスもあり、なるだけロジックを使わずに生成したい。
  • そこでSparseMatrixよりも生成難易度が低いと想定されるDenseMatrix[6]を生成し、toSparse()によるSparseMatrix生成を試みる。
  • DenseMatrixは7-1でpivotで生成したものを縦持ち化して利用する。なお、pivotのデータを用いるのは0のcellが生成されている必要があるため。

結果として、次の2パターンが考えられた。基本的にはどちらも縦持ちに変えて、DenseMatrixを生成しSparseに変える。
わかりやすさ的にはパターン1であるが、汎用性という点においてはパターン2である。

パターン1(select/unionによる縦持ち化)

  • 各列ごとのselectしたものをunionで一つの列にして、それでdense matrix生成して、それをsparse matrixに変える。(select/unionは力技の感がある....)
パターン1
from pyspark.mllib.linalg import DenseMatrix

# パターン2と結果を比較しやすいようにソートする。
df_7_1_1 = df_7_1_1.orderBy('customer_id')
df_7_1_1_union = df_7_1_1.select('customer_id','1').union(df_7_1_1.select('customer_id','2')).union(df_7_1_1.select('customer_id','3')).union(df_7_1_1.select('customer_id','4'))

df_7_1_1_dm = DenseMatrix(df_7_1_1.count(),len(df_7_1_1.columns)-1,df_7_1_1_union.select('1').rdd.flatMap(lambda x: x).collect())
df_7_2_1 = df_7_1_1_dm.toSparse()

SparseMatrix(888, 4, [0, 565, 1150, 1738, 2366], [0, 2, 3, 4, 6, 9, 10, 11, ..., 876, 877, 878, 879, 880, 881, 884, 887], [2.0, 2.0, 1.0, 2.0, 3.0, 2.0, 3.0, 4.0, ..., 2.0, 1.0, 1.0, 4.0, 2.0, 3.0, 3.0, 1.0], False)

パターン2(stackによる縦持ち化)

  • pivotをstack[7]を用いて横持ちから縦持ちに変換し、DenseMatrixのもととなるデータを生成する。stackの場合だと、データが行列の順番になるので、生成後にソートする。
パターン2
from pyspark.mllib.linalg import DenseMatrix

# stack時に数値のカラム名だとエラーになるため、カラム名を文字に変更。
df_7_1_1 = df_7_1_1.withColumnRenamed('1', 'a').withColumnRenamed('2', 'b').withColumnRenamed('3', 'c').withColumnRenamed('4', 'd')

df_7_1_1_v = (
    df_7_1_1
    .selectExpr('customer_id',
                f"""
    stack(
      {len(df_7_1_1.columns )-1 },
      {', '.join(f'"{c}", {c}' for c in df_7_1_1.columns[1:])}
    ) as (column, count)
    """
                )
)

df_7_1_1_v = df_7_1_1_v.orderBy('column', 'customer_id')

df_7_1_1_dm = DenseMatrix(df_7_1_1.count(), len(df_7_1_1.columns)-1, df_7_1_1_v.select('count').rdd.flatMap(lambda x: x).collect())
df_7_2_2 = df_7_1_1_dm.toSparse()

SparseMatrix(888, 4, [0, 565, 1150, 1738, 2366], [0, 2, 3, 4, 6, 9, 10, 11, ..., 876, 877, 878, 879, 880, 881, 884, 887], [2.0, 2.0, 1.0, 2.0, 3.0, 2.0, 3.0, 4.0, ..., 2.0, 1.0, 1.0, 4.0, 2.0, 3.0, 3.0, 1.0], False)
脚注
  1. https://docs.microsoft.com/en-us/azure/databricks/kb/sql/gen-unique-increasing-values ↩︎

  2. https://pandas.pydata.org/docs/reference/api/pandas.pivot_table.html ↩︎

  3. https://hamukazu.com/2014/12/03/internal-data-structure-scipy-sparse/ ↩︎

  4. https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csc_matrix.html ↩︎

  5. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.linalg.SparseMatrix.html ↩︎

  6. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.linalg.DenseMatrix.html ↩︎

  7. https://qiita.com/calderarie/items/9acd3d77e23484bd3bad ↩︎

Discussion