🍝

BigQueryを利用したアプリケーションのローカルテスト

2024/06/25に公開

BigQueryを利用するアプリケーションの開発時に、データベースの操作をどのようにテストするかが課題となります。その際の主な選択肢は下記となります。

  1. BigQueryのモックを作成してローカルでテストする
  2. テスト用のBigQuery環境を用意してGCP上でテストする
  3. bigquery-emulatorを利用してローカルでテストする

1. BigQueryのモックを作成してローカルでテストする

概要

BigQueryの操作をモック化し、テスト時に実際のBigQueryに接続せずにテストを行う方法です。

メリット

  • テストの実行が高速
  • 外部サービスに依存しないため、安定したテスト環境を構築可能
  • ネットワーク接続不要

デメリット

  • 実際のBigQueryの動作と異なる場合があるため、信頼性が低い
  • 複雑なクエリや大規模データセットのテストが難しい

2. テスト用のBigQuery環境を用意してGCP上でテストする

概要

GCP上にテスト用のBigQueryプロジェクトを用意し、そこでテストを行う方法です。

メリット

  • 実際のBigQueryと同じ環境でテストを行えるため、信頼性が高い

デメリット

  • テスト実行ごとにコストが発生する
  • ネットワーク接続が必要で、テスト実行速度が遅くなる可能性がある
  • テストデータの管理や環境のクリーンアップが必要

3. bigquery-emulatorを利用してローカルでテストする

概要

ローカル環境でBigQueryの挙動をエミュレートするツールを使用してテストを行う方法です。

メリット

  • ローカル環境で実際のBigQuery環境に近い挙動をシミュレートできる
  • ネットワーク環境に依存せず、コストもかからない

デメリット

  • エミュレータのセットアップと管理が必要
  • エミュレータの機能がBigQueryの全てをカバーしていない

エミュレータで動作しなかったBigQueryの機能

エミュレータで動作しなかったBigQueryの機能を以下に記載します(2024/6/12時点)

  • 「TRUNCATE TABLE」が動作しなかった
  • TIMESTAMPの列を使用してquery()を実行するとエラーとなる(query_and_wait)は動作する
  • COPY TABLEが動作しない
  • 9050以外のポートで動いている時にload_table_from_fileが動作しない場合がある

本記事では以降bigquery-emulatorを利用してローカルでテストする方法について記載します。


bigquery-emulatorを利用したテスト方法

bigquery-emulatorの起動

環境を汚さずにテストをするならDocker-composeを使用した方法が良いでしょう。

version: '3'
services:
  bigquery-emulator:
    image: "ghcr.io/goccy/bigquery-emulator:latest"
    platform: "linux/x86_64"
    ports:
      - "9050:9050"
    volumes:
      - ./bigquery:/bigquery
    command: "bigquery-emulator --project=your-project-id --data=../bigquery/data.yaml"
  • platform: "linux/x86_64"を指定することで、Appleシリコン搭載のMacでも動作します。
  • ../bigquery/data.yamlが初期データ(プロジェクト、データセット、テーブルのスキーマとデータ)として読み込まれます。
  • 9050番ポートがREST API用、9060番ポートがgRPC用として使用されます。

その他、設定の詳細については下記を参照してください
https://github.com/goccy/bigquery-emulator

初期データ(data.yaml)の自動生成

bigquery-emulator用のdata.yamlを手動で作成するのはテーブル数やテーブルのフィールド数が多い場合手間がかかります。そこで、実際のBigQueryに接続してdata.yamlを自動生成するコード例を紹介します。

import yaml
import inflect
from google.cloud import bigquery
from google.cloud.bigquery.schema import SchemaField
from bigquery_model import Project, Dataset, Table, Column


def to_pascal_case(snake_str):
    return "".join(x.title() for x in snake_str.split("_"))


class NoNoneDumper(yaml.SafeDumper):
    def ignore_aliases(self, data):
        return True


def remove_none_values(data):
    if isinstance(data, dict):
        return {k: remove_none_values(v) for k, v in data.items() if v is not None}
    elif isinstance(data, list):
        return [remove_none_values(i) for i in data if i is not None]
    else:
        return data


# Register the custom representer
NoNoneDumper.add_representer(type(None), NoNoneDumper.represent_none)


class BigqueryYamlMaker:
    def __init__(self, target_project_id, test_project_id):
        self.client = bigquery.Client(target_project_id)
        self.project = Project(id=test_project_id)
        self.inflect = inflect.engine()
        self.dict_model_names = {}

    def make_children(self, root_field: SchemaField):
        result = []
        for field in root_field.fields:
            column = Column(name=field.name, type=field.field_type, mode=field.mode)
            if field.field_type == 'RECORD':
                column.fields = self.make_children(field)
            result.append(column)
        return result

    def add_table(self, dataset_id, table_id, model_name=""):
        dataset_ref = self.client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_id)
        table = self.client.get_table(table_ref)
        columns = []
        for field in table.schema:
            column = Column(name=field.name, type=field.field_type, mode=field.mode)
            if field.field_type == 'RECORD':
                column.fields = self.make_children(field)
            columns.append(column)
        if model_name:
            self.dict_model_names[table_id] = model_name
        dataset = next((dataset for dataset in self.project.datasets if dataset.id == dataset_id), None)
        if not dataset:
            dataset = Dataset(id=dataset_id)
            self.project.datasets.append(dataset)

        table_ids = [table.id for table in dataset.tables]
        if table_id in table_ids:
            print(f"Table {table_id} already exists in dataset {dataset_id}.")
            return

        dataset.tables.append(Table(id=table_id, columns=columns))

    def export_yaml(self, output_path):
        with open(output_path, "w") as f:
            projects = {"projects": [self.project.model_dump()]}
            projects = remove_none_values(projects)
            yaml.dump(projects, f, Dumper=NoNoneDumper, default_flow_style=False)

    def export_model(self):
        models = self.project.model_dump()
        for dataset in models["datasets"]:
            for table in dataset["tables"]:
                table_name = table["id"]
                if table_name in self.dict_model_names:
                    table_name = self.dict_model_names[table_name]
                else:
                    noun_name = self.inflect.singular_noun(table_name)
                    if noun_name:
                        table_name = noun_name
                    table_name = to_pascal_case(table_name)
                print(f"class {table_name}(BaseModel):")
                for column in table["columns"]:
                    print(f"    {column['name']}: Optional[{self.convert_type(column)}] = None")
                print("\n")

    def convert_type(self, column: dict) -> str:
        bq_type = column['type']
        prefix = ''
        if bq_type == "STRING":
            prefix = "str"
        elif bq_type == "DATETIME":
            prefix = "datetime"
        elif bq_type == "DATE":
            prefix = "date"
        elif bq_type == "TIMESTAMP":
            prefix = "datetime"
        elif bq_type == "BOOL":
            prefix = "bool"
        elif bq_type == "BOOLEAN":
            prefix = "bool"
        elif bq_type == "INTEGER":
            prefix = "int"
        elif bq_type == "FLOAT":
            prefix = "float"
        elif bq_type == "RECORD":
            prefix = "dict"
        else:
            print(bq_type)
            raise ValueError(f"{bq_type}は未サポートの型です")
        if 'REPEATED' in column['mode']:
            return f"list[{prefix}]"
        else:
            return prefix

テストの実行

bigquery-emulatorによるテストを実施しやすくするためのユーティリティクラスと、それを利用したテストコードのサンプルを以下に記載します。

ユーティリティクラス

import yaml
from pydantic import BaseModel
import json
import subprocess
from google.cloud import bigquery
from bigquery_model import bq, api_end_point, Dataset

class BigqueryRepository:
    SCHEMA_FILE = "tests/docker/bigquery/data.yaml"

    def __init__(self, datasets: list[Dataset]):
        self.__init_db(datasets)

    def __init_db(self, datasets: list[Dataset]):
        # bigquery-emulatorはメモリーリークの問題が報告されているためコンストラクタで再起動する
        subprocess.run(["docker-compose", "restart", "test_project"], check=True, cwd="tests/docker")

        if "127.0.0.1" not in api_end_point:
            raise Exception(
                "接続先がテスト用のBigQueryではありません。データの全削除等を行うためテスト用のBigQueryに接続してください。"
            )
        self._db = bq
        self.delete_all()
        self.__load_schema()
        self.create_datasets_and_tables()
        self.insert_all(datasets)

    def __load_schema(self):
        with open(self.SCHEMA_FILE, "r") as file:
            self.schema_data = yaml.safe_load(file)

    def create_datasets_and_tables(self):
        for dataset_info in self.schema_data["projects"][0]["datasets"]:
            dataset_id = dataset_info["id"]
            dataset_ref = bigquery.DatasetReference(self._db.project, dataset_id)
            dataset = bigquery.Dataset(dataset_ref)
            self._db.create_dataset(dataset, exists_ok=True)

            for table_info in dataset_info["tables"]:
                table_id = table_info["id"]
                schema = [
                    bigquery.SchemaField(field["name"], field["type"], mode=field["mode"]) for field in table_info["columns"]
                ]
                table_ref = bigquery.TableReference(dataset_ref, table_id)
                table = bigquery.Table(table_ref, schema=schema)
                self._db.create_table(table, exists_ok=True)

    def insert_dict_list(self, dataset_id: str, table_id: str, rows_to_insert: list[dict]):
        table_ref = f"{dataset_id}.{table_id}"
        errors = self._db.insert_rows_json(table_ref, rows_to_insert)
        if errors:
            print(f"Errors occurred: {errors}")
        else:
            print(f"Inserted data into {dataset_id}.{table_id}.")

    def insert_model_list(self, dataset_id: str, table_id: str, rows_to_insert: list[BaseModel]):
        table_ref = f"{dataset_id}.{table_id}"
        item_list = [json.loads(item.model_dump_json()) for item in rows_to_insert]

        errors = self._db.insert_rows_json(table_ref, item_list)
        if errors:
            print(f"Errors occurred: {errors}")
        else:
            print(f"Inserted data into {dataset_id}.{table_id}.")

    def insert_all(self, datasets: list[Dataset]):
        for dataset in datasets:
            for table in dataset.tables:
                self.insert_model_list(dataset.id, table.id, table.data)

    def delete_dataset(self, dataset_id):
        self._db.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)

    def delete_all(self):
        for dataset in self._db.list_datasets():
            self.delete_dataset(dataset.dataset_id)

    def query(self, query):
        query_job = self._db.query(query)
        return query_job.result()

テストごとにデータをリセットする必要がありますが、bigquery-emulatorではTRUNCATE文が動作しなかったため、データセットを削除して再作成しています。
また、bigquery-emulatorにはデータセットやテーブルを削除してもメモリが解放されないメモリリークの問題が報告されており、コンストラクタで再起動しています。
https://github.com/goccy/bigquery-emulator/issues/313

テストコードサンプル

import pytest
from bigquery_model import Dataset, Table, User
from bigquery_repository import BigqueryRepository

@pytest.fixture(scope="class")
def init_db():
    users = Table(
        id="users",
        data=[
            User(user_id=1, user_name="Test Name1"),
            User(user_id=2, user_name="Test Name2"),
        ]
    )
    datasets = [
        Dataset(
            id="dataset1",
            tables=[users]
        )
    ]
    bigquery_repository = BigqueryRepository(datasets)
    yield bigquery_repository
    bigquery_repository.delete_all()

class TestExecute:
    def test_1(self, init_db):
        # ここでテスト対象コードを実行
        # 省略

        # 実行結果の検証
        bigquery_repository = init_db
        rows = bigquery_repository.query("SELECT user_id,user_name from dataset1.users")
        results = [User(**dict(row.items())) for row in rows]
        assert results == [
            User(user_id=1, user_name="Test Name1"),
            User(user_id=2, user_name="Test Name2"),
            User(user_id=3, user_name="Test Name3")
        ]

まとめ

BigQueryを利用したアプリケーションのテストには、モックを利用したテスト、GCP上のテスト環境の利用、bigquery-emulatorの利用の方法があります。本記事では特にbigquery-emulatorを使用したテスト方法に焦点を当て、その設定方法やテストコードの実装例を紹介しました。
bigquery-emulatorを使用することで、ローカル環境で高速かつ低コストでBigQueryのテストを行うことができます。ただし、全ての機能がサポートされているわけではないため、必要に応じて他のテスト方法と組み合わせることをおすすめします。
効果的なテスト戦略を立てることで、BigQueryを利用したアプリケーションの品質と信頼性を向上させることができます。

CareNet Engineers

Discussion