🙌

Dataprocとpysparkを使って分散処理で行列分解

2022/12/11に公開

やってみること

  • GCP上でDataprocクラスタの作成
  • movielensのデータを使ってmovieIdごとのベクトルをGCSに吐き出すスクリプトを作成
  • gcloudコマンドを使ってジョブを送信して処理を行う

Dataprocクラスタの準備

terraformで

  • dataprocクラスタを作成
  • dataprocクラスタが使うGCSバケット

の作成。お試しなのでcloud shellなどでterraform initとterraform applyをした。

resource "google_storage_bucket" "dataproc_bucket" {
  # Dataprocのファイルを保存するためのGCS Bucket
  name          = "dataproc-sugasuga-bucket"
  location      = "asia-northeast1"
}

resource "google_dataproc_cluster" "dataproc_cluster" {
  # Dataproc本体の設定
  name                          = "dataproc-cluster"
  region = "us-central1"
  graceful_decommission_timeout = "120s"

  cluster_config {
    staging_bucket = "dataproc-sugasuga-bucket"

  }
}

pysparkスクリプトの準備

movielensのデータを事前にcsvとしてGCSにおいておいた

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import *

spark = SparkSession.builder.appName("movielens").getOrCreate()

schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', FloatType(), True),
])

# データの読み込み
df = spark.read.format("csv").load('gs://dataproc-script-sugasuga/ratings.csv', schema = schema, header = True)


# モデルの定義
als = ALS(
    rank=20, 
    maxIter=10,
    regParam=0.1,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    seed=0
)

# モデルの学習
model = als.fit(df)

# movieIdごとのベクトルを取得
item_vector = model.itemFactors.cache()

# ベクトルをGCSに出力する
item_vector.write.json("gs://dataproc-script-sugasuga/item_vector")

ジョブを送信する

gcloudコマンドでジョブを送信する。

他にもPOSTだったり、python clientなど、方法はたくさんあるみたい。

gcloud dataproc jobs submit pyspark \
    gs://dataproc-script-sugasuga/script.py \
    --cluster=dataproc-cluster  \
    --region=us-central1

出力がでてくる

Job [272e9ada36744beaabec9e597a61af0d] submitted.
Waiting for job output...
( 略 ) 
jobUuid: ee734b63-51c6-358a-b2cf-7ae8a8913ec4
( 略 ) 
yarnApplications:
- name: movielens
  progress: 1.0
  state: FINISHED
  trackingUrl: http://dataproc-cluster-m:8088/proxy/application_1670689953838_0001/
  

GCSを確認してみると、目的としていた出力が完了している

所感

個人的に、hadoopに対して良い印象がなかったため、Dataprocに対しても印象が良くなかった。

しかし、Dataprocが大規模データに対して分散処理で行列分解を行える、というのは興味をそそれれた。今後、どのくらいの性能になるのかを調べてみようと思う。

Discussion