🧹

GCPで始めるデータエンジニアリング入門②:ビジネスルールでクレンジングしてBigQueryにロードする

に公開

はじめに

前回の記事では、UCIのOnline RetailデータをGCSに格納し、プロファイリングで品質を把握しました。

プロファイリングの結果、こんな問題が見えていましたね。

問題 件数
キャンセル取引(InvoiceNo が C 始まり) 1,903行
Quantity が負値 2,114行
UnitPrice がゼロ 484行
CustomerID が欠損 約34%(ゲスト購入)

「問題がわかった、じゃあ次はどうするか」——ここからがデータエンジニアの本番です。

この記事では、プロファイリングで把握した問題をビジネスルールとして言語化し、Cloud Run FunctionsでPythonクレンジング処理を実装してBigQueryに格納するまでを体験します。

この記事で作るもの

想定読者

  • 前回の記事を完了している方
  • Cloud Run FunctionsやBigQueryを初めて触る方
  • 「データクレンジング」がなんとなくわかるけど実装イメージが湧かない方

前提条件

  • GCS上に raw/transactions/*.csv が存在する(前回完了済み)
  • gcloud CLIのセットアップ・認証済み

以下の API を有効化します。Cloud Run Functions(Gen2)のデプロイには cloudbuildrunartifactregistry も必要です。

gcloud services enable \
  cloudfunctions.googleapis.com \
  cloudbuild.googleapis.com \
  run.googleapis.com \
  artifactregistry.googleapis.com \
  bigquery.googleapis.com

次に、デプロイユーザーがデフォルトのサービスアカウントを使用できるよう IAM 権限を付与します。

# プロジェクト番号を確認
PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format='value(projectNumber)')

# デプロイユーザーに serviceAccountUser ロールを付与
gcloud iam service-accounts add-iam-policy-binding \
  ${PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --member="user:$(gcloud config get-value account)" \
  --role="roles/iam.serviceAccountUser"

Step 1: クレンジング設計——判断ロジックの言語化

コードを書く前に、これをやります。

「どのデータを除外して、どのデータを残すか」はコードの問題ではなくビジネス判断の問題です。同じデータでも目的によってクレンジングルールは変わります。ここを曖昧にすると、後から「なんでこの行を除外したの?」と詰められたとき答えられなくなります。

今回は次の判断をしました。

問題 データ上の事実 判断 理由
InvoiceNo が C 始まり 1,903行 除外 キャンセル取引は売上集計に含めない
Quantity が負値 2,114行 キャンセル除外で基本的に解消 C始まりと連動しているため
UnitPrice がゼロ 484行 保持 無償提供など業務上の意図がある可能性
CustomerID 欠損 約34% NULL のまま保持 ゲスト購入として扱う
sales_amount Quantity × UnitPrice でカラム追加 後続の集計クエリを簡素化する
数値・日時の変換不能行 少数 除外(dropna) errors="coerce" で NaN になった行を最後に除去

Step 2: BigQueryのテーブル設計

スキーマ定義

02_cleaning/sql/schema.sql を作成します。

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.de_handson.transactions` (
  InvoiceNo     STRING    NOT NULL,
  StockCode     STRING    NOT NULL,
  Description   STRING,
  Quantity      INT64     NOT NULL,
  InvoiceDate   TIMESTAMP NOT NULL,
  UnitPrice     FLOAT64   NOT NULL,
  CustomerID    STRING,
  Country       STRING    NOT NULL,
  sales_amount  FLOAT64   NOT NULL
)
PARTITION BY DATE(InvoiceDate)
OPTIONS (
  require_partition_filter = FALSE
);

3点だけ説明します。

PARTITION BY DATE(InvoiceDate) について

テーブルを日付ごとに物理分割する設定です。「2010-12-01のデータだけ取得して」というクエリを書くと、その日のパーティションだけスキャンするため、コストと速度が大幅に改善します。日次で増えていくトランザクションデータにはほぼ必須の設計です。

sales_amount カラムを追加している理由

Quantity × UnitPrice を毎回クエリで計算してもいいのですが、集計マートを作るときに何度も書くことになります。ロード時に一度計算してカラムとして持つことで、後続のSQLがシンプルになります。

CustomerID を NULL 許容にしている理由

前回のプロファイリングで約34%が欠損していることが判明しています。欠損=ゲスト購入として扱うので、NOT NULL にすると行が捨てられてしまいます。

テーブルとデータセットの作成

# PROJECT_ID を設定
export PROJECT_ID="your-project-id"

# データセット作成
bq mk --dataset \
  --location=us-central1 \
  ${PROJECT_ID}:de_handson

# テーブル作成(schema.sql 内の ${PROJECT_ID} を sed で置換してから実行)
bq query --use_legacy_sql=false \
  "$(sed "s/\${PROJECT_ID}/${PROJECT_ID}/g" 02_cleaning/sql/schema.sql)"

Step 3: Cloud Run Functionsの実装

なぜ Cloud Run Functions を使うのか

今回は1日あたり数MBの小規模データを、「読む → クレンジング → 書く」というシンプルな3ステップで処理します。Cloud Run Functions はコードをデプロイするだけでスケールや常時起動を気にせず動かせるため、このようなケースに適しています。入門編なのでシンプルに始められることを優先しています。

データ量や処理の複雑さが増した場合は、以下のサービスが選択肢になります。

サービス 向いているケース
Cloud Run Functions(今回) 小規模・シンプルなバッチ処理
Cloud Run 長時間実行・大メモリを要する処理
Cloud Dataflow GB〜TB 規模・大規模な並列処理
Cloud Composer 複数ジョブの依存関係管理・複雑なスケジューリング

実務ではシステム要件(データ量・処理の複雑さ・運用コスト)に合わせて選択してください。

ファイル構成

02_cleaning/
├── sql/
│   └── schema.sql
└── cloud_run_function/
    ├── main.py
    └── requirements.txt

main.py

import io
import os

import functions_framework
import pandas as pd
from google.cloud import bigquery, storage


def read_csv_from_gcs(bucket_name: str, date_str: str) -> pd.DataFrame:
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(f"raw/transactions/transactions_{date_str}.csv")
    content = blob.download_as_bytes()
    return pd.read_csv(io.BytesIO(content))


def clean_transactions(df: pd.DataFrame) -> pd.DataFrame:
    # C始まりInvoiceNoはキャンセル取引のため除外
    df = df[~df["InvoiceNo"].astype(str).str.startswith("C")].copy()
    # キャンセル行除外後に全行が数値のみになると pandas が int64 と推論する
    # BigQuery の STRING 列に書き込む際にエラーになるため明示的に変換
    df["InvoiceNo"] = df["InvoiceNo"].astype(str)
    df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce")
    df["Quantity"] = pd.to_numeric(df["Quantity"], errors="coerce")
    df["UnitPrice"] = pd.to_numeric(df["UnitPrice"], errors="coerce")
    df["sales_amount"] = df["Quantity"] * df["UnitPrice"]
    # CSV読み込み時にfloat64になるためstr変換(例: 12345.0 → "12345")
    df["CustomerID"] = df["CustomerID"].apply(
        lambda x: str(int(x)) if pd.notna(x) else None
    )
    # errors="coerce" で NaN になった行(変換不能な値)を除外
    return df.dropna(subset=["Quantity", "UnitPrice", "InvoiceDate"])


def load_to_bigquery(df: pd.DataFrame, project_id: str, dataset_id: str) -> int:
    client = bigquery.Client(project=project_id)
    table_ref = f"{project_id}.{dataset_id}.transactions"
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="InvoiceDate",
        ),
    )
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()
    return len(df)


@functions_framework.http
def main(request):
    data = request.get_json(silent=True) or {}
    date_str = data.get("date")
    if not date_str:
        return ("date parameter is required", 400)

    bucket_name = os.environ["BUCKET_NAME"]
    project_id = os.environ["PROJECT_ID"]
    dataset_id = os.environ["DATASET_ID"]

    df = read_csv_from_gcs(bucket_name, date_str)
    df = clean_transactions(df)
    count = load_to_bigquery(df, project_id, dataset_id)

    return f"Loaded {count} rows for {date_str}"

関数を3つに分けている理由は、処理の責務を分離するためです。「GCSからの読み込み」「クレンジング」「BigQueryへの書き込み」を独立させることで、どこで問題が起きたか特定しやすくなります。

requirements.txt の生成

Cloud Run Functions は pyproject.toml を直接読まないため、uv export で生成します。

uv export --no-hashes --no-dev -o 02_cleaning/cloud_run_function/requirements.txt

生成された requirements.txt に以下が含まれていることを確認してください。

functions-framework
google-cloud-bigquery
google-cloud-bigquery-storage
google-cloud-storage
pandas
pyarrow
pandasとメモリ効率について

今回はpandasを使っています。pandasはメモリ上にデータをすべて展開するため、ファイルサイズが大きくなるとCloud Run Functionsのメモリ上限(デフォルト256MB)を超えてクラッシュすることがあります。

今回の対象は1日あたり最大数MB程度なので問題ありませんが、データ量が増えた場合の対応策として3つの選択肢があります。

対応策 概要 向いているケース
チャンク読み込み pd.read_csv(chunksize=N) で分割処理 ファイルが大きいが処理ロジックは変えたくない
Polars に切り替える 省メモリ・高速な pandas 互換ライブラリ 同じコードパスで大幅な改善が必要な場合
Cloud Dataflow へ移行 Apache Beam ベースのマネージドデータ処理 GB〜TB 規模・本番パイプライン化する場合

「データが増えたらどうするか」を考えておくのもデータエンジニアの仕事です。このシリーズのGCP中級編でCloud Dataflowを扱う予定です。


Step 4: デプロイと動作確認

ローカルテスト(デプロイ前)

デプロイ前にローカルで動作確認します。functions-framework を使うとCloud Run Functionsをローカルで起動できます。

# 環境変数を設定してからローカル起動
export BUCKET_NAME="de-handson-gc-landing"
export PROJECT_ID="your-project-id"
export DATASET_ID="de_handson"

uv run functions-framework \
  --target=main \
  --source=02_cleaning/cloud_run_function/main.py \
  --debug

別ターミナルからリクエストを送ります。

curl -X POST http://localhost:8080 \
  -H "Content-Type: application/json" \
  -d '{"date": "2010-12-01"}'
2010-12-01: 3082 行をロードしました(クレンジング前: 3108 行)

GCPへのデプロイ

gcloud functions deploy clean-and-load \
  --gen2 \
  --runtime=python312 \
  --region=asia-northeast1 \
  --source=02_cleaning/cloud_run_function \
  --entry-point=main \
  --trigger-http \
  --allow-unauthenticated \
  --memory=512M \
  --set-env-vars PROJECT_ID=${PROJECT_ID},BUCKET_NAME=de-handson-gc-landing,DATASET_ID=de_handson

デプロイが完了すると、関数のURLが発行されます。

https://asia-northeast1-{PROJECT_ID}.cloudfunctions.net/clean-and-load

動作確認:1ファイル分を手動実行

FUNCTION_URL="https://asia-northeast1-${PROJECT_ID}.cloudfunctions.net/clean-and-load"

curl -X POST "${FUNCTION_URL}" \
  -H "Content-Type: application/json" \
  -d '{"date": "2010-12-01"}'
2010-12-01: 3082 行をロードしました(クレンジング前: 3108 行)

BigQueryコンソールで確認します。

SELECT
  DATE(InvoiceDate) AS date,
  COUNT(*) AS row_count
FROM `{PROJECT_ID}.de_handson.transactions`
GROUP BY date
ORDER BY date;

1日分の実行結果

1日分実行後のコンソール


Step 5: 全日付分のバッチ実行

動作確認ができたら、67日分すべてを流します。

# scripts/batch_clean_and_load.sh
FUNCTION_URL="https://asia-northeast1-${PROJECT_ID}.cloudfunctions.net/clean-and-load"

for csv in data/transactions/*.csv; do
  date=$(basename "${csv}" .csv | sed 's/transactions_//')
  echo "Processing ${date}..."
  curl -s -X POST "${FUNCTION_URL}" \
    -H "Content-Type: application/json" \
    -d "{\"date\": \"${date}\"}"
  echo ""
done
chmod +x scripts/batch_clean_and_load.sh
./scripts/batch_clean_and_load.sh

完了後、全件数を確認します。

SELECT
  COUNT(*)                         AS total_rows,
  MIN(DATE(InvoiceDate))           AS start_date,
  MAX(DATE(InvoiceDate))           AS end_date,
  COUNT(DISTINCT DATE(InvoiceDate)) AS date_count,
  ROUND(SUM(sales_amount), 2)      AS total_sales
FROM `{PROJECT_ID}.de_handson.transactions`;
total_rows : 101,695
start_date : 2010-12-01
end_date   : 2011-02-27
date_count : 67
total_sales: 2,016,986.96

元データ103,598行からキャンセル1,903行を除外した 101,695行 が格納されていれば成功です。

全件実行後のコンソール


まとめ

今回やったこと:

  1. プロファイリング結果をもとにクレンジングルールをビジネス判断として言語化
  2. Cloud Run Functions(Python 3.12 / Gen2)でクレンジング処理を実装・デプロイ
  3. BigQuery de_handson.transactions テーブルにパーティション付きで 101,695 行をロード

クレンジングはコードより判断が重要です。「なぜこのデータを除外するのか」を言語化できていないと、後続の分析で結果の信頼性を問われたときに答えられなくなります。実務では必ずビジネスルールとしてドキュメントに残しておきましょう。

次回はBigQueryのSQLで集計マートを作り、Looker Studioで可視化します。今回ロードした transactions テーブルがそのまま入力になります。


参考資料


本シリーズのコードはGitHubで公開しています: https://github.com/y-tsuritani/data-engineering-handson-gc

採用

「あらゆる判断を、Data-Informedに。」

株式会社ギックスでは、一緒に挑戦してくれる仲間を募集しています!

株式会社ギックス HP
株式会社ギックス 採用サイト
エンジニア求人情報

GitHubで編集を提案
株式会社ギックス

Discussion