Zenn
Closed4

PanderaのDataFrame ModelsからBigQueryのSchemaFieldを自動生成したい

oxonoxon

前提

GCSにアップロードされたcsvやexcelファイルをcloud function上でPanderaを使ってバリデートしつつ※、BigQueryにロードしている。

※ 人間が管理するexcelなどが多いので、出来なりの状態をノーガードでBigQueryにロードしたくない。

やりたいこと

bigquery.LoadJobConfig()で指定するschema(bigquery.SchemaFieldのlist)を定義するのが面倒なのでPanderaのDataFrame Modelから自動で生成できると管理が楽。
あと、descriptionの記入を重要視しており、メタデータの管理も一箇所に集約していきたい。

方針

  1. pa.DataFrameModelから各fieldの情報を取得
  2. bigquery.SchemaFieldに必要な辞書を定義
  3. bigquery.SchemaField.from_api_repr()を使用して辞書をbigquery.SchemaFieldに変換し、listに格納
oxonoxon

実装

from typing import Dict, Generic, List, Type, TypeVar

import pandera as pa
from pandera import dtypes
from google.cloud import bigquery

T = TypeVar("T", bound=pa.DataFrameModel)


class SchemaConverter(Generic[T]):
    """
    PanderaのDataFrameModelをBigQueryスキーマに変換を行うクラス

    Attributes:
        model: 変換元となるPanderaのDataFrameModel

    example:
    ```
    converter = SchemaConverter(model=TestModel)
    # BigQueryのスキーマを生成
    schema = converter.create_bigquery_schema()
    ```
    """

    def __init__(self, model: Type[T]):
        self.model = model

    def map_dtype_to_bq_type(self, dtype: dtypes.DataType) -> str:
        """
        PanderaのフィールドタイプをBigQueryのデータ型に変換
        TODO: 幅広く対応する
        """
        if dtypes.is_int(dtype):
            return "INTEGER"
        elif dtypes.is_float(dtype):
            return "FLOAT"
        elif dtypes.is_bool(dtype):
            return "BOOLEAN"
        elif dtypes.is_string(dtype):
            return "STRING"
        elif dtypes.is_datetime(dtype):
            return "TIMESTAMP"
        else:
            return "STRING"  # デフォルトの型

    def create_bigquery_schema(self) -> List[bigquery.SchemaField]:
        """
        BigQueryのスキーマフィールドリストを生成

        Returns:
            List[bigquery.SchemaField]: BigQueryのスキーマフィールドのリスト
        """
        schema_fields = []

        for field_name, field_info in self.model.to_schema().columns.items():
            # SchemaFieldのAPI表現を作成
            field_repr = {
                "name": field_name,
                "type": self.map_dtype_to_bq_type(field_info.dtype),
                "mode": "NULLABLE" if field_info.nullable else "REQUIRED",
                "description": field_info.description if field_info.description else None,
            }

            schema_field = bigquery.SchemaField.from_api_repr(field_repr)
            schema_fields.append(schema_field)

        return schema_fields

pa.DataFrameModel.to_schema().columnsで各fieldにアクセスし、必要な情報を取得している。

※PanderaのフィールドタイプをBigQueryのデータ型に変換しているところは最低限なので必要に応じて要追加

oxonoxon

Usage

class TestModel(pa.DataFrameModel):
    id: int = pa.Field(nullable=False)
    name: str = pa.Field(nullable=True, description="名前フィールド")
    value: float = pa.Field(nullable=True)
    created_at: pa.dtypes.Timestamp = pa.Field(nullable=False)

    class Config:
        description = "テストモデル"


converter = SchemaConverter(TestModel)
schema = converter.create_bigquery_schema()
print(schema)

出力

[
    SchemaField('id', 'INTEGER', 'REQUIRED', None, None, (), None),
    SchemaField('name', 'STRING', 'NULLABLE', None, '名前フィールド', (), None),
    SchemaField('value', 'FLOAT', 'NULLABLE', None, None, (), None),
    SchemaField('created_at', 'TIMESTAMP', 'REQUIRED', None, None, (), None)
]

SchemaFieldを生成できている。

これでスキーマ管理をPanderaのDataFrameModelに集約することができる。

oxonoxon

追加の構想

データ型のマッピング拡充

pandas-gbqの実装をまねるのが良さそう。

https://github.com/googleapis/python-bigquery-pandas/blob/b32a9c98ec717573ffe45b51ce834a3903df8bc1/pandas_gbq/schema/pandas_to_bigquery.py#L29

metadataの活用

https://pandera.readthedocs.io/en/stable/pyspark_sql.html#adding-metadata-at-the-dataframe-and-field-level

DataFrameModelのfieldやmodel自体にmetadataを記入できるので、bigquery.SchemaFieldさらに細かい制御(ポリシータグとか)をやるのも面白そう。

dlt(data load tool)との統合

dltを使うことでロード処理を共通化できるし、スキーマ指定も可能。
ただしカラム単位でのdescriptionは受け付けてくれない。
bigquery_adapterをオーバライドしたらどうにかできないか。

https://dlthub.com/docs/dlt-ecosystem/destinations/bigquery#bigquery-adapter

このスクラップは3ヶ月前にクローズされました
ログインするとコメントできます