Docker上で動かして学ぶApache Spark
Apache Hadoopとは
Apache Hadoop(以下、Hadoop)の概要を把握しておくとApache Spark(以下、Spark)を理解しやすいため、先にHadoopの紹介をします。
HadoopはSparkと同じく分散処理フレームワークです。
Hadoopの処理の動きとしては大きく分けて以下3つになります。
- データを貯める
HDFS - データ処理のリソースを管理する
YARN - 処理する
MapReduceフレームワーク
Sparkは上記3の「MapReduceフレームワーク」に相当します。
Apache Sparkとは
Sparkはオープンソースのクラスターコンピューティング向けの分散処理フレームワークです。
Sparkを使用することで分散処理の複雑な部分を抽象化できるため、シンプルなコードを実行するだけで並列で数百台というコンピュータでの計算を実現することができます。
Sparkの特徴
高速な処理
Sparkはデータの格納場所をディスクではなくメモリにすることにより、Hadoop/MapReduceと比較しての最大100倍高速に処理できます。
様々な言語をサポートしている
Java, Scala, Python, R, SQLなどを使用してアプリケーションを実行することができます。
標準ライブラリが充実している
Sparkの標準ライブラリに以下が備わっています。
- Spark SQL
- 構造化されたデータを処理できる
- Spark Streaming
- Twitterなどのストリームデータをほぼリアルタイムで処理できる
- MLlib
- 簡易的かつスケーラブルに機械学習をすることができる
- GraphX
- 大容量のグラフデータを分散処理できる
様々な場所で動作する
SparkはHadoopはもちろんのことApache Mesos、Kubernetes、スタンドアロン、クラウド上などで動作します。
また、HDFS、Amazon S3、Google Cloud Storageなど多様なデータソースにアクセスすることができます。
高い耐障害性
Hadoop同様高い耐障害性を持ちます。
RDDのデータが一部欠損したとしてもRDD内に保持している「入力元となるRDD」と「処理内容」を元に復元することができます。
Sparkを学ぶ上での頻出単語
HDFS
Hadoop Distributed File Systemの略。
Hadoopが利用する分散ファイルシステムで、複数のコンピュータのローカルディスクを1つのストレージのように扱うことができます。
(実際はファイルを一定サイズのブロックに分割し、複数のコンピュータのディスクに格納しています。)
YARN
Yet Another Resource Negotiatorの略。
YARNはクラスタのリソース管理を行うフレームワークです。
アプリケーションの実行を依頼されると、必要なリソースの割当を行い全体のリソース量を制御します。
MapReduce
MapReduceはクラスタを用いて大規模なデータを効率的に分散処理するためのプログラミングモデルです。
処理の内容は大きく分けて以下2つに分類されます。
- map処理
- 入力ファイルからKeyとValueの組み合わせを作る処理
- reduce処理
- map処理で作られたKeyとValueの組み合わせから別のKeyとValueの組み合わせを作る処理です。
RDD
Resilient Distributed Datasetsの略。
RDDは繰り返し利用するデータをメモリ上に保持することが可能な分散処理用データセットです。
HadoopのMapReduceが保持していた耐障害性、データ局所性、スケーラビリティを引き継いでいます。
DataFrame
DataFrameはRDDをより使いやすくしたデータセットです。
テーブルのようなデータ構造を持つ分散処理用データセットで、SQLライクにデータを操作することができます。
また、filterやjoinなどのmethodも用意されているため、シンプルなコードでデータを操作することも可能です。
DataSet
RDDとDataFrameの長所合わせもつAPIです。
Dataset APIはScalaやJavaはサポートしていますが、Pythonはサポートされていません。
Partition
PartitionはSparkの分散処理の単位です。
RDDをpartitionごとに複数のコンピュータで処理することで、並行して分散処理を行います。
Executor
Executorはアプリケーションを実行するためにワーカーノード上に起動されたプロセスです。
タスクの実行やデータをメモリまたはディスクストレージに保持する役割があります。
Job・Stage・Task
Sparkアプリケーションの処理はJob, Stage, Taskという段階構造になっております。
単位 | 説明 |
---|---|
Job | 独立した実行アクション |
Stage | 最後にキャッシュされたアクションまたはシャッフルイベントをに基づいたJobの分割単位 |
Task | Executorに送られる最小実行単位 |
Job, Stage, Taskについては下記の記事が非常に分かりやすかったです。
Dockerで実際に動かしてみる
Dockerを使用してローカル環境にクラスタを構築し、Sparkのアプリケーションを実行してみます。
今回使用するバージョンは以下になります。
- Python 3.7.7
- Spark 3.0.1
- Hadoop 3.2
1. ローカルに以下リポジトリをClone
2. データセットのダウンロード
Sparkアプリケーションを実行する際に使用するデータをKaggleからダウンロードします。
ダウンロードしたファイルのファイル名を変更し以下ディレクトリに配置します。
learning-apache-spark/data/bread_basket.csv
3. マスター&ワーカーの起動
docker-composeでマスター用コンテナとワーカー用コンテナを立ち上げます。
cd [WORK_DIRECTORY]/learning-apache-spark/deployments/local/
docker-compose up -d
下記URLにアクセスすると処理の状況をWebUIから確認でき、クラスタやマスターの詳細を確認できます。
この時点ではワーカーの起動をしていないかつアプリケーションの実行もしていないため、何も表示されません。
ワーカーを起動してみます。
docker exec -it spark-worker-1 bash
/spark/sbin/start-slave.sh spark://spark-master:7077
WebUIからワーカーが立ち上がったことが確認できます。
もう1台のワーカーも起動します。
docker exec -it spark-worker-2 bash
/spark/sbin/start-slave.sh spark://spark-master:7077
4. Historyサーバーの起動
アプリケーションのイベントログをWeb UIから確認するためのサーバーを起動します。
docker exec -it spark-master bash
/spark/sbin/start-history-server.sh
5. Sparkアプリケーションの実行
Sparkのアプリケーションを実行する準備が整ったため、Sparkアプリケーションを実行します。
main.pyの以下クエリを実行しDataFrameを操作します。
query = f"""
SELECT
*
FROM
BreadBasket
WHERE
Item = 'Coffee'
AND
format_date_time BETWEEN '2016-11-01 00:00' AND '2016-12-01 00:00'
LIMIT 10
"""
アプリケーションの実行
docker exec -it spark-master bash
/spark/bin/spark-submit /root/main.py
結果
+-----------+------+----------------+----------+---------------+-------------------+
|Transaction| Item| date_time|period_day|weekday_weekend| format_date_time|
+-----------+------+----------------+----------+---------------+-------------------+
| 178|Coffee|01-11-2016 07:51| morning| weekday|2016-11-01 07:51:00|
| 179|Coffee|01-11-2016 08:20| morning| weekday|2016-11-01 08:20:00|
| 188|Coffee|01-11-2016 10:04| morning| weekday|2016-11-01 10:04:00|
| 189|Coffee|01-11-2016 10:29| morning| weekday|2016-11-01 10:29:00|
| 190|Coffee|01-11-2016 10:34| morning| weekday|2016-11-01 10:34:00|
| 193|Coffee|01-11-2016 11:00| morning| weekday|2016-11-01 11:00:00|
| 197|Coffee|01-11-2016 11:10| morning| weekday|2016-11-01 11:10:00|
| 197|Coffee|01-11-2016 11:10| morning| weekday|2016-11-01 11:10:00|
| 198|Coffee|01-11-2016 11:11| morning| weekday|2016-11-01 11:11:00|
| 199|Coffee|01-11-2016 11:13| morning| weekday|2016-11-01 11:13:00|
+-----------+------+----------------+----------+---------------+-------------------+
また、SQLだけでなくDataFrameに用意されているmethodを使って操作することもできます。
df.groupBy("Item").count().sort("count", ascending=False).show()
+-------------+----------+
| Item|item_count|
+-------------+----------+
| Coffee| 5471|
| Bread| 3325|
| Tea| 1435|
| Cake| 1025|
| Pastry| 856|
| Sandwich| 771|
| Medialuna| 616|
|Hot chocolate| 590|
| Cookies| 540|
| Brownie| 379|
| Farm House| 374|
| Muffin| 370|
| Alfajores| 369|
| Juice| 369|
| Soup| 342|
| Scone| 327|
| Toast| 318|
| Scandinavian| 277|
| Truffles| 193|
| Coke| 185|
+-------------+----------+
使用できるmethodは以下にまとまってます。
6. 実行したアプリケーションの確認
ステップ4で立ち上げたHistoryサーバーにアクセスしてJobの詳細を確認します。
Task毎のステータスや処理時間などを確認することができるため、デバッグする際に便利です。
参考
総括
今回Docker上で動かしたSparkアプリケーションはDataFrameをSQLやmethodを使用して操作するだけなので、あまり分散処理の恩恵を感じなかったと思います。
大量のデータをSparkを使用して処理してみると分散処理の恩恵を感じることができるはずです。
私は最近業務でGCPのGCEで大量のデータを処理していたバッチ処理をDataprocに移行しました。
Dataprocに切り替えたことで、処理速度が向上したことはもちろんのこと、バッチ処理のソースコードが非常にシンプルになったため移行したことによる恩恵を非常に感じております。
(Dataprocを導入したばかりのため、チューニングなどを行いまだまだ処理速度の向上は見込めそうですが・・・笑)
SQLやmethodを使用して整形したデータをMySQLに保存することやCSVファイルに出力することも簡単にできるためバッチ処理などの計算処理時間に悩んでいる方はSparkの導入を検討してみてください。
※Dataprocとは
Dataproc は、オープンソースのデータツールを利用してバッチ処理、クエリ実行、ストリーミング、機械学習を行えるマネージド Spark / Hadoop サービスです。
本記事についてご指摘やアドバイスなどございましたらコメントしていただけますと幸いです。
Discussion