🎇

pyspark入門

2023/04/04に公開

※Jupyter Notebook 形式で実行(原本)し、Markdown 形式でエクスポートしたものを一部改変して作っているので若干見づらいかもしれませんがご容赦ください

概要

pandas などは小規模~中規模くらいのデータセットの処理や分析には有効だが、
メモリに乗り切らないレベルのデータ量は処理できない。
また、GIL によって基本的に 1 スレッドでの動作になるため、マシンスペックを最大限活かし切るのは難しい。

遅延評価、並列分散処理によって所謂ビッグデータといわれるレベルのテーブルデータの処理・分析に使うことができ、
更に pandas との連携・併用ができるツールとしてpysparkが存在するため紹介する。

なお、詳細はPySpark Documentationなども参照のこと

環境の作成

Mambaforge をインストール済みの Linux 環境を前提とする。

  • conda 23.1.0
  • mamba 1.4.1

以下のような conda 仮想環境作成用の yaml を用意する:

pyspark_env.yaml
# pyspark_env.yaml
name: pyspark-intro
channels:
  - conda-forge
dependencies:
  - python=3.11
  - pyspark=3.3
  - openjdk=17
  - pandas=1.5
  - pyarrow
  # for testdata
  - numpy
  - seaborn
  # jupyter
  - jupyterlab

(参考)

※先日pandas の 2.0 がリリースされたが、
deprecated になっていたiteritemsが削除された影響で、
pandas から pyspark の DataFrame に変換する部分が一部動作しなくなる現象が起こるため、当面は 1.5 系を使う。

mamba コマンドで仮想環境を作成(conda だと依存関係の解決が遅いので mamba の使用を推奨)

mamba env create -f pyspark_env.yaml

# 出来上がった環境(pyspark-intro)をactivate
conda activate pyspark-intro
# or `mamba activate pyspark-intro`

実行

  • spark session の起動
  • データの読み込みと保存
  • pandas との連携

あたりを簡単に確認する。

データ処理周りの詳細は省略するが、Spark のデータフレーム機能がSpark SQLと対応していることから、
概ね SQL で出来ることようなことを実行出来ると考えれば OK。(というか SQL でも書ける)

以下、上記の手順で作った conda 仮想環境から Jupyter を起動して実行する。

from pyspark.sql.session import SparkSession

# spark sessionの立ち上げ
spark = (SparkSession.builder
  .master("local[*]")
  .appName("LocalTest")
  .getOrCreate()
)
spark

↓pandas との連携のため、config を調整しておく(pyarrow をインストールしておく)

参考: https://learn.microsoft.com/ja-jp/azure/databricks/pandas/pyspark-pandas-conversion

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Spark Session を起動すると、デフォルトではhttp://localhost:4040Spark UIが立ち上がり、
実行状態などをモニタリングすることが出来る(デバッグなどにも便利)

# テストデータの作成
import seaborn as sns

iris = sns.load_dataset("iris")
display(iris)

iris.to_csv("testdata/iris.csv", index=False)

# pysparkでcsvを読み込み

df_iris = spark.read.csv(
    "testdata/iris.csv",
    header=True,
    inferSchema=True,
)

# 遅延評価するので、カラムのスキーマのみ表示
display(df_iris)
DataFrame[sepal_length: string, sepal_width: string, petal_length: string, petal_width: string, species: string]

※(補足):

  • 遅延評価を行う関係で、オンメモリに展開する pandas などと異なり Spark の処理のパフォーマンスはデータソースの形式の影響をもろに受ける
    • データ形式(csv/json/parquet/orc/RDB テーブル/...etc)や、圧縮形式(gzip/xz/snappy/zlib/...etc)、カラムに対するパーティションなど
  • データ分析・データ処理用途であれば行指向形式である csv を読み込んで処理を実行するのは一般に遅く、列指向形式の parquet や orc などの方が有利
# showなどによって初めて評価が行われる
df_iris.show(5) # 5件を表示
+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows
# selectによるカラムの選択
(df_iris
    .select(["sepal_length", "sepal_width"])
    .show(3)
)
+------------+-----------+
|sepal_length|sepal_width|
+------------+-----------+
|         5.1|        3.5|
|         4.9|        3.0|
|         4.7|        3.2|
+------------+-----------+
only showing top 3 rows
# filterによる簡単なクエリ
(df_iris
    .filter(df_iris.species == "virginica")
    .filter(df_iris.sepal_length > 5.0)
    .show(3)
)
+------------+-----------+------------+-----------+---------+
|sepal_length|sepal_width|petal_length|petal_width|  species|
+------------+-----------+------------+-----------+---------+
|         6.3|        3.3|         6.0|        2.5|virginica|
|         5.8|        2.7|         5.1|        1.9|virginica|
|         7.1|        3.0|         5.9|        2.1|virginica|
+------------+-----------+------------+-----------+---------+
only showing top 3 rows
# groupByによる集約(1)
(df_iris
    .groupBy('species')
    .count()
    .show()
)
+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+
# groupByによる集約(2)

(df_iris
    .groupBy('species')
    .agg({
        'sepal_length': 'mean',
        'sepal_width': 'mean',
        'petal_length': 'mean',
        'petal_width': 'mean',
    })
    .show()
)
+----------+------------------+------------------+-----------------+------------------+
|   species|  avg(sepal_width)|  avg(petal_width)|avg(sepal_length)| avg(petal_length)|
+----------+------------------+------------------+-----------------+------------------+
| virginica|2.9739999999999998|             2.026|6.587999999999998|             5.552|
|versicolor|2.7700000000000005|1.3259999999999998|            5.936|              4.26|
|    setosa| 3.428000000000001|0.2459999999999999|5.005999999999999|1.4620000000000002|
+----------+------------------+------------------+-----------------+------------------+
# DataFrameのサマリー
df_iris.describe().show()
+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335|  3.057333333333334|3.7580000000000027| 1.199333333333334|     null|
| stddev|0.8280661279778637|0.43586628493669793|1.7652982332594662|0.7622376689603467|     null|
|    min|               4.3|                2.0|               1.0|               0.1|   setosa|
|    max|               7.9|                4.4|               6.9|               2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+
# SQLでクエリを書くことも可能
df_iris.createOrReplaceTempView('iris')

spark.sql("show tables").show()

tmp = spark.sql("SELECT * FROM iris WHERE species = 'setosa'")
tmp.show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |     iris|       true|
+---------+---------+-----------+

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1|        0.1| setosa|
|         5.8|        4.0|         1.2|        0.2| setosa|
|         5.7|        4.4|         1.5|        0.4| setosa|
|         5.4|        3.9|         1.3|        0.4| setosa|
|         5.1|        3.5|         1.4|        0.3| setosa|
|         5.7|        3.8|         1.7|        0.3| setosa|
|         5.1|        3.8|         1.5|        0.3| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 20 rows

詳細はUser Guide, API Reference,
およびSpark SQL のリファレンスを参照

次に、データのRead/Writeを確認する

# さっきは便宜上csvから読み込んだが、型情報が失われる + pandasとの連携を見るため、pandasのデータフレームから直接pysparkに変換する

display(iris.__class__) # irisはpandas dataframe
df = spark.createDataFrame(iris)

display(df)

df.show()
/home/wsl-user/LocalApps/Mambaforge/envs/pyspark-intro/lib/python3.11/site-packages/pyspark/pandas/__init__.py:49: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
  warnings.warn(



pandas.core.frame.DataFrame


/home/wsl-user/LocalApps/Mambaforge/envs/pyspark-intro/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py:604: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]



DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]


+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1|        0.1| setosa|
|         5.8|        4.0|         1.2|        0.2| setosa|
|         5.7|        4.4|         1.5|        0.4| setosa|
|         5.4|        3.9|         1.3|        0.4| setosa|
|         5.1|        3.5|         1.4|        0.3| setosa|
|         5.7|        3.8|         1.7|        0.3| setosa|
|         5.1|        3.8|         1.5|        0.3| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 20 rows

↑csv から読み込んだときは string 型になってしまっていた数値が、pandas の型を反映して double 型になっている

# spark dataframeからpandas dataframeへの変換

display(df.__class__) # dfはpyspark dataframe

pdf = df.toPandas()
display(pdf)
pyspark.sql.dataframe.DataFrame

↑pyspark で大規模~中規模くらいのデータを処理してサイズダウンし、オンメモリに乗るくらいになったらtoPandasをして扱いやすい pandas に変換する、という使い方が出来る。

最後に色々な形式でデータの書き込みと読み込みを行う

先述の通り、spark はデータ保存形式が処理パフォーマンスにダイレクトに効いてくるため、この辺りの取り扱いは肝になってくる。

# dfはspark dataframe

# csv.gz
df.write.save("testdata/iris_csv", format="csv")

# formatオプションを使う代わりに、↓のように書いてもOK
# df.write.csv("testdata/iris_csv", compression="gzip")

# 実際どのような形式でファイルが保存されているか確認(linuxコマンドを実行している)
!ls testdata/iris_csv
_SUCCESS
part-00000-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
part-00001-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
part-00002-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
part-00003-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
part-00004-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
part-00005-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
part-00006-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
part-00007-030e186f-051c-4e9d-94b1-1560776ebfef-c000.csv
# 一応保存されているファイルの1つについて先頭を見てみる
!head testdata/iris_csv/part-00000-*.csv
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
5.4,3.9,1.7,0.4,setosa
4.6,3.4,1.4,0.3,setosa
5.0,3.4,1.5,0.2,setosa
4.4,2.9,1.4,0.2,setosa
4.9,3.1,1.5,0.1,setosa

↑pandas のように単一のファイルを指定する感じではなく、保存するディレクトリを指定する。
(ファイルが分散して生成される)

読み込む際は個々のファイルを指定することもできる(←Spark 以外で作ったファイルの読み込みなど)が、
基本は保存したディレクトリを指定して読み込む

df_csv = spark.read.load("testdata/iris_csv", format="csv")

# formatを指定せず、↓のようにすることも可能
# df_csv = spark.read.csv("testdata/iris_csv")

df_csv.show(3)
+---+---+---+---+----------+
|_c0|_c1|_c2|_c3|       _c4|
+---+---+---+---+----------+
|4.9|2.4|3.3|1.0|versicolor|
|6.6|2.9|4.6|1.3|versicolor|
|5.2|2.7|3.9|1.4|versicolor|
+---+---+---+---+----------+
only showing top 3 rows

(↑csv だとカラム名などの情報が欠落しがちで使いづらい)

# テキスト形式だとcsvの他にjsonなども指定可能
# compressionで圧縮形式を指定することも出来る

df.write.json("testdata/iris_json", compression='gzip')

!ls testdata/iris_json
_SUCCESS
part-00000-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
part-00001-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
part-00002-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
part-00003-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
part-00004-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
part-00005-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
part-00006-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
part-00007-005412a7-1f0e-4ebf-a4d9-bfbbe53b0515-c000.json.gz
# 読み込みも同様

df_json = spark.read.json("testdata/iris_json")
# or df_json = spark.read.load("testdata/iris_json", format="json")

display(df_json)
df_json.show(3)
DataFrame[petal_length: double, petal_width: double, sepal_length: double, sepal_width: double, species: string]


+------------+-----------+------------+-----------+----------+
|petal_length|petal_width|sepal_length|sepal_width|   species|
+------------+-----------+------------+-----------+----------+
|         4.2|        1.2|         5.7|        3.0|versicolor|
|         4.2|        1.3|         5.7|        2.9|versicolor|
|         4.3|        1.3|         6.2|        2.9|versicolor|
+------------+-----------+------------+-----------+----------+
only showing top 3 rows

(↑json だと csv よりは型情報が保持される)

# データ分析・処理用途として適しているsnappy.parquetやzlib.orc形式など

df.write.parquet("testdata/iris_parquet", compression="snappy")
!ls testdata/iris_parquet

df.write.orc("testdata/iris_orc", compression="zlib")
!ls testdata/iris_orc
23/04/04 01:07:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




_SUCCESS
part-00000-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
part-00001-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
part-00002-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
part-00003-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
part-00004-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
part-00005-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
part-00006-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
part-00007-ccc33d97-5a54-492a-9cd2-12ae8adb52ea-c000.snappy.parquet
_SUCCESS
part-00000-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
part-00001-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
part-00002-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
part-00003-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
part-00004-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
part-00005-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
part-00006-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
part-00007-0fbad17c-8b1f-42ce-a3b2-06d173e191ed-c000.zlib.orc
# 読み込み

# parquet
print("parquet:")
df_parquet = spark.read.parquet("testdata/iris_parquet")
display(df_parquet)
df_parquet.show(3)

print("\n------------------\n")

# orc
print("orc:")
df_orc = spark.read.orc("testdata/iris_orc")
display(df_orc)
df_orc.show(3)
parquet:



DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]


+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         5.7|        3.0|         4.2|        1.2|versicolor|
|         5.7|        2.9|         4.2|        1.3|versicolor|
|         6.2|        2.9|         4.3|        1.3|versicolor|
+------------+-----------+------------+-----------+----------+
only showing top 3 rows


------------------

orc:



DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]


+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         5.7|        3.0|         4.2|        1.2|versicolor|
|         5.7|        2.9|         4.2|        1.3|versicolor|
|         6.2|        2.9|         4.3|        1.3|versicolor|
+------------+-----------+------------+-----------+----------+
only showing top 3 rows
# partitionを切って保存することも可能

df.write.save(
    "testdata/iris_with_partition",
    format="parquet",
    compression="snappy",
    partitionBy="species"
    )

!cd testdata/iris_with_partition && tree

.
├── _SUCCESS
├── species=setosa
│ ├── part-00000-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
│ ├── part-00001-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
│ └── part-00002-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
├── species=versicolor
│ ├── part-00002-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
│ ├── part-00003-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
│ ├── part-00004-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
│ └── part-00005-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
└── species=virginica
├── part-00005-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
├── part-00006-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet
└── part-00007-a847cdcf-1aea-4b71-a3cc-7ec2adddc8b0.c000.snappy.parquet

4 directories, 11 files

カラム"species"の値ごとに別ディレクトリが切られてデータが保存される

RDB のインデックスと同様、データアクセスの仕方に応じて適切に設定することでパフォーマンスを良くすることが出来る(し、不適切だと悪化する)

↑ の例だと、species を指定して分析を行うような場合、興味のある species しか見ないためアクセスするデータ量を限定することができる。
一方、かけるクエリの多くが複数の species にまたがるような場合はパフォーマンスの向上は見込めず、悪化する可能性もある。

# パーティションがある場合でも読み込み方法は同様

df_partition = spark.read.load("testdata/iris_with_partition")

display(df_partition)
df_partition.show(3)
DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]


+------------+-----------+------------+-----------+---------+
|sepal_length|sepal_width|petal_length|petal_width|  species|
+------------+-----------+------------+-----------+---------+
|         5.8|        2.8|         5.1|        2.4|virginica|
|         6.4|        3.2|         5.3|        2.3|virginica|
|         6.5|        3.0|         5.5|        1.8|virginica|
+------------+-----------+------------+-----------+---------+
only showing top 3 rows

他にもsaveAsTableで永続化したテーブルとして書き込みを行ったりも出来る。

扱えるデータ保存形式もData Sourcesのように、
他にも RDB(JDBC を使う)や、Hive Table など色々なものに対応している。

(補足)pyspark における pandas API

参考: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html

最近はかなり pandas に近い API を使って spark を扱う方法が整備されつつある様子

なお、Koalasという、spark を pandas っぽく使えるツールが存在するが、
ここなどを見るとこれが正式に pyspark に取り込まれた?様子。

import pyspark.pandas as ps

# pandas on spark DataFrameを作成する
psdf = ps.read_csv("testdata/iris.csv")

display(psdf.__class__) # pyspark.pandas.frame.DataFrame

psdf.head(5) # pandas DataFrameのようにheadで先頭を表示できる
/home/wsl-user/LocalApps/Mambaforge/envs/pyspark-intro/lib/python3.11/site-packages/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: If `index_col` is not specified for `read_csv`, the default index is attached which can cause additional overhead.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)



pyspark.pandas.frame.DataFrame

# filterや表示も通常のpandasのようにできている
psdf[psdf['sepal_length'] > 7.5]

# pysparkのDataFrameに変換

sdf = psdf.to_spark(index_col=["index"])

sdf.show(5)
+-----+------------+-----------+------------+-----------+-------+
|index|sepal_length|sepal_width|petal_length|petal_width|species|
+-----+------------+-----------+------------+-----------+-------+
|    0|         5.1|        3.5|         1.4|        0.2| setosa|
|    1|         4.9|        3.0|         1.4|        0.2| setosa|
|    2|         4.7|        3.2|         1.3|        0.2| setosa|
|    3|         4.6|        3.1|         1.5|        0.2| setosa|
|    4|         5.0|        3.6|         1.4|        0.2| setosa|
+-----+------------+-----------+------------+-----------+-------+
only showing top 5 rows
# 普通のpandasに変換
pdf = psdf.to_pandas()

display(pdf.__class__) # pandas.core.frame.DataFrame

pdf.head()
/home/wsl-user/LocalApps/Mambaforge/envs/pyspark-intro/lib/python3.11/site-packages/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)



pandas.core.frame.DataFrame

(参考) daskとの比較

pandas および numpy と互換性および連携性の高い API を持ち、遅延評価・並列分散処理を行うことが出来るライブラリ。

例えばデータサイズが大きいテーブルデータに対しては dask のデータフレームで遅延評価・並列分散処理を行っておき、
データサイズが小さくなるタイミングで pandas のデータフレームに変換する、といった今回 pyspark でやろうとしていることとほぼ同じことが出来る。

pyspark は dask に比べると、

  • クラスターなどを組める分、より本格的なビッグデータにも対応出来る
    • (今回紹介するようなやり方で、ライトに使うことも可能)
  • Java をバックエンドに動くことに加え、本格的に動かす場合はクラスター構築などが必要になるため、そのレベルでやると動作環境の構築のハードルは高い
  • pandas、numpy との親和性は dask の方が高い高かった
    • dask は遅延評価される以外は同じメソッドをそのまま使えるケースが多い
    • spark はむしろ SQL と互換性がある
    • ただ、先述の通り pandas API が整備されつつあるので、pyspark もある程度 pandas の使い方そのままで使えるようになっている模様
GitHubで編集を提案

Discussion