🆗
【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加④fn.coalesce()】
【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加④fn.coalesce()】
↓2025年6月の返金考慮前トランザクションファイル(transaction_table_v001_202506)のサンプル
取引時刻 | 取引内容 | 端末番号 | 伝票番号 | 決済方法 | 支払区分 | 金額 | 備考 |
---|---|---|---|---|---|---|---|
2025/06/01 20:00:00 | 売上 | 9999900199999 | 000017 | コード | 一括 | 3300 | |
2025/06/01 20:05:00 | 返金 | 9999900199999 | 000018(000017) | コード | 一括 | -1100 | |
2025/06/02 10:25:00 | 売上 | 9999900299999 | 000019 | カード | 一括 | 410 | |
2025/06/02 14:30:00 | 売上 | 9999900199999 | 000020 | コード | 一括 | 5120 |
上記のような決済データを集約したCSVファイルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『返金考慮前トランザクションテーブルから返金を考慮したテーブルを作成したい』
本稿では、クライアントからの要望に答えながら、 返金考慮 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
- 取引時刻:決済した日時
- 取引内容:『売上・返金』
- 端末番号:レジスター端末の識別番号(13桁)
- 決済方法:『キャッシュ・コード・カード』
- 支払区分:『一括・分割』
- 金額 :税込決済金額
- 備考 :(ユーザー記入)
- データソース格納先
-
合意『データソースの格納先を以下の通りとする』
-
トランザクションエクセルファイル(transaction_table.csv)
- 格納先:s3://data/content/static/transaction_table_v001_202506.csv
-
トランザクションエクセルファイル(transaction_table.csv)
-
合意『データソースの格納先を以下の通りとする』
◾️ アウトプットイメージ
取引時刻 | 取引内容 | 端末番号 | 伝票番号 | 決済方法 | 支払区分 | 金額 | 備考 |
---|---|---|---|---|---|---|---|
2025/06/01 20:00:00 | 売上 | 9999900199999 | 000017 | コード | 一括 | 2200 | |
2025/06/02 10:25:00 | 売上 | 9999900299999 | 000019 | カード | 一括 | 410 | |
2025/06/02 14:30:00 | 売上 | 9999900199999 | 000020 | コード | 一括 | 5120 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、読み込むデータソースの『スキーマ定義』 を行います。
読み込むデータソースの『スキーマ定義』
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# スキーマ定義
transaction_schema = StructType([
StructField("取引時刻", TimestampType(), False),
StructField("取引内容", StringType(), False),
StructField("端末番号", StringType(), False),
StructField("伝票番号", StringType(), False),
StructField("決済方法", StringType(), False),
StructField("支払区分", StringType(), False),
StructField("金額", IntegerType(), False),
StructField("備考", StringType(), True),
])
上記スキーマを利用して、CSVファイルを読み込みます。
CSVファイルを読み込み
# CSVファイル読み込み
sdf = spark.read.csv("s3a://data/content/static/transaction_table_v001_202506.csv",header=True, schema=transaction_schema)
次に、読み込んだデータを使用して、返金考慮データを作成します。
作成手順は以下の通りです。
- 売上と返金のデータフレームを分離
- 返金データから紐づく伝票番号を抽出
- 売上データと返金データを伝票番号を結合キーにして結合
- 売上と返金を合算集計 ← ここでcoalesce()を使用
考えるコツは、売上データと返金データを異なる種類のテーブルとして考えることです。
返金考慮データを作成
# 売上と返金データフレームを作成
sales_df = sdf.filter(fn.col("取引内容") == "売上")
refund_df = sdf.filter(fn.col("取引内容") == "返金")
# 正規表現を使用して返金データから結合伝票番号と返金金額を抽出
refund_df = (
refund_df
.select(
fn.regexp_extract("伝票番号", "\\((.*?)\\)", 1).alias("結合伝票番号"),
fn.col("金額").alias("返金金額")
)
)
# 売上と返金データの結合
result_df = sales_df.join(
refund_df,
(sales_df.伝票番号 == refund_df.結合伝票番号),
"left"
).select(
sales_df.取引時刻,
sales_df.取引内容,
sales_df.端末番号,
sales_df.伝票番号,
sales_df.決済方法,
sales_df.支払区分,
# coalesceを使用して、金額と返金金額を合算する。
(sales_df.金額 + fn.coalesce(refund_df.返金金額, fn.lit(0))).alias("金額"),
sales_df.備考
)
# データフレームの概観確認
result_df.show(3, truncate=False)
取引時刻 | 取引内容 | 端末番号 | 伝票番号 | 決済方法 | 支払区分 | 金額 | 備考 |
---|---|---|---|---|---|---|---|
2025/06/01 20:00:00 | 売上 | 9999900199999 | 000017 | コード | 一括 | 2200 | |
2025/06/02 10:25:00 | 売上 | 9999900299999 | 000019 | カード | 一括 | 410 | |
2025/06/02 14:30:00 | 売上 | 9999900199999 | 000020 | コード | 一括 | 5120 |
上記の結果から、操作が意図している通りになっていることが確認できました。
最後に、スクリプト全量を紹介します。
スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import pandas as pd
# セッション作成
spark = SparkSession.builder.getOrCreate()
# スキーマ定義
transaction_schema = StructType([
StructField("取引時刻", StringType(), False),
StructField("取引内容", StringType(), False),
StructField("端末番号", StringType(), False),
StructField("伝票番号", StringType(), False),
StructField("決済方法", StringType(), False),
StructField("支払区分", StringType(), False),
StructField("金額", IntegerType(), False),
StructField("備考", StringType(), True),
])
# CSVファイル読み込み
sdf = spark.read.csv("s3a://data/content/static/transaction_table_v001_202506.csv",header=True, schema=transaction_schema)
# 売上と返金データフレームを作成
sales_df = sdf.filter(fn.col("取引内容") == "売上")
refund_df = sdf.filter(fn.col("取引内容") == "返金")
# 正規表現を使用して返金データから結合伝票番号と返金金額を抽出
refund_df = (
refund_df
.select(
fn.regexp_extract("伝票番号", "\\((.*?)\\)", 1).alias("結合伝票番号"),
fn.col("金額").alias("返金金額")
)
)
# 売上と返金データの結合
result_df = sales_df.join(
refund_df,
(sales_df.伝票番号 == refund_df.結合伝票番号),
"left"
).select(
sales_df.取引時刻,
sales_df.取引内容,
sales_df.端末番号,
sales_df.伝票番号,
sales_df.決済方法,
sales_df.支払区分,
# coalesceを使用して、金額と返金金額を合算する。
(sales_df.金額 + fn.coalesce(refund_df.返金金額, fn.lit(0))).alias("金額"),
sales_df.備考
)
『結論』
◾️データフレームの結合からfn.coalesceで集計する方法を習得するメリット(ビジネスサイド)
-
正確なデータ分析
- 返金額や売上額の補正処理を自動化し、正しい売上データを基にビジネス戦略を構築できます。
-
効率的な意思決定
- 返金を考慮した売上データがリアルタイムに近い形で提供され、迅速な意思決定を支援します。
◾️データフレームの結合からfn.coalesceで集計する方法を習得するメリット(エンジニアサイド)
-
NULL値処理の簡略化
- fn.coalesce() によるNULL処理で、計算ミスやエラーを防止し、より堅牢なデータパイプラインを構築できます。
-
再利用可能なロジック
- fn.coalesce() と結合処理はテンプレート化しやすく、他のデータ処理タスクでも再利用可能です。
Discussion