Dataprocを利用してPySparkを触ってみた(feat.Jupyter Notebook)
はじめに
こんにちは、クラウドエース データ/MLディビジョン所属の金です。
最近ビッグデータの重要度が高くなっているのでビッグデータ処理ができるさまざまな方法の一つのPySparkを試してみます。
今回はGoogle CloudサービスのDataprocでクラスタを起動し、そこからJupyter Notebookを起動してPySparkを試してみます。
Cloud Dataprocとは?
Dataprocは、「Apache Hadoop、Apache Sparkなどのクラスタを簡単かつ効率が高い方法で実行するための」フルマネージドサービスです。
簡単に言うとビッグデータ処理・分析を手伝えるサービスです。
事前準備
Cloud Dataproc API画面でAPIを有効にします。
クラスタの作成
- CREATE CLUSTERボタンをクリックします。
- Compute Engine上のクラスタボタンをクリックします。
- クラスタの設定のタブではさまざまな設定が可能です。
-
名前
適切なクラスタ名を入れてください。 -
ロケーション(リージョンとゾーン)
リージョンとゾーンを選択してください。今回はus上にしました。 -
クラスタタイプ
最初起動するクラスタタイプも自分の状況に合わせて選択できますがタイプによって料金が違うので注意してください。 -
自動スケーリング
自動スケーリングも設定可能です。今回はなしで実行します。 -
バージョン
初期設定のまま(2.0-debian10)にします。 -
コンポーネント
今回は「Jupyter Notebook」上で実行する予定なので
「コンポーネント ゲートウェイを有効にする」と「Jupyter Notebook」にチェックをします。
他にノードの構成、クラスタのカスタマイズ、セキュリティ管理タブもありますが今回は省略します。
-
クラスタの設定が終わったら作成ボタンをクリックします。
-
ステータスが実行中になるまでしばらく待てばクラスタ作成は成功です。
Jupyter Notebook起動
-
実行中のままクラスタの名前をクリックするとクラスタの詳細画面が表示されます。
ウェブインターフェイスタブをクリックしてJupyterまたはJupyterLabをクリックします。
(私はJupyterLabを利用しましたがどっちも可能なので楽な方で起動してください。)
-
Notebook上PySparkをクリックして新しいファイルを作成します。
-
PySpark処理を簡単に確認します。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Sparkセッションの作成
spark = SparkSession.builder.appName("DataFrameSample").getOrCreate()
# スキーマの定義
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
# データの作成
data = [
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "David"),
(5, "Eve")
]
# DataFrameの作成
df = spark.createDataFrame(data, schema)
# 結果を表示する
df.show()
+----+-------+
| id | name|
+----+-------+
| 1 | Alice|
| 2 | Bob|
| 3 |Charlie|
| 4 | David|
| 5 | Eve|
+----+-------+
問題なく処理されるのを確認できました。
せっかくなのでGCSとBigqueryからのデータも取り込んでみます。
GCSからデータの取り込み
準備したCSVファイルをGCSに格納してからgs://<My Bucket>/<My File>
の部分を修正してください。
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
# Spark設定
conf = SparkConf().setAppName("ReadFromGcs")
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
# GCS上のデータをヘッダ付きで取り込む
df = spark.read.csv("gs://<My Bucket>/<My File>", header=True, inferSchema=True)
# 結果を表示する
df.show()
+---+------------+--------+----------+
| ID| mail| name| date|
+---+------------+--------+----------+
| 1|aaa@aaaaa.jp| Alice|2016-08-27|
| 2|bbb@bbbbb.jp| Bob|2016-08-27|
| 3|ccc@ccccc.jp| Charlie|2016-08-27|
+---+------------+--------+----------+
Bigqueryからデータの取り込み
from google.cloud import bigquery
from pyspark.sql import SparkSession
# BigQueryのクエリを定義する
query = """
SELECT
gameId,
gameNumber,
seasonId,
year
FROM `bigquery-public-data.baseball.schedules`
"""
# BigQueryに接続して、クエリを実行する
client = bigquery.Client()
df = client.query(query).to_dataframe()
# SparkSessionを作成する
spark = SparkSession.builder.appName("BigQuery to PySpark").getOrCreate()
# PySpark DataFrameに変換する
df_spark = spark.createDataFrame(df)
# 結果を表示する
df_spark.show()
+--------------------+----------+--------------------+----+
| gameId|gameNumber| seasonId|year|
+--------------------+----------+--------------------+----+
|e14b6493-9e7f-404...| 1|565de4be-dc80-484...|2016|
|1f32b347-cbcb-4c3...| 1|565de4be-dc80-484...|2016|
|0c2292d1-7398-48b...| 1|565de4be-dc80-484...|2016|
|8fbec734-a15a-42a...| 1|565de4be-dc80-484...|2016|
|..... |..... |..... |....|
# 結果をDF中で全部表示する
df_spark.show(truncate=False)
+------------------------------------+----------+------------------------------------+----+
|gameId |gameNumber|seasonId |year|
+------------------------------------+----------+------------------------------------+----+
|e14b6493-9e7f-404f-840a-8a680cc364bf|1 |565de4be-dc80-4849-a7e1-54bc79156cc8|2016|
|1f32b347-cbcb-4c31-a145-0e685306d168|1 |565de4be-dc80-4849-a7e1-54bc79156cc8|2016|
|0c2292d1-7398-48be-bf8e-b41dad5e1a43|1 |565de4be-dc80-4849-a7e1-54bc79156cc8|2016|
|8fbec734-a15a-42ab-8d51-60790de7750b|1 |565de4be-dc80-4849-a7e1-54bc79156cc8|2016|
|..... |..... |..... |....|
追加:Jupyter Notebookで使ったファイルの保存先は?
Jupyter Notebookのファイル保存先が気になりますね、、、
保存先は基本GCSの<自動付与されたバケット名>/notebooks/jupyter
下に格納されます。
ファイルのアップロード・ダウンロード・削除などはGCSまたはJupyter両方で可能です。
自動で生成されるGCSのバケット名が汚い!!と思った方はクラスタ作成の時保存するStorageを指定するのも可能です。
省略してた「クラスタのカスタマイズ」タブの下の部分をみると「Cloud Storageステージング バケット」があるのでそこから参照ボタンをクリックして保存したいバケットを選んでください。それでファイルの保存先が指定できます。
おわりに
今回はDataprocサービスを利用して簡単にJupyter Notebook上でPySparkを試してみました。
PySparkはETLから分析まで高速の処理が可能で、以前からDataFrameを使ってた方は楽に使えるのかと個人的に思いました。
クラスタのハンドリングに少し手数がかかるかもしれませんが、Dataproc Serverlessもサービスしてるらしいので引き続き試してみたいと思います。
なお、充実なPySparkの公式ドキュメントもあるのでもっと触ってみたい方はぜひご覧ください。
PySparkの公式ドキュメント
Discussion