🐍

【Databricks】Delta Live Tables (DLT) 入門 - Python編 (SQL編を踏まえて)

に公開

はじめに

Delta Live Tables(DLT)では、SQLとPythonの両方でデータパイプラインを構築できます。前回の記事ではSQLでの実装を中心に解説しましたが、今回はPythonでの実装方法を説明します。

PythonでのDLT実装は、SQLよりも複雑なロジックや外部ライブラリとの連携、動的な処理が必要な場合に利用がおすすめです。

SQLとPythonの使い分け

DLTでは、SQLとPythonそれぞれに適した使用場面があります。適切な選択により、開発効率と保守性を向上させることができます。

SQLが適している場面

✅ SQLを選ぶべきケース

  • シンプルなデータ変換: 集計、結合、フィルタリングが中心
  • ビジネスユーザーとの共有: SQLの方が理解しやすい
  • パフォーマンス重視: Catalyst オプティマイザーの恩恵を最大限活用
  • 標準的なETL処理: 一般的なデータクレンジングと集計
-- SQLの例:シンプルで読みやすい
CREATE OR REFRESH LIVE TABLE daily_sales_summary
AS SELECT 
  sale_date,
  SUM(amount) as total_sales,
  COUNT(*) as transaction_count
FROM LIVE.cleaned_sales
GROUP BY sale_date;

Pythonが適している場面

✅ Pythonを選ぶべきケース

  • 複雑なビジネスロジック: 条件分岐や計算ロジックが複雑
  • 外部ライブラリの使用: pandas、numpy、scikit-learn等の活用
  • 動的な処理: 実行時の条件に応じた処理の変更
  • 機械学習との連携: 特徴量エンジニアリングやモデル適用
  • 高度なデータ品質制御: 複雑な検証ルールの実装
  • 設定の外部化: 環境や条件に応じた動的な設定変更
# Pythonの例:複雑なロジックと外部ライブラリの使用
import dlt
from pyspark.sql.functions import *
import pandas as pd
from datetime import datetime

@dlt.table(comment="Complex customer scoring with ML features")
@dlt.expect_or_drop("valid_score", "customer_score >= 0 AND customer_score <= 100")
def customer_scoring():
    # 複雑なスコア計算ロジック
    df = dlt.read("customer_transactions")
    
    # 外部ライブラリを使った処理
    pandas_df = df.toPandas()
    processed_df = calculate_customer_risk_score(pandas_df)
    
    return spark.createDataFrame(processed_df)

組み合わせのベストプラクティス

実際のプロジェクトでは、SQLとPythonを組み合わせて使用することが多いです:

  • Bronze Layer: Python(複雑なファイル形式の処理、スキーマ推論)
  • Silver Layer: SQL(標準的なクレンジング)または Python(複雑な変換)
  • Gold Layer: SQL(集計・レポート用)または Python(機械学習特徴量)

DLT Python の基本概念

デコレータベースの定義

PythonでのDLTは、関数にデコレータを付与することでテーブルを定義します。これにより、Pythonの柔軟性を保ちながら宣言的なパイプラインを構築できます。

import dlt
from pyspark.sql.functions import *

# 基本的なテーブル定義
@dlt.table(
    comment="顧客データのクリーニング処理",
    table_properties={
        "quality": "silver",
        "delta.autoOptimize.optimizeWrite": "true"
    }
)
def cleaned_customers():
    return (
        dlt.read("raw_customers")
        .filter(col("customer_id").isNotNull())
        .select("customer_id", "name", "email", "registration_date")
    )

テーブルタイプとデコレータ

Streaming Table

@dlt.table(comment="リアルタイムイベントストリーム")
def streaming_events():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/mnt/events/")
    )

Live Table(マテリアライズドビュー)

@dlt.table(comment="日次集計テーブル")
def daily_aggregations():
    return (
        dlt.read("streaming_events")
        .groupBy("date", "event_type")
        .agg(count("*").alias("event_count"))
    )

View(中間処理用)

@dlt.view(comment="中間処理用のビュー")
def filtered_events():
    return (
        dlt.read("streaming_events")
        .filter(col("event_type").isin(["click", "purchase"]))
    )

Expectationsによる品質管理(Python版)

PythonでのExpectationsは、SQLよりも柔軟で強力な機能を提供します。複数の制約をグループ化したり、動的に制約を生成することが可能です。

基本的なExpectations

# 単一の制約
@dlt.table()
@dlt.expect("positive_amount", "amount > 0")
@dlt.expect_or_drop("valid_email", "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'")
@dlt.expect_or_fail("critical_id", "customer_id IS NOT NULL")
def quality_controlled_table():
    return dlt.read("raw_data")

複数制約のグループ化

Pythonならではのメリットとして、複数の制約をまとめて管理できます:

# 制約をグループ化
basic_quality_rules = {
    "not_null_id": "customer_id IS NOT NULL",
    "valid_email": "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'",
    "valid_age": "age BETWEEN 0 AND 120",
    "positive_amount": "amount > 0"
}

warning_rules = {
    "recent_data": "registration_date >= '2020-01-01'",
    "reasonable_amount": "amount <= 100000"
}

@dlt.table(comment="包括的な品質制御")
@dlt.expect_all_or_drop(basic_quality_rules)
@dlt.expect_all(warning_rules)  # 警告のみ
def comprehensive_quality_check():
    return dlt.read("raw_customers")

動的な品質ルール生成

Pythonの強力な機能として、実行時に動的に制約を生成できます:

def create_quality_rules_for_table(table_name):
    """テーブル名に応じて動的に品質ルールを生成"""
    base_rules = {
        "not_null_id": f"{table_name}_id IS NOT NULL"
    }
    
    if table_name == "sales":
        base_rules.update({
            "positive_amount": "amount > 0",
            "valid_currency": "currency IN ('USD', 'EUR', 'JPY')"
        })
    elif table_name == "customers":
        base_rules.update({
            "valid_email": "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'",
            "valid_age": "age BETWEEN 0 AND 120"
        })
    
    return base_rules

@dlt.table(comment="動的品質ルール適用")
@dlt.expect_all_or_drop(create_quality_rules_for_table("sales"))
def dynamic_quality_sales():
    return dlt.read("raw_sales")

実践例:売上データパイプライン(Python版)

設定管理クラス

Pythonの利点を活かして、設定を外部化し環境に応じた動的な処理を実装できます:

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, date
import json

class SalesDataPipelineConfig:
    """売上データパイプラインの設定管理クラス"""
    
    def __init__(self):
        self.env = spark.conf.get("pipeline.environment", "dev")
        self.config = self._load_environment_config()
        self.quality_thresholds = self._get_quality_thresholds()
    
    def _load_environment_config(self):
        """環境別設定を読み込み"""
        configs = {
            "dev": {
                "source_path": "/mnt/dev/raw/sales/",
                "checkpoint_path": "/mnt/dev/checkpoints/",
                "max_files_per_trigger": 10,
                "enable_advanced_features": False
            },
            "staging": {
                "source_path": "/mnt/staging/raw/sales/",
                "checkpoint_path": "/mnt/staging/checkpoints/",
                "max_files_per_trigger": 50,
                "enable_advanced_features": True
            },
            "prod": {
                "source_path": "/mnt/prod/raw/sales/",
                "checkpoint_path": "/mnt/prod/checkpoints/",
                "max_files_per_trigger": 100,
                "enable_advanced_features": True
            }
        }
        return configs.get(self.env, configs["dev"])
    
    def _get_quality_thresholds(self):
        """環境別の品質閾値を設定"""
        return {
            "dev": {"min_daily_records": 100, "max_error_rate": 10.0},
            "staging": {"min_daily_records": 1000, "max_error_rate": 5.0},
            "prod": {"min_daily_records": 10000, "max_error_rate": 1.0}
        }.get(self.env, {"min_daily_records": 100, "max_error_rate": 10.0})
    
    def get_sales_quality_rules(self):
        """売上データ用の品質ルールを生成"""
        return {
            "not_null_transaction_id": "transaction_id IS NOT NULL",
            "not_null_customer_id": "customer_id IS NOT NULL",
            "positive_amount": "amount > 0",
            "valid_date": "sale_date IS NOT NULL",
            "not_future_date": "sale_date <= current_date()"
        }
    
    def log_pipeline_metrics(self, stage, table_name, record_count):
        """パイプラインメトリクスをログ出力"""
        metrics = {
            "timestamp": datetime.now().isoformat(),
            "environment": self.env,
            "stage": stage,
            "table_name": table_name,
            "record_count": record_count
        }
        print(f"PIPELINE_METRICS: {json.dumps(metrics)}")

# 設定インスタンスを作成
config = SalesDataPipelineConfig()

Bronze Layer(Python版)

@dlt.table(
    comment="S3からの生売上データ取り込み(環境対応版)",
    table_properties={
        "quality": "bronze",
        "environment": config.env
    }
)
def bronze_sales():
    """
    外部ファイルからの売上データ取り込み
    環境に応じてパスと処理設定を動的に変更
    """
    df = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.maxFilesPerTrigger", config.config["max_files_per_trigger"])
        .option("cloudFiles.schemaLocation", config.config["checkpoint_path"])
        .load(config.config["source_path"])
        .select(
            "*",
            current_timestamp().alias("ingestion_timestamp"),
            lit(config.env).alias("environment"),
            input_file_name().alias("source_file_path")
        )
    )
    
    # メトリクスログ出力(開発時のみ)
    if config.env == "dev":
        record_count = df.count() if not df.isStreaming else "streaming"
        config.log_pipeline_metrics("bronze", "sales", record_count)
    
    return df

@dlt.table(comment="顧客マスタデータの取り込み")
def bronze_customers():
    """顧客マスタデータの処理"""
    return (
        spark.read
        .format("delta")
        .load("/mnt/master/customers/")
        .select(
            "*",
            current_timestamp().alias("load_timestamp")
        )
    )

Silver Layer(Python版)

@dlt.table(
    comment="クリーニング済み売上データ(高度な品質制御付き)",
    table_properties={
        "quality": "silver",
        "delta.autoOptimize.optimizeWrite": "true"
    }
)
@dlt.expect_all_or_drop(config.get_sales_quality_rules())
@dlt.expect("data_freshness", "ingestion_timestamp >= current_timestamp() - interval 2 hours")
def silver_sales_cleaned():
    """
    売上データのクリーニングと型変換
    複雑なビジネスルールをPythonで実装
    """
    df = dlt.read("bronze_sales")
    
    # データ型の統一と変換
    cleaned_df = (
        df
        .select(
            col("transaction_id"),
            col("customer_id").cast(LongType()),
            col("product_id"),
            col("amount").cast(DecimalType(12, 2)),
            to_date(col("sale_date")).alias("sale_date"),
            col("ingestion_timestamp"),
            col("source_file_path")
        )
        # ビジネスルール適用
        .filter(col("amount") > 0)
        .filter(col("sale_date").isNotNull())
    )
    
    # 高度な処理:異常値検出
    if config.config["enable_advanced_features"]:
        # 統計的な異常値検出
        stats = cleaned_df.agg(
            avg("amount").alias("mean_amount"),
            stddev("amount").alias("std_amount")
        ).collect()[0]
        
        mean_amount = stats["mean_amount"]
        std_amount = stats["std_amount"]
        
        # Z-scoreによる異常値フラグ
        cleaned_df = cleaned_df.withColumn(
            "is_outlier",
            when(
                abs((col("amount") - lit(mean_amount)) / lit(std_amount)) > 3,
                True
            ).otherwise(False)
        ).withColumn(
            "outlier_score",
            abs((col("amount") - lit(mean_amount)) / lit(std_amount))
        )
    
    return cleaned_df

@dlt.table(comment="顧客データの統合とエンリッチメント")
@dlt.expect_or_drop("valid_customer_data", "customer_id IS NOT NULL AND email IS NOT NULL")
def silver_customers_enriched():
    """
    顧客データのエンリッチメント
    外部データとの結合や計算フィールドの追加
    """
    customers_df = dlt.read("bronze_customers")
    sales_df = dlt.read("silver_sales_cleaned")
    
    # 顧客別の購入履歴サマリを計算
    customer_summary = (
        sales_df
        .groupBy("customer_id")
        .agg(
            count("transaction_id").alias("total_purchases"),
            sum("amount").alias("total_spent"),
            avg("amount").alias("avg_purchase_amount"),
            max("sale_date").alias("last_purchase_date"),
            min("sale_date").alias("first_purchase_date")
        )
    )
    
    # 顧客マスタと購入履歴を結合
    enriched_customers = (
        customers_df
        .join(customer_summary, "customer_id", "left")
        .select(
            col("customer_id"),
            col("name"),
            col("email"),
            col("registration_date"),
            coalesce(col("total_purchases"), lit(0)).alias("total_purchases"),
            coalesce(col("total_spent"), lit(0)).alias("total_spent"),
            col("avg_purchase_amount"),
            col("last_purchase_date"),
            col("first_purchase_date"),
            
            # 顧客セグメント分類(複雑なビジネスロジック)
            when(col("total_spent") >= 50000, "VIP")
            .when(col("total_spent") >= 10000, "Premium")
            .when(col("total_spent") >= 1000, "Standard")
            .when(col("total_purchases") > 0, "Active")
            .otherwise("Prospect")
            .alias("customer_segment"),
            
            # 顧客ライフタイム日数
            when(
                col("first_purchase_date").isNotNull() & col("last_purchase_date").isNotNull(),
                datediff(col("last_purchase_date"), col("first_purchase_date")) + 1
            ).otherwise(0).alias("customer_lifetime_days")
        )
    )
    
    return enriched_customers

Gold Layer(Python版)

@dlt.table(
    comment="日次売上サマリー(レポート・ダッシュボード用)",
    partition_cols=["sale_date"],
    table_properties={
        "quality": "gold",
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true"
    }
)
def gold_daily_sales_summary():
    """
    日次売上の包括的なサマリー
    レポートとダッシュボードで使用
    """
    return (
        dlt.read("silver_sales_cleaned")
        .groupBy("sale_date")
        .agg(
            # 基本メトリクス
            count("transaction_id").alias("transaction_count"),
            sum("amount").alias("total_sales"),
            avg("amount").alias("avg_transaction_value"),
            min("amount").alias("min_transaction"),
            max("amount").alias("max_transaction"),
            
            # 顧客メトリクス
            countDistinct("customer_id").alias("unique_customers"),
            countDistinct("product_id").alias("unique_products"),
            
            # 高度なメトリクス(異常値検出有効時)
            *([
                count(when(col("is_outlier") == True, 1)).alias("outlier_transactions"),
                avg("outlier_score").alias("avg_outlier_score")
            ] if config.config["enable_advanced_features"] else []),
            
            # タイムスタンプ
            current_timestamp().alias("summary_generated_at")
        )
    )

@dlt.table(comment="顧客RFM分析テーブル(マーケティング用)")
def gold_customer_rfm_analysis():
    """
    RFM分析(Recency, Frequency, Monetary)
    マーケティング施策立案用
    """
    sales_df = dlt.read("silver_sales_cleaned")
    
    # RFMメトリクスの計算
    rfm_metrics = (
        sales_df
        .groupBy("customer_id")
        .agg(
            # Recency: 最後の購入からの日数
            datediff(current_date(), max("sale_date")).alias("recency_days"),
            
            # Frequency: 購入回数
            count("transaction_id").alias("frequency"),
            
            # Monetary: 総購入金額
            sum("amount").alias("monetary_value")
        )
    )
    
    # RFMスコアの計算(1-5段階)
    rfm_scored = (
        rfm_metrics
        .withColumn(
            "recency_score",
            when(col("recency_days") <= 30, 5)
            .when(col("recency_days") <= 60, 4)
            .when(col("recency_days") <= 90, 3)
            .when(col("recency_days") <= 180, 2)
            .otherwise(1)
        )
        .withColumn(
            "frequency_score",
            when(col("frequency") >= 20, 5)
            .when(col("frequency") >= 10, 4)
            .when(col("frequency") >= 5, 3)
            .when(col("frequency") >= 2, 2)
            .otherwise(1)
        )
        .withColumn(
            "monetary_score",
            when(col("monetary_value") >= 50000, 5)
            .when(col("monetary_value") >= 10000, 4)
            .when(col("monetary_value") >= 5000, 3)
            .when(col("monetary_value") >= 1000, 2)
            .otherwise(1)
        )
    )
    
    # 総合評価とセグメント分類
    final_rfm = (
        rfm_scored
        .withColumn(
            "rfm_score",
            concat(col("recency_score"), col("frequency_score"), col("monetary_score"))
        )
        .withColumn(
            "customer_segment",
            when((col("recency_score") >= 4) & (col("frequency_score") >= 4) & (col("monetary_score") >= 4), "Champions")
            .when((col("recency_score") >= 3) & (col("frequency_score") >= 3) & (col("monetary_score") >= 3), "Loyal Customers")
            .when((col("recency_score") >= 4) & (col("frequency_score") <= 2), "New Customers")
            .when((col("recency_score") <= 2) & (col("frequency_score") >= 3), "At Risk")
            .when((col("recency_score") <= 2) & (col("frequency_score") <= 2), "Lost Customers")
            .otherwise("Developing")
        )
        .withColumn("analysis_date", current_date())
    )
    
    return final_rfm

# 機械学習特徴量テーブル
@dlt.table(comment="機械学習用顧客特徴量")
def gold_ml_customer_features():
    """
    機械学習モデル用の特徴量テーブル
    予測分析やレコメンデーションシステムで使用
    """
    customers_df = dlt.read("silver_customers_enriched")
    rfm_df = dlt.read("gold_customer_rfm_analysis")
    
    return (
        customers_df
        .join(rfm_df, "customer_id", "inner")
        .select(
            # 基本情報
            col("customer_id"),
            col("customer_segment"),
            col("total_purchases"),
            col("total_spent"),
            col("customer_lifetime_days"),
            
            # RFM特徴量
            col("recency_days"),
            col("frequency"),
            col("monetary_value"),
            col("recency_score"),
            col("frequency_score"), 
            col("monetary_score"),
            
            # 派生特徴量
            (col("total_spent") / greatest(col("customer_lifetime_days"), lit(1))).alias("daily_spend_rate"),
            (col("total_purchases") / greatest(col("customer_lifetime_days"), lit(1))).alias("purchase_frequency_rate"),
            
            # カテゴリ特徴量をエンコード
            when(col("customer_segment") == "VIP", 1).otherwise(0).alias("is_vip"),
            when(col("customer_segment") == "Premium", 1).otherwise(0).alias("is_premium"),
            
            current_timestamp().alias("feature_generated_at")
        )
    )

高度な機能実装

外部ライブラリとの連携

Pythonの強力な機能として、pandasやnumpy、scikit-learn等の外部ライブラリを活用できます:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

@dlt.table(comment="高度な統計分析結果")
def advanced_analytics_results():
    """
    pandasとnumpyを使った高度な分析
    """
    # SparkデータフレームをPandasに変換
    spark_df = dlt.read("silver_sales_cleaned")
    pandas_df = spark_df.toPandas()
    
    # pandas/numpyを使った分析
    # 移動平均の計算
    pandas_df['amount_7day_ma'] = pandas_df.groupby('customer_id')['amount'].rolling(7).mean().reset_index(0, drop=True)
    
    # 季節性分析
    pandas_df['sale_date'] = pd.to_datetime(pandas_df['sale_date'])
    pandas_df['day_of_week'] = pandas_df['sale_date'].dt.dayofweek
    pandas_df['month'] = pandas_df['sale_date'].dt.month
    
    # カスタム統計指標
    customer_stats = pandas_df.groupby('customer_id').agg({
        'amount': ['mean', 'std', 'skew'],
        'sale_date': ['min', 'max', 'count']
    }).round(2)
    
    # Sparkデータフレームに戻す
    return spark.createDataFrame(pandas_df)

# UDF(User Defined Function)を使った処理
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType

@udf(returnType=DoubleType())
def calculate_customer_lifetime_value(total_spent, customer_lifetime_days, frequency):
    """顧客生涯価値を計算するUDF"""
    if not all([total_spent, customer_lifetime_days, frequency]) or customer_lifetime_days <= 0:
        return 0.0
    
    # 簡単なCLV計算
    avg_order_value = total_spent / frequency
    purchase_frequency = frequency / customer_lifetime_days * 365  # 年間購入頻度
    
    # 予測生涯価値(3年間)
    predicted_clv = avg_order_value * purchase_frequency * 3
    
    return float(predicted_clv)

@dlt.table(comment="CLV計算結果")
def customer_lifetime_value():
    """カスタムUDFを使ったCLV計算"""
    return (
        dlt.read("silver_customers_enriched")
        .withColumn(
            "predicted_clv",
            calculate_customer_lifetime_value(
                col("total_spent"),
                col("customer_lifetime_days"),
                col("total_purchases")
            )
        )
        .filter(col("predicted_clv") > 0)
    )

動的スキーマ処理

@dlt.table(comment="動的スキーマ対応テーブル")
def dynamic_schema_processing():
    """
    JSONデータの動的スキーマ処理
    新しいフィールドが追加されても自動対応
    """
    df = dlt.read("bronze_sales")
    
    # JSONカラムがある場合の動的展開
    if "properties" in df.columns:
        # JSONスキーマを推論
        sample_data = df.select("properties").limit(1000).collect()
        json_fields = set()
        
        for row in sample_data:
            if row.properties:
                try:
                    import json
                    parsed = json.loads(row.properties)
                    json_fields.update(parsed.keys())
                except:
                    continue
        
        # 動的にカラムを展開
        for field in json_fields:
            df = df.withColumn(
                f"prop_{field}",
                get_json_object(col("properties"), f"$.{field}")
            )
    
    return df

# 設定駆動型の処理
def create_aggregation_table(table_name, group_by_cols, agg_cols):
    """設定に基づいて動的に集計テーブルを生成する関数"""
    
    @dlt.table(name=f"agg_{table_name}", comment=f"Dynamic aggregation for {table_name}")
    def dynamic_aggregation():
        df = dlt.read(table_name)
        
        # 動的に集計処理を生成
        agg_exprs = []
        for col_name, agg_type in agg_cols.items():
            if agg_type == "sum":
                agg_exprs.append(sum(col_name).alias(f"total_{col_name}"))
            elif agg_type == "avg":
                agg_exprs.append(avg(col_name).alias(f"avg_{col_name}"))
            elif agg_type == "count":
                agg_exprs.append(count(col_name).alias(f"count_{col_name}"))
        
        return df.groupBy(*group_by_cols).agg(*agg_exprs)
    
    return dynamic_aggregation

# 使用例:設定に基づく動的テーブル生成
sales_agg_config = {
    "group_by_cols": ["sale_date", "customer_segment"],
    "agg_cols": {
        "amount": "sum",
        "transaction_id": "count"
    }
}

# 動的にテーブル関数を生成
sales_daily_agg = create_aggregation_table("silver_sales_cleaned", 
                                          sales_agg_config["group_by_cols"],
                                          sales_agg_config["agg_cols"])

エラーハンドリングと監視

Pythonでは、より詳細なエラーハンドリングと監視機能を実装できます:

高度なエラーハンドリング

import logging
from datetime import datetime

# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def safe_division_udf(numerator, denominator):
    """安全な除算を行うUDF"""
    try:
        if denominator is None or denominator == 0:
            return None
        return float(numerator) / float(denominator)
    except Exception as e:
        logger.error(f"Division error: {e}")
        return None

safe_division = udf(safe_division_udf, DoubleType())

@dlt.table(comment="エラーハンドリング付きの計算テーブル")
def robust_calculations():
    """エラーに強い計算処理"""
    df = dlt.read("silver_sales_cleaned")
    
    try:
        # 安全な計算処理
        result_df = (
            df
            .withColumn(
                "profit_margin_safe",
                safe_division(col("profit"), col("amount"))
            )
            .withColumn(
                "processing_timestamp",
                current_timestamp()
            )
        )
        
        # データ品質チェック
        total_records = result_df.count()
        null_margins = result_df.filter(col("profit_margin_safe").isNull()).count()
        
        if total_records > 0:
            null_rate = (null_margins / total_records) * 100
            logger.info(f"処理完了: 総レコード数={total_records}, NULL率={null_rate:.2f}%")
            
            if null_rate > 50:
                logger.warning(f"高いNULL率を検出: {null_rate:.2f}%")
        
        return result_df
        
    except Exception as e:
        logger.error(f"テーブル処理エラー: {str(e)}")
        # エラー時は空のデータフレームを返す
        return spark.createDataFrame([], df.schema)

# データ品質監視テーブル
@dlt.table(comment="データ品質監視ダッシュボード")
def data_quality_monitoring():
    """包括的なデータ品質監視"""
    
    def analyze_table_quality(table_name, df):
        """テーブルの品質分析を行う"""
        total_count = df.count()
        
        quality_metrics = {
            "table_name": table_name,
            "total_records": total_count,
            "check_timestamp": datetime.now()
        }
        
        # カラムごとのNULL率計算
        for col_name in df.columns:
            null_count = df.filter(col(col_name).isNull()).count()
            null_rate = (null_count / total_count * 100) if total_count > 0 else 0
            quality_metrics[f"{col_name}_null_rate"] = round(null_rate, 2)
        
        return quality_metrics
    
    # 複数テーブルの品質をチェック
    tables_to_check = [
        ("bronze_sales", dlt.read("bronze_sales")),
        ("silver_sales_cleaned", dlt.read("silver_sales_cleaned")),
        ("silver_customers_enriched", dlt.read("silver_customers_enriched"))
    ]
    
    quality_results = []
    for table_name, df in tables_to_check:
        try:
            metrics = analyze_table_quality(table_name, df)
            quality_results.append(metrics)
        except Exception as e:
            logger.error(f"品質チェックエラー {table_name}: {str(e)}")
            quality_results.append({
                "table_name": table_name,
                "error": str(e),
                "check_timestamp": datetime.now()
            })
    
    # 結果をSparkデータフレームに変換
    return spark.createDataFrame(quality_results)

アラート機能の実装

@dlt.table(comment="自動アラート生成テーブル")
def automated_alerts():
    """条件に基づく自動アラート生成"""
    
    def generate_alert(alert_type, message, severity="INFO", table_name=None):
        """アラートメッセージを生成"""
        return {
            "alert_timestamp": datetime.now(),
            "alert_type": alert_type,
            "message": message,
            "severity": severity,
            "table_name": table_name,
            "environment": config.env
        }
    
    alerts = []
    
    # データ量チェック
    try:
        daily_sales = dlt.read("silver_sales_cleaned").filter(
            col("sale_date") == current_date()
        ).count()
        
        min_expected = config.quality_thresholds["min_daily_records"]
        
        if daily_sales < min_expected:
            alerts.append(generate_alert(
                "DATA_VOLUME",
                f"本日の売上レコード数が少なすぎます: {daily_sales} < {min_expected}",
                "ERROR",
                "silver_sales_cleaned"
            ))
    except Exception as e:
        alerts.append(generate_alert(
            "SYSTEM_ERROR",
            f"売上データチェック中にエラー: {str(e)}",
            "CRITICAL"
        ))
    
    # データ品質チェック
    try:
        quality_df = dlt.read("data_quality_monitoring")
        latest_quality = quality_df.orderBy(col("check_timestamp").desc()).first()
        
        if latest_quality:
            # 高いNULL率をチェック
            for col_name in ["amount", "customer_id", "transaction_id"]:
                null_rate_col = f"{col_name}_null_rate"
                if hasattr(latest_quality, null_rate_col):
                    null_rate = getattr(latest_quality, null_rate_col)
                    if null_rate > 5.0:  # 5%を超えるNULL率
                        alerts.append(generate_alert(
                            "DATA_QUALITY",
                            f"{col_name}のNULL率が高すぎます: {null_rate}%",
                            "WARNING",
                            latest_quality.table_name
                        ))
    except Exception as e:
        alerts.append(generate_alert(
            "SYSTEM_ERROR",
            f"品質チェック中にエラー: {str(e)}",
            "ERROR"
        ))
    
    # アラートがない場合は正常メッセージ
    if not alerts:
        alerts.append(generate_alert(
            "HEALTH_CHECK",
            "全てのチェックが正常に完了しました",
            "INFO"
        ))
    
    return spark.createDataFrame(alerts)

よくある問題と解決法(Python版)

1. パフォーマンス問題

問題: PythonのUDFやpandas操作でパフォーマンスが劣化する

解決法: 処理の最適化とキャッシュ活用

# ❌ 悪い例:非効率なUDF使用
@udf(returnType=StringType())
def slow_categorization(amount):
    if amount > 10000:
        return "High"
    elif amount > 1000:
        return "Medium"
    else:
        return "Low"

# ✅ 良い例:Spark関数を使用
@dlt.table(comment="効率的なカテゴリ分類")
def efficient_categorization():
    return (
        dlt.read("silver_sales_cleaned")
        .withColumn(
            "amount_category",
            when(col("amount") > 10000, "High")
            .when(col("amount") > 1000, "Medium")
            .otherwise("Low")
        )
        .cache()  # 再利用する場合はキャッシュ
    )

# pandas操作の最適化
@dlt.table(comment="最適化されたpandas処理")
def optimized_pandas_processing():
    spark_df = dlt.read("silver_sales_cleaned")
    
    # 大きなデータセットの場合は分割処理
    if spark_df.count() > 1000000:  # 100万件以上の場合
        # パーティションごとに処理
        def process_partition(iterator):
            import pandas as pd
            for pdf in iterator:
                # pandas処理
                pdf['processed_amount'] = pdf['amount'] * 1.1
                yield pdf
        
        return spark_df.mapInPandas(process_partition, spark_df.schema)
    else:
        # 小さなデータセットは通常処理
        pandas_df = spark_df.toPandas()
        pandas_df['processed_amount'] = pandas_df['amount'] * 1.1
        return spark.createDataFrame(pandas_df)

2. メモリ不足エラー

解決法: データの分割処理とリソース管理

@dlt.table(comment="メモリ効率的な大容量データ処理")
def memory_efficient_processing():
    """大容量データの効率的な処理"""
    
    # データを日付で分割して処理
    base_df = dlt.read("bronze_sales")
    
    # 処理対象期間を分割
    date_ranges = [
        ("2024-01-01", "2024-03-31"),
        ("2024-04-01", "2024-06-30"),
        ("2024-07-01", "2024-09-30"),
        ("2024-10-01", "2024-12-31")
    ]
    
    processed_dfs = []
    
    for start_date, end_date in date_ranges:
        chunk_df = (
            base_df
            .filter(
                (col("sale_date") >= start_date) & 
                (col("sale_date") <= end_date)
            )
            .repartition(10)  # 適切なパーティション数に調整
        )
        
        # 重い処理をチャンクごとに実行
        processed_chunk = chunk_df.withColumn(
            "complex_calculation",
            # 複雑な計算をここに記述
            col("amount") * 1.2
        )
        
        processed_dfs.append(processed_chunk)
    
    # 結果を結合
    return processed_dfs[0].union(processed_dfs[1]).union(processed_dfs[2]).union(processed_dfs[3])

3. スキーマ進化の問題

解決法: 柔軟なスキーマ処理

@dlt.table(comment="スキーマ進化対応テーブル")
def schema_evolution_safe():
    """スキーマ変更に対応した安全な処理"""
    
    df = dlt.read("bronze_sales")
    
    # 必須カラムの存在チェック
    required_columns = ["transaction_id", "customer_id", "amount", "sale_date"]
    available_columns = df.columns
    
    missing_columns = set(required_columns) - set(available_columns)
    if missing_columns:
        logger.error(f"必須カラムが不足: {missing_columns}")
        # 不足カラムをNULLで追加
        for col_name in missing_columns:
            df = df.withColumn(col_name, lit(None))
    
    # オプションカラムの安全な追加
    optional_columns = {
        "discount_amount": DecimalType(10, 2),
        "promotion_code": StringType(),
        "sales_rep_id": LongType()
    }
    
    for col_name, col_type in optional_columns.items():
        if col_name not in available_columns:
            df = df.withColumn(col_name, lit(None).cast(col_type))
    
    # 型の安全な変換
    return (
        df
        .withColumn("transaction_id", col("transaction_id").cast(StringType()))
        .withColumn("customer_id", col("customer_id").cast(LongType()))
        .withColumn("amount", col("amount").cast(DecimalType(12, 2)))
        .withColumn("sale_date", to_date(col("sale_date")))
    )

まとめ

DLT Python編では、SQLでは実現困難な高度で柔軟なデータパイプラインを構築できることを確認しました。

🐍 Python DLTの主要な利点

1. 高度な処理能力

  • 外部ライブラリ活用: pandas、numpy、scikit-learn等との連携
  • 複雑なビジネスロジック: 条件分岐や計算ロジックの柔軟な実装
  • カスタムUDF: 独自の処理関数の作成
  • 動的処理: 実行時の条件に応じた処理変更

2. 設定の外部化と環境対応

  • 設定クラス: 環境別の設定を体系的に管理
  • 動的テーブル生成: 設定に基づくテーブル定義の自動生成
  • 環境分離: 開発・ステージング・本番環境の自動切り替え

3. 包括的な監視とエラーハンドリング

  • 詳細なログ出力: Pythonのloggingモジュール活用
  • 例外処理: try-catch による堅牢なエラーハンドリング
  • 自動アラート: 条件に基づくアラート生成
  • 品質監視: データ品質の自動チェックと可視化

4. 機械学習との連携

  • 特徴量エンジニアリング: ML向けの特徴量自動生成
  • モデル適用: 学習済みモデルをパイプラインに組み込み
  • 予測分析: リアルタイムでの予測結果生成

適用の指針

SQLを選ぶべき場面

  • シンプルな集計・結合処理
  • ビジネスユーザーとの共有が重要
  • パフォーマンスを最優先する場合
  • 標準的なETL処理

Pythonを選ぶべき場面

  • 複雑なビジネスロジックの実装
  • 外部ライブラリとの連携が必要
  • 動的な処理や設定管理が重要
  • 機械学習やデータサイエンス要素を含む
  • 高度なエラーハンドリングや監視が必要

実装のベストプラクティス

  1. 設定の外部化: 環境別設定を系統的に管理
  2. エラーハンドリング: 例外処理とログ出力を徹底
  3. パフォーマンス最適化: UDFよりもSpark関数を優先
  4. メモリ管理: 大容量データは分割処理
  5. テスト可能性: 関数の分離とユニットテスト実装

DLT Pythonは学習コストが高めですが、その分得られる柔軟性と機能性は非常に高く、複雑なデータエンジニアリング要件に対応できると思います。SQLで実現困難な要件がある場合は、Python実装を検討したいですね。

Discussion