pandas初級者に送りたいTips
こんにちわ alivelimb です。
pandasは言わずと知れた Python のデータ分析ライブラリですが、私が分析業務をする時も利用しています。本記事では自分自身が過去に詰まったところや、後輩のコードレビューをしていてバグの原因になることが多いと感じた部分を Tips として紹介します。
はじめに
本記事では成績ダミーデータを生成して、データ操作を行います。
import random
import pandas as pd
def create_dummy_scores(n):
scores = [
dict(
user_id=f"user{i+1}",
subject=subject,
score=random.randint(0, 100)
)
for i in range(n)
for subject in ["国語", "数学", "英語", "理科", "社会"]
]
return scores
scores = create_dummy_scores(5)
score_df = pd.DataFrame(scores)
ダミーデータ例
user_id | subject | score | |
---|---|---|---|
0 | user1 | 国語 | 57 |
1 | user1 | 数学 | 78 |
2 | user1 | 英語 | 68 |
3 | user1 | 理科 | 87 |
4 | user1 | 社会 | 87 |
5 | user2 | 国語 | 41 |
6 | user2 | 数学 | 24 |
7 | user2 | 英語 | 13 |
8 | user2 | 理科 | 45 |
9 | user2 | 社会 | 28 |
なお、検証した バージョンは以下の通りです
- python:
3.8.7
- pandas:
1.4.2
- numpy:
1.22.3
- pandarallel:
1.6.1
- pandas-profiling:
3.1.0
- pandera:
0.10.1
CRUD を自在に行う
pandas で Create(作成、追加), Read(抽出), U(更新), Delete(削除)の方法について紹介します。
Create: io.StringIO で read_csv
io.StringIO
を用いると以下のように csv っぽい文字列からread_csv
できます。
from io import StringIO
data = """
user_id,subject,score,pres_score
user1,国語,57,85
user2,数学,78,16
"""
pd.read_csv(StringIO(data))
ダミーデータ生成のようにdict
からDataFrame
を作る方が書きやすいと思いますが、ちょっとした挙動確認や以下のような関数の単体テストに利用できます。この関数に関しては今後の章でも言及します。
def safe_read_csv(path):
df = pd.read_csv(path)
# 型や値の検証
Create: 列・行の追加
列・行の追加方法はいくつかありますが、私は以下のように書いています。
列の追加
# 全て同じ値で追加する場合
score_df["new_col"] = "new_value"
# 個別に値を指定して追加する場合
new_column = [seq_no for seq_no in range(len(score_df))]
score_df["new_col"] = new_columns
# 複数列追加する場合 -> DataFrameにしてDataFrame同士の結合
new_columns = dict(
new_col1=[seq_no for seq_no in range(len(score_df))],
new_col2=[seq_no for seq_no in range(len(score_df))],
)
score_df = pd.concat([score_df, pd.DataFrame(new_columns)], axis=1)
行の追加
new_rows = create_dummy_scores(5)
score_df = pd.concat([score_df, pd.DataFrame(new_rows)])
ちなみにappendという関数もありますが、Deprecated になっているため使わない方が良いでしょう。
Read: 条件抽出
こちらもいくつか書き方はありますが、私は以下のように書いています。
# user1の行を全て抽出
score_df[score_df["user_id"] == "user1"]
# user1でかつ教科が国語の行を全て抽出
condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
score_df[condtion]
DataFrame
で複数条件を指定する時は以下の 2 点に注意が必要です。
- python 標準の
and
,or
,not
ではなく&
,|
,~
を使う必要がある - 条件同士を
()
で囲む必要がある
では条件を満たす行を抽出(選択)した後に、列も抽出(射影)してみます。
例えば user1 の国語の点数を抽出してみましょう。
condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
score_df[condition]["score"]
これは公式ドキュメントでchained indexing
と呼ばれています。この書き方は値の確認をするだけなら問題ないのですが、抽出結果を更新しようとするとバグの原因となります。これについては次節で紹介します。
ちなみに
- n 行目から m 行目を取り出したいという時は、以下のような書き方ができるようですが使ったことはありません。
score_df[n:m]
- .valuesは非推奨となっています
Update: 値の更新
試しに user1 の国語の点数を 100 点に書き換えてみましょう。前節で紹介した方法で書いてみると
condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
print(score_df[condition]["score"].to_numpy()[0]) # 57
score_df[condition]["score"] = 100
print(score_df[condition]["score"].to_numpy()[0]) # 57(変わっていない!)
値が更新されず、以下のような Warning が表示されます。
SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame. Try using .loc[row_indexer,col_indexer] = value instead
前節で紹介ようにdf[...][...]
という形式はchained indexing
と呼ばれており([]
が 2 回続いているから)、加えてこれに代入することをchained assignment
と呼ばれています。これらについての詳しい解説は本記事では行わず、私が行なっている対応策と参考にさせて頂いた記事を紹介したいと思います。
対応策としては以下の通りです。
- 一時的に表示したいだけの時は
chained indexing
でも気にせず書く - 代入したい時は
.loc
を使って代入する -
df[]
のように行を条件抽出(選択)または列を抽出(射影)したものを以降の処理でも使う場合は.copy
を使ってコピーを作成する
1 は前節で紹介した通りですが、2 は以下のように書けばchained indexing
にはならず、更新できます。
condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
print(score_df[condition]["score"].to_numpy()[0]) # 57
score_df.loc[condition, "score"] = 100
print(score_df[condition]["score"].to_numpy()[0]) # 100
3 については[]
を 1 回つけた時点で DataFrame のコピーを作成することでchained indexing
を回避する方法です。新しく作成した DataFrame では 1 回も[]
をしていないため問題ないということです。
SettingWithCopyWarning
について参考にさせて頂いた記事は以下の通りです。
また上記と関係なくDataFrame
を引数に受け取る関数を定義する時も原則.copy
を使うようにしています。これは DataFrame が dict などと同様にミュータブルなオブジェクトだからです。以下の簡単な例で紹介してみます。
data = """
col1,col2
10,20
11,21
12,22
"""
example_df = pd.read_csv(StringIO(data))
def add_col3(df):
df["col3"] = [30, 31, 32]
add_col3(example_df)
example_df.head() # col3が追加されたDataFrame
add_col3
には example_df のコピーではなく実体が渡されてしまうのが原因です。大きなデータを扱っていて、コピーを作成するのはメモリ的に厳しいという理由があれば良いのですが、そうでない場合はこのような書き方は望ましくないと考えているので以下のように書いています。
def add_col3(df):
new_df = df.copy()
new_df["col3"] = [30, 31, 32]
return new_df
example_df_col3_added = add_col3(example_df)
example_df.head() # 元々のDataFrame
Delete: 列, 行の削除
列の削除に関しては素直に.drop
を使います。ただし、元の DataFrame を直接編集するinplace=True
は原則使わないようにしています。デフォルトでinplace=False
なので特に気にしなくても良いでしょう。
# 列の削除(scoreを消して何がしたいんだ...)
score_df_score_dropped = score_df.drop(["score"], axis=1)
行の削除は基本的に書きません。「行を削除する」ではなく「削除しない方を抽出する」という考えです。また、抽出した後は前節で紹介した通りコピーを生成しています。
# 行の削除(赤点じゃない生徒を除外)
score_df_akaten = score_df[score_df["score"] < 60].copy()
groupby を自在に行う
CRUD が終わったところでデータ集計にもよく用いる.groupby
です。私はpandas
を使い始める新人にはまず SQL の勉強を進めています。その理由として私自身が SQL をしっかり学ぶ前に pandas をコネコネしていたので.groupby
や.join
でよく分からんなーとなることが多かったからです。
基本的な例は以下のようなものだと思います。
# 教科(集約キー)ごとの行数を集計: 集約キーをindexに指定する
score_df.groupby("subject").size() # pd.Series
# 教科(集約キー)ごとの行数を集計: 集約キーをindexに指定しない
score_df.groupby("subject", as_index=False).size() # pd.DataFrame
# 教科(集約キー)ごとの点数の平均値を集計
score_df.groupby("subject", as_index=False)["score"].sum()
# ユーザID(集約キー)ごとに最初の1行を取得
score_df.groupby("user_id", as_index=False).head(1)
集計結果に別名をつけたい
# ※pandas1.4.3では実行できない書き方
agg_dict=dict(
score_max="max",
score_mean="mean",
score_std="std",
)
score_df.groupby("subject")["score"].agg(agg_dict)
のように書けていたのですが、pandas0.20.1で Deprecations になったようで、pandas1.4.3 では以下のようなエラーが発生しました。
SpecificationError: nested renamer is not supported
わかりやすく.rename
を使って書けば問題ないです。
(
score_df.groupby("subject")["score"]
.agg(["max", "mean", "std"])
.rename(
columns=dict(
max="score_max",
mean="score_mean",
std="score_std",
)
)
)
ちなみに、pandas でメソッドチェーンが長くなった時は()
で囲むと改行できて見やすいです
※2022.05.02 追記
nkay さんにコメントして頂いた通り、以下のように書けばSpecificationError
を回避できます。
nkay さんコメントありがとうございます!
agg_dict=dict(
score_max="max",
score_mean="mean",
score_std="std",
)
score_df.groupby("subject")["score"].agg(**agg_dict)
集計結果を各行につけたい
成績データに偏差値列をつけたいとしましょう。
偏差値の計算式は
で計算できるので平均点と標準偏差を求める必要がありますが、行単位の操作で計算が完結するように各教科の平均点と標準偏差の列を追加したいです。こんな時には.transform
を使うと便利です。普通に groupby をすると行数が「集約キーの組み合わせ数」に集約されますが、.transform
では行数が変わらないので、列追加に向いています。
for agg_func in ["mean", "std"]:
score_df[f"score_{agg_func}"] = score_df.groupby("subject")["score"].transform(agg_func)
ちなみに、.transform
にもリストで集約関数を渡して
subject_agg_df = (
score_df.groupby("subject")["score"]
.transform(["mean", "std"])
.rename(
columns=dict(
mean="subject_mean",
std="subject_mean",
)
)
)
score_df = pd.concat([score_df, subject_agg_df], axis=1)
と書けると思っていたのですが、エラーが吐かれました。
TypeError: unhashable type: 'list'
公式ドキュメントには
funcfunction, str, list-like or dict-like
list-like of functions and/or function names, e.g. [np.exp, 'sqrt']
とあるので書けそうなのですが、Series
では書けるけどGroupBy
オブジェクトでは出来ないということでしょうか...
さて次章ではここから偏差値を求めていきましょう。
for 文は原則書かないように意識する
Dataframe
でループを回す時に、.iterrows
などがありますが、私は原則 for 文は書かないようにしています。for 分を回さない方法としてはまず.apply
があるでしょう。
# 偏差値は英語でdeviation valueというらしい
# 逆に分かりにくいと思ったのでローマ字で
def get_hensachi(score, mean, std):
return 50 + (score - mean) / std * 10
score_df["hensachi"] = (
score_df[["score", "score_mean", "score_std"]]
.apply(lambda row: get_hensachi(row["score"], row["score_mean"], row["score_std"]), axis=1)
)
DataFrame は二次元, Series は 1 次元であることを意識する
.apply
を使う上でよくある間違いに触れておきます。複数列を選択して apply を行う時に
score_df[["score", "score_mean", "score_std"]]
ではなく
score_df["score", "score_mean", "score_std"]
と書いてしまったり、1 列について.apply
をする時に
score_df["score"].apply(.., axis=1)
とaxis
を指定してしまったりします。
これはDataFrame
とSeries
の次元を意識しておくと間違えにくいかと思います。score_df
から 2 種類の方法でuser_id
を取り出して見ましょう。
print("score_df['user_id']:", type(score_df["user_id"]))
print("score_df[['user_id']]:", type(score_df[["user_id"]]))
実行結果
score_df['user_id']: <class 'pandas.core.series.Series'>
score_df[['user_id']]: <class 'pandas.core.frame.DataFrame'>
-
文字列で切り出すと
Series
-
リストで切り出すと
DataFrame
になります。小難しくいうと「切り出した次元 = 結果の第 2 次元」になると思います。それぞれの次元数を出してみると分かりやすいです。
print("score_df['user_id']:", score_df["user_id"].shape)
print("score_df[['user_id']]:", score_df[["user_id"]].shape)
実行結果
score_df['user_id']: (25,)
score_df[['user_id']]: (25, 1)
axis=1
が列方向というのを理解している方は多いと思うのですが、意外と次元の意識をしておらずSeries
なのに axis を指定してしまうケースをよく見るので、意識してみてはどうでしょうか。
apply が遅いことを認識しておく
ところで「.apply
は遅いんじゃないの?」と思った方もいるのではないでしょうか。私も知識だけあったので、100 万行 x 2 列のデータを生成し、apply で和をとって実際に検証してみます。(本来ならnumpy
で完結させた方が圧倒的に早い処理です)
import time
import numpy as np
# numpyでデータ生成(100万行 x 2列)
start_time = time.time()
benchmark_df = pd.DataFrame(np.random.rand(10**6, 2), columns=["a", "b"])
print(time.time() - start_time) # 0.02182292938232422
# applyで100万行に対して和をとる
start_time = time.time()
benchmark_df["sum"] = benchmark_df.apply(lambda row: row["a"] + row["b"], axis=1)
print(time.time() - start_time) # 11.313948154449463
確かに遅いですね。この解決策としてnp.vectrize
などを使ってベクトル化する方法(Do You Use Apply in Pandas? There is a 600x Faster Way)なども挙げられますが、私は並列処理を可能にしてくれるpandarallelを使っています。
from pandarallel import pandarallel
pandarallel.initialize()
# INFO: Pandarallel will run on 4 workers.
# INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
start_time = time.time()
benchmark_df["sum"] = benchmark_df.parallel_apply(lambda row: row["a"] + row["b"], axis=1)
print(time.time() - start_time) # 3.1791868209838867
4workers で動かしているので処理時間は約 1/4 になっていますね。
pandarallel の利点は以下の 2 点だと思っています。
-
import
して.apply
を.parallel_apply
に変えるだけなので導入・ルール化が簡単 - 並列処理をしてくれるので処理時間が推定しやすい
一人でコードを書くのであれば問題ないと思うのですが、複数名でコードを管理していく場合はルール化や理解しやすいコードを書くことに重きを置きたいと考えているためpandarallel
は重宝しています。後述しますが、DataFrame
で速度面を気にする時は設計の改善で回避できるケースもあると思っています。
parallel_apply
のように導入が簡単なパッケージは積極的に使っていこうと思っています。pandas でオレオレ関数をたくさん定義していくのも楽しいのですが、車輪の再開発になったり、チーム開発の時に使えなかったり、テストをちゃんと書いておく必要があったりします。
parallel_apply
の他にはpandas-profilingがあります。pandas-profiling
はデータの概要を把握するのに非常に便利です(下の gif は公式ドキュメントから引用しました)。
本節で言いたかったことは、あくまで apply が遅いことは認識しておくことであり「.apply
を書くな!」ではありません。自分(or 組織)のルールとして.apply
を使わないというのは 1 つの方針であるとは思いますが、for 文を書く方法しか知らなかった方は.apply
という選択肢があるし、それほどデータ量が多くなければ十分使えるよという意図で書いています。
ファイル IO の時は型を意識する
pandas
は様々なファイル IO(Input/Output)に対応していますが、一番よく使うのはやはり CSV ではないでしょうか?しかし CSV はParquetなどとは異なり、項目ごとの型定義がされているわけではありません。そのため、注意しないと以下のようなバグが生じます。
# 成績のダミーデータを生成する
def create_dummy_scores(n):
scores = [
dict(
user_id=f"{i+1:>03}", # 001, 002, ... に変更
subject=subject,
score=random.randint(0, 100)
)
for i in range(n)
for subject in ["国語", "数学", "英語", "理科", "社会"]
]
return scores
pd.DataFrame(create_dummy_scores(3))
user_id | subject | score | |
---|---|---|---|
0 | 001 | 国語 | 89 |
# 書き込み
f_write = StringIO() # 書き込むファイルの代わり
pd.DataFrame(create_dummy_scores(3)).to_csv(f_write, index=False)
# 読み込み
f_read = StringIO(f_write.getvalue()) # 読み込むファイルの代わり
pd.read_csv(f_read)
user_id | subject | score | |
---|---|---|---|
0 | 1 | 国語 | 89 |
はい、user_id
が「001」から「1」になりました。これはpd.read_csv
の時に 001 を int64
と判断しているのが、原因です。これを回避するには
f_read = StringIO(f_write.getvalue()) # 読み込むファイルの代わり
dtype = dict(
user_id="object",
subject="object",
score="int64"
)
pd.read_csv(f_read, dtype=dtype)
のようにdtype
を指定してあげれば正しく読み込むことが出来ます。
pandera を活用する
panderaは列の型や値の検証に役立つバリデーションツールです。pandera についての詳しい内容は公式ドキュメントを参照して頂ければと思いますが、成績テーブルのスキーマを定義してみましょう。
import pandera as pa
from pandera.typing import Series
class ScoreSchema(pa.SchemaModel):
user_id: Series[str] = pa.Field(nullable=False)
subject: Series[str] = pa.Field(isin=["国語", "数学", "英語", "理科", "社会"], nullable=False)
score: Series[int] = pa.Field(ge=0, le=100, nullable=False)
非常に直感的ですね。
ge
(greater or equal)は「以上」、le
(less or equal)は「以下」になります。試しに 101 点を代入してみてましょう。
df.loc[0, "score"] = 101
ScoreSchema.validate(df)
SchemaModel
にはvalidate
メソッドがあり、これを実行することでスキーマ通りのDataFrame
になっているかの検証を行ってくれます。結果は以下のようなエラーを吐きます。
SchemaError: <Schema Column(name=score, type=DataType(int64))> failed element-wise validator 1: <Check less_than_or_equal_to: less_than_or_equal_to(100)>
df.loc[0, "subject"] = "プログラミング"
のようにしても同じように検証で弾かれます。
また、スキーマから pandas のdtype
を生成することも出来るため、read_csv
する時のdtype
指定にも役立ちます
from typing import Dict, Type
def get_dtype_from_schema(schema: Type[pa.SchemaModel]) -> Dict[str, str]:
return {col: str(dtype) for col, dtype in schema.to_schema().dtypes.items()}
dtype = get_dtype_from_schema(ScoreSchema)
df = pd.read_csv(f_read, dtype=dtype)
pandera は DataFrame をカッチリ運用したいケースで威力を発揮するので、検証用にさらっと jupyter でコードを書くだけであれば不要、もしくは開発スピードを下げてしまうかもしれません。また、Python の静的解析ツールであるmypyとの親和性が良いこともあり、私は jupyter で分析する時にはほとんど用いないですが、PoC でちょっとした集計の自動化が必要な場合は VSCode で開発しつつ pandera を活用するケースもあります。
DB から DataFrame で読み込む
DataFrame
の非 null 制約や主キー制約が気になってくるのであれば、csv ではなく RDB から取得するというのも一つの手段です。あるのであれば検証用 DB を使えばよいですし、なれけば検証用のデータを csv ではなく、sqlite の db 形式で用意するといった方法もあるかと思います。sqlite とread_sqlを用いた例は以下の通りです。
sqlite の DB(ファイル)作成
import sqlite3
DBNAME = "score.db"
conn = sqlite3.connect(DBNAME)
cur = conn.cursor()
# scoreテーブルの作成
cur.execute("""
CREATE TABLE scores(
user_id TEXT,
subject TEXT,
score INTEGER,
PRIMARY KEY(user_id, subject)
)
""")
# データ投入
for score in create_dummy_scores(5):
insert_sql = """
INSERT INTO
scores(user_id, subject, score)
VALUES
(?, ?, ?)
"""
cur.execute(
insert_sql,
(score["user_id"], score["subject"], score["score"])
)
# commit & close
conn.commit()
cur.close()
conn.close()
sqlite から SQL で取得したデータDataFrame
に変換
DBNAME = "score.db"
conn = sqlite3.connect(DBNAME)
df = pd.read_sql("select * from scores", con=conn)
conn.close()
ちなみに、SQLite の型定義でSTRING
があり、こちらを使っていたのですが、「001」ではなく「1」という形式で保存されてしまったのでTEXT
にしています。これは公式ドキュメントに記載がありました(知らなかった)。
And the declared type of "STRING" has an affinity of NUMERIC, not TEXT.
pandas は あくまで小〜中規模データの分析目的だと考える
この章に関しては様々な意見があると思いますが、私の執筆時点での考えを紹介しておきます。
まず第一に pandas は非常に素晴らしいツールだと考えています。機械学習をやる方はもちろん、ちょっとした集計、可視化を行いたいのであればピッタリです。ただ一方で、分析や検証のために使っていたコードをそのまま大規模データに使おうと考えたり、プロダクトのコードに移行しようと考えるのは待ってほしいと思います。
大規模データの対応
データ量がある程度大きいのであればDataFrame
ではなく DB や BigQuery(BQ), Athena などのクラウドサービス, Tableau, Redash などの BI ツールを使うのが良いと思っています。SQL だけでは不十分であれば AWS Glue や Spark(PySpark)という選択肢もあります。Athena のクエリ実行料金は現時点で 5$/1TB なので、リーズナブルに利用できます。
私自身、大規模データを処理しなければならないケースには何度か遭遇したことはありますが、DataFrame
でやらなければならないケースはそれほどありません。(kaggle
などの機械学習コンペに現時点であまり参加したことがないので、コンペではDataFrame
で速度を気にしながら処理する必要があるのかもしれません。)大規模データの集計や分析は BigQuery や Athena で行い、サンプリングしたデータセットをDataFrame
で分析するという方法もあります。
高度な機械学習モデルなどに詳しいデータサイエンティストが、データ加工やデータ基盤に精通しているデータエンジニアの領域もカバーしようとしてプロジェクトが失敗するというケースもよくあるのではないでしょうか。確かにDataFrame
は便利ですが、使う場面を間違えないようにすることも意識して頂きたいと思います。私自身は小規模〜中規模データで利用するようにしています。
品質を担保する
本記事で何度か紹介した通り、DataFrame
は RDB のように主キー制約や非 NULL 制約はないので、様々なことを気にしながらデータ処理を行わないとバグになる可能性があります。集計結果を出して見たけど、実は重複計算していた、実は抜け漏れがあったという経験がある方も多いのではないのでしょうか?
もちろんテストをしっかり書いて品質を担保するのは必須だとして、プロダクトレベルではDataFrame
ではなく BQ などのクラウドサービスを利用したり、dbtなどを利用するのも良いかと思います。この辺りは私自身勉強中なので、キャッチアップしていきたいと思います。
まとめ
当初の想定よりも長文になってしまいました。機械学習や kaggle をやりたくて Python を始めた方は pandas を必須で使うかと思います(私自身は機械学習よりもデータ加工やデータ基盤に興味があります)。この記事を読んで pandas を使う上で少しでも役に立てば幸いです。
Discussion
以下のように変更すれば可能です。
あと、行の追加と列の追加が逆になってますね。
コメントありがとうございます!
どちらのご指摘も仰る通りでしたので、本文の修正&追記をさせて頂きました。