❄️

Snowflake Python UDFのdbtでの管理と運用

2024/10/03に公開

Snowflake Python UDFを作ってdbt上で管理運用しているのでそれについて書きます。

Snowflakeの話

SnowflakeにはPythonでUDFを作成する機能があり、純粋なSQLで記述するとクエリが難解で長大になってしまったり非効率な場合などに便利です。
https://docs.snowflake.com/ja/developer-guide/udf/python/udf-python-introduction

単純なサンプル。

table_name sample_table
with sample_table as (
select
    1 as id,
    ['ALFA', 'BRAVO', 'CHARLIE', 'DELTA'] as arr
union all
select
    2 as id,
    ['ECHO', 'FOXTROT', 'GOLF', 'HOTEL'] as arr
union all
select
    99 as id,
    ['WHISKEY', 'X-RAY', 'YANKEE', 'ZULU'] as arr
)
id arr
1 [ALFA, BRAVO, CHARLIE, DELTA]
2 [ECHO, FOXTROT, GOLF, HOTEL]
...
99 [WHISKEY, X-RAY, YANKEE, ZULU]

例えば上のテーブルで、idごとにarr列内の配列から文字の長さが最も大きいものだけを取得したい場合、SQL経由ではクエリの様に一旦配列をほぐしてからmax_byする必要がありますが...
(※実は一発で配列から任意の関数によってfilter出来るよというのがあれば教えて下さい)
(※regexp_substr_allした後でarrayが返ってきた後などに困っている)

-- 配列を値ごとのレコードにほぐしてからmax_byでlengthの最も大きいものを取得する
with flatten_items as (
    select
        id,
        f.value as val,
        length(val) as val_length
    from
        sample_table as main,
        lateral flatten( input => main.arr) as f
)
select
    id,
    -- https://docs.snowflake.com/ja/sql-reference/functions/max_by
    -- |指定された最大値が複数の行に含まれている場合、関数は非決定的です。
    max_by(val, val_length) as max_item
from
    flatten_items
group by 1

python経由なら単純なlist操作で事足りるので、PythonUDFを作成します。
create functionによって対象のdb.schema内に関数が作られます。

get_max_val.sql
-- ref: https://docs.snowflake.com/ja/developer-guide/udf-stored-procedure-data-type-mapping#sql-python-data-type-mappings
create or replace function db.schema.get_max_val(items array)
returns varchar
language python
runtime_version = '3.11'
handler = 'get_max_val_py'
as
$$
def get_max_val(items):
    filter_length = len(max(items, key=len)
    -- 配列はソート済みなので複数候補ある場合は一番初めのものを採用
    max_item = list(filter(lambda x: filter_length==len(x), items))[0]

    return max_item
$$;

上記をUDFとして登録しておけば、クエリは以下の様になります。

select
    *,
    db.schema.get_max_val(arr) as max_item
from
    sample_table

結果はコレ。

id arr max_item
1 [ALFA, BRAVO, CHARLIE, DELTA] CHARLIE
2 [ECHO, FOXTROT, GOLF, HOTEL] FOXTROT
...
99 [WHISKEY, X-RAY, YANKEE, ZULU] WHISKEY

短くなって嬉しい(一般的にクエリは短いほど良いとされている)。

出来上がったら、変なところに作ってないかとか権限どうなってるか等はshow functionsで確認するといいです。
https://docs.snowflake.com/ja/sql-reference/sql/show-functions

dbtとdeployの話

UDFが作れるのは便利で良いのですが。

自身の環境ではdbt-coreを用いてdev/prodの2DBを単方向に管理しているため、クエリ内で使用するUDFもdbt側で環境に合わせて動的に呼び出せるようにしないといけませんし、dev反映時、prod反映時にUDFが対象db.schema内に自動で作成されるようになっていないといけません。

なのでそうしています。

UDF deploy用のgithub actions

dbt自体のdeploy方法は純粋に開発->dev->prodのステップでgithub actions経由で行っているため、UDF単体のdeploy用actionsも以下パターンに合わせて作成し、自動でdeployされるようにしています。
・開発ブランチからdevにマージ -> devのdeploy
・devからmainにマージ(本番反映) -> prodのdeploy

例えばこんなactionsです。

name: {env}-deploy-snowflake-udfs

on:
  pull_request:
    branches:
      - {dev|main}
    paths:
      - 'udfs/**'
    types:
      - closed

jobs:
  deploy-udfs:
    if: github.event.pull_request.merged == true
    runs-on: ubuntu-latest
    container:
      image: python:3.10
      options: --user root
    steps:
      - name: Checkout repository
        uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - name: Get changed files
        id: get-changed-files
        run: |
          changed_files=$(git diff --name-only --diff-filter=d ${{ github.event.pull_request.base.sha }} HEAD -- 'udfs/**.sql')
          echo "Changed files: $changed_files"
          # https://docs.github.com/ja/actions/using-workflows/workflow-commands-for-github-actions#example-of-a-multiline-string
          {
            echo 'CHANGED_FILES<<EOF'
            echo $changed_files
            echo EOF
          } >> "$GITHUB_ENV"

      - name: Run create function queries
        env:
          SNOWSQL_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWSQL_USER: ${{ secrets.SNOWFLAKE_USERNAME }}
          SNOWSQL_DATABASE: {DATABASE_NAME}
          SNOWSQL_ROLE: {ROLE_NAME}
          SNOWSQL_WAREHOUSE: {WH_NAME}
          SNOWSQL_PWD: ${{ secrets.SNOWFLAKE_PASSWORD }}
        run: |
          chmod +x ./.github/workflows/scripts/deploy_snowflake_udfs.sh
          ./.github/workflows/scripts/deploy_snowflake_udfs.sh

やることはごく簡単で、dev/mainそれぞれのブランチにPRがマージされた後

  • steps: Get changed filesで差分ファイルを確認、udfs下.sqlファイルがある場合はファイル名取得
  • 取得した.sqlファイルをsteps: Run create function queriesに渡して.sh実行でdeploy

実際に実行しているのはこういう.shで、snowsqlをファイル渡して実行しているだけです。

#!/bin/bash
# get snowsql
apt update
apt-get install -y curl unzip

curl -O https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.30-linux_x86_64.bash
SNOWSQL_DEST=/root/bin SNOWSQL_LOGIN_SHELL=/root/.profile bash ./snowsql-1.2.30-linux_x86_64.bash

for file_name in $CHANGED_FILES; do
  echo "file: ${file_name}"
  /root/bin/snowsql -f "${file_name}" -o exit_on_error=True || { echo "Deploy failed"; exit 1; }

done

https://docs.getdbt.com/best-practices/how-we-structure/1-guide-overview#guide-structure-overview
差分ファイルの取得については、dbtのproject内dir構成はだいたい上記の様になっているかと思うので、ここにudfs/を追加し、その中にudf作成用クエリがそのまま記載された.sqlファイル(上で作ったget_max_val.sqlとか)を配置しています。

なお、開発中はdevDB内にブランチ名のprefixが付いたスキーマが作成されるようになっているので、各自そのスキーマ内にて(手動でcreate functionを叩いたりファイルベースで同じようにlocalのsnowsql走らせたり)作って壊してをするようになっています。
別途、開発ブランチのマージ後にprefixを参照して対象のスキーマを自動的に削除するactionsがあるため、スキーマ内に閉じていると取りこぼしが無くて便利です。


dbtでのUDFを呼び出すsources.yml

環境ごとにUDFを作成出来たらあとはそれをdbt内で読み込めるようにしないといけません。

前提として、Snowflake UDFの呼び出しの構文はdb.schema.function_name(args)となっているので、dbt側で呼び出そうとした場合にdb.schemaの部分を動的にしないといけなくてしんどいという問題があります。

が、つまるところcompile後のqueryが上記d.s.fの形になっていればいいというだけで、それっていつもやってる{{ source('source_name', 'table_name') }}と同じですね。

なのでテーブル管理している場合と同様にsources.ymlに以下で記載しています。

version: 2

sources:
  - name: snowflake_udf
    schema: schema_name
    tables:
      - name: snowflake_udf_1
      - name: snowflake_udf_2

これにより例えば先ほど実行したudf付きのクエリは以下の様になり、試しにdbt compileなどすれば確かにd.s.fの形になっているのが分かります。

select
    *,
    {{ source('snowflake_udf', 'get_max_val') }}(arr) as max_item
from
    sample_table

functionsなのにtables:でいけちゃうんだ...と思い、これが面白いと思って書きました。それが面白いと思って書きたかっただけなのにかなり長くなった。
しかし、何らかの問題を起こしそうな気はする(dbt側でsourcesのリソースを全さらいしようとした場合にtableだけでなくshow functions等まで見ないといけないのはしんどそう)ため、他にうまいやり方があったら教えて欲しいです。

以上。

Discussion