🎇

ApacheSparkでJSONを手直ししてからパースする

2022/12/10に公開

概要

例えば下記のようなJSONファイルを読み込み、任意のスキーマに修正した後でパースしたい。
調査したところあまり情報を見かけなかったため、記事にすることにした。

[
  {
    "foo": "0123",
    "bar": "2022-12-10T11:00:00+09:00",
    ...

結論

早速だが下記のコードで実現できる。

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

val df = spark.read.text("改行を含まない、かつ最上位が配列のJSONファイル")

val df2 = df.map{row=>
  val txt = row.getString(0)
  val mapper = new ObjectMapper
  mapper.registerModule(DefaultScalaModule)
  val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
  val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
  Row(tmp2)
}(RowEncoder(StructType(Seq(StructField("jsonStr",StringType)))))

val jsonSchema = schema_of_json(df2.select("jsonStr").as[String].first)
val df3 = df2.withColumn("convert",from_json(col("jsonStr"),jsonSchema)).select("convert.*")

解説

まず、JSONとしてパースせずにプレーンテキストの体で読み込む。
(改行を含まないファイルを想定)

val df = spark.read.text("最上位が配列のJSONファイル")

JSON文字列はデータフレームの各行(Row)に格納されている。
データフレームとはRowの束であり、.mapでRow毎に処理をすることができる。

val df2 = df.map{row=>
  ...  
}

Rowはカラム名が付いた配列データだと考えればイメージしやすいと思う。
Rowから0番目のカラムのデータ(JSON文字列)を取り出し。

val df2 = df.map{row=>
  val txt = row.getString(0)
  ...
}

jacksonでJSON文字列をMap型に変換した後、Mapの構造を任意で修正する。
今回は例として、Mapの最上位にフィールド名を付与した。
修正した後、JSON文字列に戻す。

val df2 = df.map{row=>
  val txt = row.getString(0)
  val mapper = new ObjectMapper
  mapper.registerModule(DefaultScalaModule)
  val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
  val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
  ...
}

出来上がったJSON文字列をRowに入れ、Encoderでデータフレームのスキーマを指定して閉じる。
(Encoderの指定がないと、未指定では処理が不可能だとメッセージが表示される)

val df2 = df.map{row=>
  val txt = row.getString(0)
  val mapper = new ObjectMapper
  mapper.registerModule(DefaultScalaModule)
  // jacksonで文字列をMap型に変換
  val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
  // ここで任意のスキーマに修正する
  // 一例として、最上位にフィールド名を付けた後、JSON文字列に戻す
  val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
  Row(tmp2)
}(RowEncoder(StructType(Seq(StructField("jsonStr",StringType)))))

JSON文字列をfrom_jsonでStructTypeに変換する。

val df2 = df.map{row=>
  val txt = row.getString(0)
  val mapper = new ObjectMapper
  mapper.registerModule(DefaultScalaModule)
  val tmp = mapper.readValue(txt,classOf[Array[Map[String,String]]])
  val tmp2 = mapper.writeValueAsString(Map("topLevelField"->tmp))
  Row(tmp2)
}(RowEncoder(StructType(Seq(StructField("jsonStr",StringType)))))

val jsonSchema = schema_of_json(df2.select("jsonStr").as[String].first)
val df3 = df2.withColumn("convert",from_json(col("jsonStr"),jsonSchema)).select("convert.*")

Discussion