😎

Sparkでタイムスタンプによるイベントビルドのjoinを効率化する

に公開

タイムスタンプに基づいたデータフレームの結合 (join)

Sparkでは、ある条件を満たす二つのデータフレームの行を join(on=[条件]) で結合することができる。
例として、2つの検出器のデータをタイムスタンプで結合してみる。
元のデータは以下のようになっている

df1.show(10)
+--------------+------------------+-------------------+-------------------+----+
|           tsH|             HDynE|                 xH|                 yH|tsL2|
+--------------+------------------+-------------------+-------------------+----+
|48940144990787| 410.1433554614204| 0.4086835599505562|0.23290070045323444|NULL|
|48940145250196| 245.5186570074473| 0.4076036672554462| 0.6529565144250988|NULL|
|48940145300382|281.06421526436117| 0.3571535468422628|0.42023346303501946|NULL|
|48940145838433|221.37710497586892|0.44733851108164424| 0.3440045463155901|NULL|
|48940147291484|203.63924357729903| 0.3742600530720555| 0.6244131455399061|NULL|
|48940149157943| 297.5155579983453| 0.6746886214903948| 0.6232495953838576|NULL|
|48940150178894| 220.4822707155461| 0.4802576309157099| 0.4032483898067768|NULL|
|48940151692078| 184.0238521247935| 0.4357899382171227| 0.4906222418358341|NULL|
|48940151838777|190.46816005217482| 0.6318445351084389| 0.5490659222675542|NULL|
|48940152635762| 253.4045320449293| 0.3321123321123321| 0.3771265771265771|NULL|
+--------------+------------------+-------------------+-------------------+----+
only showing top 10 rows

df2.show(10)
+---+--------------+------------------+
| ch|            ts|              calE|
+---+--------------+------------------+
|  9|48940142448049| 595.4527703680001|
| 11|48940142464806|1332.2242896120001|
|  9|48940142475096|      31.397075368|
|  8|48940142484635|      478.67409526|
|  8|48940142541363|     476.100046505|
| 10|48940142570545|     907.851600168|
| 10|48940142572804|      97.886188878|
|  9|48940142639022| 781.8382598320001|
|  9|48940142652391|    1146.701715112|
| 10|48940142739709|     197.208823924|
+---+--------------+------------------+

tstsHはタイムスタンプと呼ばれる1ナノ秒単位の時間情報である。
これらが10μs以内であれば同時のイベントとして結合したいとする。
この時、単純には以下のように、tstsHの差の絶対値が10000より小さいという条件で結合すればよい。

from pyspark.sql.functions import col, abs

df_joined = df1.join(df2, on=[abs(col("ts")-col("tsH"))<10000], how="inner")
df_joined.write.mode("overwrite").parquet("test.parquet")

しかし、これだと100万行のデータを結合するのに100秒以上掛かってしまった。

Nested Loop Join は極力避ける

Sparkでは、自動的にjoinの戦略(アルゴリズム)を選択して実行してくれるが、
実際にどのようなjoinが行われているのかは、explain()を使って確認することができる。

df_joined.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildLeft, Inner, (abs((ts#229L - tsH#171L)) < 10000)
   :- BroadcastExchange IdentityBroadcastMode, [plan_id=303]
(以下略)

すると、Physical Plan として、BroadcastNestedLoopJoin が行われていることが分かる。
Sparkには、Bradcast Hash Join, Sort Merge Join, Shuffle Hash Join, Broadcast Nested Loop Join などのjoin戦略があるが、
Broadcast Nested Loop Join は基本的に最も実行コストが高い。
これを回避し、その他のJoinを使用するには、一致するキーに基づいた結合(SQLでいうequi-join)である必要がある。
そこで、データをあるタイムスタンプ幅で区切ったキーを作成し、これが一致するという条件をjoin時に加えてみる。
ここでは、pyspark.sql.functionsのfloorを使って1ms毎のバケットidとなる列を定義する。

bucket_size = 1e6  # 1m sec
from pyspark.sql.functions import floor

df1_b = df1.withColumn("tsH_bucket", floor(col("tsH") / bucket_size))
df2_b = df2.withColumn("ts_bucket", floor(col("ts") / bucket_size))

df1_b.show(10)
df2_b.show(10)

+--------------+------------------+-------------------+-------------------+----+----------+
|           tsH|             HDynE|                 xH|                 yH|tsL2|tsH_bucket|
+--------------+------------------+-------------------+-------------------+----+----------+
|48980000696193|324.92567962878604| 0.5173683876293897|  0.532545881755255|NULL|  48980000|
|48980004562261| 318.0344890242169| 0.6786485955607936| 0.5651803836836247|NULL|  48980004|
|48980005005352| 269.8797596924788|  0.476709996904983| 0.3241256576911173|NULL|  48980005|
|48980005500573|307.78161797545374| 0.5525401069518716| 0.5193181818181818|NULL|  48980005|
|48980006747547|138.15739382976582|0.48994647765080285| 0.5245190221322147|NULL|  48980006|
|48980007873101| 298.0336106789593|  0.507105719237435|0.27625649913344885|NULL|  48980007|
|48980008525457|250.49831420351236|  0.674863387978142| 0.5421086467373835|NULL|  48980008|
|48980009413119|255.15698761708114| 0.5813633419689119| 0.4065738341968912|NULL|  48980009|
|48980010359538| 162.1710182510047| 0.5184912151369947| 0.5111193021255682|NULL|  48980010|
|48980010829082|192.48171694361088| 0.6280746395250212|0.46840542832909243|NULL|  48980010|
+--------------+------------------+-------------------+-------------------+----+----------+
only showing top 10 rows

+---+--------------+------------------+---------+
| ch|            ts|              calE|ts_bucket|
+---+--------------+------------------+---------+
| 10|48990000001461|377.28830089200005| 48990000|
|  8|48990000040631|      931.53216436| 48990000|
| 10|48990000061790|      552.57379372| 48990000|
|  9|48990000103099|    1464.163004176| 48990000|
|  7|48990000126002|     171.778814816| 48990000|
|  7|48990000136350| 80.69233976000001| 48990000|
|  7|48990000139306| 658.9684831520001| 48990000|
| 10|48990000143969|    1160.930381086| 48990000|
|  8|48990000322126|      512.44212469| 48990000|
|  8|48990000483668|      32.491101745| 48990000|
+---+--------------+------------------+---------+
only showing top 10 rows

新たに定義されたts_buckettsH_bucket列は、元のタイムスタンプを1ms単位に荒くしたものになっている。
ただし、このままts_bucket==tsH_bucketの条件で結合すると、バケットの境界を跨ぐデータが結合されないので、
ts_bucketの値を+-1ずつシフトしたデータフレームを作り、それらも合わせて結合する。

# +1, 0, -1だけts_bucketをシフトしたデータフレームを作成
df2_shifted = [
    df2_b.withColumn("ts_bucket", col("ts_bucket") + shift)
    for shift in [-1, 0, 1]
]

from functools import reduce
# シフトしたデータフレームを縦結合
df2_union = reduce(lambda a, b: a.unionByName(b), df2_shifted)

# bucketが一致するかつts差が10us以内の条件でJoin
joined = df1_b.join(df2_union, on=[col("tsH_bucket")==col("ts_bucket"), abs(col("tsH") - col("ts")) < 10000])
joined.write.mode("overwrite").parquet("test.parquet")
joined.show(10)

+--------------+------------------+-------------------+------------------+----+----------+---+--------------+------------------+---------+
|           tsH|             HDynE|                 xH|                yH|tsL2|tsH_bucket| ch|            ts|              calE|ts_bucket|
+--------------+------------------+-------------------+------------------+----+----------+---+--------------+------------------+---------+
|48994081997167|240.23520585721892|0.40530108080288213|0.5108080288214102|NULL|  48994081| 10|48994082000361|       209.1502026| 48994081|
|48994081997167|240.23520585721892|0.40530108080288213|0.5108080288214102|NULL|  48994081|  8|48994082001559|     383.957826665| 48994081|
|48990694998167| 356.7801014112143| 0.4341599720572826|0.6039119804400978|NULL|  48990694|  6|48990695005712|      28.309702974| 48990694|
|48954169994019| 282.8298407861067| 0.5223023700868541|0.5414397173561019|NULL|  48954169|  7|48954170001119|    1156.513790528| 48954169|
|48953053994522|337.32649366646854|0.44238517324738114|0.2744684807537346|NULL|  48953053|  8|48953054004006| 963.8604716049999| 48953053|
|48978863999830|178.12867100829817|0.40998197386210006|0.5138575935105903|NULL|  48978863| 10|48978864002532|     245.802390434| 48978863|
|48971366998021|  223.607676641227| 0.5566225765537235|0.4072295705743794|NULL|  48971366|  6|48971367007143|1024.5787078279998| 48971366|
|48971366998021|  223.607676641227| 0.5566225765537235|0.4072295705743794|NULL|  48971366| 11|48971367007144|137.55736586999998| 48971366|
|48974648998510|295.50443671642586|0.43237962316817863|0.7500348918353106|NULL|  48974648|  9|48974649000888|     1301.15628028| 48974648|
|48974648998510|295.50443671642586|0.43237962316817863|0.7500348918353106|NULL|  48974648|  8|48974649005424|      433.73731191| 48974648|
+--------------+------------------+-------------------+------------------+----+----------+---+--------------+------------------+---------+
only showing top 10 rows

こちらは1秒そこそこで完了した。explain()で見てみると、

joined.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [tsH_bucket#2041L], [ts_bucket#2095L], Inner, BuildLeft, (abs((tsH#171L - ts#229L)) < 10000), false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[5, bigint, true]),false), [plan_id=6510]
(以下略)

BroadcastHashJoin になっていることが分かる。
equi-joinかつ片方のデータフレームが10MB以内だと、BroadcastHashJoinになり、
全てのexecutorのメモリーにデータフレームがコピーされるので、効率的にjoinされる。

大きなデータの場合

片方のデータフレームが10MB以内(あるいはspark.sql.autoBroadcastJoinThresholdで設定した閾値以内)だとBroadcast Hash Joinになるが、大きなデータを上述の方法でjoinするとSort Merge Joinになる。

joined.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [ts_bucket_1#3753L], [ts_bucket_2#3769L], Inner, (abs((ts_1#3746 - ts_2#3762)) < 10000)
   :- Sort [ts_bucket_1#3753L ASC NULLS FIRST], false, 0
(以下略)

Discussion