🆗
【Python・Pysparkで学ぶ!】データ分析の基礎【データ結合②join(how="full")】
【Python・Pysparkで学ぶ!】データ分析の基礎【データ結合②join(how="full")】
↓料金プラン顧客情報(user_plan_table)のサンプル
acq_date | user_id | cur_price_plan_cd | cur_price_plan_name | new_price_plan_cd | new_price_plan_name | y | m | d |
---|---|---|---|---|---|---|---|---|
20250501 | user000001 | P001 | ベーシックプラン | NULL | NULL | 2025 | 05 | 01 |
20250501 | user000002 | P002 | プレミアムプラン | P001 | ベーシックプラン | 2025 | 05 | 01 |
20250501 | user000004 | P001 | ベーシックプラン | NULL | NULL | 2025 | 05 | 01 |
20250502 | user000001 | P001 | ベーシックプラン | NULL | NULL | 2025 | 05 | 02 |
20250502 | user000002 | P001 | ベーシックプラン | NULL | NULL | 2025 | 05 | 02 |
20250502 | user000005 | NULL | NULL | P001 | ベーシックプラン | 2025 | 05 | 02 |
上記のような決済データを集約したSQLテーブルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『前日情報を加えたテーブルを作成したい』
本稿では、クライアントからの要望に答えながら、 完全外部結合 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
- 料金プランは『ベーシックプラン・プレミアムプラン』の2種類
-
- 前日情報とは?
- 料金プラン顧客情報の"acq_dt"取得日における前日のレコード
- 前日情報とは?
-
- 合意『結合キーは"acq_dt"と"user_id"である』
- 合意『当日料金プラン顧客情報と前日料金プラン顧客情報を完全対等に結合する』
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
acq_dt | user_id | cur_price_plan_cd | cur_price_plan_name | new_price_plan_cd | new_price_plan_name | bef_cur_price_plan_cd | bef_cur_price_plan_name | bef_new_price_plan_cd | bef_new_price_plan_name |
---|---|---|---|---|---|---|---|---|---|
20250502 | user000001 | P001 | ベーシックプラン | NULL | NULL | P001 | ベーシックプラン | NULL | NULL |
20250502 | user000002 | P001 | ベーシックプラン | NULL | NULL | P002 | プレミアムプラン | P001 | ベーシックプラン |
20250502 | user000004 | NULL | NULL | NULL | NULL | P001 | ベーシックプラン | NULL | NULL |
20250502 | user000005 | NULL | NULL | P001 | ベーシックプラン | NULL | NULL | NULL | NULL |
20250502 | user000006 | P001 | ベーシックプラン | NULL | NULL | NULL | NULL | P001 | ベーシックプラン |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、spark.sql()メソッドでDDLを書き、SQLテーブルを読み込みます。
SQLテーブルを読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS user_plan_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS user_plan_table (
acq_date STRING, --取得日
user_id STRING, --ユーザーID
cur_price_plan_cd STRING, --現加入料金プランコード
cur_price_plan_name STRING, --現加入料金プラン名称
new_price_plan_cd STRING, --新加入料金プランコード
new_price_plan_name STRING, --新加入料金プラン名称
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/user_plan_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE user_plan_table;""")
次に、合意に従いデータフレームを結合します。
今回は1つのデータフレームを2つに分けて、再度結合するというケースです。
その場合、注意点は以下の5点です。。。
- 結合キーの欠損データ
- 理由:結合キーが欠損している場合、正確な結合ができず、結果データが意図しないものになる可能性があります。
- 対策:結合前にfilter()を使ってキーがNULLでない行を除外する。
- 項目名称が被る
- 理由:1つのデータフレームに同じ項目名称は一つしか使えないため、エラーが発生します。
- 対策:withColumnRenamed()を使って片方のデータフレームのカラム名を事前に変更する。
- Null値の処理
- 理由:完全外部結合時には、結合キーがPKとなるケースがほとんどです。しかし、完全外部結合では結合キーが完全一致しない限り、PKにしたい項目にNULL値が入ります。その処理方法は事前に考える必要があります。
- 対策:fillna()やwithColumn()を使用してNULL値を埋める。
- パフォーマンスの低下
- 理由:完全外部結合はすべての結合キーの組み合わせを評価するため、データ量が多い場合パフォーマンスに影響します。
- 対策:必要なカラムのみ選択して結合する。(select()で絞る)
fullouterは基本業務における出現率が少ないので、確認はしっかりしましょう。
データフレームを結合
# メインテーブルを作成
target_day_sdf = spark.read.table("user_plan_table")
# サブテーブルを作成
bef_1_day_sdf = (
spark.read.table("user_plan_table")
.select(
fn.date_format(fn.date_sub(fn.to_date(fn.col("acq_date"), "yyyyMMdd"), 1), "yyyyMMdd").alias("base_date"),
fn.col("user_id").alias("bef_user_id"),
fn.col("cur_price_plan_cd").alias("bef_cur_price_plan_cd"),
fn.col("cur_price_plan_name").alias("bef_cur_price_plan_name"),
fn.col("new_price_plan_cd").alias("bef_new_price_plan_cd"),
fn.col("new_price_plan_name").alias("bef_new_price_plan_name"),
)
)
# 完全外部結合
union_sdf = (
target_day_sdf
.join(
bef_1_day_sdf,
on=(
(target_day_sdf.acq_date == bef_1_day_sdf.base_date) & (target_day_sdf.user_id == bef_1_day_sdf.bef_user_id)
),
how="fullouter"
)
.withColumn(
"acq_dt",
fn.when(fn.col("acq_dt").isNull(),fn.col("base_date"))
.otherwise(fn.col("acq_dt"))
)
.withColumn(
"user_id",
fn.when(fn.col("user_id").isNull(),fn.col("bef_user_id"))
.otherwise(fn.col("user_id"))
)
.select(
"acq_dt",
"user_id",
"cur_price_plan_cd",
"cur_price_plan_name",
"new_price_plan_cd",
"new_price_plan_name",
"bef_cur_price_plan_cd",
"bef_cur_price_plan_name",
"bef_new_price_plan_cd",
"bef_new_price_plan_name"
)
)
# 概観確認
union_sdf.show(4)
acq_dt | user_id | cur_price_plan_cd | cur_price_plan_name | new_price_plan_cd | new_price_plan_name | bef_cur_price_plan_cd | bef_cur_price_plan_name | bef_new_price_plan_cd | bef_new_price_plan_name |
---|---|---|---|---|---|---|---|---|---|
20250502 | user000001 | P001 | ベーシックプラン | NULL | NULL | P001 | ベーシックプラン | NULL | NULL |
20250502 | user000002 | P001 | ベーシックプラン | NULL | NULL | P002 | プレミアムプラン | P001 | ベーシックプラン |
20250502 | user000004 | NULL | NULL | NULL | NULL | P001 | ベーシックプラン | NULL | NULL |
20250502 | user000005 | NULL | NULL | P001 | ベーシックプラン | NULL | NULL | NULL | NULL |
20250502 | user000006 | P001 | ベーシックプラン | NULL | NULL | NULL | NULL | P001 | ベーシックプラン |
上記の結果から、操作が意図した通りであることが確認できました。
最後に、スクリプト全量をご紹介します。
スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS user_plan_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS user_plan_table (
acq_date STRING, --取得日
user_id STRING, --ユーザーID
cur_price_plan_cd STRING, --現加入料金プランコード
cur_price_plan_name STRING, --現加入料金プラン名称
new_price_plan_cd STRING, --新加入料金プランコード
new_price_plan_name STRING, --現加入料金プラン名称
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/user_plan_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE user_plan_table;""")
# SQLテーブルからデータフレーム作成
target_day_sdf = spark.read.table("user_plan_table")
bef_1_day_sdf = (
spark.read.table("user_plan_table")
.select(
fn.date_format(fn.date_sub(fn.to_date(fn.col("acq_date"), "yyyyMMdd"), 1), "yyyyMMdd").alias("base_date"),
fn.col("user_id").alias("bef_user_id"),
fn.col("cur_price_plan_cd").alias("bef_cur_price_plan_cd"),
fn.col("cur_price_plan_name").alias("bef_cur_price_plan_name"),
fn.col("new_price_plan_cd").alias("bef_new_price_plan_cd"),
fn.col("new_price_plan_name").alias("bef_new_price_plan_name"),
)
)
# 完全外部結合
union_sdf = (
target_day_sdf
.join(
bef_1_day_sdf,
on=(
(target_day_sdf.acq_date == bef_1_day_sdf.base_date) & (target_day_sdf.user_id == bef_1_day_sdf.bef_user_id)
),
how="fullouter"
)
.withColumn(
"acq_dt",
fn.when(fn.col("acq_dt").isNull(),fn.col("base_date"))
.otherwise(fn.col("acq_dt"))
)
.withColumn(
"user_id",
fn.when(fn.col("user_id").isNull(),fn.col("bef_user_id"))
.otherwise(fn.col("user_id"))
)
.select(
"acq_dt",
"user_id",
"cur_price_plan_cd",
"cur_price_plan_name",
"new_price_plan_cd",
"new_price_plan_name",
"bef_cur_price_plan_cd",
"bef_cur_price_plan_name",
"bef_new_price_plan_cd",
"bef_new_price_plan_name"
)
)
# 概観確認
union_sdf.show(4)
『結論』
◾️Fullouter(完全外部結合)を習得するメリット(ビジネスサイド)
Fullouter(完全外部結合)を習得するメリットは以下の通りです。
-
データの完全性を担保
- 完全外部結合では、両方のデータセットに存在するすべての情報を残せます。
-
レポートの正確性が向上
- データソース間の不一致も含めて分析できるため、透明性と信頼性の高いレポート作成が可能です。
◾️Fullouter(完全外部結合)を習得するメリット(エンジニアリングサイド)
Fullouter(完全外部結合)を習得するメリットは以下の通りです。
-
柔軟なデータ統合スキルが身につく
- 完全外部結合を使えば、システム移行や異種データソースの統合において柔軟な対応が可能となります。
-
エラー対応が効率化される
- 結合に失敗するデータを無視せずすべて保持するため、結合エラーの原因追跡が容易です。
Discussion