🎇
ApacheSparkでJSONを手直ししてからパースする
概要
例えば下記のようなJSONファイルを読み込み、任意のスキーマに修正した後でパースしたい。
調査したところあまり情報を見かけなかったため、記事にすることにした。
[
{
"foo": "0123",
"bar": "2022-12-10T11:00:00+09:00",
...
結論
早速だが下記のコードで実現できる。
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
val df = spark.read.text("改行を含まない、かつ最上位が配列のJSONファイル")
val df2 = df.map{row=>
val txt = row.getString(0)
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
Row(tmp2)
}(RowEncoder(StructType(Seq(StructField("jsonStr",StringType)))))
val jsonSchema = schema_of_json(df2.select("jsonStr").as[String].first)
val df3 = df2.withColumn("convert",from_json(col("jsonStr"),jsonSchema)).select("convert.*")
解説
まず、JSONとしてパースせずにプレーンテキストの体で読み込む。
(改行を含まないファイルを想定)
val df = spark.read.text("最上位が配列のJSONファイル")
JSON文字列はデータフレームの各行(Row)に格納されている。
データフレームとはRowの束であり、.mapでRow毎に処理をすることができる。
val df2 = df.map{row=>
...
}
Rowはカラム名が付いた配列データだと考えればイメージしやすいと思う。
Rowから0番目のカラムのデータ(JSON文字列)を取り出し。
val df2 = df.map{row=>
val txt = row.getString(0)
...
}
jacksonでJSON文字列をMap型に変換した後、Mapの構造を任意で修正する。
今回は例として、Mapの最上位にフィールド名を付与した。
修正した後、JSON文字列に戻す。
val df2 = df.map{row=>
val txt = row.getString(0)
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
...
}
出来上がったJSON文字列をRowに入れ、Encoderでデータフレームのスキーマを指定して閉じる。
(Encoderの指定がないと、未指定では処理が不可能だとメッセージが表示される)
val df2 = df.map{row=>
val txt = row.getString(0)
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
// jacksonで文字列をMap型に変換
val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
// ここで任意のスキーマに修正する
// 一例として、最上位にフィールド名を付けた後、JSON文字列に戻す
val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
Row(tmp2)
}(RowEncoder(StructType(Seq(StructField("jsonStr",StringType)))))
JSON文字列をfrom_jsonでStructTypeに変換する。
val df2 = df.map{row=>
val txt = row.getString(0)
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
Row(tmp2)
}(RowEncoder(StructType(Seq(StructField("jsonStr",StringType)))))
val jsonSchema = schema_of_json(df2.select("jsonStr").as[String].first)
val df3 = df2.withColumn("convert",from_json(col("jsonStr"),jsonSchema)).select("convert.*")
Discussion