【Cloud Data Fusion】ZIPファイルの取り込み時の苦戦とその解決方法
はじめに
Cloud Data Fusion (以降、CDF)[1] を本格的に利用し始めて8ヶ月が経とうとしています。
だいぶ使い慣れてきましたので、少しずつナレッジを公開していこうと思います^^
今回のテーマ
私は様々なログデータをBigQueryに格納するためにCDFを利用しています。
オンプレミス環境のデータを分析に利用しようとすると
Windows環境上にZIPファイルでアーカイブされていたりすることがあります。
今回はCloud Storage (以降、GCS) に配置された比較的大きいZIPファイルを解凍し
BigQueryに取り込むためのCDF設定方法について紹介したいと思います。
本対象範囲
何にハマったのか
CDFには標準で Field Decompressor [2]というプラグインがあります。
Transformカテゴリの1つで取り込んだデータの解凍処理を行うプラグインです。
使用例は過去投稿したブログも参考にしていただければと思います。
さて、どんな苦労があったのか、、
このプラグインを利用していて、パイプラインが失敗する事象に遭遇することになりました。
具体的には、以下のログがCDFで出力されていました。(ログが読みにくいのがキツい...)
2021-12-09 09:23:56,040 - ERROR [Executor task launch worker for task 0:o.a.s.u.Utils@91] - Aborting task
java.lang.OutOfMemoryError: null
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) ~[na:1.8.0_275]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) ~[na:1.8.0_275]
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.8.0_275]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[na:1.8.0_275]
at io.cdap.plugin.Decompressor.unzip(Decompressor.java:257) ~[1631179419462-0/:na]
at io.cdap.plugin.Decompressor.transform(Decompressor.java:202) ~[1631179419462-0/:na]
at io.cdap.plugin.Decompressor.transform(Decompressor.java:55) ~[1631179419462-0/:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.4.1.jar:na]
at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.4.1.jar:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.4.1.jar:na]
at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.4.1.jar:na]
at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:51) ~[hydrator-spark-core2_2.11-6.4.1.jar:na]
at io.cdap.cdap.etl.spark.Compat$FlatMapAdapter.call(Compat.java:126) ~[hydrator-spark-core2_2.11-6.4.1.jar:na]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.4.jar:2.3.4]
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) ~[scala-library-2.11.8.jar:na]
CDFでは解凍後2GBを超えるZIPファイルを処理できないのか、、と途方に暮れてました。
ところが、HUBでとあるプラグインを追加することで本事象は解消できることがわかりました。
今回紹介するプラグイン
下記のプラグインの使い方を紹介します。
これらは、いずれもActionカテゴリのプラグインになります。
Actionプラグインはパイプラインの途中に挟むことができません。
パイプラインの最初(事前処理)もしくは最後(事後処理)に利用します。
利用環境
Product | version |
---|---|
CDAP | 6.4.1 |
Decompress Action Plugin | 1.2.0 |
GCS Delete Action Plugin | 0.17.3 |
GCS Move Action Plugin | 0.17.3 |
Region | asia-northeast1 |
【構成図】
・ CDFはバッチとストリーミングの両方に対応しています。(今回は前者になります)
パイプライン構成
【補足】
・以下、GCSフォルダの用途になります。
Uploadフォルダ: オンプレミス環境からZIPファイルが配置されます。
Tempフォルダ: 解凍されたCSVファイルが配置されます。
Doneフォルダ: BigQueryに取り込みが終わった処理済みZIPファイルが配置されます。
Decompress Action プラグインとは
今回紹介したい一番重要なプラグインになります。
GCSに配置されたZIPファイルを解凍し、解凍後のファイルをGCSに配置します。
Field Decompressorのように解凍できるファイルのサイズに上限はありません。
(未検証ですが、HDFSなども指定できるようです)
以下、UploadフォルダのZIPファイルの解凍結果をTempフォルダに配置する設定になります。
【設定項目】
Source Path | Destination Path | File Regular Expression | Archived or Compressed | Continue Processing If There Are Errors? |
---|---|---|---|---|
gs://<Bucket> /Upload/ |
gs://<Bucket> /Temp/ |
.*.zip | Archived | false (Default) |
以下、CDFの設定画面になります。
Decompress Action設定画面
対応しているファイル形式は、Apache Commons Compressに依存しています。
今回の.zipや.tarはアーカイブファイルになりますので、Archivedを指定すれば処理してくれます。
処理したいファイル形式に応じて選択してください。
Archived or Compressedの選択肢
また公式ドキュメントにも記載されていますが、example.zip
というファイルを
Tempフォルダに解凍するとgs://Bucket/Temp/example/example.csv
のようなパスになります。
GCS Source プラグインで解凍後のファイルを取得する際は
Read Files RecursivelyをTrue
に設定するのを忘れないでください。
そうすることでPathで指定したパス配下を再帰的にファイル探索するようになります。
【補足】
・ Decompress Action プラグインはHUBからインストールする必要があります。
GCS Delete Action プラグインとは
Decompress Action プラグインで解凍したファイルは
パイプライン処理が全部終わっても勝手には削除されません。
よって、実行のたびにTempフォルダに解凍後のファイルが溜まってしまいます。
GCS Delete Action プラグインでは、パイプライン実行後にファイルを削除してくれます。
Objects to Delete |
---|
gs://<Bucket> /Temp/ |
以下、CDFの設定画面になります。
GCS Delete Action設定画面
GCS Move Action プラグインとは
最後に処理が終わったファイルをUploadフォルダからDoneフォルダに移動します。
Uploadフォルダに処理済みのファイルが置きっぱなしになっていると
ジョブ実行のたびに同じファイルを重複して取り込んでしまう恐れがあります。
そのため、GCS Move Action プラグインを使ってファイルを移動します。
Source Path | Destination Path | Move All Subdirectories | Overwrite Existing Files |
---|---|---|---|
gs://<Bucket> /Upload/ |
gs://<Bucket> /Done/ |
True | False (Default) |
以下、CDFの設定画面になります。
GCS Move Action設定画面
まとめ
さて、いかがでしたでしょうか?
最終的にはこのようなパイプラインになります。
実は、副次的な効果もありました。
Field Decompressorプラグインの場合、GCS Sourceプラグインで取り込むファイルが
ZIP形式であったため、Dataprocでの分散処理がファイル単位となってしまいました。
しかし、Decompress Action プラグインで事前にZIPファイルを解凍したことで
GCS Sourceプラグインで取り込むファイルがCSV形式となりました。
そのため、デフォルトで最大128MBにデータのスプリット(分割)が可能になりました。
よりDataprocで分散できるようになり、処理性能が向上しました。[6]
まだまだ活用できていないデータが多く存在していることと思います。
本投稿が皆様の業務の役に立てると幸いです^^
Cloud Data Fusion 関連投稿
- 本格的なパイプライン作ってみた!
- プライベートインスタンスでS3からデータ収集する方法
- インスタンス作成でどハマりした話
- GCSバケット消えない問題
- パイプライン実行エラー検知方法
- パイプライン失敗時のSlack通知方法
Discussion