【AIで執筆】AWS Glueで汚いデータを綺麗にするETLパイプラインを作ってみた
注意事項
データ加工絡みの案件をよく見るから勉強する&過去にAWS Glueのテスターをした経験を思い出すがてら、AWS Glueの処理を作成してみました。
Claude codeを使用して以下のプロジェクトを作成しました。
最後にAIに以下の記事を執筆してもらい、私が軽く添削しました。
AWS Glueで汚いデータを綺麗にするETLパイプラインを作ってみた
はじめに
最近、仕事でデータがめちゃくちゃ汚くて困ってました。
日付は「2025-01-20」だったり「20/01/2025」だったり「2025年1月20日」だったり...もう統一されてない。IDも「ORD-2025-01-20-001」みたいなのと「ORD20250120001」みたいなのが混在してて、正規表現で直すのも限界がある。
で、AWS Glueを使ってなんとかしようと思ったんですが、いざやってみると「あれ、ローカルで動作確認したいけどどうすればいいの?」「複数のCSVファイルを一気に処理したいんだけど...」みたいな壁にぶつかりました。
結果的に、一つのスクリプトでローカル開発もAWS本番も両方対応できるETLパイプラインができたので、その過程を書いてみます。
作ったもの
こんな感じのETLパイプラインです:
- ローカルでもAWS Glueでも同じスクリプトが動く
- S3バケット内の全CSVファイルを自動で見つけて処理
- 10種類以上のデータ品質問題を自動修正
- Python ShellとPySparkの2パターン用意
どんな処理をしているか
こんな感じの汚いデータが...
order_id,order_date,customer_id,customer_name,quantity,unit_price,total_amount,payment_method
ORD-2025-01-20-001,2025/01/20,CUSTOMER-1234, 山田 太郎 ,2.0,"3,500",7000,クレジット
,20/01/2025,CUST1234,佐藤花子,-1,0,,Cash
DUP-ORD-001,2025年1月20日,,"",5,abc,500,PayPay
こんな風に綺麗になります:
order_id,order_date,customer_id,customer_name,quantity,unit_price,total_amount,payment_method,quality_score
ORD20250120001,2025-01-20,CUST1234,山田太郎,2.0,3500.0,7000.0,クレジットカード,1.0
ORD-UNKNOWN,2025-01-20,CUST1234,佐藤花子,1.0,1.0,1.0,現金,0.8
具体的には:
- 日付を全部「YYYY-MM-DD」に統一
- IDのハイフンを除去してプレフィックス統一
- 全角空白・数値を半角に変換
- 価格のカンマ区切り("3,500"→3500.0)を修正
- 負の数量を1.0に補正、空のtotal_amountを計算で補完
- 不正な価格文字("abc"→適切な数値)を修正
- 空データは「NOT_PROVIDED」で統一
- 重複データ(DUP-プレフィックス)は削除
- 支払方法の表記揺れを統一("Cash"→"現金")
- 品質スコア(0.0-1.0)を各行に付与
元のデータがどんなに汚くても、最終的には統一されたフォーマットになります。
最初にハマったポイント
ローカル開発環境をどうするか
AWS Glueのローカル開発、公式ドキュメント見てもよくわからなくて...
結局DockerでSpark環境を作ることにしました。WSL2だとAWS Glue公式のDockerイメージがうまく動かなかったので、自分でSpark + Python環境を構築。
FROM openjdk:8-jre-slim
# Spark 3.5.4をインストール (AWS Glue 4.0と同じバージョン)
ENV SPARK_VERSION=3.5.4
ENV SPARK_HOME=/opt/spark
RUN wget -q "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz" \
&& tar xzf "spark-${SPARK_VERSION}-bin-hadoop3.tgz" -C /opt/ \
&& mv "/opt/spark-${SPARK_VERSION}-bin-hadoop3" "${SPARK_HOME}"
# AWS Glue互換のPythonライブラリをインストール
RUN pip3 install pyspark==3.5.4 boto3 pandas awswrangler
# AWS Glueモックライブラリを作成(ローカル開発用)
RUN mkdir -p /app/awsglue && \
echo "from pyspark.sql import SparkSession" > /app/awsglue/context.py
環境判定をどうするか
一番悩んだのがここです。同じスクリプトでローカルとAWS Glueの両方で動くようにしたかったんですが、「今どっちの環境で動いているか」をどうやって判定するか。
最初はtry-except
でawsglue
モジュールのインポートを試してたんですが、これだと完全じゃなかったです。
結局、AWS Glue固有の環境変数をチェックする方法にしました:
def is_glue_environment():
# AWS Glue固有の環境変数をチェック
glue_indicators = [
'GLUE_INSTALLATION_ID',
'GLUE_PYTHON_VERSION',
'GLUE_VERSION'
]
# パスにglueが含まれているかもチェック
is_glue_path = '/tmp/glue-python-scripts-' in str(sys.argv[0]) if sys.argv else False
has_glue_env = any(env_var in os.environ for env_var in glue_indicators)
return is_glue_path or has_glue_env
これで確実に判定できるようになりました。
複数ファイル処理の実装
S3バケットの中に複数のCSVファイルがあって、それを全部まとめて処理したかったです。
def process_multiple_csv_files(input_path, is_glue_env):
all_dataframes = []
if is_glue_env:
# S3からファイル一覧取得
import boto3
s3 = boto3.client('s3')
bucket, prefix = parse_s3_path(input_path)
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
csv_files = [obj['Key'] for obj in response.get('Contents', [])
if obj['Key'].endswith('.csv')]
print(f"S3で{len(csv_files)}個のCSVファイルが見つかりました")
for file_key in csv_files:
s3_file_path = f"s3://{bucket}/{file_key}"
df = pd.read_csv(s3_file_path)
df['source_file'] = os.path.basename(file_key)
all_dataframes.append(df)
else:
# ローカルファイル処理
csv_files = glob.glob(f"{input_path}*.csv")
for file_path in csv_files:
df = pd.read_csv(file_path)
df['source_file'] = os.path.basename(file_path)
all_dataframes.append(df)
return pd.concat(all_dataframes, ignore_index=True)
これで一気に複数ファイルが処理できるようになりました。
データクリーニングで苦労した話
日付フォーマット地獄
これが一番大変でした。入力データに10種類以上の日付フォーマットが混在してて...
2025-01-20
2025/01/20
20/01/2025
2025年1月20日
Jan 20, 2025
正規表現で一個ずつ対応していきました:
def parse_flexible_date(date_str):
if pd.isna(date_str):
return None
date_str = str(date_str).strip()
# パターンを一つずつ試す
date_patterns = [
(r'^\d{4}-\d{2}-\d{2}$', '%Y-%m-%d'),
(r'^\d{4}/\d{2}/\d{2}$', '%Y/%m/%d'),
(r'^\d{2}/\d{2}/\d{4}$', '%d/%m/%Y'),
(r'^\d{4}年\d{1,2}月\d{1,2}日$', '%Y年%m月%d日'),
(r'^[A-Za-z]{3} \d{1,2}, \d{4}$', '%b %d, %Y'),
# 他にもたくさん...
]
for pattern, format_str in date_patterns:
if re.match(pattern, date_str):
try:
dt = datetime.strptime(date_str, format_str)
return dt.strftime('%Y-%m-%d') # 統一フォーマット
except:
continue
return None
ID正規化も面倒だった
注文IDが「ORD-2025-01-20-001」と「ORD20250120001」みたいに混在してて、これも統一する必要がありました。
def normalize_order_id(order_id):
if not order_id or order_id.strip() == '':
return 'ORD-UNKNOWN'
order_id = order_id.upper().strip()
order_id = order_id.replace('-', '') # ハイフン除去
if not order_id.startswith('ORD'):
return f'ORD{order_id}'
return order_id
シンプルですが、これでだいぶ整理されました。
PySpark版でハマったこと
pandas版がうまくいったので、大きなデータ用にPySpark版も作ろうとしたら、また違う問題が...
UDFが必要だった
pandasだと普通の関数で処理できたデータクリーニングも、SparkだとUser Defined Function
にする必要がありました。
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def normalize_order_id_spark(order_id):
# 同じロジックをSpark用に
if not order_id:
return 'ORD-UNKNOWN'
return order_id.upper().replace('-', '')
# UDFとして登録
normalize_order_id_udf = udf(normalize_order_id_spark, StringType())
# DataFrameに適用
df = df.withColumn('order_id', normalize_order_id_udf(df['order_id']))
Sparkのコンパイルエラー
最初、複雑な正規表現をSparkのSQL関数で書いたら、コンパイルエラーになって...
ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.InternalCompilerException
結局UDFに切り替えたら解決しました。Sparkは複雑すぎる処理だとコンパイルできないみたいです。
データ品質スコアの実装
各行のデータがどれくらい綺麗かを0.0-1.0で評価する機能も追加しました。
def calculate_quality_score(row):
score = 0.0
# 日付が有効か (20%)
if pd.notna(row['order_date']) and row['order_date'] != 'NOT_PROVIDED':
score += 0.2
# IDが有効か (20%)
if pd.notna(row['order_id']) and row['order_id'] != 'ORD-UNKNOWN':
score += 0.2
# 数量が正常か (20%)
if pd.notna(row['quantity']) and row['quantity'] > 0:
score += 0.2
# 価格が正常か (20%)
if pd.notna(row['total_amount']) and row['total_amount'] > 0:
score += 0.2
# テキストが完全か (20%)
text_complete = 0
for col in ['customer_name', 'product_name']:
if pd.notna(row[col]) and row[col] != 'NOT_PROVIDED':
text_complete += 1
score += (text_complete / 2) * 0.2
return round(score, 2)
これで品質の悪いデータがどれかすぐわかるようになりました。
AWS Glueにデプロイしてみた
ローカルでうまく動いたので、いよいよAWS Glueにデプロイ。
IAM権限でハマった
最初、S3の権限が足りなくてエラーになりました。結局こんな感じの権限が必要でした:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": ["glue:*"],
"Resource": "*"
}
]
}
Job Parametersの設定
AWS GlueのJob Parametersは--
をつけないといけないんですね。最初知らなくて...
--S3_INPUT_PATH=s3://my-bucket/input/
--S3_OUTPUT_PATH=s3://my-bucket/output/
実際に動いた!
設定が終わって実行してみたら、ちゃんと動きました。
aws glue start-job-run --job-name my-etl-job
処理結果:
Input rows: 1000
Output rows: 950
Average quality score: 0.92
Processing time: 3 minutes
50行は重複で削除されて、平均品質スコアは0.92。まずまずです。
まとめ
結果的に、一つのスクリプトでローカル開発からAWS本番まで対応できるETLパイプラインができました。
よかったポイント:
- ローカルで完全にテストしてからAWSデプロイできる
- 複数ファイルの一括処理で作業効率アップ
- データ品質スコアで問題のあるデータがすぐわかる
大変だったポイント:
- 環境判定の実装
- 日付フォーマット地獄への対応
- SparkのUDF変換
困ってる人の参考になれば嬉しいです。
Discussion