🆗
【Python・Pysparkで学ぶ!】データ分析の基礎【データ分析の最適化②fn.format_number()】
【Python・Pysparkで学ぶ!】データ分析の基礎【データ分析の最適化②fn.format_number()】
↓市場別時価総額(historical_jika_table)月末時価総額(2024年12月)のサンプル
End_of_Month | Prime | Standard | Growth | Tokyo_Pro_Market | Total | y | m | d |
---|---|---|---|---|---|---|---|---|
44679 | 683684685 | 21483209 | 6338312 | 125677 | 711631885 | 2024 | 31 | 01 |
44712 | 686009079 | 21426743 | 6125602 | 129398 | 713690824 | 2024 | 31 | 01 |
44742 | 672823024 | 21341379 | 6086298 | 130556 | 700381258 | 2024 | 31 | 01 |
44771 | 698834944 | 21815836 | 6610347 | 132764 | 727393893 | 2024 | 31 | 01 |
44804 | 706358642 | 22253729 | 6934768 | 133789 | 735680930 | 2024 | 31 | 01 |
44834 | 660344668 | 21311117 | 6706567 | 135851 | 688498205 | 2024 | 31 | 01 |
44865 | 694464581 | 22010045 | 7304680 | 138851 | 723918159 | 2024 | 31 | 01 |
上記のような決済データを集約したSQLテーブルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『historical_jika_tableをCSV出力してほしい。』
本稿では、クライアントからの要望に答えながら、 可読性向上のテクニック について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントの要望との認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
-
SQLテーブル名称
- 合意『テーブル名称:historical_jika_table』
- SQLテーブルの格納先
- 合意『格納先:s3://data/warehouse/historical_jika_table/』
-
項目名称・項目型
-
合意『項目名称と項目型は以下の通りとする。』
- End_of_Month : int型
- Prime : int型
- Standard : int型
- Growth : int型
- Tokyo_Pro_Market : int型
- Total : int型
-
合意『項目名称と項目型は以下の通りとする。』
- パーティション
- 合意『SQLテーブルのパーティションはy,m,dで分割される』
- 保存形式
- 合意『SQLテーブルの保存形式:parquet形式』
- Parquetファイルの圧縮アルゴリズム
- 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
-
セルの区切り
- Parquet形式では、区切り文字の概念はありません。※Parquetが列指向のバイナリフォーマットであるためです。
-
SQLテーブル名称
-
-
月末(End of Month)
-
Excelの日付シリアル値(基準0日目が1899/12/30)となっている。
- 合意『最終アウトプットでは、yyyy/mm/dd形式にする』
-
Excelの日付シリアル値(基準0日目が1899/12/30)となっている。
-
市場名称(Prime,Standard,Growth,Tokyo Pro Market,Total)
- 時価の単位が読みづらい
- 合意『最終アウトプットでは、3桁ごとに,(カンマ)をつける』
- 時価の単位が読みづらい
-
月末(End of Month)
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、SQLテーブルをDDL(Data Definition Language)を使用して読み込みます。
SQLテーブルを読み込む
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS historical_jika_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS historical_jika_table (
End_of_Month INT, --月末
Prime INT, --プライム
Standard INT, --スタンダード
Growth INT, --グロース
Tokyo_Pro_Market INT, --東京プロマーケット
Total INT, --合計金額
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y STRING,m STRING,d STRING) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/historical_jika_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE historical_jika_table;""")
次に、タッチポイントの合意に従い、データを加工します。
- "End_of_Month"の値をyyyy/mm/dd形式にする
-
"Prime","Standard","Growth","Tokyo_Pro_Market","Total"の値を3桁ごとに,(カンマ)をつける
上記のデータ加工をするには、date_format()メソッドとformat_number()メソッドが役に立ちます。
データ加工
historical_jika_df = (
spark.read.table("historical_jika_table")
.select(
fn.date_format(
to_date(fn.col("End_of_Month"), "yyyy-MM-dd"), "yyyy/MM/dd"
).alias("End_of_Month"),
fn.format_number(fn.col("Prime"), 0).alias("Prime"),
fn.format_number(fn.col("Standard"), 0).alias("Standard"),
fn.format_number(fn.col("Growth"), 0).alias("Growth"),
fn.format_number(fn.col("Tokyo_Pro_Market"), 0).alias("Tokyo_Pro_Market"),
fn.format_number(fn.col("Total"), 0).alias("Total"),
"y",
"m",
"d"
)
)
End_of_Month | Prime | Standard | Growth | Tokyo_Pro_Market | Total | y | m | d |
---|---|---|---|---|---|---|---|---|
2022/04/28 | 683,684,685 | 21,483,209 | 6,338,312 | 125,677 | 711,631,885 | 2024 | 31 | 01 |
2022/05/31 | 686,009,079 | 21,426,743 | 6,125,602 | 129,398 | 713,690,824 | 2024 | 31 | 01 |
2022/06/30 | 672,823,024 | 21,341,379 | 6,086,298 | 130,556 | 700,381,258 | 2024 | 31 | 01 |
2022/07/29 | 698,834,944 | 21,815,836 | 6,610,347 | 132,764 | 727,393,893 | 2024 | 31 | 01 |
2022/08/31 | 706,358,642 | 22,253,729 | 6,934,768 | 133,789 | 735,680,930 | 2024 | 31 | 01 |
2022/09/30 | 660,344,668 | 21,311,117 | 6,706,567 | 135,851 | 688,498,205 | 2024 | 31 | 01 |
2022/10/31 | 694,464,581 | 22,010,045 | 7,304,680 | 138,851 | 723,918,159 | 2024 | 31 | 01 |
上記の結果より、タッチポイントの合意に沿ったアウトプットを得られたことが確認できました。
次に、データフレームを現在のディレクトリにCSV出力します。
CSV出力
# 現在のディレクトリにCSV形式で保存
historical_jika_df.write.mode("overwrite").option("header", "true").csv("./")
最後に、スクリプト全量を紹介します。
スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS historical_jika_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS historical_jika_table (
End_of_Month INT, --月末
Prime INT, --プライム
Standard INT, --スタンダード
Growth INT, --グロース
Tokyo_Pro_Market INT, --東京プロマーケット
Total INT, --合計金額
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y STRING,m STRING,d STRING) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/historical_jika_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE historical_jika_table;""")
# 現在の"End_of_Month"がExcelの日付シリアル値の場合、それを日付に変換
# "Prime", "Standard", "Growth", "Tokyo_Pro_Market", "Total"をフォーマット
historical_jika_df = (
spark.read.table("historical_jika_table")
.select(
fn.date_format(
to_date(fn.col("End_of_Month"), "yyyy-MM-dd"), "yyyy/MM/dd"
).alias("End_of_Month"),
fn.format_number(fn.col("Prime"), 0).alias("Prime"),
fn.format_number(fn.col("Standard"), 0).alias("Standard"),
fn.format_number(fn.col("Growth"), 0).alias("Growth"),
fn.format_number(fn.col("Tokyo_Pro_Market"), 0).alias("Tokyo_Pro_Market"),
fn.format_number(fn.col("Total"), 0).alias("Total"),
"y",
"m",
"d"
)
)
# 現在のディレクトリにCSV形式で保存
historical_jika_df.write.mode("overwrite").option("header", "true").csv("./")
『結論』
◾️日付変更やカンマを付加するメリット(ビジネスサイド)
- 日付や数値をわかりやすくフォーマットすることで、データを直感的に理解しやすくなる。特に、非技術者や経営陣がレポートやダッシュボードを見る際に役立つ。
- 見た目が整ったデータは、外部へのレポートやクライアントとのコミュニケーションで、信頼感を与える。
◾️日付変更やカンマを付加するメリット(エンジニアリングサイド)
- 日付やカンマ付きのフォーマットにすることで、ダッシュボードやレポート作成時の前処理が不要になる。
- 一度フォーマットを整えたデータは、他のプロジェクトやシステムで簡単に再利用できる。
Discussion
Excelで日時が数値になるのをPySparkで対応できるのが嬉しいですね!