🐍

PySparkとPandasのDataFrameの相互変換

2022/07/29に公開

Pandas を利用して作ったロジックを PySpark を使う処理系(たとえば Databricks)に持っていく場合などに、それぞれのDataFrameを変換することがありますが、その際に気をつけること共有します。

この記事の例は Databricks で実行することを想定しており、spark はプリセットの SparkSession オブジェクトだと解釈してください。

PySpark → Pandas

pyspark.sql.DataFrame オブジェクトには toPandas() というメソッドがあるため、これを使えば変換できます。

import io
data = '''id,value
one,hoge
two,fuga
'''
df = spark.read.csv(io.StringIO(data)) # df は pyspark.sql.DataFrame
pdf = df.toPandas() # pdf は pandas.DataFrame

Pandas → PySpark

SparkSession オブジェクトには createDataFrame というメソッドがあるため、これを使うと pandas.DataFramepyspark.sql.DataFrame に変換できます。

import pandas as pd
pdf = pd.read_csv(io.StringIO(data)) # pdf は pandas.DataFrame
df = spark.createDataFrame(pdf)

ただし、 pandas.DataFrame の Index としてデフォルトの値(1から始まる正数列)以外を使っている場合は、 変換の際に Index の情報が失われてしまうので注意が必要です。

例えば

import pandas as pd
pdf = pd.read_csv(io.StringIO(data), index_col='id') # pdf は pandas.DataFrame
df = spark.createDataFrame(pdf)

とすると、CSVの id 列に記載されていた情報が失われます。

これを防止したいのであれば、pandas.DataFrame.reset_index() を利用し、前述の createDataFrame の部分を以下に差し替えると Index に入っていた情報を取り込むことができます。

df = spark.createDataFrame(pdf.reset_index(drop=False))

Discussion