🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み①spark.read.csv()】

2025/01/15に公開
1

【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み①spark.read.csv()】

↓加盟店リストCSVファイル(shop_list.csv)のサンプル

上記のようなデータがCSVファイル形式で存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『CSVファイルをPysparkデータフレームにしてほしい』

本稿では、クライアントからの要望に答えながら、 CSVファイルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • データソースの特性(静的か動的か)

    • 合意『データソースの特性を以下の通りとする』
      • 加盟店リストCSVファイル(shop_list.csv): 静的データ
  • データソースの格納先

    • 合意『データソースの格納先を以下の通りとする』
      • 加盟店リストCSVファイル(shop_list.csv)
        • 格納先:s3://data/content/pj01/static/shop_list.csv
  • データソースのオプション情報
    • 合意『データソースのオプション情報を以下の通りとする』
      • 加盟店リストCSVファイル(shop_list.csv)
        • ヘッダー名称とデータ型
          • shop_id : StringType : nullを許可しない
          • shop_name : StringType : nullを許可しない
          • cache : IntegerType : nullを許可する
          • card : IntegerType : nullを許可する
          • code : IntegerType : nullを許可する
        • データソースのヘッダー行の有無
          • ヘッダー行:無し
        • 空文字の対応
          • 空文字はnullに置換。その後、0埋めする。
        • 複数行が1セル内に存在する場合の対応
          • 複数行が1セル内に存在する場合がそもそもない。
        • 文字コード
          • データソースの文字コードがUTF-8なので、変換する必要なし
        • Indexの必要性
          • 必要なし

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

shop_id shop_name cache card code
shop000001 佐藤レストラン 1 1 1
shop000002 鈴木カレー 1 1 0
shop000003 高橋カフェ 1 1 1
shop000004 田中オンラインショップ 0 1 1
shop000005 伊藤デジペイストア 0 0 1
shop000006 渡辺ビール 1 1 0
shop000007 山本パン 1 1 1
shop000008 中村クレカ 0 1 0
shop000009 小林ショップ 1 0 1
shop000010 加藤コンビニ 1 1 0

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、タッチポイントで合意した「ヘッダー名称とデータ型」に従い、読み込むデータソースの『スキーマ定義』 を行います。

読み込むデータソースの『スキーマ定義』
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# スキーマ定義
shop_list_schema = StructType([
    StructField("shop_id", StringType(), False), # nullを許可しない
    StructField("shop_name", StringType(), False), # nullを許可しない
    StructField("cache", IntegerType(), True), # nullを許可する
    StructField("card", IntegerType(), True), # nullを許可する
    StructField("code", IntegerType(), True) # nullを許可する
])

次に、データソースを読み込みます。ここで、データソースの特性は静的データなので、DataFrameReaderのハンドル read.csv()メソッド を使用します。
また、null値を0埋めすることに注意します。

データソース読み込み
spark.read.csv(
    "s3://content/sample_data/shop_list.csv",
    schema=shop_list_schema,
    header=True,
).fillna(0)
shop_id shop_name cache card code
shop000001 佐藤レストラン 1 1 1
shop000002 鈴木カレー 1 1 0
shop000003 高橋カフェ 1 1 1
shop000004 田中オンラインショップ 0 1 1
shop000005 伊藤デジペイストア 0 0 1
shop000006 渡辺ビール 1 0 0
shop000007 山本パン 1 1 1
shop000008 中村クレカ 0 1 0
shop000009 小林ショップ 1 0 1
shop000010 加藤コンビニ 1 1 0

上記の操作の結果から、加盟店リストCSVファイル(shop_list.csv) が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window

# セッション作成
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# スキーマ定義
shop_list_schema = StructType([
    StructField("shop_id", StringType(), True),
    StructField("shop_name", StringType(), True),
    StructField("cache", IntegerType(), False),
    StructField("card", IntegerType(), False),
    StructField("code", IntegerType(), False)
])

# データの読み込み
spark.read.csv(
    "s3://content/sample_data/shop_list.csv",
    schema=shop_list_schema,
    header=True,
).fillna(0) # nullを0埋め

『結論』

◾️CSVファイルを読み込む意義(ビジネスサイド)

CSVファイルを読み込むことの意義は以下の通りです。
・CSV形式は多くのビジネスデータで利用されており、PySparkを使って効率的に処理するスキルを学ぶことで、ビッグデータ解析の基礎が身につきます。
・小規模なデータから始めて、ビジネスの成長に合わせてスケールアップ可能なデータ基盤を構築できます。

◾️CSVファイルを読み込み方を学ぶメリット(エンジニアリングサイド)

CSVファイルを読み込むことを学ぶメリットは以下の通りです。
・スキーマの定義や型の管理を学ぶことで、データ品質の向上やエラーの回避を実現します。
・欠損値や文字コード問題への対応を学ぶことで、現場で頻発する課題に柔軟に対処するスキルを身につけられます。

Discussion

fact601fact601

CSVファイルは基本的なモノなので、理解できてよかったです!