Closed4
PanderaのDataFrame ModelsからBigQueryのSchemaFieldを自動生成したい

前提
GCSにアップロードされたcsvやexcelファイルをcloud function上でPanderaを使ってバリデートしつつ※、BigQueryにロードしている。
※ 人間が管理するexcelなどが多いので、出来なりの状態をノーガードでBigQueryにロードしたくない。
やりたいこと
bigquery.LoadJobConfig()
で指定するschema(bigquery.SchemaField
のlist)を定義するのが面倒なのでPanderaのDataFrame Modelから自動で生成できると管理が楽。
あと、descriptionの記入を重要視しており、メタデータの管理も一箇所に集約していきたい。
方針
-
pa.DataFrameModel
から各fieldの情報を取得 -
bigquery.SchemaField
に必要な辞書を定義 -
bigquery.SchemaField.from_api_repr()
を使用して辞書をbigquery.SchemaField
に変換し、listに格納

実装
from typing import Dict, Generic, List, Type, TypeVar
import pandera as pa
from pandera import dtypes
from google.cloud import bigquery
T = TypeVar("T", bound=pa.DataFrameModel)
class SchemaConverter(Generic[T]):
"""
PanderaのDataFrameModelをBigQueryスキーマに変換を行うクラス
Attributes:
model: 変換元となるPanderaのDataFrameModel
example:
```
converter = SchemaConverter(model=TestModel)
# BigQueryのスキーマを生成
schema = converter.create_bigquery_schema()
```
"""
def __init__(self, model: Type[T]):
self.model = model
def map_dtype_to_bq_type(self, dtype: dtypes.DataType) -> str:
"""
PanderaのフィールドタイプをBigQueryのデータ型に変換
TODO: 幅広く対応する
"""
if dtypes.is_int(dtype):
return "INTEGER"
elif dtypes.is_float(dtype):
return "FLOAT"
elif dtypes.is_bool(dtype):
return "BOOLEAN"
elif dtypes.is_string(dtype):
return "STRING"
elif dtypes.is_datetime(dtype):
return "TIMESTAMP"
else:
return "STRING" # デフォルトの型
def create_bigquery_schema(self) -> List[bigquery.SchemaField]:
"""
BigQueryのスキーマフィールドリストを生成
Returns:
List[bigquery.SchemaField]: BigQueryのスキーマフィールドのリスト
"""
schema_fields = []
for field_name, field_info in self.model.to_schema().columns.items():
# SchemaFieldのAPI表現を作成
field_repr = {
"name": field_name,
"type": self.map_dtype_to_bq_type(field_info.dtype),
"mode": "NULLABLE" if field_info.nullable else "REQUIRED",
"description": field_info.description if field_info.description else None,
}
schema_field = bigquery.SchemaField.from_api_repr(field_repr)
schema_fields.append(schema_field)
return schema_fields
pa.DataFrameModel.to_schema().columns
で各fieldにアクセスし、必要な情報を取得している。
※PanderaのフィールドタイプをBigQueryのデータ型に変換しているところは最低限なので必要に応じて要追加

Usage
class TestModel(pa.DataFrameModel):
id: int = pa.Field(nullable=False)
name: str = pa.Field(nullable=True, description="名前フィールド")
value: float = pa.Field(nullable=True)
created_at: pa.dtypes.Timestamp = pa.Field(nullable=False)
class Config:
description = "テストモデル"
converter = SchemaConverter(TestModel)
schema = converter.create_bigquery_schema()
print(schema)
出力
[
SchemaField('id', 'INTEGER', 'REQUIRED', None, None, (), None),
SchemaField('name', 'STRING', 'NULLABLE', None, '名前フィールド', (), None),
SchemaField('value', 'FLOAT', 'NULLABLE', None, None, (), None),
SchemaField('created_at', 'TIMESTAMP', 'REQUIRED', None, None, (), None)
]
SchemaFieldを生成できている。
これでスキーマ管理をPanderaのDataFrameModelに集約することができる。

追加の構想
データ型のマッピング拡充
pandas-gbq
の実装をまねるのが良さそう。
metadataの活用
DataFrameModelのfieldやmodel自体にmetadataを記入できるので、bigquery.SchemaField
さらに細かい制御(ポリシータグとか)をやるのも面白そう。
dlt(data load tool)との統合
dltを使うことでロード処理を共通化できるし、スキーマ指定も可能。
ただしカラム単位でのdescriptionは受け付けてくれない。
bigquery_adapterをオーバライドしたらどうにかできないか。
このスクラップは3ヶ月前にクローズされました
ログインするとコメントできます