📖

Pythonによるデータプロファイリング

に公開

データ品質の概念

実装に入る前にデータ品質の概念から整理していきたいと思います。
参考にしたのは以下の記事です。

https://note.com/zono_data/n/n620b48d2b49f

10ステッププロセス

データ品質の改善は、一度行えば終わりというものではなく、継続的に取り組むべき活動です。
データ品質実践ガイドには、以下の10のステップに分解して説明されています。

データ品質評価の14タスク

10ステッププロセスのStep3「データ品質の評価」では、データの品質を一面的な見方ではなく、多角的に捉えることが重要です。ガイドブックでは、そのための評価軸として以下の14のタスクを提示されています。

データプロファイリングの実装

それでは、各タスクの具体的な実装例を見ていきましょう。以下のコードは、pandas、numpy、matplotlib、seabornを使用しています。

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import re

1. レコード数

def check_record_count(df, expected_count=None):
    actual_count = len(df)
    diff_rate = None
    if expected_count is not None:
        diff_rate = abs(actual_count - expected_count) / expected_count
    return actual_count, diff_rate

record_count, diff_rate = check_record_count(df, expected_count=10000)
print("Record count:", record_count, "Difference rate:", diff_rate)

# レコード数の分布を可視化
plt.figure(figsize=(10, 5))
plt.bar(['Actual', 'Expected'], [record_count, 10000])
plt.title('Record Count Comparison')
plt.ylabel('Number of Records')
plt.show()

2. 完全性/充足率

def check_completeness(df, required_fields=None):
    if required_fields is None:
        required_fields = df.columns
    
    # 必須フィールドのNULL値をチェック
    null_counts = df[required_fields].isnull().sum(axis=1)
    complete_records = (null_counts == 0).sum()
    completeness_ratio = complete_records / len(df)
    
    # カラムごとのNULL値の割合
    column_null_ratios = df[required_fields].isnull().mean()
    
    return complete_records, completeness_ratio, column_null_ratios

# 必須フィールドを指定して完全性をチェック
required_fields = ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 
                  'passenger_count', 'trip_distance', 'fare_amount']
complete_records, completeness_ratio, column_null_ratios = check_completeness(df, required_fields)

print(f"完全なレコード数: {complete_records}")
print(f"完全なレコードの割合: {completeness_ratio:.2%}")
print("\nカラムごとのNULL値の割合:")
print(column_null_ratios)

# 完全性の結果を可視化
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.bar(['Complete', 'Incomplete'], 
        [complete_records, len(df) - complete_records])
plt.title('Complete vs Incomplete Records')
plt.ylabel('Number of Records')

plt.subplot(1, 2, 2)
plt.bar(column_null_ratios.index, column_null_ratios.values)
plt.title('NULL Ratio per Required Field')
plt.xlabel('Field')
plt.ylabel('NULL Ratio')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 必須フィールドのNULL値の分布をヒートマップで表示
plt.figure(figsize=(12, 6))
sns.heatmap(df[required_fields].isnull(), yticklabels=False, cbar=True, cmap='viridis')
plt.title('NULL Values Distribution in Required Fields')
plt.show()

3. NULL

def check_null_values(df):
    null_counts = df.isnull().sum()
    null_ratios = df.isnull().mean()
    
    null_stats = pd.DataFrame({
        'null_count': null_counts,
        'null_ratio': null_ratios
    })
    
    return null_stats

null_stats = check_null_values(df)
print("NULL Statistics:\n", null_stats)

# NULL値の分布を可視化
plt.figure(figsize=(12, 6))
sns.heatmap(df.isnull(), yticklabels=False, cbar=True, cmap='viridis')
plt.title('NULL Values Distribution')
plt.show()

# 各カラムのNULL値の割合を棒グラフで表示
plt.figure(figsize=(10, 5))
sns.barplot(x=null_stats['null_ratio'], y=null_stats.index)
plt.title("NULL Ratio per Column")
plt.xlabel("Ratio")
plt.ylabel("Column")
plt.show()

4. 内容適合性

def check_content_suitability(df, rules_dict):
    results = {}
    for column, rule in rules_dict.items():
        if column in df.columns:
            results[column] = df[column].apply(rule).mean()
    return results

rules = {
    'passenger_count': lambda x: 0 < x,
    'trip_distance': lambda x: x > 0,
    'fare_amount': lambda x: x > 0
}
suitability_results = check_content_suitability(df, rules)
print("Content Suitability Results:", suitability_results)

# 内容適合性の結果を可視化
unsuitable_ratios = {col: 1 - ratio for col, ratio in suitability_results.items()}
plt.figure(figsize=(10, 5))
plt.bar(unsuitable_ratios.keys(), unsuitable_ratios.values())
plt.title('Ratio of Records Not Meeting Content Rules')
plt.xlabel('Column')
plt.ylabel('Unsuitable Ratio')
plt.xticks(rotation=45)
plt.show()

5. 有効性

def check_validity(df, validation_rules):
    validity_results = {}
    for column, rules in validation_rules.items():
        if column in df.columns:
            valid_count = sum(1 for x in df[column] if all(rule(x) for rule in rules))
            validity_results[column] = valid_count / len(df)
    return validity_results

validation_rules = {
    'passenger_count': [lambda x: isinstance(x, (int, float)), lambda x: x >= 0],
    'trip_distance': [lambda x: isinstance(x, (int, float)), lambda x: x >= 0],
    'fare_amount': [lambda x: isinstance(x, (int, float)), lambda x: x >= 0]
}
validity_results = check_validity(df, validation_rules)
print("Validity Results:", validity_results)

# 有効性の結果を可視化
invalid_ratios = {col: 1 - ratio for col, ratio in validity_results.items()}
plt.figure(figsize=(10, 5))
plt.bar(invalid_ratios.keys(), invalid_ratios.values())
plt.title('Ratio of Invalid Records')
plt.xlabel('Column')
plt.ylabel('Invalid Ratio')
plt.xticks(rotation=45)
plt.show()

6. 一意な値

def analyze_unique_values(df):
    unique_stats = pd.DataFrame({
        'unique_count': df.nunique(),
        'unique_ratio': df.nunique() / len(df)
    })
    return unique_stats

unique_stats = analyze_unique_values(df)
print("Unique Values Analysis:\n", unique_stats)

# 一意な値の割合を可視化
plt.figure(figsize=(10, 5))
plt.bar(unique_stats.index, 1 - unique_stats['unique_ratio'])
plt.title('Ratio of Duplicate Values')
plt.xlabel('Column')
plt.ylabel('Duplicate Ratio')
plt.xticks(rotation=45)
plt.show()

7. 度数分布

def analyze_value_distribution(df):
    for column in df.columns:
        plt.figure(figsize=(10, 5))
        if df[column].dtype in ['int64', 'float64']:
            sns.histplot(data=df, x=column)
        else:
            value_counts = df[column].value_counts().head(10)
            sns.barplot(x=value_counts.values, y=value_counts.index)
        plt.title(f"Distribution of {column}")
        plt.show()

# 数値カラムの分布を箱ひげ図で表示
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
plt.figure(figsize=(12, 6))
df[numeric_cols].boxplot()
plt.title('Value Distribution of Numeric Columns')
plt.xticks(rotation=45)
plt.show()

analyze_value_distribution(df)

8. データ鮮度

def check_data_freshness(df, date_column):
    current_date = datetime.now()
    df[date_column] = pd.to_datetime(df[date_column])
    df['days_since_update'] = (current_date - df[date_column]).dt.days
    
    bins = [0, 365, 730, float('inf')]
    labels = ['Within 12 months', '13-24 months', 'Over 24 months']
    
    frequency_distribution = pd.cut(df['days_since_update'], bins=bins, labels=labels).value_counts(normalize=True)
    
    plt.figure(figsize=(8, 6))
    sns.barplot(x=frequency_distribution.index, y=frequency_distribution.values)
    plt.title(f"Data Freshness Distribution ({date_column})")
    plt.xlabel("Date Range")
    plt.ylabel("Percentage of Records")
    plt.show()
    
    return {
        'frequency_distribution': frequency_distribution.to_dict()
    }

freshness_results = check_data_freshness(df, 'pickup_datetime')
print("Data Freshness:", freshness_results)

9. パターン/フォーマット

def check_patterns(df, pattern_rules):
    results = {}
    for column, pattern in pattern_rules.items():
        if column in df.columns:
            matches = df[column].astype(str).str.match(pattern)
            results[column] = matches.mean()
    return results

pattern_rules = {
    'store_and_fwd_flag': r'^[YN]$',
    'payment_type': r'^[1-4]$'
}
pattern_results = check_patterns(df, pattern_rules)
print("Pattern Validation Results:", pattern_results)

# パターンマッチの結果を可視化
mismatch_ratios = {col: 1 - ratio for col, ratio in pattern_results.items()}
plt.figure(figsize=(10, 5))
plt.bar(mismatch_ratios.keys(), mismatch_ratios.values())
plt.title('Ratio of Records Not Matching Pattern')
plt.xlabel('Column')
plt.ylabel('Mismatch Ratio')
plt.xticks(rotation=45)
plt.show()

10. 値の範囲

def check_value_ranges(df, range_rules):
    results = {}
    for column, (min_val, max_val) in range_rules.items():
        if column in df.columns:
            in_range = (df[column] >= min_val) & (df[column] <= max_val)
            results[column] = in_range.mean()
    return results

range_rules = {
    'passenger_count': (1, 6),
    'trip_distance': (0, 100),
    'fare_amount': (0, 1000)
}
range_results = check_value_ranges(df, range_rules)
print("Value Range Results:", range_results)

# 範囲外の値の割合を可視化
out_of_range_ratios = {col: 1 - ratio for col, ratio in range_results.items()}
plt.figure(figsize=(10, 5))
plt.bar(out_of_range_ratios.keys(), out_of_range_ratios.values())
plt.title('Ratio of Values Outside Valid Range')
plt.xlabel('Column')
plt.ylabel('Out of Range Ratio')
plt.xticks(rotation=45)
plt.show()

11. 精度・粒度

def check_precision(df, precision_rules):
    results = {}
    for column, required_decimals in precision_rules.items():
        if column in df.columns:
            # 小数点以下の桁数を計算
            decimal_places = df[column].astype(str).str.split('.').str[1].str.len()
            # 必要な桁数を持つ値の割合を計算
            results[column] = (decimal_places >= required_decimals).mean()
    return results

# 精度チェックのルールを定義
precision_rules = {
    'pickup_latitude': 6,  # 緯度は6桁の精度が必要
    'pickup_longitude': 6,  # 経度は6桁の精度が必要
    'dropoff_latitude': 6,
    'dropoff_longitude': 6,
    'fare_amount': 2,  # 料金は2桁の精度が必要
    'tip_amount': 2,
    'total_amount': 2
}

precision_results = check_precision(df, precision_rules)
print("Precision Results:")
for column, ratio in precision_results.items():
    print(f"{column}: {ratio:.2%} of values have required precision")

# 精度不足の割合を可視化
insufficient_precision_ratios = {col: 1 - ratio for col, ratio in precision_results.items()}
plt.figure(figsize=(12, 6))
plt.bar(insufficient_precision_ratios.keys(), insufficient_precision_ratios.values())
plt.title('Ratio of Values with Insufficient Precision')
plt.xlabel('Column')
plt.ylabel('Insufficient Precision Ratio')
plt.xticks(rotation=45)
plt.show()

# 各カラムの小数点以下の桁数の分布を可視化
for column in precision_rules.keys():
    if column in df.columns:
        decimal_places = df[column].astype(str).str.split('.').str[1].str.len()
        plt.figure(figsize=(10, 5))
        sns.histplot(decimal_places.dropna(), bins=range(int(decimal_places.max()) + 2))
        plt.axvline(x=precision_rules[column], color='r', linestyle='--', 
                   label=f'Required precision: {precision_rules[column]}')
        plt.title(f'Decimal Places Distribution for {column}')
        plt.xlabel('Number of Decimal Places')
        plt.ylabel('Count')
        plt.legend()
        plt.show()

12. データ型

def check_data_types(df):
    type_info = {}
    for column in df.columns:
        type_info[column] = {
            'dtype': df[column].dtype,
            'unique_types': df[column].apply(type).unique()
        }
    return type_info

type_results = check_data_types(df)
print("Data Type Analysis:", type_results)

# データ型の不一致を可視化
type_mismatch_ratios = {}
for col in df.columns:
    if df[col].dtype in ['int64', 'float64']:
        non_numeric = pd.to_numeric(df[col], errors='coerce').isna().mean()
        type_mismatch_ratios[col] = non_numeric

plt.figure(figsize=(10, 5))
plt.bar(type_mismatch_ratios.keys(), type_mismatch_ratios.values())
plt.title('Ratio of Records with Unexpected Data Types')
plt.xlabel('Column')
plt.ylabel('Type Mismatch Ratio')
plt.xticks(rotation=45)
plt.show()

13. サイズ/長さ

def check_size_length(df, length_rules):
    results = {}
    for column, (min_len, max_len) in length_rules.items():
        if column in df.columns:
            lengths = df[column].astype(str).str.len()
            in_range = (lengths >= min_len) & (lengths <= max_len)
            results[column] = in_range.mean()
    return results

length_rules = {
    'store_and_fwd_flag': (1, 1),
    'payment_type': (1, 1)
}
length_results = check_size_length(df, length_rules)
print("Size/Length Results:", length_results)

# 不正な長さの割合を可視化
invalid_length_ratios = {col: 1 - ratio for col, ratio in length_results.items()}
plt.figure(figsize=(10, 5))
plt.bar(invalid_length_ratios.keys(), invalid_length_ratios.values())
plt.title('Ratio of Records with Invalid Length')
plt.xlabel('Column')
plt.ylabel('Invalid Length Ratio')
plt.xticks(rotation=45)
plt.show()

14. 参照整合性

参照整合性に関しては別テーブルを用意して、その別テーブルに元テーブルの値が入っているかを確認します。

def check_referential_integrity(df, reference_df, key_columns):
    results = {}
    for key in key_columns:
        if key in df.columns and key in reference_df.columns:
            valid_keys = df[key].isin(reference_df[key])
            results[key] = valid_keys.mean()
    return results

reference_df = df.sample(frac=0.1)
reference_integrity_results = check_referential_integrity(
    df, reference_df, ['vendor_id', 'payment_type']
)
print("Referential Integrity Results:", reference_integrity_results)

# 無効な参照の割合を可視化
invalid_reference_ratios = {col: 1 - ratio for col, ratio in reference_integrity_results.items()}
plt.figure(figsize=(10, 5))
plt.bar(invalid_reference_ratios.keys(), invalid_reference_ratios.values())
plt.title('Ratio of Records with Invalid References')
plt.xlabel('Column')
plt.ylabel('Invalid Reference Ratio')
plt.xticks(rotation=45)
plt.show()

Discussion