📖

sparkの計算結果に再現性が無く嵌った話(dropDuplicates)

2025/01/27に公開

結論

dropDuplicatesを使うと結果に対して再現性がなくなるので注意が必要。

詳細

Dropduplicatesは重複を無くす列をした際に、どの行を残すかはランダムになります。
また、pandasのdrop_duplicatesと異なり、「先頭を残す」等のオプションはありません。
(理由の記述はなかったのですが、各nodeに散った各情報を集計するのは spark上は大変なのでしょうか?)

サンプルコード

from pyspark.sql import functions as F

data = [
    (1, "value_1a"),
    (1, "value_1b"),
    (1, "value_1c"),
    (2, "value_2a"),
    (2, "value_2b"),
    (2, "value_2c"),
    (3, "value_3a"),
    (3, "value_3b"),
    (3, "value_3c")
]

df = spark.createDataFrame(data, ["key", "value"])

print("==== Original DataFrame ====")
display(df)

for i in range(1, 6):
    df_shuffled = df.withColumn("rnd", F.rand())\
                    .orderBy("rnd")\
                    .drop("rnd")
    deduped = df_shuffled.dropDuplicates(["key"])
    
    print(f"\n==== Run {i} ====")
    display(deduped)

上記を実行すると以下のようになります。


==== Run 1 ====
   key     value
0    1  value_1b
1    2  value_2b
2    3  value_3a

==== Run 2 ====
   key     value
0    1  value_1c
1    2  value_2c
2    3  value_3c

==== Run 3 ====
   key     value
0    1  value_1a
1    2  value_2b
2    3  value_3c

==== Run 4 ====
   key     value
0    1  value_1a
1    2  value_2c
2    3  value_3c

==== Run 5 ====
   key     value
0    1  value_1a
1    2  value_2c
2    3  value_3c

対策

求めたい結果にもよりますが、sortして先頭行だけを持ってくるなど、丁寧な処理が必要です。

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.partitionBy("key").orderBy("value")
df_deterministic = (
    df
    .withColumn("rn", row_number().over(w))
    .filter("rn = 1")
    .drop("rn")
)
df_deterministic.show()

上記のコードはsortした上で row_numberで連番をはり、1行目だけ抜き取るコードです。

結論

dropDuplicatesは再現性がなくなるので巨大なETL処理等行う時は注意しましょう。

Discussion