Apache Sparkでデータフレームの解析する環境を整える
Apache Spark
Apache Spark は大規模データ処理のための分散フレームワークなので、複数のマシンを使って並列処理をさせることができる。Sparkを使えばデータフレームの解析を簡単に分散処理させることができる。また、Apache Kafka などのデータソースと連携できる、機械学習ライブラリが使える、AWS(Amazon Web Service)などのクラウドコンピューティングサービス上で使えるなど様々な拡張性がある。また、Scala, Java, Python, SQLなどを使って処理を記述することができる。
Sparkをとりあえずローカルにインストールする
今回はUbuntu 22.04のマシンに sparkとpysparkを入れてpythonから使う環境をセットアップする。
まずはJavaとScalaが必要なのでインストール
sudo apt update
sudo apt install default-jdk
sudo apt install scala
続いて、Sparkをダウンロード、解凍し、お好きなインストール先にコピーする
wget https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
tar xvzf spark-3.5.2-bin-hadoop3.tgz
mkdir /opt/spark
mv spark-3.5.1-bin-hadoop3/* /opt/spark/
/opt/spark/conf/spark-env.shにJavaの場所を教える
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
環境変数の設定
export SPARK_HOME=/mnt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
次にpysparkのインストール
pip install pyspark
pyspark
と打ってこんなのがでればOK
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.2
/_/
Spark を試してみる
test.parquet を開いて最初の10行を表示
>>> df = spark.read.parquet("test.parquet")
>>> df.show(10)
+---+--------------------+-------+
|fID| fTiming|fCharge|
+---+--------------------+-------+
| 1|1.114106081875542E14| 5742|
| 0|1.114106105298723...| 155|
| 0|1.114106112329469...| 1703|
| 1|1.114106112329506...| 958|
| 0|1.114106120337208...| 1818|
| 0|1.114106125184000...| 178|
| 1|1.114106132377145E14| 105|
| 1|1.114106144537507...| 794|
| 0|1.114106417419858E14| 4164|
| 0|1.114106424617431...| 200|
+---+--------------------+-------+
only showing top 10 rows
これは2つの検出器(fID=0,1)のヒット情報が入ったTTreeをparquetにしたもの。
例えばfilter()関数を使ってfID=0のものだけ表示することができる。
>>> df.filter("fID=0").show(10)
+---+--------------------+-------+
|fID| fTiming|fCharge|
+---+--------------------+-------+
| 0|1.114106105298723...| 155|
| 0|1.114106112329469...| 1703|
| 0|1.114106120337208...| 1818|
| 0|1.114106125184000...| 178|
| 0|1.114106417419858E14| 4164|
| 0|1.114106424617431...| 200|
| 0|1.114106434624439...| 1530|
| 0| 1.11410645384044E14| 7726|
| 0|1.114106144537947...| 1012|
| 0|1.114106166012966...| 1625|
+---+--------------------+-------+
pandasでも同じことができるが、sparkだと分散処理ができるのでたくさんのデータを処理する時に向いている。
Spark Clusterを作る
せっかくSparkを使うのであれば分散処理させたい。ということで少し試してみる。
Master の設定
まずはMasterノードの設定。ローカルインストールと同じ手順を終わらせる。ここでは/opt/sparkにインストールされているとする。
/opt/spark/conf/slaves にworkerノードのホスト名をリストする
host-1
host-2
...
/opt/spark/conf/spark-env.sh に以下を追加。
export SPARK_MASTER_HOST='0.0.0.0'
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
サービスの登録
/etc/systemd/system/spark-master.service
[Unit]
Description=Apache Spark Master
After=network.target
[Service]
Type=forking
User=root
Group=root
ExecStart=/opt/spark/sbin/start-master.sh
ExecStop=/opt/spark/sbin/stop-master.sh
[Install]
WantedBy=multi-user.target
サービスの起動
systemctl daemon-reload
systemctl start spark-master
systemctl enable spark-master
Worker の設定
/opt/spark/conf/spark-env.sh に以下を追加。Masterを起動したホスト名を入れる。また、後述のHDFSのためにHDFS環境を設定したユーザー名をHADOOP_USER_NAMEに入れる。
export SPARK_MASTER_HOST='master-host-hame'
export HADOOP_USER_NAME=hadoop
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
/mnt/spark/conf/spark-defaults.conf にメモリーの設定を入れる。デフォルトだと1gらしいので、このマシンのスペックに応じて増やしておく。
spark.executor.memory 30g
サービスの登録。ExecStartの引数にMasterのホスト名を入れる。
/etc/systemd/system/spark-worker.service
[Unit]
Description=Apache Spark Worker
After=network.target
[Service]
Type=forking
User=root
Group=root
ExecStart=/mnt/spark/sbin/start-slave.sh spark://master-host-name:7077
ExecStop=/mnt/spark/sbin/stop-slave.sh
[Install]
WantedBy=multi-user.target
起動
systemctl daemon-reload
systemctl start spark-worker
systemctl enable spark-worker
各workerを起動したら、masterの port8080 にWebブラウザでアクセスする。
立てたworkerが確認できる。
クラスタを使うには
pyspark --master spark:master-host-name:7077
で起動
PySparkShellが現れたのが確認できる。
HDFS の設定
クラスタにジョブを投げる場合、入出力ファイルは全てのworkerからアクセスできる必要がある。NFS等で共有しても良いのだが、Permissionの問題がややこしいのでHDFSという分散型のファイルシステムサーバを立ててしまうことにした。ここでは、namenodeとdatanodeを一つずつ立てるシンプルな構成にする。
HDFSのインストール
Sparkの時と同様にHadoopをダウンロードし、インストール先にコピーする
wget https://downloads.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
tar xvf hadoop-3.4.0.tar.gz
sudo mv hadoop-3.4.0 /opt/hadoop
次にhdfs用のユーザーを作る
sudo adduser hadoop
.bashrcの設定
export HADOOP_HOME=/opt/hadoop
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Namenodeの設定
HDFSには実際にファイルをストアするDatanodeと、管理するNamenodeがある。
今回はそれぞれ一つずつの最低限のセットアップをする。
/opt/hadoop/etc/hadoop/core-site.xml
fs.defaultFSにnamenode-serverのホスト名を入れる。また、tmpファイルの場所をHOMEの下にする。
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode-server:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/tmp</value>
</property>
</configuration>
/opt/hadoop/etc/hadoop/hdfs-site.xml
namenodeのファイルの保存先もHOMEの中にし、httpで外からブラウザで見れるように設定、dfs.permissionsをfalseにすることで、アップロードされたファイルにpermissionを付けないようにする。
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///home/hadoop/namenode</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>0.0.0.0:9870</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
/opt/hadoop/etc/hadoop/hadoop-env.sh
logディレクトリもHOME内にして、
HDFS_NAMENODE_USERに新しく作ったユーザー名を入れる。
export HADOOP_LOG_DIR=/home/hadoop/log
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export HDFS_NAMENODE_USER=hadoop
/opt/hadoop/etc/hadoop/slavesにdatanodeをホストするサーバ一覧をリストする。
datanode-server1
namenodeをフォーマット
hdfs namenode --format
起動
hdfs --daemon start namenode
Datanodeの設定
Datanodeを立てるサーバはディスク容量の大きいものが良い。
先ほどと同じくhadoopアカウントを作成してhadoopをインストールする。
/opt/hadoop/etc/hadoop/core-site.xml
fs.defaultFSに先ほど設定したNamenodeサーバのホスト名を入れる。
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode-server:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/tmp</value>
</property>
</configuration>
/opt/hadoop/etc/hadoop/hdfs-site.xml
dfs.replication はデータの複製数。今回は1で。
dfs.datanode.hostnameに このサーバのホスト名を入れる。
dfs.datanode.data.dirはデータの保存先。容量のあるパスを選択する。
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.datanode.hostname</name>
<value>datanode-server</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///mnt/data/hdfs/datanode</value>
</property>
</configuration>
/opt/hadoop/etc/hadoop/hadoop-env.sh
HDFS_DATANODE_USERの設定。
export HADOOP_LOG_DIR=/home/hadoop/log
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export HDFS_DATANODE_USER=hadoop
起動
hdfs datanode --format
hdfs --daemon start datanode
ブラウザでnamenode-server:9870にアクセス。
LiveNodesが1になってDatanodeが認識されていたらOK
HDFS にファイルを送る
クライアントからテストファイルをHDFS上に送るには、まずクライアントにもhadoopをインストールする。
ただし、設定に関しては
core-site.xmlでfs.defaultFSにnamenode-server:9000を登録しておくだけでよい。
hdfs dfs -mkdir /test
hdfs dfs -put test.parquet /test/
hdfs dfs -ls /test
namenodeの設定でdfs.permissionsをfalseにしていたので、どのユーザーからでも書き込みができる。
クラスタでsparkを走らせる
sparkでジョブを投げるには python でスクリプトを書いて、spark-submitコマンドを使う。
試しに、Integer型の列"fCharge"の値に0から1の乱数を足してfloat型にするスクリプトを投げてみる。
スクリプトは以下の通り。 CLUSTER_HOST_NAME
にHDFSのNamenodeとSpark masterが走っているホスト名を入れる。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
CLUSTER_HOST_NAME="hostname"
# Initialize Spark session
spark = SparkSession.builder \
.master("spark://"+CLUSTER_HOST_NAME+":7077") \
.appName("testApp") \
.getOrCreate()
# Read the parquet file
df = spark.read.parquet("hdfs://"+CLUSTER_HOST_NAME+":9000/test/test.parquet")
df = df.filter("fID==0").withColumn("fCharge2", F.col("fCharge") + F.rand())
df.write.parquet("hdfs://"+CLUSTER_HOST_NAME+":9000/test/test2.parquet")
データフレームの操作は以下の行だけ。
df = df.filter("fID==0").withColumn("fCharge2", F.col("fCharge") + F.rand())
filter()でID=0を選び、withColumn()でfChargeカラムにrand()を足したものを"fCharge2"として追加する。
実行
spark-submit test.py
結果
>>> df = spark.read.parquet("hdfs://hostname:9000/test/test2.parquet")
>>> df.show(10)
+---+--------------------+-------+------------------+
|fID| fTiming|fCharge| fCharge2|
+---+--------------------+-------+------------------+
| 0|1.114106105298723...| 155| 155.5410104239401|
| 0|1.114106112329469...| 1703|1703.4675861895814|
| 0|1.114106120337208...| 1818| 1818.762840815937|
| 0|1.114106125184000...| 178|178.68623659663396|
| 0|1.114106417419858E14| 4164| 4164.791661649154|
| 0|1.114106424617431...| 200|200.08292707529353|
| 0|1.114106434624439...| 1530|1530.0493465133006|
| 0| 1.11410645384044E14| 7726| 7726.876387219425|
| 0|1.114106144537947...| 1012|1012.1130774814084|
| 0|1.114106166012966...| 1625|1625.8021897635952|
+---+--------------------+-------+------------------+
only showing top 10 rows
floatの列fCharge2が追加された。
おわりに
ちょっと面倒だけどSpark clusterとHDFSをセットアップすることで、Sparkでデータ解析する環境が整ってやる気になる。次はSparkで簡単に解析できる例を挙げていきたい。
Discussion