GCPで始めるデータエンジニアリング入門②:ビジネスルールでクレンジングしてBigQueryにロードする
はじめに
前回の記事では、UCIのOnline RetailデータをGCSに格納し、プロファイリングで品質を把握しました。
プロファイリングの結果、こんな問題が見えていましたね。
| 問題 | 件数 |
|---|---|
| キャンセル取引(InvoiceNo が C 始まり) | 1,903行 |
| Quantity が負値 | 2,114行 |
| UnitPrice がゼロ | 484行 |
| CustomerID が欠損 | 約34%(ゲスト購入) |
「問題がわかった、じゃあ次はどうするか」——ここからがデータエンジニアの本番です。
この記事では、プロファイリングで把握した問題をビジネスルールとして言語化し、Cloud Run FunctionsでPythonクレンジング処理を実装してBigQueryに格納するまでを体験します。
この記事で作るもの
想定読者
- 前回の記事を完了している方
- Cloud Run FunctionsやBigQueryを初めて触る方
- 「データクレンジング」がなんとなくわかるけど実装イメージが湧かない方
前提条件
- GCS上に
raw/transactions/*.csvが存在する(前回完了済み) - gcloud CLIのセットアップ・認証済み
以下の API を有効化します。Cloud Run Functions(Gen2)のデプロイには cloudbuild・run・artifactregistry も必要です。
gcloud services enable \
cloudfunctions.googleapis.com \
cloudbuild.googleapis.com \
run.googleapis.com \
artifactregistry.googleapis.com \
bigquery.googleapis.com
次に、デプロイユーザーがデフォルトのサービスアカウントを使用できるよう IAM 権限を付与します。
# プロジェクト番号を確認
PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format='value(projectNumber)')
# デプロイユーザーに serviceAccountUser ロールを付与
gcloud iam service-accounts add-iam-policy-binding \
${PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
--member="user:$(gcloud config get-value account)" \
--role="roles/iam.serviceAccountUser"
Step 1: クレンジング設計——判断ロジックの言語化
コードを書く前に、これをやります。
「どのデータを除外して、どのデータを残すか」はコードの問題ではなくビジネス判断の問題です。同じデータでも目的によってクレンジングルールは変わります。ここを曖昧にすると、後から「なんでこの行を除外したの?」と詰められたとき答えられなくなります。
今回は次の判断をしました。
| 問題 | データ上の事実 | 判断 | 理由 |
|---|---|---|---|
| InvoiceNo が C 始まり | 1,903行 | 除外 | キャンセル取引は売上集計に含めない |
| Quantity が負値 | 2,114行 | キャンセル除外で基本的に解消 | C始まりと連動しているため |
| UnitPrice がゼロ | 484行 | 保持 | 無償提供など業務上の意図がある可能性 |
| CustomerID 欠損 | 約34% | NULL のまま保持 | ゲスト購入として扱う |
| sales_amount | — | Quantity × UnitPrice でカラム追加 | 後続の集計クエリを簡素化する |
| 数値・日時の変換不能行 | 少数 | 除外(dropna) |
errors="coerce" で NaN になった行を最後に除去 |
Step 2: BigQueryのテーブル設計
スキーマ定義
02_cleaning/sql/schema.sql を作成します。
CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.de_handson.transactions` (
InvoiceNo STRING NOT NULL,
StockCode STRING NOT NULL,
Description STRING,
Quantity INT64 NOT NULL,
InvoiceDate TIMESTAMP NOT NULL,
UnitPrice FLOAT64 NOT NULL,
CustomerID STRING,
Country STRING NOT NULL,
sales_amount FLOAT64 NOT NULL
)
PARTITION BY DATE(InvoiceDate)
OPTIONS (
require_partition_filter = FALSE
);
3点だけ説明します。
PARTITION BY DATE(InvoiceDate) について
テーブルを日付ごとに物理分割する設定です。「2010-12-01のデータだけ取得して」というクエリを書くと、その日のパーティションだけスキャンするため、コストと速度が大幅に改善します。日次で増えていくトランザクションデータにはほぼ必須の設計です。
sales_amount カラムを追加している理由
Quantity × UnitPrice を毎回クエリで計算してもいいのですが、集計マートを作るときに何度も書くことになります。ロード時に一度計算してカラムとして持つことで、後続のSQLがシンプルになります。
CustomerID を NULL 許容にしている理由
前回のプロファイリングで約34%が欠損していることが判明しています。欠損=ゲスト購入として扱うので、NOT NULL にすると行が捨てられてしまいます。
テーブルとデータセットの作成
# PROJECT_ID を設定
export PROJECT_ID="your-project-id"
# データセット作成
bq mk --dataset \
--location=us-central1 \
${PROJECT_ID}:de_handson
# テーブル作成(schema.sql 内の ${PROJECT_ID} を sed で置換してから実行)
bq query --use_legacy_sql=false \
"$(sed "s/\${PROJECT_ID}/${PROJECT_ID}/g" 02_cleaning/sql/schema.sql)"
Step 3: Cloud Run Functionsの実装
なぜ Cloud Run Functions を使うのか
今回は1日あたり数MBの小規模データを、「読む → クレンジング → 書く」というシンプルな3ステップで処理します。Cloud Run Functions はコードをデプロイするだけでスケールや常時起動を気にせず動かせるため、このようなケースに適しています。入門編なのでシンプルに始められることを優先しています。
データ量や処理の複雑さが増した場合は、以下のサービスが選択肢になります。
| サービス | 向いているケース |
|---|---|
| Cloud Run Functions(今回) | 小規模・シンプルなバッチ処理 |
| Cloud Run | 長時間実行・大メモリを要する処理 |
| Cloud Dataflow | GB〜TB 規模・大規模な並列処理 |
| Cloud Composer | 複数ジョブの依存関係管理・複雑なスケジューリング |
実務ではシステム要件(データ量・処理の複雑さ・運用コスト)に合わせて選択してください。
ファイル構成
02_cleaning/
├── sql/
│ └── schema.sql
└── cloud_run_function/
├── main.py
└── requirements.txt
main.py
import io
import os
import functions_framework
import pandas as pd
from google.cloud import bigquery, storage
def read_csv_from_gcs(bucket_name: str, date_str: str) -> pd.DataFrame:
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(f"raw/transactions/transactions_{date_str}.csv")
content = blob.download_as_bytes()
return pd.read_csv(io.BytesIO(content))
def clean_transactions(df: pd.DataFrame) -> pd.DataFrame:
# C始まりInvoiceNoはキャンセル取引のため除外
df = df[~df["InvoiceNo"].astype(str).str.startswith("C")].copy()
# キャンセル行除外後に全行が数値のみになると pandas が int64 と推論する
# BigQuery の STRING 列に書き込む際にエラーになるため明示的に変換
df["InvoiceNo"] = df["InvoiceNo"].astype(str)
df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce")
df["Quantity"] = pd.to_numeric(df["Quantity"], errors="coerce")
df["UnitPrice"] = pd.to_numeric(df["UnitPrice"], errors="coerce")
df["sales_amount"] = df["Quantity"] * df["UnitPrice"]
# CSV読み込み時にfloat64になるためstr変換(例: 12345.0 → "12345")
df["CustomerID"] = df["CustomerID"].apply(
lambda x: str(int(x)) if pd.notna(x) else None
)
# errors="coerce" で NaN になった行(変換不能な値)を除外
return df.dropna(subset=["Quantity", "UnitPrice", "InvoiceDate"])
def load_to_bigquery(df: pd.DataFrame, project_id: str, dataset_id: str) -> int:
client = bigquery.Client(project=project_id)
table_ref = f"{project_id}.{dataset_id}.transactions"
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="InvoiceDate",
),
)
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()
return len(df)
@functions_framework.http
def main(request):
data = request.get_json(silent=True) or {}
date_str = data.get("date")
if not date_str:
return ("date parameter is required", 400)
bucket_name = os.environ["BUCKET_NAME"]
project_id = os.environ["PROJECT_ID"]
dataset_id = os.environ["DATASET_ID"]
df = read_csv_from_gcs(bucket_name, date_str)
df = clean_transactions(df)
count = load_to_bigquery(df, project_id, dataset_id)
return f"Loaded {count} rows for {date_str}"
関数を3つに分けている理由は、処理の責務を分離するためです。「GCSからの読み込み」「クレンジング」「BigQueryへの書き込み」を独立させることで、どこで問題が起きたか特定しやすくなります。
requirements.txt の生成
Cloud Run Functions は pyproject.toml を直接読まないため、uv export で生成します。
uv export --no-hashes --no-dev -o 02_cleaning/cloud_run_function/requirements.txt
生成された requirements.txt に以下が含まれていることを確認してください。
functions-framework
google-cloud-bigquery
google-cloud-bigquery-storage
google-cloud-storage
pandas
pyarrow
pandasとメモリ効率について
今回はpandasを使っています。pandasはメモリ上にデータをすべて展開するため、ファイルサイズが大きくなるとCloud Run Functionsのメモリ上限(デフォルト256MB)を超えてクラッシュすることがあります。
今回の対象は1日あたり最大数MB程度なので問題ありませんが、データ量が増えた場合の対応策として3つの選択肢があります。
| 対応策 | 概要 | 向いているケース |
|---|---|---|
| チャンク読み込み |
pd.read_csv(chunksize=N) で分割処理 |
ファイルが大きいが処理ロジックは変えたくない |
| Polars に切り替える | 省メモリ・高速な pandas 互換ライブラリ | 同じコードパスで大幅な改善が必要な場合 |
| Cloud Dataflow へ移行 | Apache Beam ベースのマネージドデータ処理 | GB〜TB 規模・本番パイプライン化する場合 |
「データが増えたらどうするか」を考えておくのもデータエンジニアの仕事です。このシリーズのGCP中級編でCloud Dataflowを扱う予定です。
Step 4: デプロイと動作確認
ローカルテスト(デプロイ前)
デプロイ前にローカルで動作確認します。functions-framework を使うとCloud Run Functionsをローカルで起動できます。
# 環境変数を設定してからローカル起動
export BUCKET_NAME="de-handson-gc-landing"
export PROJECT_ID="your-project-id"
export DATASET_ID="de_handson"
uv run functions-framework \
--target=main \
--source=02_cleaning/cloud_run_function/main.py \
--debug
別ターミナルからリクエストを送ります。
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"date": "2010-12-01"}'
2010-12-01: 3082 行をロードしました(クレンジング前: 3108 行)
GCPへのデプロイ
gcloud functions deploy clean-and-load \
--gen2 \
--runtime=python312 \
--region=asia-northeast1 \
--source=02_cleaning/cloud_run_function \
--entry-point=main \
--trigger-http \
--allow-unauthenticated \
--memory=512M \
--set-env-vars PROJECT_ID=${PROJECT_ID},BUCKET_NAME=de-handson-gc-landing,DATASET_ID=de_handson
デプロイが完了すると、関数のURLが発行されます。
https://asia-northeast1-{PROJECT_ID}.cloudfunctions.net/clean-and-load
動作確認:1ファイル分を手動実行
FUNCTION_URL="https://asia-northeast1-${PROJECT_ID}.cloudfunctions.net/clean-and-load"
curl -X POST "${FUNCTION_URL}" \
-H "Content-Type: application/json" \
-d '{"date": "2010-12-01"}'
2010-12-01: 3082 行をロードしました(クレンジング前: 3108 行)
BigQueryコンソールで確認します。
SELECT
DATE(InvoiceDate) AS date,
COUNT(*) AS row_count
FROM `{PROJECT_ID}.de_handson.transactions`
GROUP BY date
ORDER BY date;


Step 5: 全日付分のバッチ実行
動作確認ができたら、67日分すべてを流します。
# scripts/batch_clean_and_load.sh
FUNCTION_URL="https://asia-northeast1-${PROJECT_ID}.cloudfunctions.net/clean-and-load"
for csv in data/transactions/*.csv; do
date=$(basename "${csv}" .csv | sed 's/transactions_//')
echo "Processing ${date}..."
curl -s -X POST "${FUNCTION_URL}" \
-H "Content-Type: application/json" \
-d "{\"date\": \"${date}\"}"
echo ""
done
chmod +x scripts/batch_clean_and_load.sh
./scripts/batch_clean_and_load.sh
完了後、全件数を確認します。
SELECT
COUNT(*) AS total_rows,
MIN(DATE(InvoiceDate)) AS start_date,
MAX(DATE(InvoiceDate)) AS end_date,
COUNT(DISTINCT DATE(InvoiceDate)) AS date_count,
ROUND(SUM(sales_amount), 2) AS total_sales
FROM `{PROJECT_ID}.de_handson.transactions`;
total_rows : 101,695
start_date : 2010-12-01
end_date : 2011-02-27
date_count : 67
total_sales: 2,016,986.96
元データ103,598行からキャンセル1,903行を除外した 101,695行 が格納されていれば成功です。

まとめ
今回やったこと:
- プロファイリング結果をもとにクレンジングルールをビジネス判断として言語化
- Cloud Run Functions(Python 3.12 / Gen2)でクレンジング処理を実装・デプロイ
- BigQuery
de_handson.transactionsテーブルにパーティション付きで 101,695 行をロード
クレンジングはコードより判断が重要です。「なぜこのデータを除外するのか」を言語化できていないと、後続の分析で結果の信頼性を問われたときに答えられなくなります。実務では必ずビジネスルールとしてドキュメントに残しておきましょう。
次回はBigQueryのSQLで集計マートを作り、Looker Studioで可視化します。今回ロードした transactions テーブルがそのまま入力になります。
参考資料
- Cloud Run Functions ドキュメント(第2世代)
- Cloud Run Functions Python クイックスタート
- BigQuery テーブルのパーティション分割
- BigQuery Python クライアントライブラリ
- functions-framework-python
- uv ドキュメント
- UCI Online Retail Dataset: Chen, D. (2015). Online Retail [Dataset]. UCI Machine Learning Repository. https://doi.org/10.24432/C5BW33
本シリーズのコードはGitHubで公開しています: https://github.com/y-tsuritani/data-engineering-handson-gc
採用
「あらゆる判断を、Data-Informedに。」
株式会社ギックスでは、一緒に挑戦してくれる仲間を募集しています!
Discussion