❄️

Snowpark for PythonのUDFs(UDTFs)/Stored Proceduresについてまとめてみた

2023/01/04に公開

更新履歴

  • 2023/5/18:SPのReturning Tabular Dataについて追記

はじめに

Snowpark for Pythonがリリースされたことにより、Pythonを実行できるようになった。そこでSnowparkでPythonを実行するやりかたについてまとめてみた。実行パターンは大きくSnowpark API/UDFs/Stored Procedures(以下、SPとも記載)の3つのパターンがある。これらの実装パターンについてまとめる(パータンのまとめのため、各実装のパラメータの詳細などには言及しない。)
Snowpark for Pythonのコンポーネント構成などはこちらの記事がわかりやすい。
https://zenn.dev/tmasuo/articles/aa0634c14e6210

UDF/SPの使い分けについてはこちらを参照。
自分の理解としては、移行する既存資産に合わせることと、処理の対象が部品として呼ばれるものであればUDF、テーブルに対する一連の完結した処理であればStored Procedureを利用するである。そのため、UDFは値を返すことを必須とする一方でStored Procedureは必須ではない。また、UDFは直接Snowflakeに対してアクセスできないが、Stored Procedureはアクセス可能である。

Snowpark API

  • Pythonコード内でSnowpark APIを呼び出し、dataframeを介して処理を実行する。
  • dataframeを介するのでpandasと比べられていることもあるが、自分としてはPysparkと近いと思っている(Sparkもpandas APIサポートしていますがw)。dataframeの各メソッドもPysparkと遜色なく記載できる(UDFのバッチAPIの動きとかも、pysparkのpandas UDFの動きに似ている気がする。実際snowparkもpandas_udfで登録できる。)
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
import yaml

# コネクションパラメータの取得はそれぞれ
PATH = "$HOME/.dbt/profiles.yml"
with open(PATH, 'r') as yml:
    config = yaml.safe_load(yml)

# 対象データはdbtのチュートリアルを拝借
connection_parameters = {
    "account": config["jaffle_shop"]["outputs"]["dev"]["account"],
    "user": config["jaffle_shop"]["outputs"]["dev"]["user"],
    "password": config["jaffle_shop"]["outputs"]["dev"]["password"],
    "role": config["jaffle_shop"]["outputs"]["dev"]["role"],
    "warehouse": config["jaffle_shop"]["outputs"]["dev"]["warehouse"],
    "database": config["jaffle_shop"]["outputs"]["dev"]["database"],
    "schema": config["jaffle_shop"]["outputs"]["dev"]["schema"]
}

session = Session.builder.configs(connection_parameters).create()
df = session.table('JAFFLE_SHOP.CUSTOMERS')
df = df.with_column('FULL_NAME', F.concat(
    F.col('FIRST_NAME'), F.col('LAST_NAME')))
df.show(3)

---------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMER_ID"  |"FIRST_NAME"  |"LAST_NAME"  |"FIRST_ORDER_DATE"  |"MOST_RECENT_ORDER_DATE"  |"NUMBER_OF_ORDERS"  |"FULL_NAME"  |
---------------------------------------------------------------------------------------------------------------------------------
|1              |Michael       |P.           |2018-01-01          |2018-02-10                |2                   |MichaelP.    |
|2              |Shawn         |M.           |2018-01-11          |2018-01-11                |1                   |ShawnM.      |
|3              |Kathleen      |P.           |2018-01-02          |2018-03-11                |3                   |KathleenP.   |
---------------------------------------------------------------------------------------------------------------------------------

Snowpark APIの仕組みとして、dataframe処理はクライアントサイドでSQLに変換され、Snowflake上でSQLとして実行される。上記の処理はSQLとして次のように実行された。なお、SnowparkもSparkと同様遅延実行されるためshowメソッド呼び出し段階でSQLが実行される。

SELECT  *  FROM ( SELECT "CUSTOMER_ID", "FIRST_NAME", "LAST_NAME", "FIRST_ORDER_DATE", "MOST_RECENT_ORDER_DATE", "NUMBER_OF_ORDERS", concat("FIRST_NAME", "LAST_NAME") AS "FULL_NAME" FROM ( SELECT  *  FROM (JAFFLE_SHOP.CUSTOMERS))) LIMIT 3 OFFSET 0

UDFs/Stored Procedures

次の単純な処理をサンプルとして、UDFとStored Procedureの実装パターンについて記載する。なお、実際のUDFやStored Procedureではこれよりも複雑な実装が当然可能である。また、実装パターンによっては前提があるので、その場合には下記の処理以外で記載する。
UDFとStored Procedureの場合、クライアントサイドでcloudpickleを用いてPythonバイトコードが生成され、そのバイトコードをSnowflake内に送信し、Python secure sandboxと呼ばれる領域で実行される。

def double_py(i: int):
  return i * 2

Python UDF using SQL

  • CREATE FUNCTION[1]で作成する。
CREATE OR REPLACE FUNCTION double(i int)
  returns int not null
  language python
  runtime_version = '3.8'
  handler = 'double_py'
as
$$
def double_py(i: int):
  return i * 2
$$
;

-- 呼び出し
SELECT double(2);

Python Stored Procedure using SQL

  • CREATE PROCEDURE[2]で作成する。
  • SPは第一引数でSnowparkセッションが渡され、当セッションを用いてSnowflakeと直接やりとりを行える。
  • SPの第一引数であるSnowparkセッションを扱うために、snowflake-snowpark-pythonのパッケージ追加が必要となる。
CREATE OR REPLACE PROCEDURE double_sp(i int)
  returns int not null
  language python
  runtime_version = '3.8'
  packages = ('snowflake-snowpark-python')
  handler = 'double_py'
as
$$
def double_py(snowpark_session, i: int):
  return i * 2
$$
;

-- 呼び出し
call double_sp(2);

Python UDF using Python

  • PythonによるUDF作成には、ざっくり2つの関数がある(register_from_fileとかは省略)。
    • snowflake.snowpark.functions.udf[3]
    • snowflake.snowpark.udf.UDFRegistration.register[4]
  • SPと異なり、udfは引数にSnowparkセッションを持たない。
  • UDFの呼び出しにSQLコマンドまたはcall_udf[5]。ちなみにudf関数の戻り値であるUserDefinedFunctionのfunc属性はPython functionであるので、これでUDFではなく通常の関数としても呼び出せる。

snowflake.snowpark.functions.udf

  • udf関数では引数にSnowparkセッションを指定する事が可能である(指定がない場合には、この関数呼び出し前に作成されたセッションが利用される)。複数のセッションを張っている場合には、該当のセッション(つまりはUDFを作成したいデータベース/スキーマ)を指定する必要がある。(この点が、register関数との使い分けになると想定される。)
  • udf関数はデコレータ(@udf)でも利用可能である。
# session作成は省略

import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F

def double_py(i: int):
    return i * 2

double_py_snowpark = F.udf(double_py,  return_type=T.IntegerType(),
      input_types=[T.IntegerType()], name='double_py_snowpark')

df = session.sql('SELECT NUMBER_OF_ORDERS FROM JAFFLE_SHOP.CUSTOMERS')
df = df.with_column('double_NUMBER_OF_ORDERS',
                    double_py_snowpark(F.col("NUMBER_OF_ORDERS")))
df.show(3)

--------------------------------------------------
|"NUMBER_OF_ORDERS"  |"DOUBLE_NUMBER_OF_ORDERS"  |
--------------------------------------------------
|2                   |4                          |
|1                   |2                          |
|3                   |6                          |
--------------------------------------------------
  • Snowpark APIの場合クライアントサイドでSQL変換を行っているため、当然UDFを登録しなくても同等のことが可能である。
import snowflake.snowpark.functions as F

def double_not_snowpark(i: int):
    return i * 2

df = session.sql('SELECT NUMBER_OF_ORDERS FROM JAFFLE_SHOP.CUSTOMERS')
df = df.with_column('double_not_snowpark_NUMBER_OF_ORDERS',
                    double_not_snowpark(F.col("NUMBER_OF_ORDERS")))

df.show(3)
---------------------------------------------------------------
|"NUMBER_OF_ORDERS"  |"DOUBLE_NOT_SNOWPARK_NUMBER_OF_ORDERS"  |
---------------------------------------------------------------
|2                   |4                                       |
|1                   |2                                       |
|3                   |6                                       |
---------------------------------------------------------------

上記の2個のwith_columnの変換されたSQL及びクエリプランは当然異なる。

SQLの比較結果

左がUDF、右が通常の関数

クエリプラン

UDF 通常の関数

上記のため、当然session.sqlでは前者は実行可能で、後者はUnknown functionで失敗する。

# OK
session.sql('select double_py_snowpark(NUMBER_OF_ORDERS) FROM JAFFLE_SHOP.CUSTOMERS')

# NG
session.sql('select double_not_snowpark(NUMBER_OF_ORDERS) FROM JAFFLE_SHOP.CUSTOMERS')

snowflake.snowpark.udf.UDFRegistration.register

  • session.udf[6]はプロパティでudf.UDFRegistration[7]を返す。
  • UDFRegistrationのregister関数でUDFを登録する。なお、UDFRegistrationはsessionオブジェクトで生成されているため、register関数にはSnowparkセッションパラメータはない。
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F

def double_py(i: int):
    return i * 2


double_py_snowpark = session.udf.register(
    func=double_py, return_type=T.IntegerType(), input_types=[T.IntegerType()], name='double_py_snowpark'
)

df = session.sql('SELECT NUMBER_OF_ORDERS FROM JAFFLE_SHOP.CUSTOMERS')
df = df.with_column('double_NUMBER_OF_ORDERS',
                    double_py_snowpark(F.col("NUMBER_OF_ORDERS")))

Python SP using Python

  • PythonによるSPの作成は、基本的にUDFと同様である。
    • snowflake.snowpark.functions.sproc[8]
    • snowflake.snowpark.stored_procedure.StoredProcedureRegistration.register[9]
  • SPの第一引数はSnowparkセッションになる。
  • SPの第一引数であるSnowparkセッションを扱うために、snowflake-snowpark-pythonのパッケージ追加が必要となる。
  • SPの呼び出しにSQLコマンドまたはsession.call。ちなみにsproc関数の戻り値であるStoredProcedureのfunc属性はPython functionであるので、これでSPではなく通常の関数としても呼び出せる。

snowflake.snowpark.functions.sproc

# session作成は省略

import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def double_py(snowpark_session: Session, i: int):
    return i * 2

session.add_packages('snowflake-snowpark-python')

double_py_snowpark = F.sproc(
    double_py,
    return_type=T.IntegerType(),
    input_types=[T.IntegerType()],
    session=session,
    name="double_py_snowpark"
)

session.call("double_py_snowpark",1)
>2

snowflake.snowpark.stored_procedure.StoredProcedureRegistration.register

  • session.sproc[10]はプロパティでstored_procedure.StoredProcedureRegistration[11]を返す。
  • StoredProcedureRegistrationのregister関数でSPを登録する。なお、StoredProcedureRegistrationはsessionオブジェクトで生成されているため、register関数にはSnowparkセッションパラメータはない。
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def double_py(snowpark_session: Session, i: int):
    return i * 2


session.add_packages('snowflake-snowpark-python')

double_py_snowpark = session.sproc.register(
    double_py,
    return_type=T.IntegerType(),
    input_types=[T.IntegerType()],
    name="double_py_snowpark"
)

session.call("double_py_snowpark",1)
>2

SPクエリプラン

Python UDF Batch API(Vectorized UDFs)

Batch APIは通常のUDFは1行ずつに処理をするのに対して、まとまった行を一括で処理(Batch)するためのUDFである。そのためパーフォーマンスの向上が見込め、機械学習などの大量データ時などの利用に想定される。
Batch APIの登録はこちらの例9以降のようにtype_hintやpandas_udf(デコレータでも利用可)また、vectorizedデコレーターで定義可能である。なお、vectorizedデコレーターはsnowflake内で利用可能な_snowflakeモジュールをimportする必要があるのでcreate functionで使う[12]
以下、1パターンを載せる。

import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import pandas as pd

def double_py_vectorized(df: pd.DataFrame) -> pd.Series:
    return df[0] * 2

double_py__vectorized_snowpark = F.pandas_udf(double_py_vectorized,  return_type=T.IntegerType(),
      input_types=[T.IntegerType()], session=session,name='double_py_vectorized_snowpark')
      
df = session.sql('SELECT NUMBER_OF_ORDERS FROM JAFFLE_SHOP.CUSTOMERS')
df = df.with_column('double_NUMBER_OF_ORDERS',F.call_udf('double_py_vectorized_snowpark',F.col("NUMBER_OF_ORDERS")))
# 結果はこれまでと同じなので割愛

クエリプラン

  • クエリプラン自体はVectorized UDFも通常のUDFも同じであるが、通常の場合、統計の処理レコード(rows processed)と処理発行数(handler invocations)が同じであるが、Vectorizedの場合、レコードをまとめて処理するため処理発行数が減っているのが見て取れる。
Vectorized UDF Vectorized UDF 通常のUDF

UDTFs

  • UDTFの登録はusing SQLもusing Pythonも基本的にUDFと次の点が変わるくらいである。
    • UDTF using SQLでは、return TABLEになる。
    • UDTF using Pythonでは、各登録する関数名がudfからudtfに変わるのと、引数名が表入出力に合わせて、func⇒handlerとreturn_type⇒output_schemaになる。
  • UDFと登録される関数の処理実装が大きく異なる。
    • UDTFはUDFがスカラー値を返すのに対し、表形式を出力する。
    • UDTFでは関数ではなくクラスを定義する。
    • UDTFではクラス内では、入力の1行ずつを処理するprocessメソッドとパーティション単位に一回開始と終了に処理する__init__メソッドとend_partitionメソッドで構成される(__init__とend_partitionはオプション)。

以下ではudtf関数を用いたサンプルを記載する。
対象データはdbtのサンプルのものにavg_priceという適当なカラムを付加したものである。処理もUDTFの動きを見るためのものであり、中身も適当である。

# dataの中身
session.sql("SELECT * FROM JAFFLE_SHOP.CUSTOMERS").show(3)
---------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMER_ID"  |"FIRST_NAME"  |"LAST_NAME"  |"FIRST_ORDER_DATE"  |"MOST_RECENT_ORDER_DATE"  |"NUMBER_OF_ORDERS"  |"AVG_PRICE"  |
---------------------------------------------------------------------------------------------------------------------------------
|1              |Michael       |P.           |2018-01-01          |2018-02-10                |2                   |700          |
|2              |Shawn         |M.           |2018-01-11          |2018-01-11                |1                   |100          |
|3              |Kathleen      |P.           |2018-01-02          |2018-03-11                |3                   |600          |
---------------------------------------------------------------------------------------------------------------------------------

処理内容としては、

  • 各行に対しては(processメソッドでは)、注文数(number_of_orders)と平均金額(avg_price)をかけた金額(cost)とLAST_NAMEを出力するとともに、costを_cost_totalとして合算。
  • パーティション単位では、各行で積み上げた_cost_totalを出力。
  • パーティションはグルーピングできるLAST_NAMEを使う。LAST_NAMEでグルーピングすることにデータとしては何の意味もない。ただ処理の動きを見るためだけである。
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


class LastNameOrderSum:
    def __init__(self):
        self._cost_total = 0
        self._last_name = ""

    def process(self, last_name, number_of_orders, avg_price):
        self._last_name = last_name
        cost = number_of_orders * avg_price
        
        self._cost_total += cost
        yield (last_name, cost)

    def end_partition(self):
        yield (self._last_name, self._cost_total)


F.udtf(
    LastNameOrderSum
    , output_schema=T.StructType([T.StructField("last_name", T.StringType()), T.StructField("cost", T.IntegerType())])
    , input_types=[T.StringType(),T.IntegerType(),T.IntegerType()]
    , session=session
    , name='LastNameOrderSum_snowpark')
    
df = session.sql('''
SELECT
    j.FIRST_NAME,
    t.LAST_NAME,
    t.COST
FROM
    JAFFLE_SHOP.CUSTOMERS as j,
    table(
        LastNameOrderSum_snowpark(LAST_NAME, NUMBER_OF_ORDERS, AVG_PRICE) OVER (PARTITION BY LAST_NAME)
    ) as t
''')

df.sort(F.col('LAST_NAME')).show(20)
# 当然、パーティションごとの処理レコードのFIRST_NAMEはNULLになっている。
---------------------------------------
|"FIRST_NAME"  |"LAST_NAME"  |"COST"  |
---------------------------------------
|Mildred       |A.           |0       |
|Adam          |A.           |500     |
|Harry         |A.           |0       |
|NULL          |A.           |2000    |
|Maria         |A.           |500     |
|Jacqueline    |A.           |0       |
|Anna          |A.           |1000    |
|Phillip       |B.           |0       |
|Scott         |B.           |0       |
|NULL          |B.           |1800    |
|Alan          |B.           |0       |
|Benjamin      |B.           |200     |
|Anne          |B.           |1600    |
|David         |C.           |200     |
|Gerald        |C.           |600     |

ここでprocessメソッドの出力をなくしたUDTFについても動きを見てみる。
当然、end_partitionの出力のみになるので、FIRST_NAMEがnullでcostが合計になった行だけが出力される。

class LastNameOrderOnlySum:
    def __init__(self):
        self._cost_total = 0
        self._last_name = ""

    def process(self, last_name, number_of_orders, avg_price):
        self._last_name = last_name
        cost = number_of_orders * avg_price
        self._cost_total += cost

    def end_partition(self):
        yield (self._last_name, self._cost_total)


F.udtf(
    LastNameOrderOnlySum
    , output_schema=T.StructType([T.StructField("last_name", T.StringType()), T.StructField("cost", T.IntegerType())])
    , input_types=[T.StringType(),T.IntegerType(),T.IntegerType()]
    , session=session
    , name='LastNameOrderOnlySum_snowpark')
 
df = session.sql('''
SELECT
    j.FIRST_NAME,
    t.LAST_NAME,
    t.COST
FROM
    JAFFLE_SHOP.CUSTOMERS as j,
    table(
        LastNameOrderOnlySum_snowpark(LAST_NAME, NUMBER_OF_ORDERS, AVG_PRICE) OVER (PARTITION BY LAST_NAME)
    ) as t
''')

df.sort(F.col('LAST_NAME')).show(20)
---------------------------------------
|"FIRST_NAME"  |"LAST_NAME"  |"COST"  |
---------------------------------------
|NULL          |A.           |2000    |
|NULL          |B.           |1800    |
|NULL          |C.           |2800    |
|NULL          |D.           |600     |
|NULL          |E.           |900     |
|NULL          |F.           |2100    |
|NULL          |G.           |2700    |
|NULL          |H.           |7200    |
|NULL          |J.           |1400    |
|NULL          |K.           |300     |

本記事の検証で得たTips

Snowpark APIセッションにおけるUSE_CACHED_RESULTのOFF

  • セッション確立時のパラメータや専用のメソッドや関数がないので次のようにSQLを発行して行う。
  • 注意する点として、遅延実行となるため、たとえalterコマンドであっても、df.showなどのアクションを伴う処理を実行しないとalter sessionが実行されない。(当然クエリヒストリーにも出てこない)
df = session.sql('alter session set USE_CACHED_RESULT = FALSE')
df.show()
------------------------------------
|"status"                          |
------------------------------------
|Statement executed successfully.  |
------------------------------------

Snowpark dataframeのshowメソッドの出力順について

  • df.show()における出力順は仮にdf自体が例えば、生成時にorder byなどでソートされていた場合にも、ソートされた状態で出力されるとは限らない。なぜなら、df.show()はorder byなしのselect *で出力されているためである。
  • df.show()でソートされた状態で出力したい場合には、df.sort(ソートカラム).show()とする必要がある。sortを付与すると、select * にorder by がつくことになる。
df = session.sql("SELECT * FROM JAFFLE_SHOP.CUSTOMERS ORDER BY LAST_NAME")
df.show()
df.sort(F.col('LAST_NAME')).show()
df.show()の変換されたSQL
SELECT  *  FROM (SELECT * FROM JAFFLE_SHOP.CUSTOMERS ORDER BY LAST_NAME) LIMIT 10 OFFSET 0
df.sort(F.col('LAST_NAME')).show()の変換されたSQL
SELECT  *  FROM (SELECT * FROM JAFFLE_SHOP.CUSTOMERS ORDER BY LAST_NAME) ORDER BY "LAST_NAME" ASC NULLS FIRST LIMIT 10 OFFSET 0

Snowflakeセッション強制終了

上記のUDFやSPの作業の過程で、不要なセッションを残してしまうことがあり、セッションの強制終了を行ったので、その手順を残しておく。(なお、Snowpark APIでSnowparkセッションをクローズする場合には、session.close()を実行する。)

  1. Snowsightの管理者 - セキュリティ - セッションタブを開き、対象セッションのセッションIDを取得する。
  2. SQLにて下記を実行する。
select SYSTEM$ABORT_SESSION(セッションID);

参考

脚注
  1. https://docs.snowflake.com/ja/sql-reference/sql/create-function.html ↩︎

  2. https://docs.snowflake.com/ja/sql-reference/sql/create-procedure.html ↩︎

  3. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.functions.udf.html ↩︎

  4. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.udf.UDFRegistration.register.html ↩︎

  5. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.functions.call_udf.html ↩︎

  6. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.Session.udf.html ↩︎

  7. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.udf.UDFRegistration.html#snowflake.snowpark.udf.UDFRegistration ↩︎

  8. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.functions.sproc.html ↩︎

  9. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.stored_procedure.StoredProcedureRegistration.register.html ↩︎

  10. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.Session.sproc.html ↩︎

  11. https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.stored_procedure.StoredProcedureRegistration.html ↩︎

  12. https://docs.snowflake.com/ja/developer-guide/udf/python/udf-python-batch.html ↩︎

Discussion