Zenn
🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【データ分析の最適化②fn.format_number()】

2025/01/24に公開
1
2

【Python・Pysparkで学ぶ!】データ分析の基礎【データ分析の最適化②fn.format_number()】

↓市場別時価総額(historical_jika_table)月末時価総額(2024年12月)のサンプル

End_of_Month Prime Standard Growth Tokyo_Pro_Market Total y m d
44679 683684685 21483209 6338312 125677 711631885 2024 31 01
44712 686009079 21426743 6125602 129398 713690824 2024 31 01
44742 672823024 21341379 6086298 130556 700381258 2024 31 01
44771 698834944 21815836 6610347 132764 727393893 2024 31 01
44804 706358642 22253729 6934768 133789 735680930 2024 31 01
44834 660344668 21311117 6706567 135851 688498205 2024 31 01
44865 694464581 22010045 7304680 138851 723918159 2024 31 01

上記のような決済データを集約したSQLテーブルが存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『historical_jika_tableをCSV出力してほしい。』

本稿では、クライアントからの要望に答えながら、 可読性向上のテクニック について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントの要望との認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • SQLテーブルの情報

    • SQLテーブル名称
      • 合意『テーブル名称:historical_jika_table』
    • SQLテーブルの格納先
      • 合意『格納先:s3://data/warehouse/historical_jika_table/』
    • 項目名称・項目型
      • 合意『項目名称と項目型は以下の通りとする。』
        • End_of_Month : int型
        • Prime : int型
        • Standard : int型
        • Growth : int型
        • Tokyo_Pro_Market : int型
        • Total : int型
    • パーティション
      • 合意『SQLテーブルのパーティションはy,m,dで分割される』
    • 保存形式
      • 合意『SQLテーブルの保存形式:parquet形式』
    • Parquetファイルの圧縮アルゴリズム
      • 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
    • セルの区切り
      • Parquet形式では、区切り文字の概念はありません。※Parquetが列指向のバイナリフォーマットであるためです。
  • テーブルの項目情報

    • 月末(End of Month)
      • Excelの日付シリアル値(基準0日目が1899/12/30)となっている。
        • 合意『最終アウトプットでは、yyyy/mm/dd形式にする』
    • 市場名称(Prime,Standard,Growth,Tokyo Pro Market,Total)
      • 時価の単位が読みづらい
        • 合意『最終アウトプットでは、3桁ごとに,(カンマ)をつける』

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、SQLテーブルをDDL(Data Definition Language)を使用して読み込みます。

SQLテーブルを読み込む
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS historical_jika_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS historical_jika_table (
        End_of_Month      INT,      --月末
        Prime             INT,      --プライム
        Standard          INT,      --スタンダード
        Growth            INT,      --グロース
        Tokyo_Pro_Market  INT,      --東京プロマーケット
        Total             INT,      --合計金額
        y                 STRING,   --年
        m                 STRING,   --月
        d                 STRING    --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y STRING,m STRING,d STRING)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/historical_jika_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE historical_jika_table;""")

次に、タッチポイントの合意に従い、データを加工します。

  • "End_of_Month"の値をyyyy/mm/dd形式にする
  • "Prime","Standard","Growth","Tokyo_Pro_Market","Total"の値を3桁ごとに,(カンマ)をつける
    上記のデータ加工をするには、date_format()メソッドformat_number()メソッドが役に立ちます。
データ加工
historical_jika_df = (
    spark.read.table("historical_jika_table")
    .select(
        fn.date_format(
            to_date(fn.col("End_of_Month"), "yyyy-MM-dd"), "yyyy/MM/dd"
        ).alias("End_of_Month"),
        fn.format_number(fn.col("Prime"), 0).alias("Prime"),
        fn.format_number(fn.col("Standard"), 0).alias("Standard"),
        fn.format_number(fn.col("Growth"), 0).alias("Growth"),
        fn.format_number(fn.col("Tokyo_Pro_Market"), 0).alias("Tokyo_Pro_Market"),
        fn.format_number(fn.col("Total"), 0).alias("Total"),
        "y",
        "m",
        "d"
    )
)
End_of_Month Prime Standard Growth Tokyo_Pro_Market Total y m d
2022/04/28 683,684,685 21,483,209 6,338,312 125,677 711,631,885 2024 31 01
2022/05/31 686,009,079 21,426,743 6,125,602 129,398 713,690,824 2024 31 01
2022/06/30 672,823,024 21,341,379 6,086,298 130,556 700,381,258 2024 31 01
2022/07/29 698,834,944 21,815,836 6,610,347 132,764 727,393,893 2024 31 01
2022/08/31 706,358,642 22,253,729 6,934,768 133,789 735,680,930 2024 31 01
2022/09/30 660,344,668 21,311,117 6,706,567 135,851 688,498,205 2024 31 01
2022/10/31 694,464,581 22,010,045 7,304,680 138,851 723,918,159 2024 31 01

上記の結果より、タッチポイントの合意に沿ったアウトプットを得られたことが確認できました。
次に、データフレームを現在のディレクトリにCSV出力します。

CSV出力
# 現在のディレクトリにCSV形式で保存
historical_jika_df.write.mode("overwrite").option("header", "true").csv("./")

最後に、スクリプト全量を紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# セッション作成
spark = SparkSession.builder.getOrCreate()

# テーブルの削除
spark.sql("DROP TABLE IF EXISTS historical_jika_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS historical_jika_table (
        End_of_Month      INT,      --月末
        Prime             INT,      --プライム
        Standard          INT,      --スタンダード
        Growth            INT,      --グロース
        Tokyo_Pro_Market  INT,      --東京プロマーケット
        Total             INT,      --合計金額
        y                 STRING,   --年
        m                 STRING,   --月
        d                 STRING    --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y STRING,m STRING,d STRING)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/historical_jika_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE historical_jika_table;""")

# 現在の"End_of_Month"がExcelの日付シリアル値の場合、それを日付に変換
# "Prime", "Standard", "Growth", "Tokyo_Pro_Market", "Total"をフォーマット
historical_jika_df = (
    spark.read.table("historical_jika_table")
    .select(
        fn.date_format(
            to_date(fn.col("End_of_Month"), "yyyy-MM-dd"), "yyyy/MM/dd"
        ).alias("End_of_Month"),
        fn.format_number(fn.col("Prime"), 0).alias("Prime"),
        fn.format_number(fn.col("Standard"), 0).alias("Standard"),
        fn.format_number(fn.col("Growth"), 0).alias("Growth"),
        fn.format_number(fn.col("Tokyo_Pro_Market"), 0).alias("Tokyo_Pro_Market"),
        fn.format_number(fn.col("Total"), 0).alias("Total"),
        "y",
        "m",
        "d"
    )
)

# 現在のディレクトリにCSV形式で保存
historical_jika_df.write.mode("overwrite").option("header", "true").csv("./")

『結論』

◾️日付変更やカンマを付加するメリット(ビジネスサイド)

  • 日付や数値をわかりやすくフォーマットすることで、データを直感的に理解しやすくなる。特に、非技術者や経営陣がレポートやダッシュボードを見る際に役立つ。
  • 見た目が整ったデータは、外部へのレポートやクライアントとのコミュニケーションで、信頼感を与える。

◾️日付変更やカンマを付加するメリット(エンジニアリングサイド)

  • 日付やカンマ付きのフォーマットにすることで、ダッシュボードやレポート作成時の前処理が不要になる。
  • 一度フォーマットを整えたデータは、他のプロジェクトやシステムで簡単に再利用できる。
2

Discussion

fact601fact601

Excelで日時が数値になるのをPySparkで対応できるのが嬉しいですね!

ログインするとコメントできます