【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み①spark.read.csv()】
【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み①spark.read.csv()】
↓加盟店リストCSVファイル(shop_list.csv)のサンプル
上記のようなデータがCSVファイル形式で存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『CSVファイルをPysparkデータフレームにしてほしい』
本稿では、クライアントからの要望に答えながら、 CSVファイルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
-
合意『データソースの特性を以下の通りとする』
- 加盟店リストCSVファイル(shop_list.csv): 静的データ
-
合意『データソースの特性を以下の通りとする』
-
-
合意『データソースの格納先を以下の通りとする』
-
加盟店リストCSVファイル(shop_list.csv)
- 格納先:s3://data/content/pj01/static/shop_list.csv
-
加盟店リストCSVファイル(shop_list.csv)
-
合意『データソースの格納先を以下の通りとする』
- データソースのオプション情報
-
合意『データソースのオプション情報を以下の通りとする』
-
加盟店リストCSVファイル(shop_list.csv)
- ヘッダー名称とデータ型
- shop_id : StringType : nullを許可しない
- shop_name : StringType : nullを許可しない
- cache : IntegerType : nullを許可する
- card : IntegerType : nullを許可する
- code : IntegerType : nullを許可する
- データソースのヘッダー行の有無
- ヘッダー行:無し
- 空文字の対応
- 空文字はnullに置換。その後、0埋めする。
- 複数行が1セル内に存在する場合の対応
- 複数行が1セル内に存在する場合がそもそもない。
- 文字コード
- データソースの文字コードがUTF-8なので、変換する必要なし
- Indexの必要性
- 必要なし
- ヘッダー名称とデータ型
-
加盟店リストCSVファイル(shop_list.csv)
-
合意『データソースのオプション情報を以下の通りとする』
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
shop_id | shop_name | cache | card | code |
---|---|---|---|---|
shop000001 | 佐藤レストラン | 1 | 1 | 1 |
shop000002 | 鈴木カレー | 1 | 1 | 0 |
shop000003 | 高橋カフェ | 1 | 1 | 1 |
shop000004 | 田中オンラインショップ | 0 | 1 | 1 |
shop000005 | 伊藤デジペイストア | 0 | 0 | 1 |
shop000006 | 渡辺ビール | 1 | 1 | 0 |
shop000007 | 山本パン | 1 | 1 | 1 |
shop000008 | 中村クレカ | 0 | 1 | 0 |
shop000009 | 小林ショップ | 1 | 0 | 1 |
shop000010 | 加藤コンビニ | 1 | 1 | 0 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、タッチポイントで合意した「ヘッダー名称とデータ型」に従い、読み込むデータソースの『スキーマ定義』 を行います。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# スキーマ定義
shop_list_schema = StructType([
StructField("shop_id", StringType(), False), # nullを許可しない
StructField("shop_name", StringType(), False), # nullを許可しない
StructField("cache", IntegerType(), True), # nullを許可する
StructField("card", IntegerType(), True), # nullを許可する
StructField("code", IntegerType(), True) # nullを許可する
])
次に、データソースを読み込みます。ここで、データソースの特性は静的データなので、DataFrameReaderのハンドル read.csv()メソッド を使用します。
また、null値を0埋めすることに注意します。
spark.read.csv(
"s3://content/sample_data/shop_list.csv",
schema=shop_list_schema,
header=True,
).fillna(0)
shop_id | shop_name | cache | card | code |
---|---|---|---|---|
shop000001 | 佐藤レストラン | 1 | 1 | 1 |
shop000002 | 鈴木カレー | 1 | 1 | 0 |
shop000003 | 高橋カフェ | 1 | 1 | 1 |
shop000004 | 田中オンラインショップ | 0 | 1 | 1 |
shop000005 | 伊藤デジペイストア | 0 | 0 | 1 |
shop000006 | 渡辺ビール | 1 | 0 | 0 |
shop000007 | 山本パン | 1 | 1 | 1 |
shop000008 | 中村クレカ | 0 | 1 | 0 |
shop000009 | 小林ショップ | 1 | 0 | 1 |
shop000010 | 加藤コンビニ | 1 | 1 | 0 |
上記の操作の結果から、加盟店リストCSVファイル(shop_list.csv) が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window
# セッション作成
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# スキーマ定義
shop_list_schema = StructType([
StructField("shop_id", StringType(), True),
StructField("shop_name", StringType(), True),
StructField("cache", IntegerType(), False),
StructField("card", IntegerType(), False),
StructField("code", IntegerType(), False)
])
# データの読み込み
spark.read.csv(
"s3://content/sample_data/shop_list.csv",
schema=shop_list_schema,
header=True,
).fillna(0) # nullを0埋め
『結論』
◾️CSVファイルを読み込む意義(ビジネスサイド)
CSVファイルを読み込むことの意義は以下の通りです。
・CSV形式は多くのビジネスデータで利用されており、PySparkを使って効率的に処理するスキルを学ぶことで、ビッグデータ解析の基礎が身につきます。
・小規模なデータから始めて、ビジネスの成長に合わせてスケールアップ可能なデータ基盤を構築できます。
◾️CSVファイルを読み込み方を学ぶメリット(エンジニアリングサイド)
CSVファイルを読み込むことを学ぶメリットは以下の通りです。
・スキーマの定義や型の管理を学ぶことで、データ品質の向上やエラーの回避を実現します。
・欠損値や文字コード問題への対応を学ぶことで、現場で頻発する課題に柔軟に対処するスキルを身につけられます。
Discussion
CSVファイルは基本的なモノなので、理解できてよかったです!