⚙️

【Cloud Data Fusion】ZIPファイルの取り込み時の苦戦とその解決方法

2021/12/13に公開

はじめに

Cloud Data Fusion (以降、CDF)[1] を本格的に利用し始めて8ヶ月が経とうとしています。
だいぶ使い慣れてきましたので、少しずつナレッジを公開していこうと思います^^

今回のテーマ

私は様々なログデータをBigQueryに格納するためにCDFを利用しています。

オンプレミス環境のデータを分析に利用しようとすると
Windows環境上にZIPファイルでアーカイブされていたりすることがあります。

今回はCloud Storage (以降、GCS) に配置された比較的大きいZIPファイルを解凍し
BigQueryに取り込むためのCDF設定方法について紹介したいと思います。


本対象範囲

何にハマったのか

CDFには標準で Field Decompressor [2]というプラグインがあります。
Transformカテゴリの1つで取り込んだデータの解凍処理を行うプラグインです。

使用例は過去投稿したブログも参考にしていただければと思います。
https://zenn.dev/hssh2_bin/articles/478e6a41e103c7#4.-zip解凍処理

さて、どんな苦労があったのか、、
このプラグインを利用していて、パイプラインが失敗する事象に遭遇することになりました。
具体的には、以下のログがCDFで出力されていました。(ログが読みにくいのがキツい...)

サンプルログ
2021-12-09 09:23:56,040 - ERROR [Executor task launch worker for task 0:o.a.s.u.Utils@91] - Aborting task
java.lang.OutOfMemoryError: null
	at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) ~[na:1.8.0_275]
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) ~[na:1.8.0_275]
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.8.0_275]
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[na:1.8.0_275]
	at io.cdap.plugin.Decompressor.unzip(Decompressor.java:257) ~[1631179419462-0/:na]
	at io.cdap.plugin.Decompressor.transform(Decompressor.java:202) ~[1631179419462-0/:na]
	at io.cdap.plugin.Decompressor.transform(Decompressor.java:55) ~[1631179419462-0/:na]
	at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.4.1.jar:na]
	at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.4.1.jar:na]
	at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.4.1.jar:na]
	at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.4.1.jar:na]
	at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:51) ~[hydrator-spark-core2_2.11-6.4.1.jar:na]
	at io.cdap.cdap.etl.spark.Compat$FlatMapAdapter.call(Compat.java:126) ~[hydrator-spark-core2_2.11-6.4.1.jar:na]
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.4.jar:2.3.4]
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.4.jar:2.3.4]
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) ~[scala-library-2.11.8.jar:na]

CDFでは解凍後2GBを超えるZIPファイルを処理できないのか、、と途方に暮れてました。
ところが、HUBでとあるプラグインを追加することで本事象は解消できることがわかりました。

今回紹介するプラグイン

下記のプラグインの使い方を紹介します。

  1. Decompress Action プラグイン [3]
  2. GCS Delete Action プラグイン [4]
  3. GCS Move Action プラグイン [5]

これらは、いずれもActionカテゴリのプラグインになります。

Actionプラグインはパイプラインの途中に挟むことができません。
パイプラインの最初(事前処理)もしくは最後(事後処理)に利用します。

利用環境

Product version
CDAP 6.4.1
Decompress Action Plugin 1.2.0
GCS Delete Action Plugin 0.17.3
GCS Move Action Plugin 0.17.3
Region asia-northeast1

【構成図】
・ CDFはバッチとストリーミングの両方に対応しています。(今回は前者になります)


パイプライン構成

【補足】
・以下、GCSフォルダの用途になります。
 Uploadフォルダ: オンプレミス環境からZIPファイルが配置されます。
 Tempフォルダ: 解凍されたCSVファイルが配置されます。
 Doneフォルダ: BigQueryに取り込みが終わった処理済みZIPファイルが配置されます。

Decompress Action プラグインとは

今回紹介したい一番重要なプラグインになります。

GCSに配置されたZIPファイルを解凍し、解凍後のファイルをGCSに配置します。
Field Decompressorのように解凍できるファイルのサイズに上限はありません。
(未検証ですが、HDFSなども指定できるようです)

以下、UploadフォルダのZIPファイルの解凍結果をTempフォルダに配置する設定になります。

【設定項目】

Source Path Destination Path File Regular Expression Archived or Compressed Continue Processing If There Are Errors?
gs://<Bucket>/Upload/ gs://<Bucket>/Temp/ .*.zip Archived false (Default)

以下、CDFの設定画面になります。


Decompress Action設定画面

対応しているファイル形式は、Apache Commons Compressに依存しています。
今回の.zipや.tarはアーカイブファイルになりますので、Archivedを指定すれば処理してくれます。
処理したいファイル形式に応じて選択してください。


Archived or Compressedの選択肢

また公式ドキュメントにも記載されていますが、example.zipというファイルを
Tempフォルダに解凍するとgs://Bucket/Temp/example/example.csvのようなパスになります。

GCS Source プラグインで解凍後のファイルを取得する際は
Read Files RecursivelyTrueに設定するのを忘れないでください。

そうすることでPathで指定したパス配下を再帰的にファイル探索するようになります。

【補足】
・ Decompress Action プラグインはHUBからインストールする必要があります。

GCS Delete Action プラグインとは

Decompress Action プラグインで解凍したファイルは
パイプライン処理が全部終わっても勝手には削除されません。

よって、実行のたびにTempフォルダに解凍後のファイルが溜まってしまいます。
GCS Delete Action プラグインでは、パイプライン実行後にファイルを削除してくれます。

Objects to Delete
gs://<Bucket>/Temp/

以下、CDFの設定画面になります。


GCS Delete Action設定画面

GCS Move Action プラグインとは

最後に処理が終わったファイルをUploadフォルダからDoneフォルダに移動します。

Uploadフォルダに処理済みのファイルが置きっぱなしになっていると
ジョブ実行のたびに同じファイルを重複して取り込んでしまう恐れがあります。
そのため、GCS Move Action プラグインを使ってファイルを移動します。

Source Path Destination Path Move All Subdirectories Overwrite Existing Files
gs://<Bucket>/Upload/ gs://<Bucket>/Done/ True False (Default)

以下、CDFの設定画面になります。


GCS Move Action設定画面

まとめ

さて、いかがでしたでしょうか?

最終的にはこのようなパイプラインになります。

実は、副次的な効果もありました。
Field Decompressorプラグインの場合、GCS Sourceプラグインで取り込むファイルが
ZIP形式であったため、Dataprocでの分散処理がファイル単位となってしまいました。

しかし、Decompress Action プラグインで事前にZIPファイルを解凍したことで
GCS Sourceプラグインで取り込むファイルがCSV形式となりました。

そのため、デフォルトで最大128MBにデータのスプリット(分割)が可能になりました。
よりDataprocで分散できるようになり、処理性能が向上しました。[6]

まだまだ活用できていないデータが多く存在していることと思います。
本投稿が皆様の業務の役に立てると幸いです^^

Cloud Data Fusion 関連投稿

脚注
  1. Cloud Data Fusionの公式ページ ↩︎

  2. Field Decompressor Transformation ↩︎

  3. Decompress Action ↩︎

  4. Google Cloud Storage Delete Action ↩︎

  5. Google Cloud Storage Move Action ↩︎

  6. Data Pipeline Performance Tuning Guide ↩︎

Discussion