🛠️

Pydantic×Genericsで実現する堅牢なデータ処理スクリプト設計

に公開

ネルドリップのコーヒーが一段と美味しく感じられる涼しい季節が戻ってきましたね。
株式会社天地人で製品のバックエンド開発やデータエンジニアリングを担当している高瀬です。

Jupyter Notebookでササッと作ったデータ処理スクリプトが気づけば本番運用で動いている、なんてことはありませんか?パラメータを変えるたびにコードを直接修正したり、似たような処理なのに毎回コピペで新しいスクリプトを作ったり。最初は便利だったはずのスクリプトが、いつの間にか「誰も触りたくない魔窟」になってしまった経験をお持ちの方も多いのではないでしょうか。

今回は、そんな「その場しのぎのスクリプト」から、本番運用に耐えうる堅牢なデータ処理システムにリプレースする際に役立った3つの設計原則についてご紹介します。

本記事の3行サマリ:

  • ロジックと設定を分離し、保守性・再利用性を高める設計例がわかる
  • Pydanticによる型安全な設定管理の実装方法がわかる
  • Genericsを活用したロジックの共通インターフェース設計による拡張性の高いアーキテクチャがわかる

背景

データ分析やデータ処理を伴う業務では、初期段階でJupyter NotebookやPythonスクリプトを使った探索的なアプローチが取られることが多いです。これらの手法は素早くプロトタイプを作成し、データの特性を把握するには非常に有効な手段です。

しかし、プロトタイピングや検証段階で活躍する小回りの利いたワンショットスクリプトをそのまま本番運用に持ち込むと、以下のような課題に直面します。

  • 設定とロジックの混在: パラメータがコード内にハードコーディングされ、変更のたびにコード修正が必要
  • 再利用性の低さ: 似たような処理を行う別のタスクでもスクリプトごとコードを複製してしまう
  • 保守性の問題: エラーハンドリングやロギングが不統一で、運用時のトラブルシューティングが困難
  • テストの難しさ: 設定とロジックが分離されていないため、単体テストが書きにくい

天地人では、衛星データや地理空間データに対する加工処理を頻繁に行うため、複数の解析対象地域や異なるパラメータに対して同様のデータ処理を繰り返し実行する必要があります。こうした要件に対応するため、保守性・拡張性・再利用性を重視した設計パターンを採用しました。

尚、今回ご紹介する設計事例は、特定のドメインに関わらずデータ処理を伴う様々なプロジェクトに対して適用可能です。

設計の3つの柱

ある観測衛星のデータと地理情報データを共通のID(ここではtenchijin_idとしています)で結合する処理を例に、本アーキテクチャを支える3つの設計原則について、以下に紹介していきます。

1. ロジックと設定の分離

この処理におけるパラメータとなる要素を抽出し「設定ファイル」として切り出します。

# csv_join_config.yml
project_name: satellite_geo_analysis
base_dir: /data/satellite_geo_analysis
left_csv_path: input/satellite_data.csv
right_csv_path: input/geo_info.csv
join_columns:
  left_column: tenchijin_id
  right_column: tenchijin_id
join_type: inner
output_columns:
  - name: location_id
    source_table: left
    source_column: tenchijin_id
  - name: observation_value
    source_table: left
    source_column: ndvi_value
  - name: prefecture
    source_table: right
    source_column: prefecture_name
    dtype: str
output_path: output/satellite_geo_joined.csv

処理ロジックとパラメータを完全に分離することで、コードの変更なしに異なるお客様プロジェクトやデータセットに対応できるようにしました。

2. Pydanticによるパラメータモデル

1.で分離したパラメータたちは、以下のようにPydanticを利用しパラメータモデルとして定義しました。

from pydantic import BaseModel, model_validator
from pathlib import Path
from typing_extension import Literal

class JoinColumnConfig(BaseModel):
    """結合に使用するカラムの設定"""
    left_column: str
    right_column: str

class OutputColumnConfig(BaseModel):
    """出力カラムの設定"""
    name: str
    source_table: Literal["left", "right"]
    source_column: str
    dtype: str | None = None

class CsvJoinJobConfig(BaseModel):
    """CSV結合ジョブの設定"""
    project_name: str
    base_dir: Path
    left_csv_path: Path
    right_csv_path: Path
    join_columns: JoinColumnConfig
    output_columns: list[OutputColumnConfig]
    output_path: Path
    join_type: Literal["inner", "left", "right", "outer"] = "inner"

    @model_validator(mode="after")
    def validate_csv_files_exist(self):
        for path_field in ["left_csv_path", "right_csv_path"]:
            csv_path = getattr(self, path_field)
            full_path = self.base_dir / csv_path
            if not full_path.exists():
                raise ValueError(f"CSV file does not exist: {full_path}")
        return self

Pydanticにより、以下のメリットが得られます。

  • 型安全性による自動的な例外送出: 不正な型や値が設定された場合、実行時ではなく設定読み込み時点で即座に例外が発生
  • 複数属性を跨いだカスタムバリデーションロジックの実装: @model_validatorデコレータにより、全フィールドの値にアクセスして複雑な検証ルールを効率的に実装可能
  • 型ヒントによる開発時の補完とエラー検出: IDEやmypyなどの静的解析ツールによる開発効率の向上

3. ロジックの共通インターフェース設計

データ処理の各ステップを「Job」という単位で抽象化し、全てのJobが共通のJobBaseクラスを継承するように設計します。これにより、ジョブの追加や変更が容易になります。

Genericsとは?

ここで重要な役割を果たすのが、Pythonの ジェネリクス(Generics) です。ジェネリクスは、型を抽象化して扱えるクラスや関数を定義できる仕組みです。例えば、JobBase[CsvJoinJobConfig]JobBase[DataCleaningConfig]のように、ある型Cをパラメータとして受け取り、その型に応じた振る舞いを保持するクラスを一つ用意できるため、個別の型ごとに似たようなクラスを都度書く必要がなくなります。

具体的には、TypeVarで型変数を宣言し、Generic[C]を継承することで、__init__やメソッドの引数・戻り値にCを使って型安全な汎用性を持たせることができます。これにより、JobBase[CsvJoinJobConfig]に対してはconfigプロパティは必ずCsvJoinJobConfig型でなければならず、異なる型を間違って渡すことを防げます。

from abc import ABC, abstractmethod
from typing import Generic, TypeVar, get_args

C = TypeVar("C")  # Configクラスの型変数

class JobBase(ABC, Generic[C]):
    def __init__(self, config: C):
        # 実行時に正しいConfigクラスが渡されているかチェック
        self._validate_config_type(config)
        self.config = config

    def _validate_config_type(self, config: C):
        # __init_subclass__で保存された、期待するConfigクラスを取得
        expected_job_config_type = self.job_config_class

        if not isinstance(config, expected_job_config_type):
            raise TypeError(
                f"Config must be an instance of {expected_job_config_type.__name__}"
            )

    def __init_subclass__(cls, **kwargs):
        """
        重要: サブクラス定義時に自動実行される特殊メソッド
        例: class CsvJoinJob(JobBase[CsvJoinJobConfig]) が定義されると、
        CsvJoinJobConfigクラスをjob_config_classに自動保存
        """
        super().__init_subclass__(**kwargs)
        # Generic型引数からConfigクラスを抽出
        generic_args = get_args(cls.__orig_bases__[0])

        if generic_args:
            # クラス属性として保存(インスタンス化前でもアクセス可能)
            cls.job_config_class = generic_args[0]

    @abstractmethod
    def execute(self):
        """各Jobクラスで具体的な処理を実装"""
        raise NotImplementedError

実装のポイント解説:

この実装の特徴的な点は以下の通りです:

  1. ABC(抽象基底クラス)による強制: JobBaseABCを継承し、execute()メソッドを@abstractmethodで定義することで、すべての子クラスに具体的な実装を強制

  2. 自動的な型情報の保存: __init_subclass__メソッドにより、class CsvJoinJob(JobBase[CsvJoinJobConfig]) と定義するだけで、CsvJoinJobConfig クラスが自動的に job_config_class 属性に保存される

  3. 実行時の型安全性: インスタンス生成時に、期待するConfigクラスの型でない場合は即座にTypeErrorが発生

このGenericパターンの活用により、各Jobクラスと対応するConfigクラスの型安全性が確保されます。

実際の活用例

Jobの実装例

実際のジョブ実装は非常にシンプルです。JobBaseクラスが提供する共通インターフェースにより、以下の点でシンプルかつ自由度の高い実装が可能になります:

  • 最小限の実装: execute()メソッドの実装のみが必須
  • パラメータクラスの自動注入: self.configとして型安全にアクセス可能
  • 実装の自由度: 具体的な処理ロジックは完全に開発者の裁量
from compass_jobs.csv_join.modules.parameter_models import CsvJoinJobConfig
from job_executor.modules.job_base import JobBase

class CsvJoinJob(JobBase[CsvJoinJobConfig]):
    def __init__(self, config: CsvJoinJobConfig):
        super().__init__(config)
        self._setup_file_paths()

    def execute(self):
        """JobBaseの抽象メソッドを実装"""
        self._validate_csv_headers()
        self._process_csv_join()
        ...
        ...

    def _setup_file_paths(self):
        ...

    def _validate_csv_headers(self):
        ...

    def _process_csv_join(self):
        ...
    
    ...
    ...

このように、開発者はexecute()メソッド内で自由に処理フローを設計でき、必要に応じて任意の数のprivateメソッドに分割できます。この設計パターンでは、パラメータの型安全性とインターフェースの統一のみを保証し、具体的な処理の実装については一切制約を課しません。

実行方法

実際の使用は非常にシンプルです。

import yaml

# 設定ファイルから設定を読み込み
with open("csv_join_config.yml", "r") as f:
    config_dict = yaml.safe_load(f)

# Pydanticモデルでバリデーション
config = CsvJoinJobConfig(**config_dict)

# ジョブを実行
job = CsvJoinJob(config)
job.execute()

設定ファイルを切り替えるだけで、異なるデータセットや異なる結合条件での処理が可能になります。

なお、実運用では以下のようにコマンドライン引数で設定ファイルを指定できるようにしています。

python csv_join_job.py --config-filepath /path/to/config.yml

これにより、cron等の定期実行スケジューラからも柔軟に利用できます。

まとめ

ワンショットスクリプトから始まったデータ処理を本番運用可能なシステムに進化させるには、「ロジックと設定の分離」「型安全な設定管理」「ロジックの共通インターフェース設計」という3つの設計原則が有効です。

今回紹介したパターンは、以下のような特徴を持つプロジェクトに特に有効です。

  • 複数の類似したデータソースを処理する必要がある
  • パラメータの変更が頻繁に発生する
  • 新しい処理ステップの追加が継続的に必要になる
  • チームでの保守性が重要

PythonのGenericsや__init_subclass__といった高度な機能を活用することで、高い堅牢性と柔軟性を両立した設計を実現できます。

データエンジニアリングにおいて、初期のプロトタイピングから本番運用への移行は避けて通れない道です。今回ご紹介した設計原則が、同じような課題に取り組んでいる方々の参考になれば幸いです。

参考


株式会社天地人では、人工衛星などの宇宙ビッグデータを活用し、地球規模の課題に取り組むためのオンラインGISプラットフォーム天地人コンパス(Tenchijin COMPASS)を開発しています。

私たちと一緒に天地人コンパスを開発してくれる仲間を募集しております。ご興味のある方は以下のページよりエンジニアリングの募集の求人にてご確認下さい。

https://www.wantedly.com/companies/company_5025838/projects

Discussion