❄️
PandasとSnowpark for pythonの対応表
この記事の対象ユーザー
- Snowflakeユーザー
- Pandasはわかるけど,Snowpark for pythonは不慣れな方
- Snowpark for pythonのチートシートを探している方
背景
「Snowpark for python,書きにくい・・・」
同じ感想を持った方は同士だ.是非,この記事を最後まで読んだうえで「私が知ってるTips」をコメントで残してほしい.
なぜ,Snowpark for python(以下,Snowpark)を書きにくいと感じるのか?理由はいくつかあるが,一番の理由は「Pandas likeなコードでsnowflakeを操作できる」と勘違いしている人が一定いるためなように感じる.実際は「PySpark likeなコード」というのが正しい.
本記事では,普段自分がよく使うPandasの操作をSnowparkで書くにはどうすればよいかをまとめたものである.順次更新予定だが,是非「こう書くと楽だよ」という皆の英知を集約させたいと思っているので,有識者からのコメントお待ちしております.
なお,@takada_tfさんが非常に有益な記事[1]を作成しているため,まずはそちらを参照することをお勧めする.
前提条件
各パッケージのversionは以下の通り.
- python3.8 ※3.8以上必須
- pandas==1.3.4
- snowflake-snowpark-python==1.4.0
以降の操作は以下の通りsnowflakeとのsessionが確立されている状態のものとする.
!pip install -qq snowflake-snowpark-python
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, lit
import snowflake.snowpark.functions as F
CONNECTION_PARAMETERS = {
"account": <ACCOUNT LOCATOR>,
"user": <USERNAME>,
"password": <PASSWORD>,
"database": <DB>,
"schema": <SCHEMA>,
"warehouse": <WH>
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
余談:
CONNECTION_PARAMETERSにはschema
とdatabase
を指定しないことが私は多い.
色々なデータベースに接続して複数データを結合することが多いので,以下のような書き方を好んでする.
CONNECTION_PARAMETERS = {
"account": <ACCOUNT LOCATOR>,
"user": <USERNAME>,
"password": <PASSWORD>,
"warehouse": <WH>
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
# 分析時に接続先を指定する.
session.use_database('DB name')
session.use_schema('Schema name')
Part1.基本的なデータフレーム操作
データフレーム作成
# pandas
df = pd.DataFrame({
'NAME':['Bob', 'Alice', 'Amanda'],
'AGE':[18, 20, 45],
'WEIGHT':[75.1, 48.9, 52.0],
'DATE':['2020/1/1', '2022/10/14', '2002/7/31']
})
df2 = pd.DataFrame({
'NAME':['Bob', 'Alice', 'Amanda'],
'GENDER':['M', 'F', 'F']
})
# snowpark
# ↓これはエラーになる
df = pd.DataFrame({
'NAME':['Bob', 'Alice', 'Amanda'],
'AGE':[18, 20, 45],
'WEIGHT':[75.1, 48.9, 52.0],
'DATE':['2020/1/1', '2022/10/14', '2002/7/31']
})
snow_df = session.create_dataframe(df)
# 列名にはダブルクォーテーションを使う必要がある↓
df = pd.DataFrame({
"NAME":['Bob', 'Alice', 'Amanda'],
"AGE":[18, 20, 45],
"WEIGHT":[75.1, 48.9, 52.0],
"DATE":['2020/1/1', '2022/10/14', '2002/7/31']
})
snow_df = session.create_dataframe(df)
# こういう書き方もある↓
snow_df2 = session.create_dataframe(
[['Bob', 'M'], ['Alice', 'F'], ['Amanda', 'F']],
schema = ["NAME", "GENDER"]
)
列選択
# pandas
df[['NAME', 'WEIGHT']]
# コッチも可.正規表現で列名選択するときによく使う.
df.filter(['NAME', 'WEIGHT'])
# snowpark
snow_df.select(col("NAME"), col("WEIGHT"))
# コッチも可
snow_df.select(["NAME", "WEIGHT"])
行選択(抽出)
# pandas
df.query('NAME == "Bob"')
df[df.NAME == "Bob"]
# 複数条件
df.query('AGE <= 30 & WEIGHT <= 60') # andでも可.
df[(df.AGE <= 30) & (df.WEIGHT <= 60)]
# snowpark
# whereはfilterのalias.どちらを使っても良し.
# snow_df.where('NAME = "Bob"')はダメ.クォーテーションに注意.
# pandas.query likeに書く場合は,演算子「==」ではないことに注意↓
snow_df.where("NAME = 'Bob'")
# コッチも可.
snow_df.where(snow_df.NAME == "Bob")
# 複数条件1.「&」や「|」を使うように.
snow_df.filter((snow_df.AGE <= 30) & (snow_df.WEIGHT <= 60))
# 複数条件2.「and」や「or」を使うように.
snow_df.filter("AGE <= 30 and WEIGHT <= 60")
concat(縦結合)
# pandas
pd.concat([df, df2], axis=0)
# snowpark
# 一致しているデータフレーム同士ならこれでOK.列数が一致していないとエラーになる.
snow_df.union_all(snow_df2)
# 列数が一致していない場合は,以下のように列数を一致させてから結合する.
# 新しいカラムを作成する場合はwith_columnまたはwith_columnsを用いる.
snow_df_tmp = snow_df.with_column("GENDER", lit(None))
snow_df2_tmp = snow_df2.with_columns(["AGE", "WEIGHT", "DATE"], [lit(None), lit(None), lit(None)])
snow_df_tmp.union_all_by_name(snow_df2_tmp)
# 縦結合時に重複行を削除したければ"union_by_name"を使用する.
snow_df_tmp.union_by_name(snow_df2_tmp)
concat(横結合)
# pandas
pd.concat([df1, df2], axis=1)
# snowpark
# 完全に同じ挙動のものはなし.with_columnをうまく使うくらいしか思いつかない.
# snow_df1.join(snow_df2)だとCROSS JOINの結果が返ってるので注意.
merge(inner, left, full outer, cross)
# pandas
pd.merge(df, df2, how='inner', on='NAME')
# snowpark
snow_df.join(snow_df2, how='inner', on="NAME")
groupby(グループ化)
結果を確認しやすいように,下記テーブル(group_df, group_snow_df)を使用する.
NAME | AGE | WEIGHT | DATE | GENDER |
---|---|---|---|---|
Bob | 18 | 75.1 | 2020/1/1 | M |
Alice | 20 | 48.9 | 2022/10/14 | F |
Amanda | 45 | 52.0 | 2002/7/31 | F |
# pandas
group_df.groupby('GENDER').size()
group_df.groupby('GENDER')['AGE'].max()
group_df.groupby('GENDER')['AGE'].min()
group_df.groupby('GENDER')['AGE'].mean()
group_df.groupby('GENDER')['AGE'].std()
group_df.groupby('GENDER')['AGE'].median()
# snowpark
group_snow_df.group_by("GENDER").count()
group_snow_df.group_by("GENDER").agg(F.max("AGE"))
group_snow_df.group_by("GENDER").agg(F.min("AGE"))
group_snow_df.group_by("GENDER").agg(F.mean("AGE"))
group_snow_df.group_by("GENDER").agg(F.stddev("AGE"))
group_snow_df.group_by("GENDER").agg(F.median("AGE"))
Part2.個人的によくやる操作
以降の操作は下記のテーブル(sales_df, sales_snow_df)を使用する.
NAME | AGE | SALES | DATE |
---|---|---|---|
Bob | 18 | NULL | 2020/1/1 |
Bob | 18 | 1000 | 2020/1/20 |
Bob | 18 | 2000 | 2020/1/31 |
Bob | 18 | 0 | 2020/2/2 |
Bob | 18 | NULL | 2020/3/14 |
Alice | 20 | 500 | 2020/1/1 |
Alice | 20 | NULL | 2020/1/15 |
Alice | 20 | 1000 | 2020/10/27 |
# DataFrame作成
sales_snow_df = session.create_dataframe([
('Bob', 18, None, '2020/1/1'),
('Bob', 18, 1000, '2020/1/20'),
('Bob', 18, 2000, '2020/1/31'),
('Bob', 18, 0, '2020/2/2'),
('Bob', 18, None, '2020/3/14'),
('Alice', 20, 500, '2020/1/1'),
('Alice', 20, None, '2020/1/15'),
('Alice', 20, 1000, '2020/10/27')
], ["NAME", "AGE", "SALES", "DATE"])
sales_df = sales_snow_df.to_pandas()
slice, split(文字抽出)
# pandas
# 文字列をスライス
sales_df['YEAR'] = sales_df['DATE'].str[0:4]
# splitを使用
sales_df['MONTH'] = sales_df['DATE'].str.split('/').str[1]
# snowpark
# 文字列をスライス
sales_snow_df.with_column("YEAR", F.substr("DATE", 1, 4))
# splitを使用.F.split("DATE", lit("/")).getItem(1)でも可.
sales_snow_df.with_column("MONTH", F.split("DATE", lit("/"))[1])
# ただし↑だとMONTHのデータ型がVARIANT型なので,↓のようにINT型に変換するとよい.
from snowflake.snowpark.types import IntegerType
sales_snow_df.with_column("MONTH", F.cast(F.split("DATE", lit("/"))[1], IntegerType()))
cumcount, rank(グループ内連番)
# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['RANK_IN_GROUP'] = sales_df.groupby('NAME').cumcount() + 1
# 単純にrank()を使っても良い
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['RANK_IN_GROUP'] = sales_df.groupby('NAME')['NAME'].transform('rank', method='first')
# snowpark
from snowflake.snowpark.window import Window
order = Window.partitionBy(col('NAME')).orderBy(col('NAME'), col('DATE'))
sales_snow_df.select('NAME', 'AGE', 'SALES', 'DATE', F.rank().over(order).alias('RANK'))
cumsum(グループ内累積和)
# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['CUMSUM_SALES'] = sales_df.groupby('NAME')['SALES'].transform(pd.Series.cumsum)
# snowpark
from snowflake.snowpark.window import Window
order = Window.partitionBy(col('NAME')).orderBy(col('NAME'), col('DATE'))
sales_snow_df.select('NAME', 'AGE', 'SALES', 'DATE', F.sum('SALES').over(order).alias('CUMSUM_SALES'))
diff(グループ内差分)
# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['LAG'] = sales_df.groupby('NAME')['SALES'].diff(periods=1)
# snowpark
# pandasとNullを含んだ場合の差分計算の挙動が異なる点,差分計算は絶対値が返される点に注意.
from snowflake.snowpark.window import Window
order = Window.partitionBy(col('NAME')).orderBy(col('NAME'), col('DATE'))
sales_snow_df.select('NAME', 'AGE', 'SALES', 'DATE', F.lag('SALES').over(order).alias('LAG'))
ffill, bfill(グループ内で,前後の値で欠損値補完)
# pandas
sales_df.sort_values(['NAME', 'DATE'], ascending=[True, True], inplace=True)
sales_df['SALES_WITH_FFILL'] = sales_df.groupby('NAME')['SALES'].apply(lambda x: x.fillna(method='ffill'))
# snowpark
# 一発でfillna('ffill')を再現できなかったので,二段階で処理した.
# bfillしたければ,order作成時のソート順を変更すればよい(col('DATE').asc()).
# 1.一旦,SALESをグループ内で一行ずらしたカラム"SALES2"を作成しておく.
from snowflake.snowpark.window import Window
order = Window.partition_by(col('NAME')).order_by(col('NAME'), col('DATE').desc())
sales_snow_df = sales_snow_df.select("NAME", "DATE", "SALES", F.lead("SALES", ignore_nulls=True).over(order).alias("SALES2")).sort(col("NAME"), col("DATE").asc())
# 2.カラム"SALES"がNULLの場合に,カラム"SALES2"の値を代入.
sales_snow_df.with_column("SALES_WITH_FFILL",
F.when(F.col("SALES").isNull(), F.col("SALES2"))
.otherwise(F.col("SALES"))
)
Part3.Tips(随時更新)
実行されるSQLの確認
# snowpark
sales_snow_df.explain()
所感
- PySparkで実現できる操作は,たいていSnowparkでもできそう(個人的には,SnowparkのDocよりPySparkのDocの方が参考になった.)
- Bing AIに「このPandasの処理をSnowpark for pythonで書き直して!」と指示しても,返ってくるコードはPySparkなので注意.出力されたコードを参考に,Snowpark referenceを漁るのがベストプラクティスな気がする.
-
select
やwhere
でデータ量を削減し,メモリに載るサイズになったらto_pandas()
でpandas dataframe化しちゃうのが個人的にはお勧め. - Snowparkではvisualizationできないのが致命的.
- UDFを使えばもっと色々できそう.UDFは別の機会に記事化します.
参考
- https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/index.html
- https://docs.snowflake.com/en/sql-reference/identifiers-syntax
- https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf
Discussion