🦔

Amazon AppFlowその2: JSONの読み込みとJSON Lines / Nested JSON

2024/10/05に公開

前回以下の記事でAmazon AppFlowを用いてS3バケットの中に存在しているCSVファイルをTiDB Serverlessに投入する手順をまとめました。
https://zenn.dev/kameping/articles/8284c8e159ec52

今日はJSONを試してみます。

さっそくやってみる

まずはこの手順を終わらせたところから作業を始めます。
https://zenn.dev/kameping/articles/8284c8e159ec52
以下のJSONをS3バケットのjsonフォルダに格納します。バケット直下では動作しませんので気を付けてください。

{"column1": "value1", "column2": "value2"}
{"column1": "value3", "column2": "value4"}
{"column1": "value5", "column2": "value6"}
{"column1": "value7", "column2": "value8"}

JSON Lines

普段JSONを利用されている方からするとこれはJSONではないことに気づくと思います。これはJSON Linesと言われるものです。AppFlowはJSONではデータを読み込めません。JSON Linesとは複数レコードのデータをJSONで処理する際に使われるもので、1行1JSONのレコードが複数行は言っていることが特徴です。

複数データを扱う際この形式にはいくつかのメリットがあります。
1.データの追加、削除が容易:1行1JSONですから、データの追加は単純にJSONを1行追加、削除は1行JSONを削除すればOKです
2.Validationに要求されるメモリが少ない:1つのJSONに複数行のレコードを持つ以下のような場合、データのバリデーションは全て読み込みが終わらないといけません。一方JSON Lines であれば1行ずつ必要な分だけValidationを行えば済むため、データを一部だけ処理する際などの高速化が期待できます。

[
  {
    "column1": "value1",
    "column2": "value2"
  },
  {
    "column1": "value3",
    "column2": "value4"
  },
  {
    "column1": "value5",
    "column2": "value6"
  },
  {
    "column1": "value7",
    "column2": "value8"
  }
]

次にAppFlowでフローを作成ボタンをクリックします。

適当な名前を入力し次へをクリックします。
送信元にJSONが格納されているS3バケットとフォルダを指定します。

前回使用したコネクタ接続オブジェクトを選択し次へをクリックします。

以下のようにマッピングを行い次へを2回クリックし最後にフローを作成をクリックします。

フローが作成されたらフローの実行をクリックします。
暫く待つと以下の通りTiDB Serverlessにデータが投入されました。

Nested JSON の場合

ではデータを以下のようにネストされたJSONに変えてみます。

{"column1": "value1", "column2": {"id": 1, "age": 25}}
{"column1": "value3", "column2": {"id": 2, "age": 30}}
{"column1": "value5", "column2": {"id": 3, "age": 22}}
{"column1": "value7", "column2": {"id": 4, "age": 28}}

再度実行するとcolumn2はnullで投入されています。AppFlowはネストされたJSONは取り扱えずフラット化が必要なようです。フラット化というのは以下のようなJSONに変更する作業のことを言います。

{"column1": "value1", "column2_id": 1, "column2_age": 25}
{"column1": "value3", "column2_id": 2, "column2_age": 30}

例えばこのJSONであれば以下のPythonコードでフラット化を行うことが可能です。

import json

# JSONL形式のデータ
json_data = [
    {"column1": "value1", "column2": {"id": 1, "age": 25}},
    {"column1": "value3", "column2": {"id": 2, "age": 30}},
    {"column1": "value5", "column2": {"id": 3, "age": 22}},
    {"column1": "value7", "column2": {"id": 4, "age": 28}}
]

# フラット化する関数
def flatten_json(data):
    for item in data:
        # column2のネストされた部分を展開
        flat_item = {
            "column1": item["column1"],
            "column2_id": item["column2"]["id"],
            "column2_age": item["column2"]["age"]
        }
        # フラット化されたデータを1行ずつJSON Lines形式で出力
        print(json.dumps(flat_item))

# フラット化実行
flatten_json(json_data)

Discussion