📝

Pysparkによるハッシュ化

2022/12/22に公開

モチベーション

データ基盤の個人情報の取り扱いなどにおいて、対象カラムをハッシュ化する場面がよくでてくる。そこでデータ基盤で大量データに対してハッシュ化を行うにあたり、PySparkを用いて実装する形を検討する。
(最近は自前で実装せずとも、クラウドのデータウェアハウスサービスではカラム単位の動的マスクとかしてくれますが。。。)

前提

  • ハッシュ化にはCRYPTREC暗号リストに掲載されているsha256を用いる。
  • ハッシュ化にはソルトとストレッチングを適用できる方式で実装する。
  • Sparkの分散処理が適用された形でハッシュ化実装をめざす。
  • 対象データは適当に下記のアクセスログのようなものを利用し、userNameに対してハッシュ化を行う。
df.printSchema()
df.show()

#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+--------+
#|        access_time| campaignID|userName|
#+-------------------+-----------+--------+
#|2022-03-27 12:30:40|Campaign063|    Mike|
#|2022-03-27 00:37:45|Campaign038|     Ken|
#|2022-03-27 00:24:51|Campaign099|    Mary|
#                    :
#+-------------------+-----------+--------+

検討

1.pyspark.sql.functions.sha2[1]を使う(微妙)

  • 実装はできたものの、for-loopを用いており、分散処理としてどうなのだろう。
  • pyspark.sql.DataFrame.withColumn[2]でloopによりStackOverflowExceptionの発生の可能性があると注意書きがある。
python
import pyspark.sql.functions as F

salt = "shio"
streching = 100

for i in range(streching):
  df = df.withColumn("userName",F.sha2(F.concat(F.col("userName"),F.lit(salt)), 256))

2.RDDのmap(hashlib)を使う(微妙)

  • ストレッチングのループ処理を改善するため、RDDのmapを用いる。
  • 対象カラムのハッシュ化は実現できたが、ここからハッシュ化したDataframeと元のDataframeに結合するところが微妙である。(ハッシュ化されたDataframeだけでは使いみちが。。。)
    • keyがないので、pandas Dataframeのconcatのように単純にくっつけることをしたいが、Spark Dataframeのconcat[3]ではできない
    • SparkがPandasをサポートしたので、Pandasに変換してくっつけるという手もあるにはあるが。
python
from pyspark.sql import Row

def calc_sha256(content,salt,stretching):
    h = content + salt
    for i in range(stretching):
        h = hashlib.sha256(h.encode()).hexdigest()
    return h

salt = "shio"
streching = 100

df2 =df.rdd.map(lambda x:Row(calc_sha256(x["userName"],salt,streching))).toDF(['userName_hash'])

3.UDF(hashlib)&withColumnを使う(微妙)

  • No.1のストレッチングのループ処理を改善するため、withColumnでUDFを用いる。
  • ハッシュ化は実現できているが、python UDFによるJVM-Pythonプロセス間シリアライズコストや行単位処理のため、パフォーマンス最適ではないと想定される。
python
import hashlib
from pyspark.sql.types import StringType

def calc_sha256(content,salt,stretching):
    h = content + salt
    for i in range(stretching):
        h = hashlib.sha256(h.encode()).hexdigest()
    return h
  
udf_calc_sha256 = udf(calc_sha256,StringType())


salt = "shio"
streching = 100

df2 = df.withColumn("userName",udf_calc_sha256(F.col("userName"),F.lit(salt) ,F.lit( streching )))

4.Pandas UDF(hashlib)&withColumnを使う(今のところの最適解)

  • Python UDFのパフォーマンス改善のためにPandas UDFを用いる。
  • 大量データの処理を考慮し、バッチサイズでの処理分割を想定しIteratorによる処理を実装する。(Iteratorのバッチ処理イメージはこちらがわかりやすかった。[4])
  • 今のところのこれが最適ではと思える。但し、可読性とかは正直劣る気がする。
python
import hashlib
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.functions import pandas_udf
from typing import Iterator, Tuple

@pandas_udf("string")
def calc_sha256_pandasudf(iterator: Iterator[Tuple[pd.Series, pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for content_series, salt_series, stretching_series in iterator:
    h = content_series + salt_series
    stretching = stretching_series[0]
    for i in range(stretching):
      h = h.apply(lambda x: hashlib.sha256(x.encode()).hexdigest())
    yield h

salt = "shio"
streching = 100
df2 = df.withColumn("userName",calc_sha256_pandasudf(F.col("userName"),F.lit(salt) ,F.lit( streching )))

上記のようにIteratorによる処理でなくコーディングする場合にはpandas udfは次のとおりとなる。

@pandas_udf("string")
def calc_sha256_pandasudf(content_series: pd.Series,salt_series: pd.Series,stretching_series: pd.Series) -> pd.Series:
  h = content_series + salt_series
  stretching = stretching_series[0]
  for i in range(stretching):
    h = h.apply(lambda x: hashlib.sha256(x.encode()).hexdigest())
  return h

終わりに

  • 現状としては、No.4のやり方が最適では考えられる。しかし、実際に大量データを用いてNo.4がパフォーマンス最適なものなのかを確認する必要がある。
  • 分散処理を軸に記載をしてきたが、可読性については落ちた気もする。他によりよいコーディングがないのか検討の余地はある気がする。
  • はじめにも書いているが、まずは内製せずにデータウェアハウスなどの機能でできないかを確認するのが一番最適である。(この記事自体が元も子もないがw)
脚注
  1. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.sha2.html ↩︎

  2. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html ↩︎

  3. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html ↩︎

  4. https://www.miningmassivedata.com/chapters/chapter14-pandas-with-arrow.html#chapter-learning-objectives ↩︎

Discussion