👏

Snowpark Python and dbt~鉄は熱いうちに叩け!!~

2023/07/29に公開

本日DATUM STUDIO様主催のイベントに参加させていただきました。
講師の方、司会の方、スタッフの皆さん、みんなとてもいい人ばかりでこんな環境で働けるとエンジニアのパフォーマンスが限界突破しそうだなって眺めていました。
※正直カジュアル面談申し込もうとWebフォームまで行ってしまったのは内緒ですww
ホントに楽しくて、機会があれば絶対参加しようと電車のなかで決意しました!
https://datumstudio.jp/information/0729_snowflake_hands-on_seminar/

そんなイベントの中でdbtのモデル定義をPythonでできることを知って驚きました!
家に帰ってすぐ調べたらこんな素敵な記事を発見し、興奮しっぱなしです!
https://zenn.dev/ugmuka/articles/5b28cca40572e3
https://zenn.dev/tmasuo/articles/df953dbbf40616
お題にもある通り、鉄は熱いうちに叩け!と言うことで、早速触っていきます!!
ありがたい事にSnowflakeのクイックスタートがあったので、こちらを進めたいと思います。
https://quickstarts.snowflake.com/guide/data_engineering_with_snowpark_python_and_dbt/index.html#0

Snowparkをdbtで動かす理由

歴史的に見てデータの変換はSQLで行うのが一般的でしたが、最近では、データ変換処理にPythonのDataFrame APIを採用し始めているそうです。
SQLでもPythonでもどちらのアプローチでも同じデータ変換はできます。
なので、どちらを使うかは好みによるところが大きです。
しかし、そうは言っても、特定のデータ変換を SQL で表現できず、別のアプローチが必要になるユースケースもあります。

dbt

dbtバージョン1.3からSQLベースのモデルとPythonベースのモデルを作成できるようになりました。
ありがてぇ〜

クイックスタートより抜粋
dbt で Python 変換を定義すると、Python 変換はプロジェクト内の単なるモデルとなり、
テスト、ドキュメント、およびリネージュに関するすべての同じ機能を備えます。

https://docs.getdbt.com/docs/building-a-dbt-project/building-models/python-models

Snowparkでサポートする機能

  • Python (DataFrame) API
  • Python Scalar User Defined Functions (UDFs)
  • Python UDF Batch API (Vectorized UDFs)
  • Python Table Functions (UDTFs)
  • Python Stored Procedures
  • Integration with Anaconda

Snowpark何が嬉しいの?

嬉しい理由の一つにこれは言えると思います。

クイックスタートより抜粋
Snowflake の Snowpark Python 機能を使用すると、Python コードを Snowflake のエンタープライズ グレードの
データ プラットフォーム内で直接実行できるため、Python コードを実行するために別のインフラストラクチャ/サービスを維持、保護し、支払う必要がなくなりました。

前置きが長くなりましたが、ここからハンズオンです!

dbtプロジェクトの作成

dbt init snowpark2dbt

フォルダ構成は以下のとおりです。

root@3f7c4a32a705:/home/dbt/snowpark2dbt# tree
.
|-- README.md
|-- analyses
|-- dbt_project.yml
|-- macros
|-- models
|   `-- example
|       |-- my_first_dbt_model.sql
|       |-- my_second_dbt_model.sql
|       `-- schema.yml
|-- seeds
|-- snapshots
`-- tests

dbt runは実行しておきます。

dbt runの実行結果
root@3f7c4a32a705:/home/dbt/snowpark2dbt# dbt run
11:56:14  Running with dbt=1.5.2
11:56:15  Registered adapter: snowflake=1.5.2
11:56:15  Unable to do partial parsing because profile has changed
11:56:16  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 322 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
11:56:16
11:56:18  Concurrency: 1 threads (target='dev')
11:56:18
11:56:18  1 of 2 START sql table model PUBLIC.my_first_dbt_model ......................... [RUN]
11:56:21  1 of 2 OK created sql table model PUBLIC.my_first_dbt_model .................... [SUCCESS 1 in 2.71s]
11:56:21  2 of 2 START sql view model PUBLIC.my_second_dbt_model ......................... [RUN]
11:56:22  2 of 2 OK created sql view model PUBLIC.my_second_dbt_model .................... [SUCCESS 1 in 0.87s]
11:56:22
11:56:22  Finished running 1 table model, 1 view model in 0 hours 0 minutes and 6.24 seconds (6.24s).
11:56:22
11:56:22  Completed successfully
11:56:22
11:56:22  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

単純なPythonモデルを作成する

dbt Pythonモデルの説明

Pythonで書くファイルはmodelsディレクトリ配下となります。
これはSQLモデルと一緒ですね。
ただ、Pythonファイル内に定義する必要がある関数があります。
model()です。
この関数には2つのパラメータを取ります。

  • dbt
    dbt coreによってコンパイルされた各モデルに固有のクラス。
    これにより、dbtプロジェクトおよびDAGのコンテキストでPythonコードを実行します。
  • session
    データプラットフォーム(今回はSnowflake)上のPythonバックエンドへの接続を表すクラス。
    セッションは、テーブルをDataFrameとして読み取り、DataFrameをテーブルに書き戻すために必要。

実際にPythonモデルを作成してみる

def model(dbt, session):
    # Must be either table or incremental (view is not currently supported)
    dbt.config(materialized = "table")

    # DataFrame representing an upstream model
    df = dbt.ref("my_first_dbt_model")

    return df

パッと見た感じ、わかりやすいですね。
dbt.configでテーブルを指定して、dbt.refでSQLモデルで作成したモデルを参照してるって感じでしょうか。
実際に実行してみます。

dbt runの実行結果
root@3f7c4a32a705:/home/dbt/snowpark2dbt# dbt run --select my_first_python_model2
12:06:34  Running with dbt=1.5.2
12:06:34  Registered adapter: snowflake=1.5.2
12:06:34  Found 3 models, 4 tests, 0 snapshots, 0 analyses, 322 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
12:06:34  
12:06:36  Concurrency: 1 threads (target='dev')
12:06:36  
12:06:36  1 of 1 START python table model PUBLIC.my_first_python_model2 .................. [RUN]
12:06:42  1 of 1 OK created python table model PUBLIC.my_first_python_model2 ............. [SUCCESS 1 in 6.01s]
12:06:42  
12:06:42  Finished running 1 table model in 0 hours 0 minutes and 7.85 seconds (7.85s).
12:06:42  
12:06:42  Completed successfully
12:06:42
12:06:42  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

注意事項

  • dbt pythonモデルはJinjaを使用しない
  • Snowflake Snowpark Python ライブラリを明示的にインポートする必要はありません。
  • すべてのdbt Pythonモデルは、model(dbt, session) という名前のメソッドを定義する必要があります。
  • 2022年10月17日以降table、incremental実体化のみがサポートされているため、ここで明示的に構成しました。
  • SQLモデルでのJinjaとまったく同じように、dbt.ref()、dbt.source()を使用できます。また、PythonモデルとSQLモデルを参照できます。

dbt Pythonモデルの仕組みについて

実際先ほど実行したdbt run --select my_first_python_model2コマンドの裏では何が起きているのか??
SnowflakeのQuery Historyで確認して見たところ、ストアドプロシージャに変換されてSnowflake上で実行されていました。

プロシージャの中身を見たい人はこちらから
WITH my_first_python_model2__dbt_sp AS PROCEDURE ()

RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')

HANDLER = 'main'
EXECUTE AS CALLER
AS
$$

import sys
sys._xoptions['snowflake_partner_attribution'].append("dbtLabs_dbtPython")


  
    
def model(dbt, session):
    # Must be either table or incremental (view is not currently supported)
    dbt.config(materialized = "table")

    # DataFrame representing an upstream model
    df = dbt.ref("my_first_dbt_model")
 
    return df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args, **kwargs):
    refs = {"my_first_dbt_model": "SNOWPARK2DBT.PUBLIC.my_first_dbt_model"}
    key = '.'.join(args)
    version = kwargs.get("v") or kwargs.get("version")
    if version:
        key += f".v{version}"
    dbt_load_df_function = kwargs.get("dbt_load_df_function")
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {}
    key = '.'.join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = "SNOWPARK2DBT"
    schema = "PUBLIC"
    identifier = "my_first_python_model2"
    
    def __repr__(self):
        return 'SNOWPARK2DBT.PUBLIC.my_first_python_model2'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args, **kwargs: ref(*args, **kwargs, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = False

# COMMAND ----------

# To run this in snowsight, you need to select entry point to be main
# And you may have to modify the return type to text to get the result back
# def main(session):
#     dbt = dbtObj(session.table)
#     df = model(dbt, session)
#     return df.collect()

# to run this in local notebook, you need to create a session following examples https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-python
# then you can do the following to run model
# dbt = dbtObj(session.table)
# df = model(dbt, session)


def materialize(session, df, target_relation):
    # make sure pandas exists
    import importlib.util
    package_name = 'pandas'
    if importlib.util.find_spec(package_name):
        import pandas
        if isinstance(df, pandas.core.frame.DataFrame):
          session.use_database(target_relation.database)
          session.use_schema(target_relation.schema)
          # session.write_pandas does not have overwrite function
          df = session.createDataFrame(df)
    
    df.write.mode("overwrite").save_as_table('SNOWPARK2DBT.PUBLIC.my_first_python_model2', create_temp_table=False)

def main(session):
    dbt = dbtObj(session.table)
    df = model(dbt, session)
    materialize(session, df, dbt.this)
    return "OK"

  
$$
CALL my_first_python_model2__dbt_sp();

UDFを使用してPythonモデルを作成する

models配下にmy_second_python_model.pyファイルを作成し、以下を貼り付けます。

from snowflake.snowpark.functions import udf

def model(dbt, session):
    # Must be either table or incremental (view is not currently supported)
    dbt.config(materialized = "table")

    # User defined function
    @udf
    def add_one(x: int) -> int:
        x = 0 if not x else x
        return x + 1

    # DataFrame representing an upstream model
    df = dbt.ref("my_first_dbt_model")

    # Add a new column containing the id incremented by one
    df = df.withColumn("id_plus_one", add_one(df["id"]))

    return df
dbt runの実行結果
root@3f7c4a32a705:/home/dbt/snowpark2dbt# dbt run --select my_second_python_model
12:36:46  Running with dbt=1.5.2
12:36:46  Registered adapter: snowflake=1.5.2
12:36:46  Found 4 models, 4 tests, 0 snapshots, 0 analyses, 322 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
12:36:46  
12:36:48  Concurrency: 1 threads (target='dev')
12:36:48  
12:36:48  1 of 1 START python table model PUBLIC.my_second_python_model .................. [RUN]
12:36:56  1 of 1 OK created python table model PUBLIC.my_second_python_model ............. [SUCCESS 1 in 7.99s]
12:36:56  
12:36:56  Finished running 1 table model in 0 hours 0 minutes and 10.30 seconds (10.30s).
12:36:56  
12:36:56  Completed successfully
12:36:56  
12:36:56  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
root@3f7c4a32a705:/home/dbt/snowpark2dbt#

最後にjaffleのデータでデータの結合とか遊んでみました。

def model(dbt,session):
    dbt.config(materialized = "table")

    orders = dbt.ref("raw_orders").select("ID","USER_ID","order_date")
    payments = dbt.ref("raw_payments").select("ID","AMOUNT")
    customers = dbt.ref("raw_customers").select("ID","FIRST_NAME","LAST_NAME")
    df = payments.join(orders, payments.id == orders.id).join(customers, payments.id == customers.id)

    final = df.select(payments.id.alias("ID"),"AMOUNT","ORDER_DATE","FIRST_NAME","LAST_NAME")

    return final


ただのテーブル結合ですけど、SQLではない書き方でやってできると新鮮で楽しですね〜
ちょっとSparkにも興味出てきたかも。。。
次回はもっと複雑な集計ロジックをPythonモデルでやってみたいと思います〜!!

Discussion