🗂

pysparkで汎用的に取込対象外のデータ行をskipする

2023/12/10に公開

概要

マルチテナントの連携ファイル取り込みetlに関して、
異常データが入っている場合、その行ごとskipして保存できる分だけ保存する処理を
汎用的にするには。

経緯

etl取込を行うにあたって、連携データの中には往々にして取り込めない異常データが存在する。
最終的に数値がで保存するのに文字列型であったり、日付型で保存するのに-のような日付に変換できないようなデータが混じっている場合がマルチテナントでファイル取り込みを行う場合、様々なシステムからの連携ファイルを取り込むので存在する。

マルチテナントのetl取り込みを共通のコードで汎用的に取り込めるようにしたのはまた別の話でやるとして。
努力要件として、様々な連携データに対して、取り込みができないようなデータはその行ごと省いて、取り込みできる行だけ取り込みたいという要件があった。

この要件をpysparkで満たすためにごちゃごちゃやったという経緯がある。

前提

pysparkとは

PythonからApache Sparkを操作する際に使用するAPIのことである。
pysparkを用いることで、容易に大規模なデータを分散処理で扱うことをpythonから行うことができるようになる。

PySpark combines Python’s learnability and ease of use with the power of Apache Spark to enable processing and analysis of data at any size for everyone familiar with Python.

* apache sparkとは:
ビッグデータと機械学習のための非常に高速な分散処理フレームワークのこと。

https://spark.apache.org/docs/latest/api/python/index.html

etlとは

自分の記事でも軽く説明はさせていただいているのですが、以下の定義と認識している。
Extract, Transform, Loadの略で以下のように複数のソースからデータをデータウェアハウスなどにまとめる一連プロセスのこと。最近では処理できる性能などが上がっていることもあり、
先にLoadしてからTransformしようというELTが主流になりつつある。

https://zenn.dev/johndoe/articles/5a680f0aad7384

懸念点

連携データエラー行の異常データ行だけ削除してそのほかを取り込むという要件を満たすには越えなければならない懸念点がいくつかありました。

sparkは一部のデータだけ弾くような構想ではない

そもそもsparkは大規模なデータを分散処理を用いて高速に処理できるフレームワークではありますが、data frameを永続化する際はdata frameごと一気に永続化することが主であり、sqlにて一部のdataframeは永続化できたけれど、異常データは弾くとかはできない。

psycopg2を用いてmergeもしくはinsertを行なっているのだが、このsqlの前にフィルタリングする必要があるということになる。

マルチテナント故にデータ構造が多種多様

この処理を実装するためのetl処理はマルチテナント構成対応であり、汎用的にすることが前提条件となっていた。一つのテナントの決められたファイルならば特定のファイルカラムだけ判定するなどができたが、汎用的にするには工夫する必要がある。

データ量が多く、かつファイルカラム数が多いと処理が終わらなくなる

こちらもマルチテナントゆえの感がありますが、連携ファイルのデータの中には2500万件を超えるようなデータや、ファイルカラム数150の連携ファイルなどがあります。

このような場合でも汎用的にエラー対象データを見つけ、行ごとskipするにはカラム一つ一つのデータをみる必要があり、非常に非効率になり、処理が終わらなくなる事象が発生しました。
考えなしに実装したら、glueでpysparkを動かしていたのですが、driverが上がり、処理が終わらなくなる事象が発生してしまいました...

公式ではdriverのoomは小さなファイルを大量に取り込む際しか記載しておらず、今回のケースにあたらないので自力でごちゃごちゃする必要があった。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-profile-debug-oom-abnormalities.html

方法

結論から言うとexcludeを用いて一度分解してfilter処理を簡潔にする、なおかつ必ずchacheを行う が汎用的かつ処理時間を許容範囲に収める方法でした。

filter処理を簡潔にする

ETLでいうL(Load)開始の時点では永続化対象のdataframeは全て正常データでなければならない。
一時テーブルにぶち込むにしろ型変換は正常にできている必要があるから。
なのでT(Transform)の時点で行う必要がある。

この記事では割愛(またどっかでかく)が、filter処理を始める時点でudfを用いながらdataframeをsparkで永続化できるように型変換を行っておく。
この型変換関数を行う際に変換処理ができず、正常にdbに保存できないようなデータはある特定の文字列(以下"filter_value"と仮定する)に変換するようにする。
NULLなどにしなかったのは意図的にNULLで連携されているデータなども往々にあるのでその際に正常なデータを行ごとskipする危険性があったため。

その後にdataframe全体に対してfilterを用いて全てのカラムのどれかで"filter_value"が存在する行ごとを省けば良い。

この処理を行う際にも気をつけたことが複数あり、
filterの際に全カラムループを回させない
要件として任意のファイルカラムに異常データが入っている場合filterlingする必要があるので普通に書けば以下のようになる。

""" 元のdata frameに連番を振ります
"""
df_with_line_no = df.withColumn(
    "line_no", monotonically_increasing_id()+1)

""" 各カラムに対して"filter_value"文字列を検索し、該当するカラム名をリストとして作成します。
"""
column_conditions = [when(col(c).contains(
    "filter_value"), c).otherwise(None) for c in df.columns]
combined_conditions = array(*column_conditions)

""" "filter_value"文字列を含むカラムの名前とその行番号を持つDataFrame (error_df) を作成します。
"""
error_df = (df_with_line_no
            .withColumn("columns_with_filter", array_filter(combined_conditions, lambda x: x.isNotNull()))
            .filter(F.size(F.col("columns_with_filter")) > 0)
            .select("line_no", "columns_with_filter"))

ただこのようになるとfilter処理を100を超えるファイルカラムに対して都度行うことになり、処理が大変遅くなる。
要件としてはエラー対象の行数とファイルカラム名なので以下のようにしました。

""" 各カラムをカラム名と値のペアとして構造化します
"""
df_with_line_no = df.withColumn(
    "line_no", monotonically_increasing_id()+1)

columns_as_kv_pairs = [F.struct(F.lit(c).alias("column_name"), F.col(c).alias("column_value")) for c in df.columns]

""" explode関数を使用して展開します
"""
exploded_df = df_with_line_no.select("line_no", F.explode(F.array(*columns_as_kv_pairs)).alias("data"))
exploded_df = exploded_df.select("line_no", "data.column_name", "data.column_value")


""" 特定の文字列と完全一致だけフィルタリング
"""
filtered_df = exploded_df.filter(
    df_with_line_no["column_value"] == "filter_value")


explodeを行い、一度分解することで、filterを適用する際に一つのカラムが特定の値かどうかというシンプルな条件でのfitleringが可能になります。

作成されたfiltered_dfをもとに、line_noをdistinctすることでどのファイル行数がエラーになったかを一意に出すこともできますし、元のdataframeの形にしたければdf_with_line_noとfiltered_dfのline_noをjoinさせることで求めることができます。

条件が煩雑になれば煩雑になる程filterの負荷が上がるので部分一致もやめましょう。

当初はcontainsでやってしまっていた。

filtered_df = exploded_df.filter(
    df_with_line_no("column_value").contains("filter_value"))

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html

https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.explode.html

必ずchacheを行う。

まじ大事です。
なぜならばsparkは遅延評価という仕組みを用いており、chacheを行わないとその都度新しいdataframeに対して処理を行おうと試みてしまうからです。

遅延評価による処理速度低下及びメモリオーバーを防ぐために適切なタイミングでのchacheが重要になります。

df_with_line_no.cache()

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.cache.html

結論


まだ改善すべきところとかは山ほどあるのでこの記事も更新し続けたい。
cache大事。

参考文献


https://spark.apache.org/docs/latest/api/python/index.html

https://zenn.dev/johndoe/articles/5a680f0aad7384

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-profile-debug-oom-abnormalities.html

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html

https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.explode.html

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.cache.html

Discussion