【Databricks】Delta Live Tables (DLT) 入門 - Python編 (SQL編を踏まえて)
はじめに
Delta Live Tables(DLT)では、SQLとPythonの両方でデータパイプラインを構築できます。前回の記事ではSQLでの実装を中心に解説しましたが、今回はPythonでの実装方法を説明します。
PythonでのDLT実装は、SQLよりも複雑なロジックや外部ライブラリとの連携、動的な処理が必要な場合に利用がおすすめです。
SQLとPythonの使い分け
DLTでは、SQLとPythonそれぞれに適した使用場面があります。適切な選択により、開発効率と保守性を向上させることができます。
SQLが適している場面
✅ SQLを選ぶべきケース
- シンプルなデータ変換: 集計、結合、フィルタリングが中心
- ビジネスユーザーとの共有: SQLの方が理解しやすい
- パフォーマンス重視: Catalyst オプティマイザーの恩恵を最大限活用
- 標準的なETL処理: 一般的なデータクレンジングと集計
-- SQLの例:シンプルで読みやすい
CREATE OR REFRESH LIVE TABLE daily_sales_summary
AS SELECT
sale_date,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM LIVE.cleaned_sales
GROUP BY sale_date;
Pythonが適している場面
✅ Pythonを選ぶべきケース
- 複雑なビジネスロジック: 条件分岐や計算ロジックが複雑
- 外部ライブラリの使用: pandas、numpy、scikit-learn等の活用
- 動的な処理: 実行時の条件に応じた処理の変更
- 機械学習との連携: 特徴量エンジニアリングやモデル適用
- 高度なデータ品質制御: 複雑な検証ルールの実装
- 設定の外部化: 環境や条件に応じた動的な設定変更
# Pythonの例:複雑なロジックと外部ライブラリの使用
import dlt
from pyspark.sql.functions import *
import pandas as pd
from datetime import datetime
@dlt.table(comment="Complex customer scoring with ML features")
@dlt.expect_or_drop("valid_score", "customer_score >= 0 AND customer_score <= 100")
def customer_scoring():
# 複雑なスコア計算ロジック
df = dlt.read("customer_transactions")
# 外部ライブラリを使った処理
pandas_df = df.toPandas()
processed_df = calculate_customer_risk_score(pandas_df)
return spark.createDataFrame(processed_df)
組み合わせのベストプラクティス
実際のプロジェクトでは、SQLとPythonを組み合わせて使用することが多いです:
- Bronze Layer: Python(複雑なファイル形式の処理、スキーマ推論)
- Silver Layer: SQL(標準的なクレンジング)または Python(複雑な変換)
- Gold Layer: SQL(集計・レポート用)または Python(機械学習特徴量)
DLT Python の基本概念
デコレータベースの定義
PythonでのDLTは、関数にデコレータを付与することでテーブルを定義します。これにより、Pythonの柔軟性を保ちながら宣言的なパイプラインを構築できます。
import dlt
from pyspark.sql.functions import *
# 基本的なテーブル定義
@dlt.table(
comment="顧客データのクリーニング処理",
table_properties={
"quality": "silver",
"delta.autoOptimize.optimizeWrite": "true"
}
)
def cleaned_customers():
return (
dlt.read("raw_customers")
.filter(col("customer_id").isNotNull())
.select("customer_id", "name", "email", "registration_date")
)
テーブルタイプとデコレータ
Streaming Table
@dlt.table(comment="リアルタイムイベントストリーム")
def streaming_events():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/mnt/events/")
)
Live Table(マテリアライズドビュー)
@dlt.table(comment="日次集計テーブル")
def daily_aggregations():
return (
dlt.read("streaming_events")
.groupBy("date", "event_type")
.agg(count("*").alias("event_count"))
)
View(中間処理用)
@dlt.view(comment="中間処理用のビュー")
def filtered_events():
return (
dlt.read("streaming_events")
.filter(col("event_type").isin(["click", "purchase"]))
)
Expectationsによる品質管理(Python版)
PythonでのExpectationsは、SQLよりも柔軟で強力な機能を提供します。複数の制約をグループ化したり、動的に制約を生成することが可能です。
基本的なExpectations
# 単一の制約
@dlt.table()
@dlt.expect("positive_amount", "amount > 0")
@dlt.expect_or_drop("valid_email", "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'")
@dlt.expect_or_fail("critical_id", "customer_id IS NOT NULL")
def quality_controlled_table():
return dlt.read("raw_data")
複数制約のグループ化
Pythonならではのメリットとして、複数の制約をまとめて管理できます:
# 制約をグループ化
basic_quality_rules = {
"not_null_id": "customer_id IS NOT NULL",
"valid_email": "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'",
"valid_age": "age BETWEEN 0 AND 120",
"positive_amount": "amount > 0"
}
warning_rules = {
"recent_data": "registration_date >= '2020-01-01'",
"reasonable_amount": "amount <= 100000"
}
@dlt.table(comment="包括的な品質制御")
@dlt.expect_all_or_drop(basic_quality_rules)
@dlt.expect_all(warning_rules) # 警告のみ
def comprehensive_quality_check():
return dlt.read("raw_customers")
動的な品質ルール生成
Pythonの強力な機能として、実行時に動的に制約を生成できます:
def create_quality_rules_for_table(table_name):
"""テーブル名に応じて動的に品質ルールを生成"""
base_rules = {
"not_null_id": f"{table_name}_id IS NOT NULL"
}
if table_name == "sales":
base_rules.update({
"positive_amount": "amount > 0",
"valid_currency": "currency IN ('USD', 'EUR', 'JPY')"
})
elif table_name == "customers":
base_rules.update({
"valid_email": "email RLIKE '^[^@]+@[^@]+\\.[^@]+$'",
"valid_age": "age BETWEEN 0 AND 120"
})
return base_rules
@dlt.table(comment="動的品質ルール適用")
@dlt.expect_all_or_drop(create_quality_rules_for_table("sales"))
def dynamic_quality_sales():
return dlt.read("raw_sales")
実践例:売上データパイプライン(Python版)
設定管理クラス
Pythonの利点を活かして、設定を外部化し環境に応じた動的な処理を実装できます:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, date
import json
class SalesDataPipelineConfig:
"""売上データパイプラインの設定管理クラス"""
def __init__(self):
self.env = spark.conf.get("pipeline.environment", "dev")
self.config = self._load_environment_config()
self.quality_thresholds = self._get_quality_thresholds()
def _load_environment_config(self):
"""環境別設定を読み込み"""
configs = {
"dev": {
"source_path": "/mnt/dev/raw/sales/",
"checkpoint_path": "/mnt/dev/checkpoints/",
"max_files_per_trigger": 10,
"enable_advanced_features": False
},
"staging": {
"source_path": "/mnt/staging/raw/sales/",
"checkpoint_path": "/mnt/staging/checkpoints/",
"max_files_per_trigger": 50,
"enable_advanced_features": True
},
"prod": {
"source_path": "/mnt/prod/raw/sales/",
"checkpoint_path": "/mnt/prod/checkpoints/",
"max_files_per_trigger": 100,
"enable_advanced_features": True
}
}
return configs.get(self.env, configs["dev"])
def _get_quality_thresholds(self):
"""環境別の品質閾値を設定"""
return {
"dev": {"min_daily_records": 100, "max_error_rate": 10.0},
"staging": {"min_daily_records": 1000, "max_error_rate": 5.0},
"prod": {"min_daily_records": 10000, "max_error_rate": 1.0}
}.get(self.env, {"min_daily_records": 100, "max_error_rate": 10.0})
def get_sales_quality_rules(self):
"""売上データ用の品質ルールを生成"""
return {
"not_null_transaction_id": "transaction_id IS NOT NULL",
"not_null_customer_id": "customer_id IS NOT NULL",
"positive_amount": "amount > 0",
"valid_date": "sale_date IS NOT NULL",
"not_future_date": "sale_date <= current_date()"
}
def log_pipeline_metrics(self, stage, table_name, record_count):
"""パイプラインメトリクスをログ出力"""
metrics = {
"timestamp": datetime.now().isoformat(),
"environment": self.env,
"stage": stage,
"table_name": table_name,
"record_count": record_count
}
print(f"PIPELINE_METRICS: {json.dumps(metrics)}")
# 設定インスタンスを作成
config = SalesDataPipelineConfig()
Bronze Layer(Python版)
@dlt.table(
comment="S3からの生売上データ取り込み(環境対応版)",
table_properties={
"quality": "bronze",
"environment": config.env
}
)
def bronze_sales():
"""
外部ファイルからの売上データ取り込み
環境に応じてパスと処理設定を動的に変更
"""
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.maxFilesPerTrigger", config.config["max_files_per_trigger"])
.option("cloudFiles.schemaLocation", config.config["checkpoint_path"])
.load(config.config["source_path"])
.select(
"*",
current_timestamp().alias("ingestion_timestamp"),
lit(config.env).alias("environment"),
input_file_name().alias("source_file_path")
)
)
# メトリクスログ出力(開発時のみ)
if config.env == "dev":
record_count = df.count() if not df.isStreaming else "streaming"
config.log_pipeline_metrics("bronze", "sales", record_count)
return df
@dlt.table(comment="顧客マスタデータの取り込み")
def bronze_customers():
"""顧客マスタデータの処理"""
return (
spark.read
.format("delta")
.load("/mnt/master/customers/")
.select(
"*",
current_timestamp().alias("load_timestamp")
)
)
Silver Layer(Python版)
@dlt.table(
comment="クリーニング済み売上データ(高度な品質制御付き)",
table_properties={
"quality": "silver",
"delta.autoOptimize.optimizeWrite": "true"
}
)
@dlt.expect_all_or_drop(config.get_sales_quality_rules())
@dlt.expect("data_freshness", "ingestion_timestamp >= current_timestamp() - interval 2 hours")
def silver_sales_cleaned():
"""
売上データのクリーニングと型変換
複雑なビジネスルールをPythonで実装
"""
df = dlt.read("bronze_sales")
# データ型の統一と変換
cleaned_df = (
df
.select(
col("transaction_id"),
col("customer_id").cast(LongType()),
col("product_id"),
col("amount").cast(DecimalType(12, 2)),
to_date(col("sale_date")).alias("sale_date"),
col("ingestion_timestamp"),
col("source_file_path")
)
# ビジネスルール適用
.filter(col("amount") > 0)
.filter(col("sale_date").isNotNull())
)
# 高度な処理:異常値検出
if config.config["enable_advanced_features"]:
# 統計的な異常値検出
stats = cleaned_df.agg(
avg("amount").alias("mean_amount"),
stddev("amount").alias("std_amount")
).collect()[0]
mean_amount = stats["mean_amount"]
std_amount = stats["std_amount"]
# Z-scoreによる異常値フラグ
cleaned_df = cleaned_df.withColumn(
"is_outlier",
when(
abs((col("amount") - lit(mean_amount)) / lit(std_amount)) > 3,
True
).otherwise(False)
).withColumn(
"outlier_score",
abs((col("amount") - lit(mean_amount)) / lit(std_amount))
)
return cleaned_df
@dlt.table(comment="顧客データの統合とエンリッチメント")
@dlt.expect_or_drop("valid_customer_data", "customer_id IS NOT NULL AND email IS NOT NULL")
def silver_customers_enriched():
"""
顧客データのエンリッチメント
外部データとの結合や計算フィールドの追加
"""
customers_df = dlt.read("bronze_customers")
sales_df = dlt.read("silver_sales_cleaned")
# 顧客別の購入履歴サマリを計算
customer_summary = (
sales_df
.groupBy("customer_id")
.agg(
count("transaction_id").alias("total_purchases"),
sum("amount").alias("total_spent"),
avg("amount").alias("avg_purchase_amount"),
max("sale_date").alias("last_purchase_date"),
min("sale_date").alias("first_purchase_date")
)
)
# 顧客マスタと購入履歴を結合
enriched_customers = (
customers_df
.join(customer_summary, "customer_id", "left")
.select(
col("customer_id"),
col("name"),
col("email"),
col("registration_date"),
coalesce(col("total_purchases"), lit(0)).alias("total_purchases"),
coalesce(col("total_spent"), lit(0)).alias("total_spent"),
col("avg_purchase_amount"),
col("last_purchase_date"),
col("first_purchase_date"),
# 顧客セグメント分類(複雑なビジネスロジック)
when(col("total_spent") >= 50000, "VIP")
.when(col("total_spent") >= 10000, "Premium")
.when(col("total_spent") >= 1000, "Standard")
.when(col("total_purchases") > 0, "Active")
.otherwise("Prospect")
.alias("customer_segment"),
# 顧客ライフタイム日数
when(
col("first_purchase_date").isNotNull() & col("last_purchase_date").isNotNull(),
datediff(col("last_purchase_date"), col("first_purchase_date")) + 1
).otherwise(0).alias("customer_lifetime_days")
)
)
return enriched_customers
Gold Layer(Python版)
@dlt.table(
comment="日次売上サマリー(レポート・ダッシュボード用)",
partition_cols=["sale_date"],
table_properties={
"quality": "gold",
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true"
}
)
def gold_daily_sales_summary():
"""
日次売上の包括的なサマリー
レポートとダッシュボードで使用
"""
return (
dlt.read("silver_sales_cleaned")
.groupBy("sale_date")
.agg(
# 基本メトリクス
count("transaction_id").alias("transaction_count"),
sum("amount").alias("total_sales"),
avg("amount").alias("avg_transaction_value"),
min("amount").alias("min_transaction"),
max("amount").alias("max_transaction"),
# 顧客メトリクス
countDistinct("customer_id").alias("unique_customers"),
countDistinct("product_id").alias("unique_products"),
# 高度なメトリクス(異常値検出有効時)
*([
count(when(col("is_outlier") == True, 1)).alias("outlier_transactions"),
avg("outlier_score").alias("avg_outlier_score")
] if config.config["enable_advanced_features"] else []),
# タイムスタンプ
current_timestamp().alias("summary_generated_at")
)
)
@dlt.table(comment="顧客RFM分析テーブル(マーケティング用)")
def gold_customer_rfm_analysis():
"""
RFM分析(Recency, Frequency, Monetary)
マーケティング施策立案用
"""
sales_df = dlt.read("silver_sales_cleaned")
# RFMメトリクスの計算
rfm_metrics = (
sales_df
.groupBy("customer_id")
.agg(
# Recency: 最後の購入からの日数
datediff(current_date(), max("sale_date")).alias("recency_days"),
# Frequency: 購入回数
count("transaction_id").alias("frequency"),
# Monetary: 総購入金額
sum("amount").alias("monetary_value")
)
)
# RFMスコアの計算(1-5段階)
rfm_scored = (
rfm_metrics
.withColumn(
"recency_score",
when(col("recency_days") <= 30, 5)
.when(col("recency_days") <= 60, 4)
.when(col("recency_days") <= 90, 3)
.when(col("recency_days") <= 180, 2)
.otherwise(1)
)
.withColumn(
"frequency_score",
when(col("frequency") >= 20, 5)
.when(col("frequency") >= 10, 4)
.when(col("frequency") >= 5, 3)
.when(col("frequency") >= 2, 2)
.otherwise(1)
)
.withColumn(
"monetary_score",
when(col("monetary_value") >= 50000, 5)
.when(col("monetary_value") >= 10000, 4)
.when(col("monetary_value") >= 5000, 3)
.when(col("monetary_value") >= 1000, 2)
.otherwise(1)
)
)
# 総合評価とセグメント分類
final_rfm = (
rfm_scored
.withColumn(
"rfm_score",
concat(col("recency_score"), col("frequency_score"), col("monetary_score"))
)
.withColumn(
"customer_segment",
when((col("recency_score") >= 4) & (col("frequency_score") >= 4) & (col("monetary_score") >= 4), "Champions")
.when((col("recency_score") >= 3) & (col("frequency_score") >= 3) & (col("monetary_score") >= 3), "Loyal Customers")
.when((col("recency_score") >= 4) & (col("frequency_score") <= 2), "New Customers")
.when((col("recency_score") <= 2) & (col("frequency_score") >= 3), "At Risk")
.when((col("recency_score") <= 2) & (col("frequency_score") <= 2), "Lost Customers")
.otherwise("Developing")
)
.withColumn("analysis_date", current_date())
)
return final_rfm
# 機械学習特徴量テーブル
@dlt.table(comment="機械学習用顧客特徴量")
def gold_ml_customer_features():
"""
機械学習モデル用の特徴量テーブル
予測分析やレコメンデーションシステムで使用
"""
customers_df = dlt.read("silver_customers_enriched")
rfm_df = dlt.read("gold_customer_rfm_analysis")
return (
customers_df
.join(rfm_df, "customer_id", "inner")
.select(
# 基本情報
col("customer_id"),
col("customer_segment"),
col("total_purchases"),
col("total_spent"),
col("customer_lifetime_days"),
# RFM特徴量
col("recency_days"),
col("frequency"),
col("monetary_value"),
col("recency_score"),
col("frequency_score"),
col("monetary_score"),
# 派生特徴量
(col("total_spent") / greatest(col("customer_lifetime_days"), lit(1))).alias("daily_spend_rate"),
(col("total_purchases") / greatest(col("customer_lifetime_days"), lit(1))).alias("purchase_frequency_rate"),
# カテゴリ特徴量をエンコード
when(col("customer_segment") == "VIP", 1).otherwise(0).alias("is_vip"),
when(col("customer_segment") == "Premium", 1).otherwise(0).alias("is_premium"),
current_timestamp().alias("feature_generated_at")
)
)
高度な機能実装
外部ライブラリとの連携
Pythonの強力な機能として、pandasやnumpy、scikit-learn等の外部ライブラリを活用できます:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
@dlt.table(comment="高度な統計分析結果")
def advanced_analytics_results():
"""
pandasとnumpyを使った高度な分析
"""
# SparkデータフレームをPandasに変換
spark_df = dlt.read("silver_sales_cleaned")
pandas_df = spark_df.toPandas()
# pandas/numpyを使った分析
# 移動平均の計算
pandas_df['amount_7day_ma'] = pandas_df.groupby('customer_id')['amount'].rolling(7).mean().reset_index(0, drop=True)
# 季節性分析
pandas_df['sale_date'] = pd.to_datetime(pandas_df['sale_date'])
pandas_df['day_of_week'] = pandas_df['sale_date'].dt.dayofweek
pandas_df['month'] = pandas_df['sale_date'].dt.month
# カスタム統計指標
customer_stats = pandas_df.groupby('customer_id').agg({
'amount': ['mean', 'std', 'skew'],
'sale_date': ['min', 'max', 'count']
}).round(2)
# Sparkデータフレームに戻す
return spark.createDataFrame(pandas_df)
# UDF(User Defined Function)を使った処理
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType
@udf(returnType=DoubleType())
def calculate_customer_lifetime_value(total_spent, customer_lifetime_days, frequency):
"""顧客生涯価値を計算するUDF"""
if not all([total_spent, customer_lifetime_days, frequency]) or customer_lifetime_days <= 0:
return 0.0
# 簡単なCLV計算
avg_order_value = total_spent / frequency
purchase_frequency = frequency / customer_lifetime_days * 365 # 年間購入頻度
# 予測生涯価値(3年間)
predicted_clv = avg_order_value * purchase_frequency * 3
return float(predicted_clv)
@dlt.table(comment="CLV計算結果")
def customer_lifetime_value():
"""カスタムUDFを使ったCLV計算"""
return (
dlt.read("silver_customers_enriched")
.withColumn(
"predicted_clv",
calculate_customer_lifetime_value(
col("total_spent"),
col("customer_lifetime_days"),
col("total_purchases")
)
)
.filter(col("predicted_clv") > 0)
)
動的スキーマ処理
@dlt.table(comment="動的スキーマ対応テーブル")
def dynamic_schema_processing():
"""
JSONデータの動的スキーマ処理
新しいフィールドが追加されても自動対応
"""
df = dlt.read("bronze_sales")
# JSONカラムがある場合の動的展開
if "properties" in df.columns:
# JSONスキーマを推論
sample_data = df.select("properties").limit(1000).collect()
json_fields = set()
for row in sample_data:
if row.properties:
try:
import json
parsed = json.loads(row.properties)
json_fields.update(parsed.keys())
except:
continue
# 動的にカラムを展開
for field in json_fields:
df = df.withColumn(
f"prop_{field}",
get_json_object(col("properties"), f"$.{field}")
)
return df
# 設定駆動型の処理
def create_aggregation_table(table_name, group_by_cols, agg_cols):
"""設定に基づいて動的に集計テーブルを生成する関数"""
@dlt.table(name=f"agg_{table_name}", comment=f"Dynamic aggregation for {table_name}")
def dynamic_aggregation():
df = dlt.read(table_name)
# 動的に集計処理を生成
agg_exprs = []
for col_name, agg_type in agg_cols.items():
if agg_type == "sum":
agg_exprs.append(sum(col_name).alias(f"total_{col_name}"))
elif agg_type == "avg":
agg_exprs.append(avg(col_name).alias(f"avg_{col_name}"))
elif agg_type == "count":
agg_exprs.append(count(col_name).alias(f"count_{col_name}"))
return df.groupBy(*group_by_cols).agg(*agg_exprs)
return dynamic_aggregation
# 使用例:設定に基づく動的テーブル生成
sales_agg_config = {
"group_by_cols": ["sale_date", "customer_segment"],
"agg_cols": {
"amount": "sum",
"transaction_id": "count"
}
}
# 動的にテーブル関数を生成
sales_daily_agg = create_aggregation_table("silver_sales_cleaned",
sales_agg_config["group_by_cols"],
sales_agg_config["agg_cols"])
エラーハンドリングと監視
Pythonでは、より詳細なエラーハンドリングと監視機能を実装できます:
高度なエラーハンドリング
import logging
from datetime import datetime
# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def safe_division_udf(numerator, denominator):
"""安全な除算を行うUDF"""
try:
if denominator is None or denominator == 0:
return None
return float(numerator) / float(denominator)
except Exception as e:
logger.error(f"Division error: {e}")
return None
safe_division = udf(safe_division_udf, DoubleType())
@dlt.table(comment="エラーハンドリング付きの計算テーブル")
def robust_calculations():
"""エラーに強い計算処理"""
df = dlt.read("silver_sales_cleaned")
try:
# 安全な計算処理
result_df = (
df
.withColumn(
"profit_margin_safe",
safe_division(col("profit"), col("amount"))
)
.withColumn(
"processing_timestamp",
current_timestamp()
)
)
# データ品質チェック
total_records = result_df.count()
null_margins = result_df.filter(col("profit_margin_safe").isNull()).count()
if total_records > 0:
null_rate = (null_margins / total_records) * 100
logger.info(f"処理完了: 総レコード数={total_records}, NULL率={null_rate:.2f}%")
if null_rate > 50:
logger.warning(f"高いNULL率を検出: {null_rate:.2f}%")
return result_df
except Exception as e:
logger.error(f"テーブル処理エラー: {str(e)}")
# エラー時は空のデータフレームを返す
return spark.createDataFrame([], df.schema)
# データ品質監視テーブル
@dlt.table(comment="データ品質監視ダッシュボード")
def data_quality_monitoring():
"""包括的なデータ品質監視"""
def analyze_table_quality(table_name, df):
"""テーブルの品質分析を行う"""
total_count = df.count()
quality_metrics = {
"table_name": table_name,
"total_records": total_count,
"check_timestamp": datetime.now()
}
# カラムごとのNULL率計算
for col_name in df.columns:
null_count = df.filter(col(col_name).isNull()).count()
null_rate = (null_count / total_count * 100) if total_count > 0 else 0
quality_metrics[f"{col_name}_null_rate"] = round(null_rate, 2)
return quality_metrics
# 複数テーブルの品質をチェック
tables_to_check = [
("bronze_sales", dlt.read("bronze_sales")),
("silver_sales_cleaned", dlt.read("silver_sales_cleaned")),
("silver_customers_enriched", dlt.read("silver_customers_enriched"))
]
quality_results = []
for table_name, df in tables_to_check:
try:
metrics = analyze_table_quality(table_name, df)
quality_results.append(metrics)
except Exception as e:
logger.error(f"品質チェックエラー {table_name}: {str(e)}")
quality_results.append({
"table_name": table_name,
"error": str(e),
"check_timestamp": datetime.now()
})
# 結果をSparkデータフレームに変換
return spark.createDataFrame(quality_results)
アラート機能の実装
@dlt.table(comment="自動アラート生成テーブル")
def automated_alerts():
"""条件に基づく自動アラート生成"""
def generate_alert(alert_type, message, severity="INFO", table_name=None):
"""アラートメッセージを生成"""
return {
"alert_timestamp": datetime.now(),
"alert_type": alert_type,
"message": message,
"severity": severity,
"table_name": table_name,
"environment": config.env
}
alerts = []
# データ量チェック
try:
daily_sales = dlt.read("silver_sales_cleaned").filter(
col("sale_date") == current_date()
).count()
min_expected = config.quality_thresholds["min_daily_records"]
if daily_sales < min_expected:
alerts.append(generate_alert(
"DATA_VOLUME",
f"本日の売上レコード数が少なすぎます: {daily_sales} < {min_expected}",
"ERROR",
"silver_sales_cleaned"
))
except Exception as e:
alerts.append(generate_alert(
"SYSTEM_ERROR",
f"売上データチェック中にエラー: {str(e)}",
"CRITICAL"
))
# データ品質チェック
try:
quality_df = dlt.read("data_quality_monitoring")
latest_quality = quality_df.orderBy(col("check_timestamp").desc()).first()
if latest_quality:
# 高いNULL率をチェック
for col_name in ["amount", "customer_id", "transaction_id"]:
null_rate_col = f"{col_name}_null_rate"
if hasattr(latest_quality, null_rate_col):
null_rate = getattr(latest_quality, null_rate_col)
if null_rate > 5.0: # 5%を超えるNULL率
alerts.append(generate_alert(
"DATA_QUALITY",
f"{col_name}のNULL率が高すぎます: {null_rate}%",
"WARNING",
latest_quality.table_name
))
except Exception as e:
alerts.append(generate_alert(
"SYSTEM_ERROR",
f"品質チェック中にエラー: {str(e)}",
"ERROR"
))
# アラートがない場合は正常メッセージ
if not alerts:
alerts.append(generate_alert(
"HEALTH_CHECK",
"全てのチェックが正常に完了しました",
"INFO"
))
return spark.createDataFrame(alerts)
よくある問題と解決法(Python版)
1. パフォーマンス問題
問題: PythonのUDFやpandas操作でパフォーマンスが劣化する
解決法: 処理の最適化とキャッシュ活用
# ❌ 悪い例:非効率なUDF使用
@udf(returnType=StringType())
def slow_categorization(amount):
if amount > 10000:
return "High"
elif amount > 1000:
return "Medium"
else:
return "Low"
# ✅ 良い例:Spark関数を使用
@dlt.table(comment="効率的なカテゴリ分類")
def efficient_categorization():
return (
dlt.read("silver_sales_cleaned")
.withColumn(
"amount_category",
when(col("amount") > 10000, "High")
.when(col("amount") > 1000, "Medium")
.otherwise("Low")
)
.cache() # 再利用する場合はキャッシュ
)
# pandas操作の最適化
@dlt.table(comment="最適化されたpandas処理")
def optimized_pandas_processing():
spark_df = dlt.read("silver_sales_cleaned")
# 大きなデータセットの場合は分割処理
if spark_df.count() > 1000000: # 100万件以上の場合
# パーティションごとに処理
def process_partition(iterator):
import pandas as pd
for pdf in iterator:
# pandas処理
pdf['processed_amount'] = pdf['amount'] * 1.1
yield pdf
return spark_df.mapInPandas(process_partition, spark_df.schema)
else:
# 小さなデータセットは通常処理
pandas_df = spark_df.toPandas()
pandas_df['processed_amount'] = pandas_df['amount'] * 1.1
return spark.createDataFrame(pandas_df)
2. メモリ不足エラー
解決法: データの分割処理とリソース管理
@dlt.table(comment="メモリ効率的な大容量データ処理")
def memory_efficient_processing():
"""大容量データの効率的な処理"""
# データを日付で分割して処理
base_df = dlt.read("bronze_sales")
# 処理対象期間を分割
date_ranges = [
("2024-01-01", "2024-03-31"),
("2024-04-01", "2024-06-30"),
("2024-07-01", "2024-09-30"),
("2024-10-01", "2024-12-31")
]
processed_dfs = []
for start_date, end_date in date_ranges:
chunk_df = (
base_df
.filter(
(col("sale_date") >= start_date) &
(col("sale_date") <= end_date)
)
.repartition(10) # 適切なパーティション数に調整
)
# 重い処理をチャンクごとに実行
processed_chunk = chunk_df.withColumn(
"complex_calculation",
# 複雑な計算をここに記述
col("amount") * 1.2
)
processed_dfs.append(processed_chunk)
# 結果を結合
return processed_dfs[0].union(processed_dfs[1]).union(processed_dfs[2]).union(processed_dfs[3])
3. スキーマ進化の問題
解決法: 柔軟なスキーマ処理
@dlt.table(comment="スキーマ進化対応テーブル")
def schema_evolution_safe():
"""スキーマ変更に対応した安全な処理"""
df = dlt.read("bronze_sales")
# 必須カラムの存在チェック
required_columns = ["transaction_id", "customer_id", "amount", "sale_date"]
available_columns = df.columns
missing_columns = set(required_columns) - set(available_columns)
if missing_columns:
logger.error(f"必須カラムが不足: {missing_columns}")
# 不足カラムをNULLで追加
for col_name in missing_columns:
df = df.withColumn(col_name, lit(None))
# オプションカラムの安全な追加
optional_columns = {
"discount_amount": DecimalType(10, 2),
"promotion_code": StringType(),
"sales_rep_id": LongType()
}
for col_name, col_type in optional_columns.items():
if col_name not in available_columns:
df = df.withColumn(col_name, lit(None).cast(col_type))
# 型の安全な変換
return (
df
.withColumn("transaction_id", col("transaction_id").cast(StringType()))
.withColumn("customer_id", col("customer_id").cast(LongType()))
.withColumn("amount", col("amount").cast(DecimalType(12, 2)))
.withColumn("sale_date", to_date(col("sale_date")))
)
まとめ
DLT Python編では、SQLでは実現困難な高度で柔軟なデータパイプラインを構築できることを確認しました。
🐍 Python DLTの主要な利点
1. 高度な処理能力
- 外部ライブラリ活用: pandas、numpy、scikit-learn等との連携
- 複雑なビジネスロジック: 条件分岐や計算ロジックの柔軟な実装
- カスタムUDF: 独自の処理関数の作成
- 動的処理: 実行時の条件に応じた処理変更
2. 設定の外部化と環境対応
- 設定クラス: 環境別の設定を体系的に管理
- 動的テーブル生成: 設定に基づくテーブル定義の自動生成
- 環境分離: 開発・ステージング・本番環境の自動切り替え
3. 包括的な監視とエラーハンドリング
- 詳細なログ出力: Pythonのloggingモジュール活用
- 例外処理: try-catch による堅牢なエラーハンドリング
- 自動アラート: 条件に基づくアラート生成
- 品質監視: データ品質の自動チェックと可視化
4. 機械学習との連携
- 特徴量エンジニアリング: ML向けの特徴量自動生成
- モデル適用: 学習済みモデルをパイプラインに組み込み
- 予測分析: リアルタイムでの予測結果生成
適用の指針
SQLを選ぶべき場面
- シンプルな集計・結合処理
- ビジネスユーザーとの共有が重要
- パフォーマンスを最優先する場合
- 標準的なETL処理
Pythonを選ぶべき場面
- 複雑なビジネスロジックの実装
- 外部ライブラリとの連携が必要
- 動的な処理や設定管理が重要
- 機械学習やデータサイエンス要素を含む
- 高度なエラーハンドリングや監視が必要
実装のベストプラクティス
- 設定の外部化: 環境別設定を系統的に管理
- エラーハンドリング: 例外処理とログ出力を徹底
- パフォーマンス最適化: UDFよりもSpark関数を優先
- メモリ管理: 大容量データは分割処理
- テスト可能性: 関数の分離とユニットテスト実装
DLT Pythonは学習コストが高めですが、その分得られる柔軟性と機能性は非常に高く、複雑なデータエンジニアリング要件に対応できると思います。SQLで実現困難な要件がある場合は、Python実装を検討したいですね。
Discussion