🐍

pandas初級者に送りたいTips

2022/04/30に公開
2

こんにちわ alivelimb です。
pandasは言わずと知れた Python のデータ分析ライブラリですが、私が分析業務をする時も利用しています。本記事では自分自身が過去に詰まったところや、後輩のコードレビューをしていてバグの原因になることが多いと感じた部分を Tips として紹介します。

はじめに

本記事では成績ダミーデータを生成して、データ操作を行います。

import random
import pandas as pd

def create_dummy_scores(n):
    scores = [
        dict(
            user_id=f"user{i+1}",
            subject=subject,
            score=random.randint(0, 100)
        )
        for i in range(n)
        for subject in ["国語", "数学", "英語", "理科", "社会"]
    ]
    return scores

scores = create_dummy_scores(5)
score_df = pd.DataFrame(scores)

ダミーデータ例

user_id subject score
0 user1 国語 57
1 user1 数学 78
2 user1 英語 68
3 user1 理科 87
4 user1 社会 87
5 user2 国語 41
6 user2 数学 24
7 user2 英語 13
8 user2 理科 45
9 user2 社会 28

なお、検証した バージョンは以下の通りです

  • python: 3.8.7
  • pandas: 1.4.2
  • numpy: 1.22.3
  • pandarallel: 1.6.1
  • pandas-profiling: 3.1.0
  • pandera: 0.10.1

CRUD を自在に行う

pandas で Create(作成、追加), Read(抽出), U(更新), Delete(削除)の方法について紹介します。

Create: io.StringIO で read_csv

io.StringIOを用いると以下のように csv っぽい文字列からread_csvできます。

from io import StringIO

data = """
user_id,subject,score,pres_score
user1,国語,57,85
user2,数学,78,16
"""
pd.read_csv(StringIO(data))

ダミーデータ生成のようにdictからDataFrameを作る方が書きやすいと思いますが、ちょっとした挙動確認や以下のような関数の単体テストに利用できます。この関数に関しては今後の章でも言及します。

def safe_read_csv(path):
    df = pd.read_csv(path)
    # 型や値の検証

Create: 列・行の追加

列・行の追加方法はいくつかありますが、私は以下のように書いています。
列の追加

# 全て同じ値で追加する場合
score_df["new_col"] = "new_value"

# 個別に値を指定して追加する場合
new_column = [seq_no for seq_no in range(len(score_df))]
score_df["new_col"] = new_columns

# 複数列追加する場合 -> DataFrameにしてDataFrame同士の結合
new_columns = dict(
    new_col1=[seq_no for seq_no in range(len(score_df))],
    new_col2=[seq_no for seq_no in range(len(score_df))],
)
score_df = pd.concat([score_df, pd.DataFrame(new_columns)], axis=1)

行の追加

new_rows = create_dummy_scores(5)
score_df = pd.concat([score_df, pd.DataFrame(new_rows)])

ちなみにappendという関数もありますが、Deprecated になっているため使わない方が良いでしょう。

Read: 条件抽出

こちらもいくつか書き方はありますが、私は以下のように書いています。

# user1の行を全て抽出
score_df[score_df["user_id"] == "user1"]

# user1でかつ教科が国語の行を全て抽出
condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
score_df[condtion]

DataFrameで複数条件を指定する時は以下の 2 点に注意が必要です。

  1. python 標準のand, or, notではなく&, |, ~を使う必要がある
  2. 条件同士を()で囲む必要がある

では条件を満たす行を抽出(選択)した後に、列も抽出(射影)してみます。
例えば user1 の国語の点数を抽出してみましょう。

condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
score_df[condition]["score"]

これは公式ドキュメントchained indexingと呼ばれています。この書き方は値の確認をするだけなら問題ないのですが、抽出結果を更新しようとするとバグの原因となります。これについては次節で紹介します。

ちなみに

  1. n 行目から m 行目を取り出したいという時は、以下のような書き方ができるようですが使ったことはありません。
score_df[n:m]
  1. .valuesは非推奨となっています

Update: 値の更新

試しに user1 の国語の点数を 100 点に書き換えてみましょう。前節で紹介した方法で書いてみると

condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
print(score_df[condition]["score"].to_numpy()[0]) # 57
score_df[condition]["score"] = 100
print(score_df[condition]["score"].to_numpy()[0]) # 57(変わっていない!)

値が更新されず、以下のような Warning が表示されます。

SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame. Try using .loc[row_indexer,col_indexer] = value instead

前節で紹介ようにdf[...][...]という形式はchained indexingと呼ばれており([]が 2 回続いているから)、加えてこれに代入することをchained assignmentと呼ばれています。これらについての詳しい解説は本記事では行わず、私が行なっている対応策と参考にさせて頂いた記事を紹介したいと思います。

対応策としては以下の通りです。

  1. 一時的に表示したいだけの時はchained indexingでも気にせず書く
  2. 代入したい時は.locを使って代入する
  3. df[]のように行を条件抽出(選択)または列を抽出(射影)したものを以降の処理でも使う場合は.copyを使ってコピーを作成する

1 は前節で紹介した通りですが、2 は以下のように書けばchained indexingにはならず、更新できます。

condition = (score_df["user_id"] == "user1") & (score_df["subject"] == "国語")
print(score_df[condition]["score"].to_numpy()[0]) # 57
score_df.loc[condition, "score"] = 100
print(score_df[condition]["score"].to_numpy()[0]) # 100

3 については[]を 1 回つけた時点で DataFrame のコピーを作成することでchained indexingを回避する方法です。新しく作成した DataFrame では 1 回も[]をしていないため問題ないということです。

SettingWithCopyWarningについて参考にさせて頂いた記事は以下の通りです。

また上記と関係なくDataFrameを引数に受け取る関数を定義する時も原則.copyを使うようにしています。これは DataFrame が dict などと同様にミュータブルなオブジェクトだからです。以下の簡単な例で紹介してみます。

data = """
col1,col2
10,20
11,21
12,22
"""
example_df = pd.read_csv(StringIO(data))

def add_col3(df):
    df["col3"] = [30, 31, 32]

add_col3(example_df)
example_df.head() # col3が追加されたDataFrame

add_col3には example_df のコピーではなく実体が渡されてしまうのが原因です。大きなデータを扱っていて、コピーを作成するのはメモリ的に厳しいという理由があれば良いのですが、そうでない場合はこのような書き方は望ましくないと考えているので以下のように書いています。

def add_col3(df):
    new_df = df.copy()
    new_df["col3"] = [30, 31, 32]
    return new_df

example_df_col3_added = add_col3(example_df)
example_df.head() # 元々のDataFrame

Delete: 列, 行の削除

列の削除に関しては素直に.dropを使います。ただし、元の DataFrame を直接編集するinplace=Trueは原則使わないようにしています。デフォルトでinplace=Falseなので特に気にしなくても良いでしょう。

# 列の削除(scoreを消して何がしたいんだ...)
score_df_score_dropped = score_df.drop(["score"], axis=1)

行の削除は基本的に書きません。「行を削除する」ではなく「削除しない方を抽出する」という考えです。また、抽出した後は前節で紹介した通りコピーを生成しています。

# 行の削除(赤点じゃない生徒を除外)
score_df_akaten = score_df[score_df["score"] < 60].copy()

groupby を自在に行う

CRUD が終わったところでデータ集計にもよく用いる.groupbyです。私はpandasを使い始める新人にはまず SQL の勉強を進めています。その理由として私自身が SQL をしっかり学ぶ前に pandas をコネコネしていたので.groupby.joinでよく分からんなーとなることが多かったからです。

基本的な例は以下のようなものだと思います。

# 教科(集約キー)ごとの行数を集計: 集約キーをindexに指定する
score_df.groupby("subject").size() # pd.Series

# 教科(集約キー)ごとの行数を集計: 集約キーをindexに指定しない
score_df.groupby("subject", as_index=False).size() # pd.DataFrame

# 教科(集約キー)ごとの点数の平均値を集計
score_df.groupby("subject", as_index=False)["score"].sum()

# ユーザID(集約キー)ごとに最初の1行を取得
score_df.groupby("user_id", as_index=False).head(1)

集計結果に別名をつけたい

# ※pandas1.4.3では実行できない書き方
agg_dict=dict(
    score_max="max",
    score_mean="mean",
    score_std="std",
)
score_df.groupby("subject")["score"].agg(agg_dict)

のように書けていたのですが、pandas0.20.1で Deprecations になったようで、pandas1.4.3 では以下のようなエラーが発生しました。

SpecificationError: nested renamer is not supported

わかりやすく.renameを使って書けば問題ないです。

(
    score_df.groupby("subject")["score"]
    .agg(["max", "mean", "std"])
    .rename(
        columns=dict(
            max="score_max",
            mean="score_mean",
            std="score_std",
        )
    )
)

ちなみに、pandas でメソッドチェーンが長くなった時は()で囲むと改行できて見やすいです

※2022.05.02 追記

nkay さんにコメントして頂いた通り、以下のように書けばSpecificationErrorを回避できます。
nkay さんコメントありがとうございます!

agg_dict=dict(
    score_max="max",
    score_mean="mean",
    score_std="std",
)
score_df.groupby("subject")["score"].agg(**agg_dict)

集計結果を各行につけたい

成績データに偏差値列をつけたいとしましょう。
偏差値の計算式は

偏差値 = 50 + \frac{得点 - 平均点}{標準偏差 \times 10}

で計算できるので平均点と標準偏差を求める必要がありますが、行単位の操作で計算が完結するように各教科の平均点と標準偏差の列を追加したいです。こんな時には.transformを使うと便利です。普通に groupby をすると行数が「集約キーの組み合わせ数」に集約されますが、.transformでは行数が変わらないので、列追加に向いています。

for agg_func in ["mean", "std"]:
    score_df[f"score_{agg_func}"] = score_df.groupby("subject")["score"].transform(agg_func)

ちなみに、.transformにもリストで集約関数を渡して

subject_agg_df = (
    score_df.groupby("subject")["score"]
    .transform(["mean", "std"])
    .rename(
        columns=dict(
            mean="subject_mean",
            std="subject_mean",
        )
    )
)
score_df = pd.concat([score_df, subject_agg_df], axis=1)

と書けると思っていたのですが、エラーが吐かれました。

TypeError: unhashable type: 'list'

公式ドキュメントには

funcfunction, str, list-like or dict-like

list-like of functions and/or function names, e.g. [np.exp, 'sqrt']

とあるので書けそうなのですが、Seriesでは書けるけどGroupByオブジェクトでは出来ないということでしょうか...

さて次章ではここから偏差値を求めていきましょう。

for 文は原則書かないように意識する

Dataframeでループを回す時に、.iterrowsなどがありますが、私は原則 for 文は書かないようにしています。for 分を回さない方法としてはまず.applyがあるでしょう。

# 偏差値は英語でdeviation valueというらしい
# 逆に分かりにくいと思ったのでローマ字で
def get_hensachi(score, mean, std):
    return 50 + (score - mean) / std * 10

score_df["hensachi"] = (
    score_df[["score", "score_mean", "score_std"]]
    .apply(lambda row: get_hensachi(row["score"], row["score_mean"], row["score_std"]), axis=1)
)

DataFrame は二次元, Series は 1 次元であることを意識する

.applyを使う上でよくある間違いに触れておきます。複数列を選択して apply を行う時に

score_df[["score", "score_mean", "score_std"]]

ではなく

score_df["score", "score_mean", "score_std"]

と書いてしまったり、1 列について.applyをする時に

score_df["score"].apply(.., axis=1)

axisを指定してしまったりします。

これはDataFrameSeriesの次元を意識しておくと間違えにくいかと思います。score_dfから 2 種類の方法でuser_idを取り出して見ましょう。

print("score_df['user_id']:", type(score_df["user_id"]))
print("score_df[['user_id']]:", type(score_df[["user_id"]]))

実行結果

score_df['user_id']: <class 'pandas.core.series.Series'>
score_df[['user_id']]: <class 'pandas.core.frame.DataFrame'>
  • 文字列で切り出すとSeries
  • リストで切り出すとDataFrame

になります。小難しくいうと「切り出した次元 = 結果の第 2 次元」になると思います。それぞれの次元数を出してみると分かりやすいです。

print("score_df['user_id']:", score_df["user_id"].shape)
print("score_df[['user_id']]:", score_df[["user_id"]].shape)

実行結果

score_df['user_id']: (25,)
score_df[['user_id']]: (25, 1)

axis=1が列方向というのを理解している方は多いと思うのですが、意外と次元の意識をしておらずSeriesなのに axis を指定してしまうケースをよく見るので、意識してみてはどうでしょうか。

apply が遅いことを認識しておく

ところで「.applyは遅いんじゃないの?」と思った方もいるのではないでしょうか。私も知識だけあったので、100 万行 x 2 列のデータを生成し、apply で和をとって実際に検証してみます。(本来ならnumpyで完結させた方が圧倒的に早い処理です)

import time
import numpy as np

# numpyでデータ生成(100万行 x 2列)
start_time = time.time()
benchmark_df = pd.DataFrame(np.random.rand(10**6, 2), columns=["a", "b"])
print(time.time() - start_time) # 0.02182292938232422

# applyで100万行に対して和をとる
start_time = time.time()
benchmark_df["sum"] = benchmark_df.apply(lambda row: row["a"] + row["b"], axis=1)
print(time.time() - start_time) # 11.313948154449463

確かに遅いですね。この解決策としてnp.vectrizeなどを使ってベクトル化する方法(Do You Use Apply in Pandas? There is a 600x Faster Way)なども挙げられますが、私は並列処理を可能にしてくれるpandarallelを使っています。

from pandarallel import pandarallel
pandarallel.initialize()

# INFO: Pandarallel will run on 4 workers.
# INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

start_time = time.time()
benchmark_df["sum"] = benchmark_df.parallel_apply(lambda row: row["a"] + row["b"], axis=1)
print(time.time() - start_time) # 3.1791868209838867

4workers で動かしているので処理時間は約 1/4 になっていますね。
pandarallel の利点は以下の 2 点だと思っています。

  • importして.apply.parallel_applyに変えるだけなので導入・ルール化が簡単
  • 並列処理をしてくれるので処理時間が推定しやすい

一人でコードを書くのであれば問題ないと思うのですが、複数名でコードを管理していく場合はルール化や理解しやすいコードを書くことに重きを置きたいと考えているためpandarallelは重宝しています。後述しますが、DataFrameで速度面を気にする時は設計の改善で回避できるケースもあると思っています。

parallel_applyのように導入が簡単なパッケージは積極的に使っていこうと思っています。pandas でオレオレ関数をたくさん定義していくのも楽しいのですが、車輪の再開発になったり、チーム開発の時に使えなかったり、テストをちゃんと書いておく必要があったりします。

parallel_applyの他にはpandas-profilingがあります。pandas-profilingはデータの概要を把握するのに非常に便利です(下の gif は公式ドキュメントから引用しました)。

pandas-profiling

本節で言いたかったことは、あくまで apply が遅いことは認識しておくことであり「.applyを書くな!」ではありません。自分(or 組織)のルールとして.applyを使わないというのは 1 つの方針であるとは思いますが、for 文を書く方法しか知らなかった方は.applyという選択肢があるし、それほどデータ量が多くなければ十分使えるよという意図で書いています。

ファイル IO の時は型を意識する

pandasは様々なファイル IO(Input/Output)に対応していますが、一番よく使うのはやはり CSV ではないでしょうか?しかし CSV はParquetなどとは異なり、項目ごとの型定義がされているわけではありません。そのため、注意しないと以下のようなバグが生じます。

# 成績のダミーデータを生成する
def create_dummy_scores(n):
    scores = [
        dict(
            user_id=f"{i+1:>03}", # 001, 002, ... に変更
            subject=subject,
            score=random.randint(0, 100)
        )
        for i in range(n)
        for subject in ["国語", "数学", "英語", "理科", "社会"]
    ]
    return scores

pd.DataFrame(create_dummy_scores(3))
user_id subject score
0 001 国語 89
# 書き込み
f_write = StringIO() # 書き込むファイルの代わり
pd.DataFrame(create_dummy_scores(3)).to_csv(f_write, index=False)

# 読み込み
f_read = StringIO(f_write.getvalue()) # 読み込むファイルの代わり
pd.read_csv(f_read)
user_id subject score
0 1 国語 89

はい、user_idが「001」から「1」になりました。これはpd.read_csvの時に 001 を int64 と判断しているのが、原因です。これを回避するには

f_read = StringIO(f_write.getvalue()) # 読み込むファイルの代わり
dtype = dict(
    user_id="object",
    subject="object",
    score="int64"
)
pd.read_csv(f_read, dtype=dtype)

のようにdtypeを指定してあげれば正しく読み込むことが出来ます。

pandera を活用する

panderaは列の型や値の検証に役立つバリデーションツールです。pandera についての詳しい内容は公式ドキュメントを参照して頂ければと思いますが、成績テーブルのスキーマを定義してみましょう。

import pandera as pa
from pandera.typing import Series

class ScoreSchema(pa.SchemaModel):
    user_id: Series[str] = pa.Field(nullable=False)
    subject: Series[str] = pa.Field(isin=["国語", "数学", "英語", "理科", "社会"], nullable=False)
    score: Series[int] = pa.Field(ge=0, le=100, nullable=False)

非常に直感的ですね。
ge(greater or equal)は「以上」、le(less or equal)は「以下」になります。試しに 101 点を代入してみてましょう。

df.loc[0, "score"] = 101
ScoreSchema.validate(df)

SchemaModelにはvalidateメソッドがあり、これを実行することでスキーマ通りのDataFrameになっているかの検証を行ってくれます。結果は以下のようなエラーを吐きます。

SchemaError: <Schema Column(name=score, type=DataType(int64))> failed element-wise validator 1: <Check less_than_or_equal_to: less_than_or_equal_to(100)>

df.loc[0, "subject"] = "プログラミング"のようにしても同じように検証で弾かれます。

また、スキーマから pandas のdtypeを生成することも出来るため、read_csvする時のdtype指定にも役立ちます

from typing import Dict, Type

def get_dtype_from_schema(schema: Type[pa.SchemaModel]) -> Dict[str, str]:
    return {col: str(dtype) for col, dtype in schema.to_schema().dtypes.items()}

dtype = get_dtype_from_schema(ScoreSchema)
df = pd.read_csv(f_read, dtype=dtype)

pandera は DataFrame をカッチリ運用したいケースで威力を発揮するので、検証用にさらっと jupyter でコードを書くだけであれば不要、もしくは開発スピードを下げてしまうかもしれません。また、Python の静的解析ツールであるmypyとの親和性が良いこともあり、私は jupyter で分析する時にはほとんど用いないですが、PoC でちょっとした集計の自動化が必要な場合は VSCode で開発しつつ pandera を活用するケースもあります。

DB から DataFrame で読み込む

DataFrameの非 null 制約や主キー制約が気になってくるのであれば、csv ではなく RDB から取得するというのも一つの手段です。あるのであれば検証用 DB を使えばよいですし、なれけば検証用のデータを csv ではなく、sqlite の db 形式で用意するといった方法もあるかと思います。sqlite とread_sqlを用いた例は以下の通りです。

sqlite の DB(ファイル)作成

import sqlite3
DBNAME = "score.db"
conn = sqlite3.connect(DBNAME)
cur = conn.cursor()

# scoreテーブルの作成
cur.execute("""
CREATE TABLE scores(
  user_id TEXT,
  subject TEXT,
  score INTEGER,
  PRIMARY KEY(user_id, subject)
)
""")

# データ投入
for score in create_dummy_scores(5):
    insert_sql = """
    INSERT INTO
      scores(user_id, subject, score)
    VALUES
      (?, ?, ?)
    """
    cur.execute(
        insert_sql,
        (score["user_id"], score["subject"], score["score"])
    )

# commit & close
conn.commit()
cur.close()
conn.close()

sqlite から SQL で取得したデータDataFrameに変換

DBNAME = "score.db"
conn = sqlite3.connect(DBNAME)
df = pd.read_sql("select * from scores", con=conn)
conn.close()

ちなみに、SQLite の型定義でSTRINGがあり、こちらを使っていたのですが、「001」ではなく「1」という形式で保存されてしまったのでTEXTにしています。これは公式ドキュメントに記載がありました(知らなかった)。

And the declared type of "STRING" has an affinity of NUMERIC, not TEXT.

pandas は あくまで小〜中規模データの分析目的だと考える

この章に関しては様々な意見があると思いますが、私の執筆時点での考えを紹介しておきます。

まず第一に pandas は非常に素晴らしいツールだと考えています。機械学習をやる方はもちろん、ちょっとした集計、可視化を行いたいのであればピッタリです。ただ一方で、分析や検証のために使っていたコードをそのまま大規模データに使おうと考えたり、プロダクトのコードに移行しようと考えるのは待ってほしいと思います。

大規模データの対応

データ量がある程度大きいのであればDataFrameではなく DB や BigQuery(BQ), Athena などのクラウドサービス, Tableau, Redash などの BI ツールを使うのが良いと思っています。SQL だけでは不十分であれば AWS Glue や Spark(PySpark)という選択肢もあります。Athena のクエリ実行料金は現時点で 5$/1TB なので、リーズナブルに利用できます。

私自身、大規模データを処理しなければならないケースには何度か遭遇したことはありますが、DataFrameでやらなければならないケースはそれほどありません。(kaggleなどの機械学習コンペに現時点であまり参加したことがないので、コンペではDataFrameで速度を気にしながら処理する必要があるのかもしれません。)大規模データの集計や分析は BigQuery や Athena で行い、サンプリングしたデータセットをDataFrameで分析するという方法もあります。

高度な機械学習モデルなどに詳しいデータサイエンティストが、データ加工やデータ基盤に精通しているデータエンジニアの領域もカバーしようとしてプロジェクトが失敗するというケースもよくあるのではないでしょうか。確かにDataFrameは便利ですが、使う場面を間違えないようにすることも意識して頂きたいと思います。私自身は小規模〜中規模データで利用するようにしています。

品質を担保する

本記事で何度か紹介した通り、DataFrameは RDB のように主キー制約や非 NULL 制約はないので、様々なことを気にしながらデータ処理を行わないとバグになる可能性があります。集計結果を出して見たけど、実は重複計算していた、実は抜け漏れがあったという経験がある方も多いのではないのでしょうか?

もちろんテストをしっかり書いて品質を担保するのは必須だとして、プロダクトレベルではDataFrameではなく BQ などのクラウドサービスを利用したり、dbtなどを利用するのも良いかと思います。この辺りは私自身勉強中なので、キャッチアップしていきたいと思います。

まとめ

当初の想定よりも長文になってしまいました。機械学習や kaggle をやりたくて Python を始めた方は pandas を必須で使うかと思います(私自身は機械学習よりもデータ加工やデータ基盤に興味があります)。この記事を読んで pandas を使う上で少しでも役に立てば幸いです。

Discussion

nkaynkay

集計結果に別名をつけたい

以下のように変更すれば可能です。

agg_dict = dict(
    score_max="max",
    score_mean="mean",
    score_std="std",
)

- score_df.groupby("subject")["score"].agg(agg_dict)
+ score_df.groupby("subject")["score"].agg(**agg_dict)

あと、行の追加と列の追加が逆になってますね。

alivelimbalivelimb

コメントありがとうございます!
どちらのご指摘も仰る通りでしたので、本文の修正&追記をさせて頂きました。