🎇
ApacheSparkでzipファイルに格納されたJSONファイルを読み込む
概要
提供されたデータがSparkの対応する形式であれば、特に問題なく処理可能だ。
しかしzipで圧縮されていた場合はあらかじめ解凍しておくか、少し工夫する必要がある。
この記事ではzipに格納されたJSONファイルをzipのまま読み込み、データフレーム化する方法を紹介する。
前提条件
ファイルのエンコードがUTF-8であること
zipファイルに格納されたJSONファイルのスキーマが全て同一であること
※今回使用したzipファイル
target
├── foo.zip
│ ├── test1.json
│ └── test2.json
└── bar.zip
├── test1.json
└── test2.json
# JSONファイルの中身
# 改行コードはLF
$ cat test1.json
{"_1":"aaa","_2":"111"}
{"_1":"bbb","_2":"222"}
コード全体
解説は次項目をご覧ください。
import java.io._
import java.nio._
import java.util.zip.ZipEntry
import java.util.zip.ZipInputStream
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
lazy val zipFileCharset = java.nio.charset.Charset.forName("UTF8")
lazy val byteArrayCharset = java.nio.charset.StandardCharsets.UTF_8
val df = spark.read.format("binaryFile").option("pathGlobFilter","*.zip").load("./target")
val df2 = df.map{row=>
val ba = row.getAs[Array[Byte]]("content")
val baInStream = new ByteArrayInputStream(ba)
val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
}.filter(_._1.endsWith(".json"))
val elemsString = elemsByteBuffer.map{bb => byteArrayCharset.decode(bb._2).toString.stripMargin}
Row(elemsString.flatMap(_.split("""\n""")).toSeq)
}(RowEncoder(StructType(Seq(StructField("extract",ArrayType(StringType))))))
val df3 = df2.withColumn("tmp",explode(col("extract")))
val s=schema_of_json(df3.select("tmp").as[String].first)
val df4 = df3.withColumn("tmp",from_json(col("tmp"),s)).select("tmp.*")
解説
まず、targetディレクトリ配下の*.zipを全て取得する。
lazy val zipFileCharset = java.nio.charset.Charset.forName("UTF8")
lazy val byteArrayCharset = java.nio.charset.StandardCharsets.UTF_8
val df = spark.read.format("binaryFile").option("pathGlobFilter","*.zip").load("./target")
生成されたデータフレームは下記のスキーマになっている。
root
|-- path: string (nullable = true)
|-- modificationTime: timestamp (nullable = true)
|-- length: long (nullable = true)
|-- content: binary (nullable = true)
上記のデータフレームから実データ(content)を取得し、ZipInputStreamとして読み込む。
val df2 = df.map{row=>
val ba = row.getAs[Array[Byte]]("content")
val baInStream = new ByteArrayInputStream(ba)
val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
...
}
ファイル名の末尾が.jsonではないファイルを除外する。
val df2 = df.map{row=>
val ba = row.getAs[Array[Byte]]("content")
val baInStream = new ByteArrayInputStream(ba)
val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
}.filter(_._1.endsWith(".json"))
...
}
ZipInputStreamをデコードし、Stringにする。
val df2 = df.map{row=>
val ba = row.getAs[Array[Byte]]("content")
val baInStream = new ByteArrayInputStream(ba)
val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
}.filter(_._1.endsWith(".json"))
val elemsString = elemsByteBuffer.map{bb => byteArrayCharset.decode(bb._2).toString.stripMargin}
...
}
今回のJSONファイルは改行区切り(ndjson)になっているため、改行コードでsplitする。
その後スキーマを付与してデータフレームを閉じる。
val df2 = df.map{row=>
val ba = row.getAs[Array[Byte]]("content")
val baInStream = new ByteArrayInputStream(ba)
val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
}.filter(_._1.endsWith(".json"))
val elemsString = elemsByteBuffer.map{bb => byteArrayCharset.decode(bb._2).toString.stripMargin}
Row(elemsString.flatMap(_.split("""\n""")).toSeq)
}(RowEncoder(StructType(Seq(StructField("extract",ArrayType(StringType))))))
先ほど作った配列を展開し、JSON文字列をパースする。
下記の例ではパース結果をtmpに格納している。
val df3 = df2.withColumn("tmp",explode(col("extract")))
val s=schema_of_json(df3.select("tmp").as[String].first)
val df4 = df3.withColumn("tmp",from_json(col("tmp"),s)).select("tmp.*")
参考(df.showの結果)
scala> df.show
+--------------------+--------------------+------+--------------------+
| path| modificationTime|length| content|
+--------------------+--------------------+------+--------------------+
|file:/home/hoge... |2022-12-11 23:32:...| 1006|[50 4B 03 04 0A 0...|
|file:/home/hoge... |2022-12-11 23:32:...| 1004|[50 4B 03 04 0A 0...|
+--------------------+--------------------+------+--------------------+
scala> df2.show(false)
+--------------------------------------------------------------------------------------------------------+
|extract |
+--------------------------------------------------------------------------------------------------------+
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}] |
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|
+--------------------------------------------------------------------------------------------------------+
scala> df3.show(false)
+--------------------------------------------------------------------------------------------------------+------------------------+
|extract |tmp |
+--------------------------------------------------------------------------------------------------------+------------------------+
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}] |{"_1":"aaa","_2":"111"} |
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}] |{"_1":"bbb","_2":"222"} |
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}] |{"_1":"ccc","_2":"111"} |
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}] |{"_1":"ddd","_2":"222"} |
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"DDD","_2":"1000"}|
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"EEE","_2":"2000"}|
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"ZZZ","_2":"3000"}|
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"zzz","_2":"4000"}|
+--------------------------------------------------------------------------------------------------------+------------------------+
scala> df4.show(false)
+---+----+
|_1 |_2 |
+---+----+
|aaa|111 |
|bbb|222 |
|ccc|111 |
|ddd|222 |
|DDD|1000|
|EEE|2000|
|ZZZ|3000|
|zzz|4000|
+---+----+
Discussion