🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【データの確認.agg(fn.count(),fn.countDistinct()】

2025/01/12に公開
2

【Python・Pysparkで学ぶ!】データ分析の基礎【データの確認.agg(fn.count(),fn.countDistinct()】

↓トランザクションテーブル(transaction_table)のサンプル

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09

上記のような決済データを集約したテーブルが存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『データの品質確認をしてほしい』

本稿では、クライアントからの要望に答えながら、 データフレームの確認 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントの要望との認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • 『品質確認の要件整理』

    • 確認観点①: 概観の確認
      • 目的: データの形式や一部の値を確認
    • 確認観点②: カラムの型確認
      • 目的: スキーマ情報(カラム名とデータ型)を確認
    • 確認観点③: 集計によるデータ全体の確認
      • 目的: データ件数、主キーの一意性、日付の範囲を確認
    • 確認観点④: 欠損値や異常値の確認
      • 目的: 欠損率が高い場合、重要なカラムの適切性を確認

◾️PKカラムの確認
 クライアントより、PKは"tran_id"という情報を連携していただきました。

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。
 はじめに、データの概観の確認です。こちらはshow()メソッドを使用し、truncate引数にFalseを渡します。そうすることで、全ての列を切り詰めずに表示にさせます。

確認観点①: 概観の確認
tran_sdf.show(3,truncate=False)
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48:03 user000005 shop0004 1540 1400 2025 1 3
102 cache 2025/01/04 22:27:16 user000027 shop0006 6050 5500 2025 1 4
103 code 2025/01/08 03:22:07 user000020 shop0003 8140 7400 2025 1 8

次に、カラムの型確認です。printSchema()メソッドはデータフレームのカラム名称とカラム型を表示します。

確認観点②: カラムの型確認
tran_sdf.printSchema()

root
|-- tran_id: string (nullable = true)
|-- pay_method: string (nullable = true)
|-- tran_dt: timestamp (nullable = true)
|-- user_id: string (nullable = true)
|-- shop_id: string (nullable = true)
|-- pay_amount_raw: int (nullable = true)
|-- pay_amount_without_tax: int (nullable = true)
|-- y: int (nullable = true)
|-- m: int (nullable = true)
|-- d: int (nullable = true)

次に、集計によるデータ全体の確認です。groupBy()メソッドで集約してからagg()メソッドで集計することでPKの一意性や期間の整合性が確認できます。

確認観点③: 集計によるデータ全体の確認
(
    tran_sdf
    .withColumn("ymd",fn.concat("y","m","d")) # "年月日"カラムを追加
    .groupBy() # 集約
    .agg(
        fn.count("*").alias("record_cnt"), # レコード数をカウント
        fn.count("tran_id").alias("tran_cnt"), # "tran_id"数をカウント
        fn.countDistinct("tran_id").alias("tran_uu_cnt"), # "tran_id"のUU数をカウント
        fn.min("ymd").alias("oldest_ymd"), # "年月日"が最も古いレコードの値を集計
        fn.max("ymd").alias("latest_ymd") # "年月日"が最も新しいレコードの値を集計
    )
).show() 
record_cnt tran_cnt tran_uu_cnt oldest_ymd latest_ymd
33 33 33 20250103 20250401

そして、欠損値や異常値の確認です。少し理解の難易度が高くなりますが、select()メソッドをうまく使用し、カラムごとの欠損(null)率を算出します。

確認観点④: 欠損値や異常値の確認
(
    tran_sdf
    .select([(
        fn.count(fn.when(fn.col(c).isNull(), c)) / fn.count("*")).alias(f"{c}_null_rate"
    ) for c in tran_sdf.columns])
).show()

tran_id_null_rate pay_method_null_rate tran_dt_null_rate user_id_null_rate shop_id_null_rate pay_amount_raw_null_rate pay_amount_without_tax_null_rate y_null_rate m_null_rate d_null_rate
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0

最後に、上記のコードスニペットをまとめたスクリプトの全量を紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
# セッション作成
spark = SparkSession.builder.getOrCreate()

# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS transaction_table (
        tran_id                   STRING,     --トランザクションID
        pay_method                STRING,     --決済手段
        tran_dt                   TIMESTAMP,  --トランザクション日時
        user_id                   STRING,     --ユーザーID
        shop_id                   STRING,     --店舗ID
        pay_amount_raw            INT,        --決済金額(税込)
        pay_amount_without_tax    INT,        --決済金額(税抜)
        y                         STRING,     --年
        m                         STRING,     --月
        d                         STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string, m string, d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/transaction_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")

# テーブル読み込み
tran_sdf = spark.read.table("transaction_table")
# データ確認
tran_sdf.show(3) # 確認①:概観
tran_sdf.printSchema() # 確認②:カラムの型確認

(
    tran_sdf
    .withColumn("ymd",fn.concat("y","m","d"))
    .groupBy()
    .agg(
        fn.count("*").alias("record_cnt"),
        fn.count("tran_id").alias("tran_cnt"),
        fn.countDistinct("tran_id").alias("tran_uu_cnt"),
        fn.min("ymd").alias("oldest_ymd"),
        fn.max("ymd").alias("latest_ymd")
    )
).show() # 確認③:PKのUU確認および期間確認
(
    tran_sdf
    .select([(
        fn.count(fn.when(fn.col(c).isNull(), c)) / fn.count("*")).alias(f"{c}_null_rate"
    ) for c in tran_sdf.columns])
).show() # 確認④: 欠損値や異常値の確認

『結論』

◾️データ確認の意義

データ確認の意義はデータの信頼性と品質を確保することにあります。
不整合や欠損値、異常値を特定することで、エラーや誤った分析結果を未然に防ぎ、データ処理や分析の基盤を整えます。

◾️データ確認に必要なエンジニアリング技術を高めるメリット

効率的なスクリプトやツールを活用するスキルは、作業の自動化やミスの低減を実現します。さらに、データ品質の問題を迅速に発見・修正できることで、プロジェクト全体のリスク管理が向上します。結果として、精度の高いシステム開発や分析を提供できるエンジニアとしての信頼性が高まり、キャリアの成長に繋がります。

Discussion

fact601fact601

『品質確認の要件整理』を初めて見たので感動しました!