💫

Apache Sparkのことはじめ

2024/10/15に公開

これはなに?

Apache Sparkについて、触り始めたのでそのメモです。

環境準備

以下の記事を参考に、DockerでJupyterLabとpyspark環境を立ち上げます。

docker run -it -p 8888:8888 jupyter/pyspark-notebook

コマンド調査

データフレームを作成+集計

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# SparkSessionを使用してDataFrameを作成する
spark = (SparkSession
        .builder
        .appName("AuthorsAges")
        .getOrCreate())
# DataFrameを作成する
data_df = spark.createDataFrame([
    ("Brooke", 20), ("Denny", 31), ("Jules", 30),
    ("TD", 35), ("Brooke", 25)
], ["name", "age"])
# 同じnameでグループ化し、それらのageでaggregateし、averageを算出する
avg_df = data_df.groupBy("name").agg(avg("age"))

# 最後に実行結果を表示する
avg_df.show()

スキーマを定義してDataFrameを作成

# スキーマを定義する
from pyspark.sql import SparkSession

# DDLを使用してデータのスキーマを定義する
schema = "`ID` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

# 静的データを作成する
data = [[1, "Jules", "Damji", "https://tinyurul.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]]]

if __name__ == "__main__":
    # SparkSessionを作成する
    spark = (SparkSession
            .builder
            .appName("Example-3_6")
            .getOrCreate())
    blogs_df = spark.createDataFrame(data, schema)
    blogs_df.show()
    print(blogs_df.printSchema())

行 Row

# 行 Row
from pyspark.sql import Row
blog_row = Row(6, "Reynold", "Xin",["Twitter", "LinkedIn"])
print(blog_row[1])
blog_row[3][0]

Rowオブジェクトを使用して、DataFrameを作成

# Rowオブジェクトを使用して、DataFrameを作成できる
rows = [Row("MZ", "CA"), Row("RX", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()

参考

【環境構築】
https://qiita.com/mangano-ito/items/dac5582a331d40a484ad

Discussion