SparkSession での JSON 読み込み時の挙動メモ
背景
DataFrame で JSON を読み込む際は DataFrameReader を利用するケースが多いと思う。
実際には以下のように SparkSession の read メソッドを利用して読み込むことになる。
val df = spark.read.json(path)
このように読み込むと勝手に存在するカラムとその型が推論されるので、入力ファイルに含まれる各レコードのスキーマが統一されていることが保証できる場合はこれだけで十分なケースが多い。だが、入力ファイルの各レコードのスキーマがレコードによって異なる場合はもう少し考える必要が出てくる。
また、 option メソッド経由で JSON 固有のオプションを指定することが出来る。指定できるオプションは spark/DataFrameReader.scala at v2.4.3 · apache/spark にある以下の17個。
オプション一覧
primitivesAsString
(デフォルト値は false
)
- すべてのプリミティブ値を StringType として推論する。
prefersDecimal
(デフォルト値は false
)
- すべての浮動小数点値を DecimalType として推論する。
- 値が DecimalType に収まらない場合は double として推論する。
allowComments
(デフォルト値は false
)
- JSON レコード内の Java/C++ スタイルのコメントを無視する。
allowUnquotedFieldNames
(デフォルト値は false
)
- 引用符で囲まれていない JSON フィールド名を許可する。
allowSingleQuotes
(デフォルト値は true
)
- 二重引用符に加えて一重引用符を許可する。
allowNumericLeadingZeros
(デフォルト値は false
)
- 数値の先頭のゼロを許可する(例:00012)。
allowBackslashEscapingAnyCharacter
(デフォルト値は false
)
- バックスラッシュによるエスケープをすべての文字について許可する。
allowUnquotedControlChars
(デフォルト値は false
)
- JSON 文字列がエスケープされていない制御文字(タブや改行文字を含む 32 未満の値を持つ ASCII 文字)を含むことを許可する。
mode
(デフォルト値は PERMISSIVE
)
- パース中に現れた malform なレコードの処理モードを指定する。
-
PERMISSIVE
- malform なレコードに出会ったとき
columnNameOfCorruptRecord
で設定された名前のフィールドに不正な文字列を入れ、他のフィールドを null に設定する。 - malform なレコードを保持するために、ユーザーはユーザー定義のスキーマに
columnNameOfCorruptRecord
で設定された名前の文字列型フィールドを設定することができる。- スキーマがこのフィールドを持っていない場合、パース時に破損レコードを削除する。
- スキーマが推論される場合、出力スキーマに
columnNameOfCorruptRecord
で設定された名前のフィールドを暗黙のうちに追加する。
- malform なレコードに出会ったとき
-
DROPMALFORMED
- malform なレコードをすべて無視する。
-
FAILFAST
- malform なレコードに遭遇した場合、例外を投げる。
columnNameOfCorruptRecord
(デフォルト値は spark.sql.columnNameOfCorruptRecord
で指定された値)
-
PERMISSIVE
モードで作成された不正な文字列が入るフィールドの名前。 - この値は
spark.sql.columnNameOfCorruptRecord
で指定された値より優先される。
dateFormat
(デフォルト値は yyyy-MM-dd
)
- java.text.SimpleDateFormat のフォーマットに従う日付の書式を表す文字列。
- DateType に適用される。
timestampFormat
(デフォルト値は yyyy-MM-dd'T'HH:mm:ss.SSSXXX
)
- java.text.SimpleDateFormat のフォーマットに従うタイムスタンプの書式を示す文字列。
- TimestampType に適用される。
multiLine
(デフォルト値は false
)
- 読み込み対象の各ファイルを複数行に渡る可能性のあるレコードとしてパースする。
encoding
(デフォルトでは未設定)
- JSON ファイルのエンコーディングを標準の基本または拡張エンコーディング(例: UTF-16BE や UTF-32LE など)のいずれかに強制的に設定する。
- エンコーディングが指定されず
multiLine
が true に設定されている場合、自動的に検出される。
lineSep
(デフォルトでは次をすべてカバーする: \r
, \r\n
, \n
)
- 構文解析に使用するレコードの区切り文字。
samplingRatio
(デフォルト値は 1.0
)
- スキーマ推論に使用する入力 JSON オブジェクトの割合。
dropFieldIfAllNull
(デフォルト値は false
)
- スキーマ推論時に、すべてのレコードで NULL 値または空の配列/構造体のカラムを無視するか否か。
spark/JsonParsingOptionsSuite.scala at v2.4.3 · apache/spark や spark/JsonSuite.scala at v2.4.3 · apache/spark にオプションの挙動に関するテストがあるのでこれを見たほうが理解しやすい。
実際の挙動を見つつ、注意したほうが良さそうなポイントについてメモしておく。
フィールドが足りないレコードが存在
基本
存在しないフィールドの値は null として扱われる。
{"x": "str1", "y": 1, "z": true }
{ "y": 2, "z": true }
{"x": "str3", "z": false}
val df = spark.read.json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- x: string (nullable = true)
|-- y: long (nullable = true)
|-- z: boolean (nullable = true)
+----+----+-----+
|x |y |z |
+----+----+-----+
|str1|1 |true |
|null|2 |true |
|str3|null|false|
+----+----+-----+
関連
注意点
上記ではスキーマが未指定だったが、スキーマを指定して nullable=false
としていても、 上記と同様にフィールドが null になるだけでそのレコード自体が drop されたりはしない。また得られた DataFrame のスキーマ上では nullable=true
となる。
val schema = StructType(Seq(
StructField("x", StringType, true),
StructField("y", IntegerType, false),
StructField("z", BooleanType, false)
))
val df = spark.read.schema(schema).json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- x: string (nullable = true)
|-- y: integer (nullable = true)
|-- z: boolean (nullable = true)
+----+----+-----+
|x |y |z |
+----+----+-----+
|str1|1 |true |
|null|2 |true |
|str3|null|false|
+----+----+-----+
フィールドの型がレコードによって異なる
基本
型の推論は compatibleType メソッド(および compatibleRootType メソッド)を利用して行われるらしい。スキーマ推論に関連する部分としては、大まかには以下のようになるようだ。
- 可能な限り元の型を残すように推論される。
- IntegerType と DoubleType は DoubleType として推論される。
- LongType と DoubleType は DoubleType として推論される。
- それ以外の組み合わせの場合、すべて StringType として推論される。
関連
注意点
型の推論というより型変換での注意点だが、 LongType が DoubleType に推論されるので精度が落ちる可能性がある。また、 DoubleType が StringType に変換される場合、指数表記の文字列として読み込まれる場合がある。
{"x": 1, "y": 9223372036854775807, "z": {"key1": 0.00000000001}}
{"x": 2, "y": 2.0, "z": {"key1": "2" }}
{"x": false, "y": 3, "z": {"key2": 3.0 }}
val df = spark.read.json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- x: string (nullable = true)
|-- y: double (nullable = true)
|-- z: struct (nullable = true)
| |-- key1: string (nullable = true)
| |-- key2: double (nullable = true)
+-----+--------------------+----------+
|x |y |z |
+-----+--------------------+----------+
|1 |9.223372036854776E18|[1.0E-11,]|
|2 |2.0 |[2,] |
|false|3.0 |[, 3.0] |
+-----+--------------------+----------+
malform な行が存在
基本
自動的に _corrupt_record
というカラムが追加され、そこに行全体の文字列が入る。
{"date": "2022-01-01", "time": "2022-01-01T00:00:00+09:00"}
{
val df = spark.read.json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- _corrupt_record: string (nullable = true)
|-- date: string (nullable = true)
|-- time: string (nullable = true)
+---------------+----------+-------------------------+
|_corrupt_record|date |time |
+---------------+----------+-------------------------+
|null |2022-01-01|2022-01-01T00:00:00+09:00|
|{ |null |null |
+---------------+----------+-------------------------+
関連
mode
オプションの値が DROPMALFORMED
_corrupt_record
カラムは追加されず、 malform な行は無視される。
val df = spark.read.option("mode", "DROPMALFORMED").json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
+----------+-------------------------+
|date |time |
+----------+-------------------------+
|2022-01-01|2022-01-01T00:00:00+09:00|
+----------+-------------------------+
関連テスト
columnNameOfCorruptRecord
オプションの値が invalid_record
_corrupt_record
の代わりに invalid_record
というカラムが追加され、そこに行全体の文字列が入る。
val df = spark.read.option("columnNameOfCorruptRecord", "invalid_record").json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- date: string (nullable = true)
|-- invalid_record: string (nullable = true)
|-- time: string (nullable = true)
+----------+--------------+-------------------------+
|date |invalid_record|time |
+----------+--------------+-------------------------+
|2022-01-01|null |2022-01-01T00:00:00+09:00|
|null |{ |null |
+----------+--------------+-------------------------+
関連
注意点
指定されたスキーマと異なる型の値のフィールドが存在して型変換が出来ない場合、そのようなレコードも malform と見做されるようだ。
{"x": "str1", "y": "invalid", "z": 1 }
{"x": 2, "y": 2.0, "z": 2 }
{"x": "str3", "y": 3.0, "z": 3 }
{"x": 1.0, "y": 4.0, "z": 4 }
{"x": false, "y": 5.0, "z": 5 }
{"x": ["invalid"], "y": 6.0, "z": 6 }
{"x": "str7", "y": 7, "z": 7 }
{"x": "str8", "y": 8.0, "z": 8.0}
val schema = StructType(Seq(
StructField("_corrupt_record", StringType, true),
StructField("x", StringType, true),
StructField("y", DoubleType, true),
StructField("z", IntegerType, true)
))
val df = spark.read.schema(schema).json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- _corrupt_record: string (nullable = true)
|-- x: string (nullable = true)
|-- y: double (nullable = true)
|-- z: integer (nullable = true)
+---------------------------------------------+-----------+----+----+
|_corrupt_record |x |y |z |
+---------------------------------------------+-----------+----+----+
|{"x": "str1", "y": "invalid", "z": 1 }|null |null|null|
|null |2 |2.0 |2 |
|null |str3 |3.0 |3 |
|null |1.0 |4.0 |4 |
|null |false |5.0 |5 |
|null |["invalid"]|6.0 |6 |
|null |str7 |7.0 |7 |
|{"x": "str8", "y": 8.0, "z": 8.0}|null |null|null|
+---------------------------------------------+-----------+----+----+
行内に複数のレコードを含む
基本
array で括られている場合、各要素をレコードとして扱う。
[{"x": "10000000000000000", "z": "5"}, {"x": "1", "y": 5}]
{"x": "10", "y": 10}
val df = spark.read.json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- x: string (nullable = true)
|-- y: long (nullable = true)
|-- z: string (nullable = true)
+-----------------+----+----+
|x |y |z |
+-----------------+----+----+
|10000000000000000|null|5 |
|1 |5 |null|
|10 |10 |null|
+-----------------+----+----+
関連テスト
注意
行先頭から始まるレコードの終端以降から、行末までの文字列は無視されるらしい。
なので、1行に複数のレコードが羅列されている場合、最初のレコード以外は無視される。
{"x": "1-1"}{"x": "1-2"}
{"x": "2"}extra text 1
extra text 2
root
|-- _corrupt_record: string (nullable = true)
|-- x: string (nullable = true)
+---------------+----+
|_corrupt_record|x |
+---------------+----+
|null |1-1 |
|null |2 |
|extra text 2 |null|
+---------------+----+
仕様非準拠な JSON 文字列の行が存在
基本
allowComments
, allowUnquotedFieldNames
, allowSingleQuotes
, allowNumericLeadingZeros
, allowBackslashEscapingAnyCharacter
, allowUnquotedControlChars
を on にすることで、仕様非準拠な JSON 文字列も受け付けるように出来る。
注意
JSON 仕様上はシングルクォートをダブルクォートのように利用できないが、デフォルトで allowSingleQuotes
が on になっているのでシングルクォートで括った文字列を許容してしまう。
{"x": "str1"}
{"x": "str2" /* comment */}
{'x': 'str3'}
{x: "str4"}
val df = spark.read.json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- _corrupt_record: string (nullable = true)
|-- x: string (nullable = true)
+---------------------------+----+
|_corrupt_record |x |
+---------------------------+----+
|null |str1|
|{"x": "str2" /* comment */}|null|
|null |str3|
|{x: "str4"} |null|
+---------------------------+----+
日付やタイムスタンプっぽい文字列を含むフィールドが存在
基本
スキーマを指定していない場合は、勝手に DateType や TimestampType に変換されることはなく StringType になる。
{"date": "2022-01-01", "time": "2022-01-01T00:00:00+09:00"}
{"date": "2022-01-02", "time": "2022-01-02T00:00:00+09:00"}
val df = spark.read.json("data.json")
df.printSchema()
df.show(truncate = false)
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
+----------+-------------------------+
|date |time |
+----------+-------------------------+
|2022-01-01|2022-01-01T00:00:00+09:00|
|2022-01-02|2022-01-02T00:00:00+09:00|
+----------+-------------------------+
Discussion