🦔

【AIで執筆】AWS Glueで汚いデータを綺麗にするETLパイプラインを作ってみた

に公開

注意事項

データ加工絡みの案件をよく見るから勉強する&過去にAWS Glueのテスターをした経験を思い出すがてら、AWS Glueの処理を作成してみました。
Claude codeを使用して以下のプロジェクトを作成しました。
https://github.com/dbd-fish/glue-test

最後にAIに以下の記事を執筆してもらい、私が軽く添削しました。

AWS Glueで汚いデータを綺麗にするETLパイプラインを作ってみた

はじめに

最近、仕事でデータがめちゃくちゃ汚くて困ってました。

日付は「2025-01-20」だったり「20/01/2025」だったり「2025年1月20日」だったり...もう統一されてない。IDも「ORD-2025-01-20-001」みたいなのと「ORD20250120001」みたいなのが混在してて、正規表現で直すのも限界がある。

で、AWS Glueを使ってなんとかしようと思ったんですが、いざやってみると「あれ、ローカルで動作確認したいけどどうすればいいの?」「複数のCSVファイルを一気に処理したいんだけど...」みたいな壁にぶつかりました。

結果的に、一つのスクリプトでローカル開発もAWS本番も両方対応できるETLパイプラインができたので、その過程を書いてみます。

作ったもの

こんな感じのETLパイプラインです:

  • ローカルでもAWS Glueでも同じスクリプトが動く
  • S3バケット内の全CSVファイルを自動で見つけて処理
  • 10種類以上のデータ品質問題を自動修正
  • Python ShellとPySparkの2パターン用意

どんな処理をしているか

こんな感じの汚いデータが...

order_id,order_date,customer_id,customer_name,quantity,unit_price,total_amount,payment_method
ORD-2025-01-20-001,2025/01/20,CUSTOMER-1234, 山田 太郎 ,2.0,"3,500",7000,クレジット
,20/01/2025,CUST1234,佐藤花子,-1,0,,Cash
DUP-ORD-001,2025年1月20日,,"",5,abc,500,PayPay

こんな風に綺麗になります:

order_id,order_date,customer_id,customer_name,quantity,unit_price,total_amount,payment_method,quality_score
ORD20250120001,2025-01-20,CUST1234,山田太郎,2.0,3500.0,7000.0,クレジットカード,1.0
ORD-UNKNOWN,2025-01-20,CUST1234,佐藤花子,1.0,1.0,1.0,現金,0.8

具体的には:

  • 日付を全部「YYYY-MM-DD」に統一
  • IDのハイフンを除去してプレフィックス統一
  • 全角空白・数値を半角に変換
  • 価格のカンマ区切り("3,500"→3500.0)を修正
  • 負の数量を1.0に補正、空のtotal_amountを計算で補完
  • 不正な価格文字("abc"→適切な数値)を修正
  • 空データは「NOT_PROVIDED」で統一
  • 重複データ(DUP-プレフィックス)は削除
  • 支払方法の表記揺れを統一("Cash"→"現金")
  • 品質スコア(0.0-1.0)を各行に付与

元のデータがどんなに汚くても、最終的には統一されたフォーマットになります。

最初にハマったポイント

ローカル開発環境をどうするか

AWS Glueのローカル開発、公式ドキュメント見てもよくわからなくて...

結局DockerでSpark環境を作ることにしました。WSL2だとAWS Glue公式のDockerイメージがうまく動かなかったので、自分でSpark + Python環境を構築。

FROM openjdk:8-jre-slim

# Spark 3.5.4をインストール (AWS Glue 4.0と同じバージョン)
ENV SPARK_VERSION=3.5.4
ENV SPARK_HOME=/opt/spark

RUN wget -q "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz" \
    && tar xzf "spark-${SPARK_VERSION}-bin-hadoop3.tgz" -C /opt/ \
    && mv "/opt/spark-${SPARK_VERSION}-bin-hadoop3" "${SPARK_HOME}"

# AWS Glue互換のPythonライブラリをインストール
RUN pip3 install pyspark==3.5.4 boto3 pandas awswrangler

# AWS Glueモックライブラリを作成(ローカル開発用)
RUN mkdir -p /app/awsglue && \
    echo "from pyspark.sql import SparkSession" > /app/awsglue/context.py

環境判定をどうするか

一番悩んだのがここです。同じスクリプトでローカルとAWS Glueの両方で動くようにしたかったんですが、「今どっちの環境で動いているか」をどうやって判定するか。

最初はtry-exceptawsglueモジュールのインポートを試してたんですが、これだと完全じゃなかったです。

結局、AWS Glue固有の環境変数をチェックする方法にしました:

def is_glue_environment():
    # AWS Glue固有の環境変数をチェック
    glue_indicators = [
        'GLUE_INSTALLATION_ID',
        'GLUE_PYTHON_VERSION', 
        'GLUE_VERSION'
    ]
    
    # パスにglueが含まれているかもチェック
    is_glue_path = '/tmp/glue-python-scripts-' in str(sys.argv[0]) if sys.argv else False
    has_glue_env = any(env_var in os.environ for env_var in glue_indicators)
    
    return is_glue_path or has_glue_env

これで確実に判定できるようになりました。

複数ファイル処理の実装

S3バケットの中に複数のCSVファイルがあって、それを全部まとめて処理したかったです。

def process_multiple_csv_files(input_path, is_glue_env):
    all_dataframes = []
    
    if is_glue_env:
        # S3からファイル一覧取得
        import boto3
        s3 = boto3.client('s3')
        bucket, prefix = parse_s3_path(input_path)
        
        response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
        csv_files = [obj['Key'] for obj in response.get('Contents', []) 
                    if obj['Key'].endswith('.csv')]
        
        print(f"S3で{len(csv_files)}個のCSVファイルが見つかりました")
        
        for file_key in csv_files:
            s3_file_path = f"s3://{bucket}/{file_key}"
            df = pd.read_csv(s3_file_path)
            df['source_file'] = os.path.basename(file_key)
            all_dataframes.append(df)
    else:
        # ローカルファイル処理
        csv_files = glob.glob(f"{input_path}*.csv")
        
        for file_path in csv_files:
            df = pd.read_csv(file_path)
            df['source_file'] = os.path.basename(file_path)
            all_dataframes.append(df)
    
    return pd.concat(all_dataframes, ignore_index=True)

これで一気に複数ファイルが処理できるようになりました。

データクリーニングで苦労した話

日付フォーマット地獄

これが一番大変でした。入力データに10種類以上の日付フォーマットが混在してて...

2025-01-20
2025/01/20  
20/01/2025
2025年1月20日
Jan 20, 2025

正規表現で一個ずつ対応していきました:

def parse_flexible_date(date_str):
    if pd.isna(date_str):
        return None
    
    date_str = str(date_str).strip()
    
    # パターンを一つずつ試す
    date_patterns = [
        (r'^\d{4}-\d{2}-\d{2}$', '%Y-%m-%d'),
        (r'^\d{4}/\d{2}/\d{2}$', '%Y/%m/%d'),
        (r'^\d{2}/\d{2}/\d{4}$', '%d/%m/%Y'),
        (r'^\d{4}年\d{1,2}月\d{1,2}日$', '%Y年%m月%d日'),
        (r'^[A-Za-z]{3} \d{1,2}, \d{4}$', '%b %d, %Y'),
        # 他にもたくさん...
    ]
    
    for pattern, format_str in date_patterns:
        if re.match(pattern, date_str):
            try:
                dt = datetime.strptime(date_str, format_str)
                return dt.strftime('%Y-%m-%d')  # 統一フォーマット
            except:
                continue
    
    return None

ID正規化も面倒だった

注文IDが「ORD-2025-01-20-001」と「ORD20250120001」みたいに混在してて、これも統一する必要がありました。

def normalize_order_id(order_id):
    if not order_id or order_id.strip() == '':
        return 'ORD-UNKNOWN'
    
    order_id = order_id.upper().strip()
    order_id = order_id.replace('-', '')  # ハイフン除去
    
    if not order_id.startswith('ORD'):
        return f'ORD{order_id}'
    
    return order_id

シンプルですが、これでだいぶ整理されました。

PySpark版でハマったこと

pandas版がうまくいったので、大きなデータ用にPySpark版も作ろうとしたら、また違う問題が...

UDFが必要だった

pandasだと普通の関数で処理できたデータクリーニングも、SparkだとUser Defined Functionにする必要がありました。

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def normalize_order_id_spark(order_id):
    # 同じロジックをSpark用に
    if not order_id:
        return 'ORD-UNKNOWN'
    return order_id.upper().replace('-', '')

# UDFとして登録
normalize_order_id_udf = udf(normalize_order_id_spark, StringType())

# DataFrameに適用
df = df.withColumn('order_id', normalize_order_id_udf(df['order_id']))

Sparkのコンパイルエラー

最初、複雑な正規表現をSparkのSQL関数で書いたら、コンパイルエラーになって...

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.InternalCompilerException

結局UDFに切り替えたら解決しました。Sparkは複雑すぎる処理だとコンパイルできないみたいです。

データ品質スコアの実装

各行のデータがどれくらい綺麗かを0.0-1.0で評価する機能も追加しました。

def calculate_quality_score(row):
    score = 0.0
    
    # 日付が有効か (20%)
    if pd.notna(row['order_date']) and row['order_date'] != 'NOT_PROVIDED':
        score += 0.2
    
    # IDが有効か (20%)
    if pd.notna(row['order_id']) and row['order_id'] != 'ORD-UNKNOWN':
        score += 0.2
    
    # 数量が正常か (20%)
    if pd.notna(row['quantity']) and row['quantity'] > 0:
        score += 0.2
    
    # 価格が正常か (20%)  
    if pd.notna(row['total_amount']) and row['total_amount'] > 0:
        score += 0.2
    
    # テキストが完全か (20%)
    text_complete = 0
    for col in ['customer_name', 'product_name']:
        if pd.notna(row[col]) and row[col] != 'NOT_PROVIDED':
            text_complete += 1
    score += (text_complete / 2) * 0.2
    
    return round(score, 2)

これで品質の悪いデータがどれかすぐわかるようになりました。

AWS Glueにデプロイしてみた

ローカルでうまく動いたので、いよいよAWS Glueにデプロイ。

IAM権限でハマった

最初、S3の権限が足りなくてエラーになりました。結局こんな感じの権限が必要でした:

{
    "Version": "2012-10-17", 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["s3:*"],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": ["glue:*"],
            "Resource": "*"
        }
    ]
}

Job Parametersの設定

AWS GlueのJob Parametersは--をつけないといけないんですね。最初知らなくて...

--S3_INPUT_PATH=s3://my-bucket/input/
--S3_OUTPUT_PATH=s3://my-bucket/output/

実際に動いた!

設定が終わって実行してみたら、ちゃんと動きました。

aws glue start-job-run --job-name my-etl-job

処理結果:

Input rows: 1000
Output rows: 950  
Average quality score: 0.92
Processing time: 3 minutes

50行は重複で削除されて、平均品質スコアは0.92。まずまずです。

まとめ

結果的に、一つのスクリプトでローカル開発からAWS本番まで対応できるETLパイプラインができました。

よかったポイント:

  • ローカルで完全にテストしてからAWSデプロイできる
  • 複数ファイルの一括処理で作業効率アップ
  • データ品質スコアで問題のあるデータがすぐわかる

大変だったポイント:

  • 環境判定の実装
  • 日付フォーマット地獄への対応
  • SparkのUDF変換

困ってる人の参考になれば嬉しいです。

Discussion