👻

Apache Sparkでデータフレームの解析する環境を整える

2024/08/27に公開

Apache Spark

Apache Spark は大規模データ処理のための分散フレームワークなので、複数のマシンを使って並列処理をさせることができる。Sparkを使えばデータフレームの解析を簡単に分散処理させることができる。また、Apache Kafka などのデータソースと連携できる、機械学習ライブラリが使える、AWS(Amazon Web Service)などのクラウドコンピューティングサービス上で使えるなど様々な拡張性がある。また、Scala, Java, Python, SQLなどを使って処理を記述することができる。

Sparkをとりあえずローカルにインストールする

今回はUbuntu 22.04のマシンに sparkとpysparkを入れてpythonから使う環境をセットアップする。
まずはJavaとScalaが必要なのでインストール

sudo apt update
sudo apt install default-jdk
sudo apt install scala
  • 追記

Ubuntu 24.04 では default-jdk が 21 になったので、
sudo apt install openjdk-11-jdk
で version 11をインストールする。また、以下のように11をデフォルトにする。

$ sudo update-alternatives --config java
There are 2 choices for the alternative java (providing /usr/bin/java).

  Selection    Path                                         Priority   Status
------------------------------------------------------------
* 0            /usr/lib/jvm/java-21-openjdk-amd64/bin/java   2111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java   1111      manual mode
  2            /usr/lib/jvm/java-21-openjdk-amd64/bin/java   2111      manual mode

Press <enter> to keep the current choice[*], or type selection number: 1
update-alternatives: using /usr/lib/jvm/java-11-openjdk-amd64/bin/java to provide /usr/bin/java (java) in manual mode
$ sudo update-alternatives --config javac
There are 2 choices for the alternative javac (providing /usr/bin/javac).

  Selection    Path                                          Priority   Status
------------------------------------------------------------
* 0            /usr/lib/jvm/java-21-openjdk-amd64/bin/javac   2111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/javac   1111      manual mode
  2            /usr/lib/jvm/java-21-openjdk-amd64/bin/javac   2111      manual mode

Press <enter> to keep the current choice[*], or type selection number: 1
update-alternatives: using /usr/lib/jvm/java-11-openjdk-amd64/bin/javac to provide /usr/bin/javac (javac) in manual mode

続いて、Sparkをダウンロード、解凍し、お好きなインストール先にコピーする

wget https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
tar xvzf spark-3.5.2-bin-hadoop3.tgz
mkdir /opt/spark
mv spark-3.5.1-bin-hadoop3/* /opt/spark/

/opt/spark/conf/spark-env.shにJavaの場所を教える

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64

環境変数の設定

~/.bashrc
export SPARK_HOME=/mnt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

次にpysparkのインストール

pip install pyspark

pysparkと打ってこんなのがでればOK

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.2
      /_/

Spark を試してみる

test.parquet を開いて最初の10行を表示

>>> df = spark.read.parquet("test.parquet")
>>> df.show(10)
+---+--------------------+-------+
|fID|             fTiming|fCharge|
+---+--------------------+-------+
|  1|1.114106081875542E14|   5742|
|  0|1.114106105298723...|    155|
|  0|1.114106112329469...|   1703|
|  1|1.114106112329506...|    958|
|  0|1.114106120337208...|   1818|
|  0|1.114106125184000...|    178|
|  1|1.114106132377145E14|    105|
|  1|1.114106144537507...|    794|
|  0|1.114106417419858E14|   4164|
|  0|1.114106424617431...|    200|
+---+--------------------+-------+
only showing top 10 rows

これは2つの検出器(fID=0,1)のヒット情報が入ったTTreeをparquetにしたもの。
例えばfilter()関数を使ってfID=0のものだけ表示することができる。

>>> df.filter("fID=0").show(10)
+---+--------------------+-------+
|fID|             fTiming|fCharge|
+---+--------------------+-------+
|  0|1.114106105298723...|    155|
|  0|1.114106112329469...|   1703|
|  0|1.114106120337208...|   1818|
|  0|1.114106125184000...|    178|
|  0|1.114106417419858E14|   4164|
|  0|1.114106424617431...|    200|
|  0|1.114106434624439...|   1530|
|  0| 1.11410645384044E14|   7726|
|  0|1.114106144537947...|   1012|
|  0|1.114106166012966...|   1625|
+---+--------------------+-------+

pandasでも同じことができるが、sparkだと分散処理ができるのでたくさんのデータを処理する時に向いている。

Spark Clusterを作る

せっかくSparkを使うのであれば分散処理させたい。ということで少し試してみる。

Master の設定

まずはMasterノードの設定。ローカルインストールと同じ手順を終わらせる。ここでは/opt/sparkにインストールされているとする。

/opt/spark/conf/slaves にworkerノードのホスト名をリストする

host-1
host-2
...

/opt/spark/conf/spark-env.sh に以下を追加。

export SPARK_MASTER_HOST='0.0.0.0'
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64

サービスの登録
/etc/systemd/system/spark-master.service

[Unit]
Description=Apache Spark Master
After=network.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/opt/spark/sbin/start-master.sh
ExecStop=/opt/spark/sbin/stop-master.sh

[Install]
WantedBy=multi-user.target

サービスの起動

systemctl daemon-reload
systemctl start spark-master
systemctl enable spark-master

Worker の設定

/opt/spark/conf/spark-env.sh に以下を追加。Masterを起動したホスト名を入れる。また、後述のHDFSのためにHDFS環境を設定したユーザー名をHADOOP_USER_NAMEに入れる。

export SPARK_MASTER_HOST='master-host-hame'
export HADOOP_USER_NAME=hadoop
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64

/mnt/spark/conf/spark-defaults.conf にメモリーの設定を入れる。デフォルトだと1gらしいので、このマシンのスペックに応じて増やしておく。

spark.executor.memory 30g

サービスの登録。ExecStartの引数にMasterのホスト名を入れる。
/etc/systemd/system/spark-worker.service

[Unit]

Description=Apache Spark Worker

After=network.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/mnt/spark/sbin/start-slave.sh spark://master-host-name:7077
ExecStop=/mnt/spark/sbin/stop-slave.sh

[Install]
WantedBy=multi-user.target

起動

systemctl daemon-reload
systemctl start spark-worker
systemctl enable spark-worker

各workerを起動したら、masterの port8080 にWebブラウザでアクセスする。

立てたworkerが確認できる。

クラスタを使うには

pyspark --master spark:master-host-name:7077

で起動

PySparkShellが現れたのが確認できる。

HDFS の設定

クラスタにジョブを投げる場合、入出力ファイルは全てのworkerからアクセスできる必要がある。NFS等で共有しても良いのだが、Permissionの問題がややこしいのでHDFSという分散型のファイルシステムサーバを立ててしまうことにした。ここでは、namenodeとdatanodeを一つずつ立てるシンプルな構成にする。

HDFSのインストール

Sparkの時と同様にHadoopをダウンロードし、インストール先にコピーする

wget https://downloads.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
tar xvf hadoop-3.4.0.tar.gz
sudo mv hadoop-3.4.0 /opt/hadoop

次にhdfs用のユーザーを作る

sudo adduser hadoop

.bashrcの設定

export HADOOP_HOME=/opt/hadoop
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64

Namenodeの設定

HDFSには実際にファイルをストアするDatanodeと、管理するNamenodeがある。
今回はそれぞれ一つずつの最低限のセットアップをする。

/opt/hadoop/etc/hadoop/core-site.xml
fs.defaultFSにnamenode-serverのホスト名を入れる。また、tmpファイルの場所をHOMEの下にする。

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://namenode-server:9000</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/home/hadoop/tmp</value>
  </property>
</configuration>

/opt/hadoop/etc/hadoop/hdfs-site.xml
namenodeのファイルの保存先もHOMEの中にし、httpで外からブラウザで見れるように設定、dfs.permissionsをfalseにすることで、アップロードされたファイルにpermissionを付けないようにする。

<configuration>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:///home/hadoop/namenode</value>
  </property>
  <property>
    <name>dfs.namenode.http-address</name>
    <value>0.0.0.0:9870</value>
  </property>
  <property>
    <name>dfs.permissions</name>
    <value>false</value>
  </property>
</configuration>

/opt/hadoop/etc/hadoop/hadoop-env.sh
logディレクトリもHOME内にして、
HDFS_NAMENODE_USERに新しく作ったユーザー名を入れる。

export HADOOP_LOG_DIR=/home/hadoop/log
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export HDFS_NAMENODE_USER=hadoop

/opt/hadoop/etc/hadoop/slavesにdatanodeをホストするサーバ一覧をリストする。

datanode-server1

namenodeをフォーマット

hdfs namenode --format

起動

hdfs --daemon start namenode

Datanodeの設定

Datanodeを立てるサーバはディスク容量の大きいものが良い。
先ほどと同じくhadoopアカウントを作成してhadoopをインストールする。

/opt/hadoop/etc/hadoop/core-site.xml
fs.defaultFSに先ほど設定したNamenodeサーバのホスト名を入れる。

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://namenode-server:9000</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/home/hadoop/tmp</value>
  </property>
</configuration>

/opt/hadoop/etc/hadoop/hdfs-site.xml
dfs.replication はデータの複製数。今回は1で。
dfs.datanode.hostnameに このサーバのホスト名を入れる。
dfs.datanode.data.dirはデータの保存先。容量のあるパスを選択する。

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>dfs.datanode.hostname</name>
    <value>datanode-server</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>file:///mnt/data/hdfs/datanode</value>
  </property>
</configuration>

/opt/hadoop/etc/hadoop/hadoop-env.sh
HDFS_DATANODE_USERの設定。

export HADOOP_LOG_DIR=/home/hadoop/log
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export HDFS_DATANODE_USER=hadoop

起動

hdfs datanode --format
hdfs --daemon start datanode

ブラウザでnamenode-server:9870にアクセス。

LiveNodesが1になってDatanodeが認識されていたらOK

HDFS にファイルを送る

クライアントからテストファイルをHDFS上に送るには、まずクライアントにもhadoopをインストールする。
ただし、設定に関しては
core-site.xmlでfs.defaultFSにnamenode-server:9000を登録しておくだけでよい。

hdfs dfs -mkdir /test
hdfs dfs -put test.parquet /test/
hdfs dfs -ls /test

namenodeの設定でdfs.permissionsをfalseにしていたので、どのユーザーからでも書き込みができる。

クラスタでsparkを走らせる

sparkでジョブを投げるには python でスクリプトを書いて、spark-submitコマンドを使う。
試しに、Integer型の列"fCharge"の値に0から1の乱数を足してfloat型にするスクリプトを投げてみる。
スクリプトは以下の通り。 CLUSTER_HOST_NAMEにHDFSのNamenodeとSpark masterが走っているホスト名を入れる。

test.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

CLUSTER_HOST_NAME="hostname"
# Initialize Spark session
spark = SparkSession.builder \
        .master("spark://"+CLUSTER_HOST_NAME+":7077") \
        .appName("testApp") \
        .getOrCreate()

# Read the parquet file
df = spark.read.parquet("hdfs://"+CLUSTER_HOST_NAME+":9000/test/test.parquet")
df = df.filter("fID==0").withColumn("fCharge2", F.col("fCharge") + F.rand())

df.write.parquet("hdfs://"+CLUSTER_HOST_NAME+":9000/test/test2.parquet")

データフレームの操作は以下の行だけ。

df = df.filter("fID==0").withColumn("fCharge2", F.col("fCharge") + F.rand())

filter()でID=0を選び、withColumn()でfChargeカラムにrand()を足したものを"fCharge2"として追加する。

実行

spark-submit test.py

結果

>>> df = spark.read.parquet("hdfs://hostname:9000/test/test2.parquet")
>>> df.show(10)
+---+--------------------+-------+------------------+                           
|fID|             fTiming|fCharge|          fCharge2|
+---+--------------------+-------+------------------+
|  0|1.114106105298723...|    155| 155.5410104239401|
|  0|1.114106112329469...|   1703|1703.4675861895814|
|  0|1.114106120337208...|   1818| 1818.762840815937|
|  0|1.114106125184000...|    178|178.68623659663396|
|  0|1.114106417419858E14|   4164| 4164.791661649154|
|  0|1.114106424617431...|    200|200.08292707529353|
|  0|1.114106434624439...|   1530|1530.0493465133006|
|  0| 1.11410645384044E14|   7726| 7726.876387219425|
|  0|1.114106144537947...|   1012|1012.1130774814084|
|  0|1.114106166012966...|   1625|1625.8021897635952|
+---+--------------------+-------+------------------+
only showing top 10 rows

floatの列fCharge2が追加された。

おわりに

ちょっと面倒だけどSpark clusterとHDFSをセットアップすることで、Sparkでデータ解析する環境が整ってやる気になる。次はSparkで簡単に解析できる例を挙げていきたい。

Discussion