🎉

Snowparkを用いたS3オブジェクト操作

2024/08/05に公開

背景

保守の簡便さからAWS資産をSnowflakeへ移行していく活動を少しずつ始めています。移行対象に簡単なファイル操作をともなうS3バケット間のファイル転送があるため、今回はこれをNotebook上のSnowparkで実装するとどうなるか検証してみます。

SnowparkのAPIに加えて、PythonのBytesIOやZipFileパッケージもあまりよく分かっていないので、その辺りの基本的な仕様も確認してみます。

アーキテクチャは以下のように、zipファイルを格納しているS3バケットから特定のCSVを抽出して、データ参照用のS3バケットへparquet形式で出力する想定とします。それぞれのS3バケットに対するストレージ統合や外部ステージの作成はここでは省略します。

利用するzipファイルの構成は以下です。

  • abc.zip
    • test.csv

S3バケットのオブジェクト一覧の出力

バケットの一覧を出力するにはlistコマンドをSQLとして実行します。ここではzipの拡張子を持つオブジェクトへフィルタリングしています。

from snowflake.snowpark.context import get_active_session
import pandas as pd

session = get_active_session()
stage_full_name = "db.schema.stage_name"
file_pattern = r".*\\.zip"

s3_objects = session.sql(f"list @{stage_full_name} pattern = '{file_pattern}'").collect()
df_s3_objects = pd.DataFrame(s3_objects)
print(df_s3_objects.iloc[0])

# name             s3://snowflake/abc.zip  
# size                                                  463  
# last_modified                Sun, 4 Aug 2024 03:25:33 GMT 

listの戻り値には、

  • name: オブジェクトのフルパス
  • last_modified: ファイルの最終更新日付

が含まれており、差分更新時に利用ができそうです。

S3オブジェクトの取得

続いて、snowpark.FileOperation.get_streamを利用してオブジェクトを取得します。

from io import BytesIO
from pathlib import Path

p = Path("s3://snowflake/abc.zip")
zipfile_buffer = session.file.get_stream(f"@{stage_full_name}/{p.name}", decompress=False)
print(p.name, type(zipfile_buffer))

# abc.zip <class '_sfstream.SfStream'>

返り値はSfStreamという見慣れない型ですが、公式のget_streamの説明の通りBytesIOと見なしてよいようです。

Returns
An BytesIO object which points to the downloaded file.

zipからcsvを取得しdfとして読み込み

ここの処理はあまりSnowparkは関係ないですが、ZipFileを利用してzipバッファからcsvを取得し、DataFrameとして読み込みます。

import zipfile
from io import StringIO

with zipfile.ZipFile(zipfile_buffer) as f_zip:
    # namelist()に含まれるサブディレクトリをフィルタ
    csv_path = list(filter(lambda csv_path: ".csv" in csv_path, f_zip.namelist()))[0]
    csv_str = f_zip.read(csv_path).decode("utf-8")
    df = pd.read_csv(StringIO(csv_str))
    print(df)

# a   b   c  
# 0  1   2   3  
# 1  4   5   6

ZipFile.read()の返り値であるbytesをstringへdecodeして、string bufferとしてpandas.DataFrameに与えてdfを作成しています。pandas.read_csvの引数にバッファを指定して利用したことが無かったので新鮮です。

運用時には不正なzipに対してBadZipFile Exceptionをキャッチした方が良いかもしれません。

DataFrameをS3へ出力

pandasからsnowparkのDataFrameに変換し、DataFrameWriter.parquetメソッドを用いて外部ステージに書き出します。出力用のS3バケットを検証用に用意する時間が無かったので、入力用のS3バケットにoutputディレクトリを切って出力しています。

output_full_path = f"@{stage_full_name}/output/{p.name.strip('.zip')}.parquet"
sf_df = session.create_dataframe(df)
sf_df.write.parquet(output_full_path, overwrite=True, single=True)

まとめ

S3バケットからzipファイルを読み出し、csvファイルを抽出してS3バケットへ書き出すSnowparkの処理を検証してみました。S3へのアクセスは全てステージを介して実施できるので、boto3の利用やAWSのアクセスキーの管理が不要であり、シンプルに実装できそうです。

大容量のファイルを扱う場合は、pandas.DataFrameを介さずに直接snowpark.DataFrameを作成した方がより高速だと思いますが、StringIOをsnowpark.DataFrameの入力に取る方法が見つからずこちらは断念しました。

備忘

modinのpandasを利用していない理由

pd.read_csvは純正のpandasだとstring bufferを引数に取れるのですが、modinの場合はstageへのパスのみ引数に取るため、DataFrameを作成するのに利用できませんでした。

Discussion