なぜお前のSparkは遅いのか?(初心者向け)
なぜSparkは遅いのか??
という目を惹けるようなタイトルで初めてみましたが、Sparkが遅いと一言で言ってもいろんなケースがあり、集計が遅いのか読み込みが遅いのか色々あると思います。
あと、簡単にSparkと言いましたがどの言語がどの用途でなどなど制約が色々あるので、タイトルをもう少し厳密にしてから始めたいと思います。
また、私はDatabricks上でPythonで使っていることがメインなので、さらにその話題に限定しまして。。。
改めまして 「Sparkを初めて使い、速くなると聞いていたのになんか色々遅いな?と初心者が特に感じるであろうことをまとめてみた」 というタイトルで進めていきたいと思います。
記事を書くモチベーションをもう少し(雑談)
私はpandasとnumpyと機械学習ライブラリを触りまくる系の仕事をしている中、ビッグデータを高速に処理できるSpark・並列分散処理とやらのカッコ良さに憧れを抱いていました。
初めて業務で触れる機会が到来し、利用を始めるのですが、あれ?普通のpandasより遅いけど何これ?という場面に多く遭遇し、何が悪いのかわかりませんでした。
これらを解消するためにいくつか解放を見つけてきたので、共有したいと思います。
あなたのSparkはいつ遅くなるのか?
本題に入る前にSparkの基本構造だけ抑えましょう。
(Cluster Mode Overvie: https://spark.apache.org/docs/latest/cluster-overview.html)
Spark Clusterは実際に集計・加工処理を行うWorker Nodeと それらを統合・設計するDriver Nodeに分かれます。これらがこの後の話全てに影響するので、この構造が前提にあることを抑えてくださ。
それでは本題に入ります。
1. そもそもデータ量がそんなに大きくない
Sparkは大規模データ処理に特化しており、さらに、別にデータが少なくても全然動きます。
しかし下のベンチマークの結果を見てください。
(Benchmarking Apache Spark on a Single Node Machine: https://www.databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html)
上記は「Spark: Apache Spark 2.3.0 in local cluster mode」で実行した結果です。
データ量がかなりグラフにから見ると大体15GB以下の時はpandasの方が集計処理が早いという結果があります。
理由はいくつか考えられうるのですが、一言大雑把にいうと、少量データにおいてはSparkによる最適化に向けた処理がOverheadになるケースがある。ということです。
リンク先に飛んでもらうとわかるのですが別に全然Sparkの方が遅くなってないケースもありますから注意してください。ただし、個人的には遅くなっていないケースにおいては Local Cluster mode(一台マシン構成)だから少量データにおいても速いのであってStandalone型に少量データの際にはより不利になるのでは?と思っています。ネットワーク間の通信等も発生するはずでそれが不利に働くと考えています。
Pandasであれば基本的に全て一つのマシンのメモリに載っています。
その中でPandasの最適化された処理で集計するので、ネットワーク通信、ストレージ通信なく、全てオンメモリで処理できます。それは速いに決まっています。
一方でSparkはデータを処理する際に RDD(Resilient Distributed Dataset) と呼ばれる並列処理に向けたデータの整理が発生し、尚且つWorkerが個別のインスタンスの場合にはそこにデータを移動させるネットワーク通信が発生しこれが遅くなる原因になっています。
あとから時間がある時に試そうと思いますが、上記リンク先にある query2のベンチマーク結果も、個人的にはそこまで差が出るかな?・・・という感じです。
後ほど少しだけ検証してみようと思います。
2. lazy evaluationを知ろう
lazy evaluationとは日本語に直すと遅延評価であり、「後からまとめて処理するから」という概念です。
「後から」というところをもう少し詳しく説明します。
Sparkの処理は大きく分けて TransformationsとActionsに分かれます。
- Transformationsの例:map, filter, union, distinct, join...
- Actionsの例:collect, count, saveAsTextFile...
そして lazy evaluationとはTrasformationsが記載された際には実際に処理は何も行われず、Actionsが記載されたタイミングでやっと処理が実行される、ということになります。
pandasと見比べると言いたいことがわかりやすいと思うので例を書いていみます。
(ただし、pandasとsparkの処理が必ずしも同質ではないので、そこはざっくりでお願いします。)
例えば以下のコードはl_tax、税率を%表示に変換し、l_orderkeyごとにその平均を出力するコードです。(本来pandasを使うのであればfor文で書くのは最悪ですが、今回 lazy evaluationの説明のためです。)
df = spark.read.table("tpch.lineitem").limit(1000).toPandas()
df["l_tax_percentage"] = df["l_tax"] * 100
for i, (l_orderkey, group_df) in enumerate(df.groupby("l_orderkey")):
print(l_orderkey, group_df["l_tax_percentage"].mean())
上記のコードは3.04 secondsで終わります。一方これを Sparkで再現(?)っぽくしてみたのがこちらのコードです。
from pyspark.sql import functions as F
sdf = spark.read.table("tpch.lineitem").limit(1000)
sdf = sdf.withColumn("l_tax_percentage", F.col("l_tax") * 100)
for row in sdf.select("l_orderkey").distinct().collect():
grouped_sdf = sdf.filter(F.col("l_orderkey") == row.l_orderkey).groupBy("l_orderkey").agg(F.mean("l_tax_percentage").alias("avg_l_tax_percentage"))
print(grouped_sdf.show())
こちらのコードの実行時間は3分経っても終わりません。何故このようなことが起きるのでしょうか?
理由は二つありまして、一つはlazy evaluation、二つ目は少し話がそれますが、collect, showなどの関数はWorker to Driverへのデータ転送が発生するので遅いということです。
まず一つ目、 lazy evaluationですが、実は二つ目のコード、実質pandasで例えると以下のように書いているのと同じです。
df = spark.read.table("tpch.lineitem").limit(1000).toPandas()
for i, (l_orderkey, group_df) in enumerate(df.groupby("l_orderkey")):
df["l_tax_percentage"] = df["l_tax"] * 100
print(l_orderkey, group_df["l_tax_percentage"].mean())
df["l_tax_percentage"] = df["l_tax"] * 100` が for文の中に入っちゃってますね。コードを書ける人なら絶対やらない御法度です。しかし lazy evaluationとはこういうことなのです。
今回で言うとActionsに該当するshowまで一切の計算を行わず、そしてshowの瞬間に今までの全ての処理Transformationsを実行するのです。
厳密にいうとここで cacheを利用すれば回避できるのですが・・・
ただ、知っておいて欲しいことはSparkは書いた瞬間に処理されるわけではないということです。
lazy evaluationの本質はクエリ最適化(実行計画に近い)が実現できることだと思っています。
そしてSparkはpandasのように見えますが、処理はSQLに近い物だと考えた方がいいです。そう考えるとfor文もすごくナンセンスなことがわかりやすいと思います。
Spark DataFrameに対して explain()を使うと実行計画に相当する内容が出力されます。
これは今までのTransformations全ての処理を受けて、最適化された処理手順を自動で考慮し作成されたものです。(とはいえ一部は書き方に当然依存します。)
つまりlazy evaluationは、みなさんの書いた処理を最適化するための仕組みであり、SQLライクに物事を処理するための仕組みです。
しかし、これを知っていないと pandasのように適当に書いて、むしろ遅くなるということがあるので、気をつけましょう。
2点目のcollect, show等が遅い件についてですが、基本的にSparkの処理はWorker側で処理され、Driver nodeで処理は行われていません。showはこれをDriverに持ってくる処理であるため、ネットワークの通信も発生し遅くなります。
for文の中でcollect等使うと最悪なので、もしpandasやlistとしてpython nativeな処理をしたい時には、SparkDataFrameとして全ての集計処理は事前に済ませて、ある程度のまとまりになった時にcollect, toPandas等で一発でDriverに持ってきましょう。
3. Shuffleに気をつけろ
Sparkには Shuffleという重要な概念があります。
Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.
上記のShuffleが起きる処理について、十分に注意しましょう。
Shuffleはざっくりいうと Worker node間でのデータ通信及び並び替えが必要な処理です。勘の言い方はこの時点でお気づきかもしれませんが、問題はここでWorker間のネットワーク通信とデータ全体に渡る集計が必要になるという点です。
せっかくpartitionを切って、各マシンに最適に割り振ったデータを、またお互いに繋ぎ合わせる必要が出てきてしまうのです。
ネットワークの通信はCPU to Memory間の処理に比べれば当然速い物ではないので、ここでまた時間がかかってしまいます。
ここで一口に対応と言ってもいろいろとあり、正直一言で対応方法を述べることは難しいです。
データの種類や何に最適化するか(コスト or 速度)、パイプライン全体の構成・前後の処理内容、集計の内容や、joinの話をし始めるとBroadcast Hash Joinが使えそうかなど、キリがありません。
ただし、ここでは初心者がよくやりそうなミスについて、言及しようと思います。
Worker nodeが多いほど有利というわけではない。: 特にこのShuffleが発生する前提の時は、例えば32GB Worker 8台よりも、多分ほとんどのケースにおいて、64GB Worker 4台の方が集計が早くなります。これはマシン同士の間のネットワーク通信を少なくできるからです。
この点を意識して、無駄にWorkerを増やして並列処理がむちゃくちゃできるようになったと思わないことが大切です。
おわりに
以上です。何か参考になりましたでしょうか?
Sparkは上手く使えば最高のツールですし、色々知って思うのは、私は大規模データの処理速度向上の側面と同時に、クラウドの恩恵と相まってスケーラブルなリソース管理もまた大きなbenefitだと感じるようになりました。
ぜひSparkを使って、自社のデータ管理を最適化してみてください。
Discussion