Snowparkを用いたS3オブジェクト操作
背景
保守の簡便さからAWS資産をSnowflakeへ移行していく活動を少しずつ始めています。移行対象に簡単なファイル操作をともなうS3バケット間のファイル転送があるため、今回はこれをNotebook上のSnowparkで実装するとどうなるか検証してみます。
SnowparkのAPIに加えて、PythonのBytesIOやZipFileパッケージもあまりよく分かっていないので、その辺りの基本的な仕様も確認してみます。
アーキテクチャは以下のように、zipファイルを格納しているS3バケットから特定のCSVを抽出して、データ参照用のS3バケットへparquet形式で出力する想定とします。それぞれのS3バケットに対するストレージ統合や外部ステージの作成はここでは省略します。
利用するzipファイルの構成は以下です。
- abc.zip
- test.csv
S3バケットのオブジェクト一覧の出力
バケットの一覧を出力するにはlistコマンドをSQLとして実行します。ここではzipの拡張子を持つオブジェクトへフィルタリングしています。
from snowflake.snowpark.context import get_active_session
import pandas as pd
session = get_active_session()
stage_full_name = "db.schema.stage_name"
file_pattern = r".*\\.zip"
s3_objects = session.sql(f"list @{stage_full_name} pattern = '{file_pattern}'").collect()
df_s3_objects = pd.DataFrame(s3_objects)
print(df_s3_objects.iloc[0])
# name s3://snowflake/abc.zip
# size 463
# last_modified Sun, 4 Aug 2024 03:25:33 GMT
listの戻り値には、
- name: オブジェクトのフルパス
- last_modified: ファイルの最終更新日付
が含まれており、差分更新時に利用ができそうです。
S3オブジェクトの取得
続いて、snowpark.FileOperation.get_streamを利用してオブジェクトを取得します。
from io import BytesIO
from pathlib import Path
p = Path("s3://snowflake/abc.zip")
zipfile_buffer = session.file.get_stream(f"@{stage_full_name}/{p.name}", decompress=False)
print(p.name, type(zipfile_buffer))
# abc.zip <class '_sfstream.SfStream'>
返り値はSfStreamという見慣れない型ですが、公式のget_streamの説明の通りBytesIOと見なしてよいようです。
Returns
An BytesIO object which points to the downloaded file.
zipからcsvを取得しdfとして読み込み
ここの処理はあまりSnowparkは関係ないですが、ZipFileを利用してzipバッファからcsvを取得し、DataFrameとして読み込みます。
import zipfile
from io import StringIO
with zipfile.ZipFile(zipfile_buffer) as f_zip:
# namelist()に含まれるサブディレクトリをフィルタ
csv_path = list(filter(lambda csv_path: ".csv" in csv_path, f_zip.namelist()))[0]
csv_str = f_zip.read(csv_path).decode("utf-8")
df = pd.read_csv(StringIO(csv_str))
print(df)
# a b c
# 0 1 2 3
# 1 4 5 6
ZipFile.read()の返り値であるbytesをstringへdecodeして、string bufferとしてpandas.DataFrameに与えてdfを作成しています。pandas.read_csvの引数にバッファを指定して利用したことが無かったので新鮮です。
運用時には不正なzipに対してBadZipFile Exceptionをキャッチした方が良いかもしれません。
DataFrameをS3へ出力
pandasからsnowparkのDataFrameに変換し、DataFrameWriter.parquetメソッドを用いて外部ステージに書き出します。出力用のS3バケットを検証用に用意する時間が無かったので、入力用のS3バケットにoutputディレクトリを切って出力しています。
output_full_path = f"@{stage_full_name}/output/{p.name.strip('.zip')}.parquet"
sf_df = session.create_dataframe(df)
sf_df.write.parquet(output_full_path, overwrite=True, single=True)
まとめ
S3バケットからzipファイルを読み出し、csvファイルを抽出してS3バケットへ書き出すSnowparkの処理を検証してみました。S3へのアクセスは全てステージを介して実施できるので、boto3の利用やAWSのアクセスキーの管理が不要であり、シンプルに実装できそうです。
大容量のファイルを扱う場合は、pandas.DataFrameを介さずに直接snowpark.DataFrameを作成した方がより高速だと思いますが、StringIOをsnowpark.DataFrameの入力に取る方法が見つからずこちらは断念しました。
備忘
modinのpandasを利用していない理由
pd.read_csvは純正のpandasだとstring bufferを引数に取れるのですが、modinの場合はstageへのパスのみ引数に取るため、DataFrameを作成するのに利用できませんでした。
Discussion