🙌
Dataprocとpysparkを使って分散処理で行列分解
やってみること
- GCP上でDataprocクラスタの作成
- movielensのデータを使ってmovieIdごとのベクトルをGCSに吐き出すスクリプトを作成
- gcloudコマンドを使ってジョブを送信して処理を行う
Dataprocクラスタの準備
terraformで
- dataprocクラスタを作成
- dataprocクラスタが使うGCSバケット
の作成。お試しなのでcloud shellなどでterraform initとterraform applyをした。
resource "google_storage_bucket" "dataproc_bucket" {
# Dataprocのファイルを保存するためのGCS Bucket
name = "dataproc-sugasuga-bucket"
location = "asia-northeast1"
}
resource "google_dataproc_cluster" "dataproc_cluster" {
# Dataproc本体の設定
name = "dataproc-cluster"
region = "us-central1"
graceful_decommission_timeout = "120s"
cluster_config {
staging_bucket = "dataproc-sugasuga-bucket"
}
}
pysparkスクリプトの準備
movielensのデータを事前にcsvとしてGCSにおいておいた
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import *
spark = SparkSession.builder.appName("movielens").getOrCreate()
schema = StructType([
StructField("userId", IntegerType(), True),
StructField('movieId', IntegerType(), True),
StructField('rating', FloatType(), True),
])
# データの読み込み
df = spark.read.format("csv").load('gs://dataproc-script-sugasuga/ratings.csv', schema = schema, header = True)
# モデルの定義
als = ALS(
rank=20,
maxIter=10,
regParam=0.1,
userCol='userId',
itemCol='movieId',
ratingCol='rating',
seed=0
)
# モデルの学習
model = als.fit(df)
# movieIdごとのベクトルを取得
item_vector = model.itemFactors.cache()
# ベクトルをGCSに出力する
item_vector.write.json("gs://dataproc-script-sugasuga/item_vector")
ジョブを送信する
gcloudコマンドでジョブを送信する。
他にもPOSTだったり、python clientなど、方法はたくさんあるみたい。
gcloud dataproc jobs submit pyspark \
gs://dataproc-script-sugasuga/script.py \
--cluster=dataproc-cluster \
--region=us-central1
出力がでてくる
Job [272e9ada36744beaabec9e597a61af0d] submitted.
Waiting for job output...
( 略 )
jobUuid: ee734b63-51c6-358a-b2cf-7ae8a8913ec4
( 略 )
yarnApplications:
- name: movielens
progress: 1.0
state: FINISHED
trackingUrl: http://dataproc-cluster-m:8088/proxy/application_1670689953838_0001/
GCSを確認してみると、目的としていた出力が完了している
所感
個人的に、hadoopに対して良い印象がなかったため、Dataprocに対しても印象が良くなかった。
しかし、Dataprocが大規模データに対して分散処理で行列分解を行える、というのは興味をそそれれた。今後、どのくらいの性能になるのかを調べてみようと思う。
Discussion