Open7
APIからXMLファイルを取得してBigqueryに格納する
サンプルでXMLファイルを取得できるAPIを調べる
- 無料天気予報APIのOpenWeatherMapを使ってみる
- 無料のFreeプランだと5日後までの3時間毎の天気予報を取得できて、1分間に60回までAPIコールできます。月180ドルのDeveloperプランでは3000 calls/分までOKで、16日後までの天気予報を取得することができます。
- 登録したら、使えるまで数時間かかる
- キーがメールで送付される。webサイトでも確認できる
import requests
# APIキーと都市名を設定
api_key = "" # ここにあなたのAPIキーを入力してください
city_name = "Osaka" # 取得したい都市名を入力してください
# リクエストURLを構成
url = f"http://api.openweathermap.org/data/2.5/weather?q={city_name}&mode=xml&appid={api_key}"
# APIにリクエストを送信
response = requests.get(url)
# レスポンスのステータスコードをチェック
if response.status_code == 200:
# XMLデータをファイルに保存
with open("weather_data.xml", "wb") as file:
file.write(response.content)
print("XMLファイルにデータを保存しました。")
else:
print(f"APIリクエストに失敗しました。ステータスコード: {response.status_code}")
xmlファイルをBigqueryに格納する
サンプルXMLファイル
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Recruiter Total="14" Count="10" Start="0">
<Code>0</Code>
<Item>
<Recruiter.P_Id>1234</Recruiter.P_Id>
<Recruiter.P_Name>テストリクルーター1</Recruiter.P_Name>
<Recruiter.P_UpdatedBy>
<User>
<User.P_Id>1</User.P_Id>
<User.P_Name>テストエージェント1</User.P_Name>
</User>
</Recruiter.P_UpdatedBy>
</Item>
<Item>
<Recruiter.P_Id>1235
</Recruiter.P_Id>
<Recruiter.P_Name>テストリクルーター2</Recruiter.P_Name>
<Recruiter.P_UpdatedBy>
<User>
<User.P_Id>2</User.P_Id>
<User.P_Name>テストエージェント2</User.P_Name>
</User>
</Recruiter.P_UpdatedBy>
</Item>
</Recruiter>
XMLをパースするライブラリ
from pprint import pprint
import xmltodict
import json
# XMLファイルを読み込む
with open("Recruiter.xml", 'r', encoding='utf-8') as file:
xml_content = file.read()
# XMLを辞書にパースする
parsed = xmltodict.parse(xml_content)
# 辞書の内容をきれいに表示する
pprint(parsed)
with open(r"Recruiter.json", "w", encoding='utf-8') as of:
json.dump(parsed, of, ensure_ascii=False, indent=2)
{
"Recruiter": {
"@Total": "14",
"@Count": "10",
"@Start": "0",
"Code": "0",
"Item": [
{
"Recruiter.P_Id": "1234",
"Recruiter.P_Name": "テストリクルーター1",
"Recruiter.P_UpdatedBy": {
"User": {
"User.P_Id": "1",
"User.P_Name": "テストエージェント1"
}
}
},
{
"Recruiter.P_Id": "1235",
"Recruiter.P_Name": "テストリクルーター2",
"Recruiter.P_UpdatedBy": {
"User": {
"User.P_Id": "2",
"User.P_Name": "テストエージェント2"
}
}
}
]
}
}
from pprint import pprint
import xmltodict
import json
resource_name = '' # ファイル名
# XMLファイルを読み込む
with open(f"data/{resource_name}_data.xml", 'r', encoding='utf-8') as file:
xml_content = file.read()
# XMLを辞書にパースする
parsed = xmltodict.parse(xml_content)
# 辞書の内容をきれいに表示する
pprint(parsed)
with open(f"data/{resource_name}_data.json", "w", encoding='utf-8') as of:
json.dump(parsed, of, ensure_ascii=False, indent=2) # BQに入れない閲覧用
with open(f"data/{resource_name}.json", "w", encoding='utf-8') as of:
json.dump(parsed, of, ) # BQに入れる用
dltも気になる
既存のコネクタ使えなかったら、素朴にPython 実装した方が早い
Bigqueryのjson型について
JSONL( LDJSON )じゃないとだめなんですね
JSON ファイルデータ全体を 1 列の CSV ファイル(例: sourceFormat=CSV)として読み込んで、BigQuery のネイティブの JSON 関数を使用して、データを適切な JSON データ型に解析します。
注: BigQuery の行サイズは 100 MB までに制限されています。この読み込み手法では JSON ファイルのすべてのデータを 1 列の行に読み込むため、JSON ファイルのサイズが 100 MB を超えないようにしてください。
bigquery json
まず、サンプルJSONデータを以下のように設定します:
WITH input_data AS (
SELECT SAFE.PARSE_JSON('{
"AAA": {
"@BBB": "3",
"@CCC": "3",
"@DDD": "0",
"EEE": "0",
"FFF": [
{
"GGG": "10001",
"HHH": "山田太郎",
"III": "東京",
"JJJ": "xxxx"
},
{
"GGG": "10002",
"HHH": "佐藤花子",
"III": "大阪",
"JJJ": "yyyy"
},
{
"GGG": "10003",
"HHH": "鈴木一郎",
"III": "名古屋",
"JJJ": "zzzz"
}
]
}
}') AS json_data
)
UNNEST(JSON_QUERY_ARRAY(json_data...を使用したクエリ:
SELECT
JSON_VALUE(item, '$.GGG') as col1,
JSON_VALUE(item, '$.HHH') as col2,
JSON_VALUE(item, '$.III') as col3,
JSON_VALUE(item, '$.JJJ') as col4
FROM input_data,
UNNEST(JSON_QUERY_ARRAY(json_data, '$.AAA.FFF')) as item;
このクエリの結果:
col1 col2 col3 col4
10001 山田太郎 東京 xxxx
10002 佐藤花子 大阪 yyyy
10003 鈴木一郎 名古屋 zzzz
FFFを使用したクエリ:
SELECT
JSON_VALUE(json_data, '$.AAA.FFF[0].GGG') as col1,
JSON_VALUE(json_data, '$.AAA.FFF[0].HHH') as col2,
JSON_VALUE(json_data, '$.AAA.FFF[0].III') as col3,
JSON_VALUE(json_data, '$.AAA.FFF[0].JJJ') as col4
FROM input_data;
このクエリの結果:
col1 col2 col3 col4
10001 山田太郎 東京 xxxx
説明:
UNNEST(JSON_QUERY_ARRAY(...は配列FFFのすべての要素を処理し、複数の行を生成します。
FFFは配列FFFの最初の要素(インデックス0)のみにアクセスし、1行のみを生成します。
これらの例から、配列全体を処理したい場合はUNNESTを使用し、特定の要素にアクセスしたい場合は配列インデックス(例:``)を使用することがわかります。この方法により、JSONデータ内のネストされた配列を効果的に処理することができます。
JSONをDBに格納するアイデア
BQクエリ重くてやめました。没案
BQ想定
APIからの取得データ
{
"Customer": {
"Total": "2",
"Count": "2",
"Start": "0",
"Code": "0",
"Item": [
{
"Id": "10003",
"Name": "山田太郎",
"Prefecture": "大阪府",
"City": "大阪市",
"Deleted": "0"
},
{
"Id": "10004",
"Name": "鈴木花子",
"Prefecture": "東京都",
"City": "新宿区",
"Deleted": "0"
}
]
}
}
CSVに変換しデータレイクに流す。
- JSONLにしてもいいと思う
-
json_data
はJSONでもSTRING型でも入れれるが、データ入れる時にエラーならないように、STRINGで入れる想定
import json
import csv
from datetime import datetime
resource_type = 'customer'
# JSONファイルを読み込む
with open(f"data/{resource_type}.json", 'r', encoding='utf-8') as file:
json_data = json.load(file)
# 現在の日時を取得
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# CSVファイルに書き込む
with open(f'data/{resource_type}.csv', 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['transferred_at', 'json_data']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# リソースオブジェクト全体を保持しつつ、各Itemを個別のレコードとして扱う
resource_data = json_data[resource_type.title()]
items = resource_data.pop('Item') # Itemを取り出し、残りのリソースデータを保持
for item in items:
# 各Itemに対して新しいリソースオブジェクトを作成
new_resource = resource_data.copy()
new_resource['Item'] = item
row = {
'transferred_at': current_time,
'json_data': json.dumps({resource_type.title(): new_resource}, ensure_ascii=False)
}
writer.writerow(row)
print(f"CSVファイル 'data/{resource_type}.csv' が正常に作成されました。")
変換後
transferred_at | json_data |
---|---|
2024-08-02 15:49:10 | {"Customer": {"Total": "2", "Count": "2", "Start": "0", "Code": "0", "Item": {"Id": "10003", "Name": "山田太郎", "Prefecture": "大阪府", "City": "大阪市", "Deleted": "0"}}} |
2024-08-02 15:49:10 | {"Customer": {"Total": "2", "Count": "2", "Start": "0", "Code": "0", "Item": {"Id": "10004", "Name": "鈴木花子", "Prefecture": "東京都", "City": "新宿区", "Deleted": "0"}}} |
データレイクからデータウェアハウスへ
WITH parsed_data AS (
SELECT
transferred_at,
JSON_EXTRACT(json_data, '$.Customer.Item') AS item
FROM
{{ source('saas', 'Customer') }}
)
SELECT
transferred_at,
JSON_EXTRACT_SCALAR(item, '$.Id') AS client_id,
JSON_EXTRACT_SCALAR(item, '$.Name') AS client_name,
JSON_EXTRACT_SCALAR(item, '$.Prefecture') AS prefecture,
JSON_EXTRACT_SCALAR(item, '$.City') AS city,
JSON_EXTRACT_SCALAR(item, '$.Deleted') AS deleted
FROM
parsed_data
transferred_at | client_id | client_name | prefecture | city | deleted |
---|---|---|---|---|---|
2024-08-02 15:49:10 | 10003 | 山田太郎 | 大阪府 | 大阪市 | 0 |
2024-08-02 15:49:10 | 10004 | 鈴木花子 | 東京都 | 新宿区 | 0 |
Pros
- APIから流れる項目が増えてもデータパイプラインがダウンしない
- 項目減る分にはクエリエラーになる
Cons
- APIから流れる項目が増えても検知出来ない
- BQでやったがパースのクエリが重い印象
JSONPath
python版JSONPath検討
やはりある程度分解してデータ基盤に入れた方がいい気がする