Amazon AppFlowその2: JSONの読み込みとJSON Lines / Nested JSON
前回以下の記事でAmazon AppFlowを用いてS3バケットの中に存在しているCSVファイルをTiDB Serverlessに投入する手順をまとめました。
今日はJSONを試してみます。
さっそくやってみる
まずはこの手順を終わらせたところから作業を始めます。json
フォルダに格納します。バケット直下では動作しませんので気を付けてください。
{"column1": "value1", "column2": "value2"}
{"column1": "value3", "column2": "value4"}
{"column1": "value5", "column2": "value6"}
{"column1": "value7", "column2": "value8"}
JSON Lines
普段JSONを利用されている方からするとこれはJSONではないことに気づくと思います。これはJSON Linesと言われるものです。AppFlowはJSONではデータを読み込めません。JSON Linesとは複数レコードのデータをJSONで処理する際に使われるもので、1行1JSONのレコードが複数行は言っていることが特徴です。
複数データを扱う際この形式にはいくつかのメリットがあります。
1.データの追加、削除が容易:1行1JSONですから、データの追加は単純にJSONを1行追加、削除は1行JSONを削除すればOKです
2.Validationに要求されるメモリが少ない:1つのJSONに複数行のレコードを持つ以下のような場合、データのバリデーションは全て読み込みが終わらないといけません。一方JSON Lines であれば1行ずつ必要な分だけValidationを行えば済むため、データを一部だけ処理する際などの高速化が期待できます。
[
{
"column1": "value1",
"column2": "value2"
},
{
"column1": "value3",
"column2": "value4"
},
{
"column1": "value5",
"column2": "value6"
},
{
"column1": "value7",
"column2": "value8"
}
]
次にAppFlowでフローを作成
ボタンをクリックします。
適当な名前を入力し次へ
をクリックします。
送信元にJSONが格納されているS3バケットとフォルダを指定します。
前回使用したコネクタ
、接続
、オブジェクト
を選択し次へ
をクリックします。
以下のようにマッピングを行い次へ
を2回クリックし最後にフローを作成
をクリックします。
フローが作成されたらフローの実行
をクリックします。
暫く待つと以下の通りTiDB Serverlessにデータが投入されました。
Nested JSON の場合
ではデータを以下のようにネストされたJSONに変えてみます。
{"column1": "value1", "column2": {"id": 1, "age": 25}}
{"column1": "value3", "column2": {"id": 2, "age": 30}}
{"column1": "value5", "column2": {"id": 3, "age": 22}}
{"column1": "value7", "column2": {"id": 4, "age": 28}}
再度実行するとcolumn2
はnullで投入されています。AppFlowはネストされたJSONは取り扱えずフラット化が必要なようです。フラット化というのは以下のようなJSONに変更する作業のことを言います。
{"column1": "value1", "column2_id": 1, "column2_age": 25}
{"column1": "value3", "column2_id": 2, "column2_age": 30}
例えばこのJSONであれば以下のPythonコードでフラット化を行うことが可能です。
import json
# JSONL形式のデータ
json_data = [
{"column1": "value1", "column2": {"id": 1, "age": 25}},
{"column1": "value3", "column2": {"id": 2, "age": 30}},
{"column1": "value5", "column2": {"id": 3, "age": 22}},
{"column1": "value7", "column2": {"id": 4, "age": 28}}
]
# フラット化する関数
def flatten_json(data):
for item in data:
# column2のネストされた部分を展開
flat_item = {
"column1": item["column1"],
"column2_id": item["column2"]["id"],
"column2_age": item["column2"]["age"]
}
# フラット化されたデータを1行ずつJSON Lines形式で出力
print(json.dumps(flat_item))
# フラット化実行
flatten_json(json_data)
Discussion