❄️

PandasとSnowpark for pythonの対応表

2023/05/21に公開

この記事の対象ユーザー

  • Snowflakeユーザー
  • Pandasはわかるけど,Snowpark for pythonは不慣れな方
  • Snowpark for pythonのチートシートを探している方

背景

「Snowpark for python,書きにくい・・・」
同じ感想を持った方は同士だ.是非,この記事を最後まで読んだうえで「私が知ってるTips」をコメントで残してほしい.
なぜ,Snowpark for python(以下,Snowpark)を書きにくいと感じるのか?理由はいくつかあるが,一番の理由は「Pandas likeなコードでsnowflakeを操作できる」と勘違いしている人が一定いるためなように感じる.実際は「PySpark likeなコード」というのが正しい.

本記事では,普段自分がよく使うPandasの操作をSnowparkで書くにはどうすればよいかをまとめたものである.順次更新予定だが,是非「こう書くと楽だよ」という皆の英知を集約させたいと思っているので,有識者からのコメントお待ちしております.

なお,@takada_tfさんが非常に有益な記事[1]を作成しているため,まずはそちらを参照することをお勧めする.

前提条件

各パッケージのversionは以下の通り.

  • python3.8 ※3.8以上必須
  • pandas==1.3.4
  • snowflake-snowpark-python==1.4.0

以降の操作は以下の通りsnowflakeとのsessionが確立されている状態のものとする.

!pip install -qq snowflake-snowpark-python
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, lit
import snowflake.snowpark.functions as F

CONNECTION_PARAMETERS = {
   "account": <ACCOUNT LOCATOR>,
   "user": <USERNAME>,
   "password": <PASSWORD>,
   "database": <DB>,
   "schema": <SCHEMA>,
   "warehouse": <WH>
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

余談:
CONNECTION_PARAMETERSにはschemadatabaseを指定しないことが私は多い.
色々なデータベースに接続して複数データを結合することが多いので,以下のような書き方を好んでする.

CONNECTION_PARAMETERS = {
   "account": <ACCOUNT LOCATOR>,
   "user": <USERNAME>,
   "password": <PASSWORD>,
   "warehouse": <WH>
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

# 分析時に接続先を指定する.
session.use_database('DB name')
session.use_schema('Schema name')

Part1.基本的なデータフレーム操作

データフレーム作成

# pandas
df = pd.DataFrame({
    'NAME':['Bob', 'Alice', 'Amanda'],
    'AGE':[18, 20, 45],
    'WEIGHT':[75.1, 48.9, 52.0],
    'DATE':['2020/1/1', '2022/10/14', '2002/7/31']
})

df2 = pd.DataFrame({
    'NAME':['Bob', 'Alice', 'Amanda'],
    'GENDER':['M', 'F', 'F']
})
# snowpark
# ↓これはエラーになる
df = pd.DataFrame({
    'NAME':['Bob', 'Alice', 'Amanda'],
    'AGE':[18, 20, 45],
    'WEIGHT':[75.1, 48.9, 52.0],
    'DATE':['2020/1/1', '2022/10/14', '2002/7/31']
})
snow_df = session.create_dataframe(df)

# 列名にはダブルクォーテーションを使う必要がある↓
df = pd.DataFrame({
    "NAME":['Bob', 'Alice', 'Amanda'],
    "AGE":[18, 20, 45],
    "WEIGHT":[75.1, 48.9, 52.0],
    "DATE":['2020/1/1', '2022/10/14', '2002/7/31']
})
snow_df = session.create_dataframe(df)

# こういう書き方もある↓
snow_df2 = session.create_dataframe(
    [['Bob', 'M'], ['Alice', 'F'], ['Amanda', 'F']],
    schema = ["NAME", "GENDER"]
)

列選択

# pandas
df[['NAME', 'WEIGHT']]

# コッチも可.正規表現で列名選択するときによく使う.
df.filter(['NAME', 'WEIGHT'])
# snowpark
snow_df.select(col("NAME"), col("WEIGHT"))

# コッチも可
snow_df.select(["NAME", "WEIGHT"])

行選択(抽出)

# pandas
df.query('NAME == "Bob"')
df[df.NAME == "Bob"]

# 複数条件
df.query('AGE <= 30 & WEIGHT <= 60') # andでも可.
df[(df.AGE <= 30) & (df.WEIGHT <= 60)]
# snowpark
# whereはfilterのalias.どちらを使っても良し.
# snow_df.where('NAME = "Bob"')はダメ.クォーテーションに注意.
# pandas.query likeに書く場合は,演算子「==」ではないことに注意↓
snow_df.where("NAME = 'Bob'")

# コッチも可.
snow_df.where(snow_df.NAME == "Bob")

# 複数条件1.「&」や「|」を使うように.
snow_df.filter((snow_df.AGE <= 30) & (snow_df.WEIGHT <= 60))

# 複数条件2.「and」や「or」を使うように.
snow_df.filter("AGE <= 30 and WEIGHT <= 60")

concat(縦結合)

# pandas
pd.concat([df, df2], axis=0)
# snowpark
# 一致しているデータフレーム同士ならこれでOK.列数が一致していないとエラーになる.
snow_df.union_all(snow_df2)

# 列数が一致していない場合は,以下のように列数を一致させてから結合する.
# 新しいカラムを作成する場合はwith_columnまたはwith_columnsを用いる.
snow_df_tmp = snow_df.with_column("GENDER", lit(None))
snow_df2_tmp = snow_df2.with_columns(["AGE", "WEIGHT", "DATE"], [lit(None), lit(None), lit(None)])
snow_df_tmp.union_all_by_name(snow_df2_tmp)

# 縦結合時に重複行を削除したければ"union_by_name"を使用する.
snow_df_tmp.union_by_name(snow_df2_tmp)

concat(横結合)

# pandas
pd.concat([df1, df2], axis=1)
# snowpark
# 完全に同じ挙動のものはなし.with_columnをうまく使うくらいしか思いつかない.
# snow_df1.join(snow_df2)だとCROSS JOINの結果が返ってるので注意.

merge(inner, left, full outer, cross)

# pandas
pd.merge(df, df2, how='inner', on='NAME')
# snowpark
snow_df.join(snow_df2, how='inner', on="NAME")

groupby(グループ化)

結果を確認しやすいように,下記テーブル(group_df, group_snow_df)を使用する.

NAME AGE WEIGHT DATE GENDER
Bob 18 75.1 2020/1/1 M
Alice 20 48.9 2022/10/14 F
Amanda 45 52.0 2002/7/31 F
# pandas
group_df.groupby('GENDER').size()
group_df.groupby('GENDER')['AGE'].max()
group_df.groupby('GENDER')['AGE'].min()
group_df.groupby('GENDER')['AGE'].mean()
group_df.groupby('GENDER')['AGE'].std()
group_df.groupby('GENDER')['AGE'].median()
# snowpark
group_snow_df.group_by("GENDER").count()
group_snow_df.group_by("GENDER").agg(F.max("AGE"))
group_snow_df.group_by("GENDER").agg(F.min("AGE"))
group_snow_df.group_by("GENDER").agg(F.mean("AGE"))
group_snow_df.group_by("GENDER").agg(F.stddev("AGE"))
group_snow_df.group_by("GENDER").agg(F.median("AGE"))

Part2.個人的によくやる操作

以降の操作は下記のテーブル(sales_df, sales_snow_df)を使用する.

NAME AGE SALES DATE
Bob 18 NULL 2020/1/1
Bob 18 1000 2020/1/20
Bob 18 2000 2020/1/31
Bob 18 0 2020/2/2
Bob 18 NULL 2020/3/14
Alice 20 500 2020/1/1
Alice 20 NULL 2020/1/15
Alice 20 1000 2020/10/27
# DataFrame作成
sales_snow_df = session.create_dataframe([
    ('Bob', 18, None, '2020/1/1'),
    ('Bob', 18, 1000, '2020/1/20'),
    ('Bob', 18, 2000, '2020/1/31'),
    ('Bob', 18, 0, '2020/2/2'),
    ('Bob', 18, None, '2020/3/14'),
    ('Alice', 20, 500, '2020/1/1'),
    ('Alice', 20, None, '2020/1/15'),
    ('Alice', 20, 1000, '2020/10/27')
], ["NAME", "AGE", "SALES", "DATE"])

sales_df = sales_snow_df.to_pandas()

slice, split(文字抽出)

# pandas
# 文字列をスライス
sales_df['YEAR'] = sales_df['DATE'].str[0:4]

# splitを使用
sales_df['MONTH'] = sales_df['DATE'].str.split('/').str[1]
# snowpark
# 文字列をスライス
sales_snow_df.with_column("YEAR", F.substr("DATE", 1, 4))

# splitを使用.F.split("DATE", lit("/")).getItem(1)でも可.
sales_snow_df.with_column("MONTH", F.split("DATE", lit("/"))[1])

# ただし↑だとMONTHのデータ型がVARIANT型なので,↓のようにINT型に変換するとよい.
from snowflake.snowpark.types import IntegerType
sales_snow_df.with_column("MONTH", F.cast(F.split("DATE", lit("/"))[1], IntegerType()))

cumcount, rank(グループ内連番)

# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['RANK_IN_GROUP'] = sales_df.groupby('NAME').cumcount() + 1

# 単純にrank()を使っても良い
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['RANK_IN_GROUP'] = sales_df.groupby('NAME')['NAME'].transform('rank', method='first')
# snowpark
from snowflake.snowpark.window import Window
order = Window.partitionBy(col('NAME')).orderBy(col('NAME'), col('DATE'))
sales_snow_df.select('NAME', 'AGE', 'SALES', 'DATE', F.rank().over(order).alias('RANK'))

cumsum(グループ内累積和)

# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['CUMSUM_SALES'] = sales_df.groupby('NAME')['SALES'].transform(pd.Series.cumsum)
# snowpark
from snowflake.snowpark.window import Window
order = Window.partitionBy(col('NAME')).orderBy(col('NAME'), col('DATE'))
sales_snow_df.select('NAME', 'AGE', 'SALES', 'DATE', F.sum('SALES').over(order).alias('CUMSUM_SALES'))

diff(グループ内差分)

# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['LAG'] = sales_df.groupby('NAME')['SALES'].diff(periods=1)
# snowpark
# pandasとNullを含んだ場合の差分計算の挙動が異なる点,差分計算は絶対値が返される点に注意.
from snowflake.snowpark.window import Window
order = Window.partitionBy(col('NAME')).orderBy(col('NAME'), col('DATE'))
sales_snow_df.select('NAME', 'AGE', 'SALES', 'DATE', F.lag('SALES').over(order).alias('LAG'))

ffill, bfill(グループ内で,前後の値で欠損値補完)

# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['SALES_WITH_FFILL'] = sales_df.groupby('NAME')['SALES'].apply(lambda x: x.fillna(method='ffill'))
# snowpark
# 一発でfillna('ffill')を再現できなかったので,二段階で処理した.
# bfillしたければ,order作成時のソート順を変更すればよい(col('DATE').asc()).
# 1.一旦,SALESをグループ内で一行ずらしたカラム"SALES2"を作成しておく.
from snowflake.snowpark.window import Window
order = Window.partition_by(col('NAME')).order_by(col('NAME'), col('DATE').desc())
sales_snow_df = sales_snow_df.select("NAME", "DATE", "SALES", F.lead("SALES", ignore_nulls=True).over(order).alias("SALES2")).sort(col("NAME"), col("DATE").asc())

# 2.カラム"SALES"がNULLの場合に,カラム"SALES2"の値を代入.
sales_snow_df.with_column("SALES_WITH_FFILL",
                F.when(F.col("SALES").isNull(), F.col("SALES2"))
                .otherwise(F.col("SALES"))
		)

Part3.Tips(随時更新)

実行されるSQLの確認

# snowpark
sales_snow_df.explain()

所感

  • PySparkで実現できる操作は,たいていSnowparkでもできそう(個人的には,SnowparkのDocよりPySparkのDocの方が参考になった.)
  • Bing AIに「このPandasの処理をSnowpark for pythonで書き直して!」と指示しても,返ってくるコードはPySparkなので注意.出力されたコードを参考に,Snowpark referenceを漁るのがベストプラクティスな気がする.
  • selectwhereでデータ量を削減し,メモリに載るサイズになったらto_pandas()でpandas dataframe化しちゃうのが個人的にはお勧め.
  • Snowparkではvisualizationできないのが致命的.
  • UDFを使えばもっと色々できそう.UDFは別の機会に記事化します.

参考

脚注
  1. https://qiita.com/takada_tf/items/62f0337d80508631db57 ↩︎

Discussion