Sparkでタイムスタンプによるイベントビルドのjoinを効率化する
タイムスタンプに基づいたデータフレームの結合 (join)
Sparkでは、ある条件を満たす二つのデータフレームの行を join(on=[条件]) で結合することができる。
例として、2つの検出器のデータをタイムスタンプで結合してみる。
元のデータは以下のようになっている
df1.show(10)
+--------------+------------------+-------------------+-------------------+----+
| tsH| HDynE| xH| yH|tsL2|
+--------------+------------------+-------------------+-------------------+----+
|48940144990787| 410.1433554614204| 0.4086835599505562|0.23290070045323444|NULL|
|48940145250196| 245.5186570074473| 0.4076036672554462| 0.6529565144250988|NULL|
|48940145300382|281.06421526436117| 0.3571535468422628|0.42023346303501946|NULL|
|48940145838433|221.37710497586892|0.44733851108164424| 0.3440045463155901|NULL|
|48940147291484|203.63924357729903| 0.3742600530720555| 0.6244131455399061|NULL|
|48940149157943| 297.5155579983453| 0.6746886214903948| 0.6232495953838576|NULL|
|48940150178894| 220.4822707155461| 0.4802576309157099| 0.4032483898067768|NULL|
|48940151692078| 184.0238521247935| 0.4357899382171227| 0.4906222418358341|NULL|
|48940151838777|190.46816005217482| 0.6318445351084389| 0.5490659222675542|NULL|
|48940152635762| 253.4045320449293| 0.3321123321123321| 0.3771265771265771|NULL|
+--------------+------------------+-------------------+-------------------+----+
only showing top 10 rows
df2.show(10)
+---+--------------+------------------+
| ch| ts| calE|
+---+--------------+------------------+
| 9|48940142448049| 595.4527703680001|
| 11|48940142464806|1332.2242896120001|
| 9|48940142475096| 31.397075368|
| 8|48940142484635| 478.67409526|
| 8|48940142541363| 476.100046505|
| 10|48940142570545| 907.851600168|
| 10|48940142572804| 97.886188878|
| 9|48940142639022| 781.8382598320001|
| 9|48940142652391| 1146.701715112|
| 10|48940142739709| 197.208823924|
+---+--------------+------------------+
ts
とtsH
はタイムスタンプと呼ばれる1ナノ秒単位の時間情報である。
これらが10μs以内であれば同時のイベントとして結合したいとする。
この時、単純には以下のように、ts
とtsH
の差の絶対値が10000より小さいという条件で結合すればよい。
from pyspark.sql.functions import col, abs
df_joined = df1.join(df2, on=[abs(col("ts")-col("tsH"))<10000], how="inner")
df_joined.write.mode("overwrite").parquet("test.parquet")
しかし、これだと100万行のデータを結合するのに100秒以上掛かってしまった。
Nested Loop Join は極力避ける
Sparkでは、自動的にjoinの戦略(アルゴリズム)を選択して実行してくれるが、
実際にどのようなjoinが行われているのかは、explain()を使って確認することができる。
df_joined.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildLeft, Inner, (abs((ts#229L - tsH#171L)) < 10000)
:- BroadcastExchange IdentityBroadcastMode, [plan_id=303]
(以下略)
すると、Physical Plan として、BroadcastNestedLoopJoin が行われていることが分かる。
Sparkには、Bradcast Hash Join, Sort Merge Join, Shuffle Hash Join, Broadcast Nested Loop Join などのjoin戦略があるが、
Broadcast Nested Loop Join は基本的に最も実行コストが高い。
これを回避し、その他のJoinを使用するには、一致するキーに基づいた結合(SQLでいうequi-join)である必要がある。
そこで、データをあるタイムスタンプ幅で区切ったキーを作成し、これが一致するという条件をjoin時に加えてみる。
ここでは、pyspark.sql.functions
のfloorを使って1ms毎のバケットidとなる列を定義する。
bucket_size = 1e6 # 1m sec
from pyspark.sql.functions import floor
df1_b = df1.withColumn("tsH_bucket", floor(col("tsH") / bucket_size))
df2_b = df2.withColumn("ts_bucket", floor(col("ts") / bucket_size))
df1_b.show(10)
df2_b.show(10)
+--------------+------------------+-------------------+-------------------+----+----------+
| tsH| HDynE| xH| yH|tsL2|tsH_bucket|
+--------------+------------------+-------------------+-------------------+----+----------+
|48980000696193|324.92567962878604| 0.5173683876293897| 0.532545881755255|NULL| 48980000|
|48980004562261| 318.0344890242169| 0.6786485955607936| 0.5651803836836247|NULL| 48980004|
|48980005005352| 269.8797596924788| 0.476709996904983| 0.3241256576911173|NULL| 48980005|
|48980005500573|307.78161797545374| 0.5525401069518716| 0.5193181818181818|NULL| 48980005|
|48980006747547|138.15739382976582|0.48994647765080285| 0.5245190221322147|NULL| 48980006|
|48980007873101| 298.0336106789593| 0.507105719237435|0.27625649913344885|NULL| 48980007|
|48980008525457|250.49831420351236| 0.674863387978142| 0.5421086467373835|NULL| 48980008|
|48980009413119|255.15698761708114| 0.5813633419689119| 0.4065738341968912|NULL| 48980009|
|48980010359538| 162.1710182510047| 0.5184912151369947| 0.5111193021255682|NULL| 48980010|
|48980010829082|192.48171694361088| 0.6280746395250212|0.46840542832909243|NULL| 48980010|
+--------------+------------------+-------------------+-------------------+----+----------+
only showing top 10 rows
+---+--------------+------------------+---------+
| ch| ts| calE|ts_bucket|
+---+--------------+------------------+---------+
| 10|48990000001461|377.28830089200005| 48990000|
| 8|48990000040631| 931.53216436| 48990000|
| 10|48990000061790| 552.57379372| 48990000|
| 9|48990000103099| 1464.163004176| 48990000|
| 7|48990000126002| 171.778814816| 48990000|
| 7|48990000136350| 80.69233976000001| 48990000|
| 7|48990000139306| 658.9684831520001| 48990000|
| 10|48990000143969| 1160.930381086| 48990000|
| 8|48990000322126| 512.44212469| 48990000|
| 8|48990000483668| 32.491101745| 48990000|
+---+--------------+------------------+---------+
only showing top 10 rows
新たに定義されたts_bucket
、tsH_bucket
列は、元のタイムスタンプを1ms単位に荒くしたものになっている。
ただし、このままts_bucket==tsH_bucket
の条件で結合すると、バケットの境界を跨ぐデータが結合されないので、
ts_bucket
の値を+-1ずつシフトしたデータフレームを作り、それらも合わせて結合する。
# +1, 0, -1だけts_bucketをシフトしたデータフレームを作成
df2_shifted = [
df2_b.withColumn("ts_bucket", col("ts_bucket") + shift)
for shift in [-1, 0, 1]
]
from functools import reduce
# シフトしたデータフレームを縦結合
df2_union = reduce(lambda a, b: a.unionByName(b), df2_shifted)
# bucketが一致するかつts差が10us以内の条件でJoin
joined = df1_b.join(df2_union, on=[col("tsH_bucket")==col("ts_bucket"), abs(col("tsH") - col("ts")) < 10000])
joined.write.mode("overwrite").parquet("test.parquet")
joined.show(10)
+--------------+------------------+-------------------+------------------+----+----------+---+--------------+------------------+---------+
| tsH| HDynE| xH| yH|tsL2|tsH_bucket| ch| ts| calE|ts_bucket|
+--------------+------------------+-------------------+------------------+----+----------+---+--------------+------------------+---------+
|48994081997167|240.23520585721892|0.40530108080288213|0.5108080288214102|NULL| 48994081| 10|48994082000361| 209.1502026| 48994081|
|48994081997167|240.23520585721892|0.40530108080288213|0.5108080288214102|NULL| 48994081| 8|48994082001559| 383.957826665| 48994081|
|48990694998167| 356.7801014112143| 0.4341599720572826|0.6039119804400978|NULL| 48990694| 6|48990695005712| 28.309702974| 48990694|
|48954169994019| 282.8298407861067| 0.5223023700868541|0.5414397173561019|NULL| 48954169| 7|48954170001119| 1156.513790528| 48954169|
|48953053994522|337.32649366646854|0.44238517324738114|0.2744684807537346|NULL| 48953053| 8|48953054004006| 963.8604716049999| 48953053|
|48978863999830|178.12867100829817|0.40998197386210006|0.5138575935105903|NULL| 48978863| 10|48978864002532| 245.802390434| 48978863|
|48971366998021| 223.607676641227| 0.5566225765537235|0.4072295705743794|NULL| 48971366| 6|48971367007143|1024.5787078279998| 48971366|
|48971366998021| 223.607676641227| 0.5566225765537235|0.4072295705743794|NULL| 48971366| 11|48971367007144|137.55736586999998| 48971366|
|48974648998510|295.50443671642586|0.43237962316817863|0.7500348918353106|NULL| 48974648| 9|48974649000888| 1301.15628028| 48974648|
|48974648998510|295.50443671642586|0.43237962316817863|0.7500348918353106|NULL| 48974648| 8|48974649005424| 433.73731191| 48974648|
+--------------+------------------+-------------------+------------------+----+----------+---+--------------+------------------+---------+
only showing top 10 rows
こちらは1秒そこそこで完了した。explain()で見てみると、
joined.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [tsH_bucket#2041L], [ts_bucket#2095L], Inner, BuildLeft, (abs((tsH#171L - ts#229L)) < 10000), false
:- BroadcastExchange HashedRelationBroadcastMode(List(input[5, bigint, true]),false), [plan_id=6510]
(以下略)
BroadcastHashJoin になっていることが分かる。
equi-joinかつ片方のデータフレームが10MB以内だと、BroadcastHashJoinになり、
全てのexecutorのメモリーにデータフレームがコピーされるので、効率的にjoinされる。
大きなデータの場合
片方のデータフレームが10MB以内(あるいはspark.sql.autoBroadcastJoinThreshold
で設定した閾値以内)だとBroadcast Hash Joinになるが、大きなデータを上述の方法でjoinするとSort Merge Joinになる。
joined.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [ts_bucket_1#3753L], [ts_bucket_2#3769L], Inner, (abs((ts_1#3746 - ts_2#3762)) < 10000)
:- Sort [ts_bucket_1#3753L ASC NULLS FIRST], false, 0
(以下略)
Discussion