🎇

ApacheSparkでzipファイルに格納されたJSONファイルを読み込む

2022/12/12に公開

概要

提供されたデータがSparkの対応する形式であれば、特に問題なく処理可能だ。
しかしzipで圧縮されていた場合はあらかじめ解凍しておくか、少し工夫する必要がある。
この記事ではzipに格納されたJSONファイルをzipのまま読み込み、データフレーム化する方法を紹介する。

前提条件

ファイルのエンコードがUTF-8であること
zipファイルに格納されたJSONファイルのスキーマが全て同一であること
※今回使用したzipファイル

target
├── foo.zip
│   ├── test1.json
│   └── test2.json
└── bar.zip
    ├── test1.json
    └── test2.json
# JSONファイルの中身
# 改行コードはLF
$ cat test1.json
{"_1":"aaa","_2":"111"}
{"_1":"bbb","_2":"222"}

コード全体

解説は次項目をご覧ください。

import java.io._
import java.nio._
import java.util.zip.ZipEntry
import java.util.zip.ZipInputStream
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

lazy val zipFileCharset = java.nio.charset.Charset.forName("UTF8")
lazy val byteArrayCharset = java.nio.charset.StandardCharsets.UTF_8

val df = spark.read.format("binaryFile").option("pathGlobFilter","*.zip").load("./target")

val df2 = df.map{row=>
    val ba = row.getAs[Array[Byte]]("content")
    val baInStream = new ByteArrayInputStream(ba)
    val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
    val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
      entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
    }.filter(_._1.endsWith(".json"))
    val elemsString = elemsByteBuffer.map{bb => byteArrayCharset.decode(bb._2).toString.stripMargin}
    Row(elemsString.flatMap(_.split("""\n""")).toSeq)
  }(RowEncoder(StructType(Seq(StructField("extract",ArrayType(StringType))))))
    
val df3 = df2.withColumn("tmp",explode(col("extract")))
  
val s=schema_of_json(df3.select("tmp").as[String].first)
val df4 = df3.withColumn("tmp",from_json(col("tmp"),s)).select("tmp.*")

解説

まず、targetディレクトリ配下の*.zipを全て取得する。

lazy val zipFileCharset = java.nio.charset.Charset.forName("UTF8")
lazy val byteArrayCharset = java.nio.charset.StandardCharsets.UTF_8

val df = spark.read.format("binaryFile").option("pathGlobFilter","*.zip").load("./target")

生成されたデータフレームは下記のスキーマになっている。

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)

上記のデータフレームから実データ(content)を取得し、ZipInputStreamとして読み込む。

val df2 = df.map{row=>
    val ba = row.getAs[Array[Byte]]("content")
    val baInStream = new ByteArrayInputStream(ba)
    val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
    ...
  }

ファイル名の末尾が.jsonではないファイルを除外する。

  val df2 = df.map{row=>
    val ba = row.getAs[Array[Byte]]("content")
    val baInStream = new ByteArrayInputStream(ba)
    val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
    val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
      entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
    }.filter(_._1.endsWith(".json"))
    ...
  }

ZipInputStreamをデコードし、Stringにする。

  val df2 = df.map{row=>
    val ba = row.getAs[Array[Byte]]("content")
    val baInStream = new ByteArrayInputStream(ba)
    val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
    val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
      entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
    }.filter(_._1.endsWith(".json"))
    val elemsString = elemsByteBuffer.map{bb => byteArrayCharset.decode(bb._2).toString.stripMargin}
    ...
  }

今回のJSONファイルは改行区切り(ndjson)になっているため、改行コードでsplitする。
その後スキーマを付与してデータフレームを閉じる。

  val df2 = df.map{row=>
    val ba = row.getAs[Array[Byte]]("content")
    val baInStream = new ByteArrayInputStream(ba)
    val zipInStream = new ZipInputStream(baInStream, zipFileCharset)
    val elemsByteBuffer = Iterator.continually(zipInStream.getNextEntry).takeWhile(_!=null).map{entry=>
      entry.getName -> ByteBuffer.wrap(IOUtils.toByteArray(zipInStream))
    }.filter(_._1.endsWith(".json"))
    val elemsString = elemsByteBuffer.map{bb => byteArrayCharset.decode(bb._2).toString.stripMargin}
    Row(elemsString.flatMap(_.split("""\n""")).toSeq)
  }(RowEncoder(StructType(Seq(StructField("extract",ArrayType(StringType))))))

先ほど作った配列を展開し、JSON文字列をパースする。
下記の例ではパース結果をtmpに格納している。

val df3 = df2.withColumn("tmp",explode(col("extract")))
  
val s=schema_of_json(df3.select("tmp").as[String].first)
val df4 = df3.withColumn("tmp",from_json(col("tmp"),s)).select("tmp.*")

参考(df.showの結果)

scala> df.show
+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/home/hoge...  |2022-12-11 23:32:...|  1006|[50 4B 03 04 0A 0...|
|file:/home/hoge...  |2022-12-11 23:32:...|  1004|[50 4B 03 04 0A 0...|
+--------------------+--------------------+------+--------------------+
scala> df2.show(false)
+--------------------------------------------------------------------------------------------------------+
|extract                                                                                                 |
+--------------------------------------------------------------------------------------------------------+
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}]    |
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|
+--------------------------------------------------------------------------------------------------------+
scala> df3.show(false)
+--------------------------------------------------------------------------------------------------------+------------------------+
|extract                                                                                                 |tmp                     |
+--------------------------------------------------------------------------------------------------------+------------------------+
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}]    |{"_1":"aaa","_2":"111"} |
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}]    |{"_1":"bbb","_2":"222"} |
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}]    |{"_1":"ccc","_2":"111"} |
|[{"_1":"aaa","_2":"111"}, {"_1":"bbb","_2":"222"}, {"_1":"ccc","_2":"111"}, {"_1":"ddd","_2":"222"}]    |{"_1":"ddd","_2":"222"} |
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"DDD","_2":"1000"}|
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"EEE","_2":"2000"}|
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"ZZZ","_2":"3000"}|
|[{"_1":"DDD","_2":"1000"}, {"_1":"EEE","_2":"2000"}, {"_1":"ZZZ","_2":"3000"}, {"_1":"zzz","_2":"4000"}]|{"_1":"zzz","_2":"4000"}|
+--------------------------------------------------------------------------------------------------------+------------------------+
scala> df4.show(false)
+---+----+
|_1 |_2  |
+---+----+
|aaa|111 |
|bbb|222 |
|ccc|111 |
|ddd|222 |
|DDD|1000|
|EEE|2000|
|ZZZ|3000|
|zzz|4000|
+---+----+

Discussion