【Python・Pysparkで学ぶ!】データ分析の基礎【データの確認.agg(fn.count(),fn.countDistinct()】
【Python・Pysparkで学ぶ!】データ分析の基礎【データの確認.agg(fn.count(),fn.countDistinct()】
↓トランザクションテーブル(transaction_table)のサンプル
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記のような決済データを集約したテーブルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『データの品質確認をしてほしい』
本稿では、クライアントからの要望に答えながら、 データフレームの確認 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントの要望との認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
-
確認観点①: 概観の確認
- 目的: データの形式や一部の値を確認
-
確認観点②: カラムの型確認
- 目的: スキーマ情報(カラム名とデータ型)を確認
-
確認観点③: 集計によるデータ全体の確認
- 目的: データ件数、主キーの一意性、日付の範囲を確認
-
確認観点④: 欠損値や異常値の確認
- 目的: 欠損率が高い場合、重要なカラムの適切性を確認
-
確認観点①: 概観の確認
◾️PKカラムの確認
クライアントより、PKは"tran_id"という情報を連携していただきました。
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
はじめに、データの概観の確認です。こちらはshow()メソッドを使用し、truncate引数にFalseを渡します。そうすることで、全ての列を切り詰めずに表示にさせます。
tran_sdf.show(3,truncate=False)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48:03 | user000005 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 |
102 | cache | 2025/01/04 22:27:16 | user000027 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 |
103 | code | 2025/01/08 03:22:07 | user000020 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 |
次に、カラムの型確認です。printSchema()メソッドはデータフレームのカラム名称とカラム型を表示します。
tran_sdf.printSchema()
root
|-- tran_id: string (nullable = true)
|-- pay_method: string (nullable = true)
|-- tran_dt: timestamp (nullable = true)
|-- user_id: string (nullable = true)
|-- shop_id: string (nullable = true)
|-- pay_amount_raw: int (nullable = true)
|-- pay_amount_without_tax: int (nullable = true)
|-- y: int (nullable = true)
|-- m: int (nullable = true)
|-- d: int (nullable = true)
次に、集計によるデータ全体の確認です。groupBy()メソッドで集約してからagg()メソッドで集計することでPKの一意性や期間の整合性が確認できます。
(
tran_sdf
.withColumn("ymd",fn.concat("y","m","d")) # "年月日"カラムを追加
.groupBy() # 集約
.agg(
fn.count("*").alias("record_cnt"), # レコード数をカウント
fn.count("tran_id").alias("tran_cnt"), # "tran_id"数をカウント
fn.countDistinct("tran_id").alias("tran_uu_cnt"), # "tran_id"のUU数をカウント
fn.min("ymd").alias("oldest_ymd"), # "年月日"が最も古いレコードの値を集計
fn.max("ymd").alias("latest_ymd") # "年月日"が最も新しいレコードの値を集計
)
).show()
record_cnt | tran_cnt | tran_uu_cnt | oldest_ymd | latest_ymd |
---|---|---|---|---|
33 | 33 | 33 | 20250103 | 20250401 |
そして、欠損値や異常値の確認です。少し理解の難易度が高くなりますが、select()メソッドをうまく使用し、カラムごとの欠損(null)率を算出します。
(
tran_sdf
.select([(
fn.count(fn.when(fn.col(c).isNull(), c)) / fn.count("*")).alias(f"{c}_null_rate"
) for c in tran_sdf.columns])
).show()
tran_id_null_rate | pay_method_null_rate | tran_dt_null_rate | user_id_null_rate | shop_id_null_rate | pay_amount_raw_null_rate | pay_amount_without_tax_null_rate | y_null_rate | m_null_rate | d_null_rate |
---|---|---|---|---|---|---|---|---|---|
0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
最後に、上記のコードスニペットをまとめたスクリプトの全量を紹介します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS transaction_table (
tran_id STRING, --トランザクションID
pay_method STRING, --決済手段
tran_dt TIMESTAMP, --トランザクション日時
user_id STRING, --ユーザーID
shop_id STRING, --店舗ID
pay_amount_raw INT, --決済金額(税込)
pay_amount_without_tax INT, --決済金額(税抜)
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string, m string, d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/transaction_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")
# テーブル読み込み
tran_sdf = spark.read.table("transaction_table")
# データ確認
tran_sdf.show(3) # 確認①:概観
tran_sdf.printSchema() # 確認②:カラムの型確認
(
tran_sdf
.withColumn("ymd",fn.concat("y","m","d"))
.groupBy()
.agg(
fn.count("*").alias("record_cnt"),
fn.count("tran_id").alias("tran_cnt"),
fn.countDistinct("tran_id").alias("tran_uu_cnt"),
fn.min("ymd").alias("oldest_ymd"),
fn.max("ymd").alias("latest_ymd")
)
).show() # 確認③:PKのUU確認および期間確認
(
tran_sdf
.select([(
fn.count(fn.when(fn.col(c).isNull(), c)) / fn.count("*")).alias(f"{c}_null_rate"
) for c in tran_sdf.columns])
).show() # 確認④: 欠損値や異常値の確認
『結論』
◾️データ確認の意義
データ確認の意義はデータの信頼性と品質を確保することにあります。
不整合や欠損値、異常値を特定することで、エラーや誤った分析結果を未然に防ぎ、データ処理や分析の基盤を整えます。
◾️データ確認に必要なエンジニアリング技術を高めるメリット
効率的なスクリプトやツールを活用するスキルは、作業の自動化やミスの低減を実現します。さらに、データ品質の問題を迅速に発見・修正できることで、プロジェクト全体のリスク管理が向上します。結果として、精度の高いシステム開発や分析を提供できるエンジニアとしての信頼性が高まり、キャリアの成長に繋がります。
Discussion
『品質確認の要件整理』を初めて見たので感動しました!
特に "最古日" と "最新日" は nice to haveです!