Open7

APIからXMLファイルを取得してBigqueryに格納する

YuichiYuichi

サンプルでXMLファイルを取得できるAPIを調べる

https://qiita.com/nownabe/items/aeac1ce0977be963a740

  • 無料天気予報APIのOpenWeatherMapを使ってみる
  • 無料のFreeプランだと5日後までの3時間毎の天気予報を取得できて、1分間に60回までAPIコールできます。月180ドルのDeveloperプランでは3000 calls/分までOKで、16日後までの天気予報を取得することができます。
  • 登録したら、使えるまで数時間かかる
  • キーがメールで送付される。webサイトでも確認できる
import requests

# APIキーと都市名を設定
api_key = ""  # ここにあなたのAPIキーを入力してください
city_name = "Osaka"  # 取得したい都市名を入力してください

# リクエストURLを構成
url = f"http://api.openweathermap.org/data/2.5/weather?q={city_name}&mode=xml&appid={api_key}"

# APIにリクエストを送信
response = requests.get(url)

# レスポンスのステータスコードをチェック
if response.status_code == 200:
    # XMLデータをファイルに保存
    with open("weather_data.xml", "wb") as file:
        file.write(response.content)
    print("XMLファイルにデータを保存しました。")
else:
    print(f"APIリクエストに失敗しました。ステータスコード: {response.status_code}")
YuichiYuichi

xmlファイルをBigqueryに格納する

サンプルXMLファイル

https://hrbcapi.porters.jp/hc/ja/articles/115012160428-Recruiter-Read

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Recruiter Total="14" Count="10" Start="0">
  <Code>0</Code>
  <Item>
    <Recruiter.P_Id>1234</Recruiter.P_Id>
    <Recruiter.P_Name>テストリクルーター1</Recruiter.P_Name>
    <Recruiter.P_UpdatedBy>
      <User>
        <User.P_Id>1</User.P_Id>
        <User.P_Name>テストエージェント1</User.P_Name>
      </User>
    </Recruiter.P_UpdatedBy>
  </Item>
  <Item>
    <Recruiter.P_Id>1235
</Recruiter.P_Id>
    <Recruiter.P_Name>テストリクルーター2</Recruiter.P_Name>
    <Recruiter.P_UpdatedBy>
      <User>
        <User.P_Id>2</User.P_Id>
        <User.P_Name>テストエージェント2</User.P_Name>
      </User>
    </Recruiter.P_UpdatedBy>
  </Item>
</Recruiter>

XMLをパースするライブラリ

https://nikkie-ftnext.hatenablog.com/entry/xmltodict-is-awesome-first-step

from pprint import pprint
import xmltodict
import json

# XMLファイルを読み込む
with open("Recruiter.xml", 'r', encoding='utf-8') as file:
    xml_content = file.read()

# XMLを辞書にパースする
parsed = xmltodict.parse(xml_content)

# 辞書の内容をきれいに表示する
pprint(parsed)

with open(r"Recruiter.json", "w", encoding='utf-8') as of:
    json.dump(parsed, of, ensure_ascii=False, indent=2)

{
  "Recruiter": {
    "@Total": "14",
    "@Count": "10",
    "@Start": "0",
    "Code": "0",
    "Item": [
      {
        "Recruiter.P_Id": "1234",
        "Recruiter.P_Name": "テストリクルーター1",
        "Recruiter.P_UpdatedBy": {
          "User": {
            "User.P_Id": "1",
            "User.P_Name": "テストエージェント1"
          }
        }
      },
      {
        "Recruiter.P_Id": "1235",
        "Recruiter.P_Name": "テストリクルーター2",
        "Recruiter.P_UpdatedBy": {
          "User": {
            "User.P_Id": "2",
            "User.P_Name": "テストエージェント2"
          }
        }
      }
    ]
  }
}
from pprint import pprint
import xmltodict
import json

resource_name = '' # ファイル名

# XMLファイルを読み込む
with open(f"data/{resource_name}_data.xml", 'r', encoding='utf-8') as file:
    xml_content = file.read()

# XMLを辞書にパースする
parsed = xmltodict.parse(xml_content)

# 辞書の内容をきれいに表示する
pprint(parsed)

with open(f"data/{resource_name}_data.json", "w", encoding='utf-8') as of:
    json.dump(parsed, of, ensure_ascii=False, indent=2) # BQに入れない閲覧用

with open(f"data/{resource_name}.json", "w", encoding='utf-8') as of:
    json.dump(parsed, of, ) # BQに入れる用
YuichiYuichi

JSONL( LDJSON )じゃないとだめなんですね

https://dev.classmethod.jp/articles/bigquery-load-json-data/

JSON ファイルデータ全体を 1 列の CSV ファイル(例: sourceFormat=CSV)として読み込んで、BigQuery のネイティブの JSON 関数を使用して、データを適切な JSON データ型に解析します。
注: BigQuery の行サイズは 100 MB までに制限されています。この読み込み手法では JSON ファイルのすべてのデータを 1 列の行に読み込むため、JSON ファイルのサイズが 100 MB を超えないようにしてください。

https://cloud.google.com/blog/ja/products/data-analytics/load-your-json-data-into-bigquery-despite-wacky-formatting

YuichiYuichi

bigquery json

まず、サンプルJSONデータを以下のように設定します:

WITH input_data AS (
  SELECT SAFE.PARSE_JSON('{
    "AAA": {
      "@BBB": "3",
      "@CCC": "3",
      "@DDD": "0",
      "EEE": "0",
      "FFF": [
        {
          "GGG": "10001",
          "HHH": "山田太郎",
          "III": "東京",
          "JJJ": "xxxx"
        },
        {
          "GGG": "10002",
          "HHH": "佐藤花子",
          "III": "大阪",
          "JJJ": "yyyy"
        },
        {
          "GGG": "10003",
          "HHH": "鈴木一郎",
          "III": "名古屋",
          "JJJ": "zzzz"
        }
      ]
    }
  }') AS json_data
)

UNNEST(JSON_QUERY_ARRAY(json_data...を使用したクエリ:

SELECT
  JSON_VALUE(item, '$.GGG') as col1,
  JSON_VALUE(item, '$.HHH') as col2,
  JSON_VALUE(item, '$.III') as col3,
  JSON_VALUE(item, '$.JJJ') as col4
FROM input_data,
UNNEST(JSON_QUERY_ARRAY(json_data, '$.AAA.FFF')) as item;

このクエリの結果:

col1    col2        col3    col4
10001   山田太郎    東京    xxxx
10002   佐藤花子    大阪    yyyy
10003   鈴木一郎    名古屋  zzzz

FFFを使用したクエリ:

SELECT
  JSON_VALUE(json_data, '$.AAA.FFF[0].GGG') as col1,
  JSON_VALUE(json_data, '$.AAA.FFF[0].HHH') as col2,
  JSON_VALUE(json_data, '$.AAA.FFF[0].III') as col3,
  JSON_VALUE(json_data, '$.AAA.FFF[0].JJJ') as col4
FROM input_data;

このクエリの結果:

col1    col2        col3    col4
10001   山田太郎    東京    xxxx

説明:
UNNEST(JSON_QUERY_ARRAY(...は配列FFFのすべての要素を処理し、複数の行を生成します。
FFFは配列FFFの最初の要素(インデックス0)のみにアクセスし、1行のみを生成します。
これらの例から、配列全体を処理したい場合はUNNESTを使用し、特定の要素にアクセスしたい場合は配列インデックス(例:``)を使用することがわかります。この方法により、JSONデータ内のネストされた配列を効果的に処理することができます。

YuichiYuichi

JSONをDBに格納するアイデア

BQクエリ重くてやめました。没案
BQ想定

APIからの取得データ

{
  "Customer": {
    "Total": "2",
    "Count": "2",
    "Start": "0",
    "Code": "0",
    "Item": [
      {
        "Id": "10003",
        "Name": "山田太郎",
        "Prefecture": "大阪府",
        "City": "大阪市",
        "Deleted": "0"
      },
      {
        "Id": "10004",
        "Name": "鈴木花子",
        "Prefecture": "東京都",
        "City": "新宿区",
        "Deleted": "0"
      }
    ]
  }
}

CSVに変換しデータレイクに流す。

  • JSONLにしてもいいと思う
  • json_dataはJSONでもSTRING型でも入れれるが、データ入れる時にエラーならないように、STRINGで入れる想定
import json
import csv
from datetime import datetime

resource_type = 'customer'  

# JSONファイルを読み込む
with open(f"data/{resource_type}.json", 'r', encoding='utf-8') as file:
    json_data = json.load(file)

# 現在の日時を取得
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# CSVファイルに書き込む
with open(f'data/{resource_type}.csv', 'w', newline='', encoding='utf-8') as csvfile:
    fieldnames = ['transferred_at', 'json_data']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()

    # リソースオブジェクト全体を保持しつつ、各Itemを個別のレコードとして扱う
    resource_data = json_data[resource_type.title()]
    items = resource_data.pop('Item')  # Itemを取り出し、残りのリソースデータを保持

    for item in items:
        # 各Itemに対して新しいリソースオブジェクトを作成
        new_resource = resource_data.copy()
        new_resource['Item'] = item

        row = {
            'transferred_at': current_time,
            'json_data': json.dumps({resource_type.title(): new_resource}, ensure_ascii=False)
        }
        writer.writerow(row)

print(f"CSVファイル 'data/{resource_type}.csv' が正常に作成されました。")

変換後

transferred_at json_data
2024-08-02 15:49:10 {"Customer": {"Total": "2", "Count": "2", "Start": "0", "Code": "0", "Item": {"Id": "10003", "Name": "山田太郎", "Prefecture": "大阪府", "City": "大阪市", "Deleted": "0"}}}
2024-08-02 15:49:10 {"Customer": {"Total": "2", "Count": "2", "Start": "0", "Code": "0", "Item": {"Id": "10004", "Name": "鈴木花子", "Prefecture": "東京都", "City": "新宿区", "Deleted": "0"}}}

データレイクからデータウェアハウスへ

WITH parsed_data AS (
  SELECT
    transferred_at,
    JSON_EXTRACT(json_data, '$.Customer.Item') AS item
  FROM
    {{ source('saas', 'Customer') }}
)

SELECT
  transferred_at,
  JSON_EXTRACT_SCALAR(item, '$.Id') AS client_id,
  JSON_EXTRACT_SCALAR(item, '$.Name') AS client_name,
  JSON_EXTRACT_SCALAR(item, '$.Prefecture') AS prefecture,
  JSON_EXTRACT_SCALAR(item, '$.City') AS city,
  JSON_EXTRACT_SCALAR(item, '$.Deleted') AS deleted
FROM
  parsed_data
transferred_at client_id client_name prefecture city deleted
2024-08-02 15:49:10 10003 山田太郎 大阪府 大阪市 0
2024-08-02 15:49:10 10004 鈴木花子 東京都 新宿区 0

Pros

  • APIから流れる項目が増えてもデータパイプラインがダウンしない
  • 項目減る分にはクエリエラーになる

Cons

  • APIから流れる項目が増えても検知出来ない
  • BQでやったがパースのクエリが重い印象

JSONPath
python版JSONPath検討
やはりある程度分解してデータ基盤に入れた方がいい気がする