Snowflake Python UDFのdbtでの管理と運用
Snowflake Python UDFを作ってdbt上で管理運用しているのでそれについて書きます。
Snowflakeの話
SnowflakeにはPythonでUDFを作成する機能があり、純粋なSQLで記述するとクエリが難解で長大になってしまったり非効率な場合などに便利です。
単純なサンプル。
table_name sample_table
with sample_table as (
select
1 as id,
['ALFA', 'BRAVO', 'CHARLIE', 'DELTA'] as arr
union all
select
2 as id,
['ECHO', 'FOXTROT', 'GOLF', 'HOTEL'] as arr
union all
select
99 as id,
['WHISKEY', 'X-RAY', 'YANKEE', 'ZULU'] as arr
)
id | arr |
---|---|
1 | [ALFA, BRAVO, CHARLIE, DELTA] |
2 | [ECHO, FOXTROT, GOLF, HOTEL] |
... | |
99 | [WHISKEY, X-RAY, YANKEE, ZULU] |
例えば上のテーブルで、idごとにarr列内の配列から文字の長さが最も大きいものだけを取得したい場合、SQL経由ではクエリの様に一旦配列をほぐしてからmax_by
する必要がありますが...
(※実は一発で配列から任意の関数によってfilter出来るよというのがあれば教えて下さい)
(※regexp_substr_all
した後でarrayが返ってきた後などに困っている)
-- 配列を値ごとのレコードにほぐしてからmax_byでlengthの最も大きいものを取得する
with flatten_items as (
select
id,
f.value as val,
length(val) as val_length
from
sample_table as main,
lateral flatten( input => main.arr) as f
)
select
id,
-- https://docs.snowflake.com/ja/sql-reference/functions/max_by
-- |指定された最大値が複数の行に含まれている場合、関数は非決定的です。
max_by(val, val_length) as max_item
from
flatten_items
group by 1
python経由なら単純なlist操作で事足りるので、PythonUDFを作成します。
create function
によって対象のdb.schema
内に関数が作られます。
-- ref: https://docs.snowflake.com/ja/developer-guide/udf-stored-procedure-data-type-mapping#sql-python-data-type-mappings
create or replace function db.schema.get_max_val(items array)
returns varchar
language python
runtime_version = '3.11'
handler = 'get_max_val_py'
as
$$
def get_max_val(items):
filter_length = len(max(items, key=len)
-- 配列はソート済みなので複数候補ある場合は一番初めのものを採用
max_item = list(filter(lambda x: filter_length==len(x), items))[0]
return max_item
$$;
上記をUDFとして登録しておけば、クエリは以下の様になります。
select
*,
db.schema.get_max_val(arr) as max_item
from
sample_table
結果はコレ。
id | arr | max_item |
---|---|---|
1 | [ALFA, BRAVO, CHARLIE, DELTA] | CHARLIE |
2 | [ECHO, FOXTROT, GOLF, HOTEL] | FOXTROT |
... | ||
99 | [WHISKEY, X-RAY, YANKEE, ZULU] | WHISKEY |
短くなって嬉しい(一般的にクエリは短いほど良いとされている)。
出来上がったら、変なところに作ってないかとか権限どうなってるか等はshow functions
で確認するといいです。
dbtとdeployの話
UDFが作れるのは便利で良いのですが。
自身の環境ではdbt-coreを用いてdev/prodの2DBを単方向に管理しているため、クエリ内で使用するUDFもdbt側で環境に合わせて動的に呼び出せるようにしないといけませんし、dev反映時、prod反映時にUDFが対象db.schema
内に自動で作成されるようになっていないといけません。
なのでそうしています。
UDF deploy用のgithub actions
dbt自体のdeploy方法は純粋に開発->dev->prodのステップでgithub actions経由で行っているため、UDF単体のdeploy用actionsも以下パターンに合わせて作成し、自動でdeployされるようにしています。
・開発ブランチからdevにマージ -> devのdeploy
・devからmainにマージ(本番反映) -> prodのdeploy
例えばこんなactionsです。
name: {env}-deploy-snowflake-udfs
on:
pull_request:
branches:
- {dev|main}
paths:
- 'udfs/**'
types:
- closed
jobs:
deploy-udfs:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
container:
image: python:3.10
options: --user root
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get changed files
id: get-changed-files
run: |
changed_files=$(git diff --name-only --diff-filter=d ${{ github.event.pull_request.base.sha }} HEAD -- 'udfs/**.sql')
echo "Changed files: $changed_files"
# https://docs.github.com/ja/actions/using-workflows/workflow-commands-for-github-actions#example-of-a-multiline-string
{
echo 'CHANGED_FILES<<EOF'
echo $changed_files
echo EOF
} >> "$GITHUB_ENV"
- name: Run create function queries
env:
SNOWSQL_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWSQL_USER: ${{ secrets.SNOWFLAKE_USERNAME }}
SNOWSQL_DATABASE: {DATABASE_NAME}
SNOWSQL_ROLE: {ROLE_NAME}
SNOWSQL_WAREHOUSE: {WH_NAME}
SNOWSQL_PWD: ${{ secrets.SNOWFLAKE_PASSWORD }}
run: |
chmod +x ./.github/workflows/scripts/deploy_snowflake_udfs.sh
./.github/workflows/scripts/deploy_snowflake_udfs.sh
やることはごく簡単で、dev/mainそれぞれのブランチにPRがマージされた後
-
steps: Get changed files
で差分ファイルを確認、udfs下.sqlファイルがある場合はファイル名取得 - 取得した.sqlファイルを
steps: Run create function queries
に渡して.sh実行でdeploy
実際に実行しているのはこういう.shで、snowsqlをファイル渡して実行しているだけです。
#!/bin/bash
# get snowsql
apt update
apt-get install -y curl unzip
curl -O https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.30-linux_x86_64.bash
SNOWSQL_DEST=/root/bin SNOWSQL_LOGIN_SHELL=/root/.profile bash ./snowsql-1.2.30-linux_x86_64.bash
for file_name in $CHANGED_FILES; do
echo "file: ${file_name}"
/root/bin/snowsql -f "${file_name}" -o exit_on_error=True || { echo "Deploy failed"; exit 1; }
done
get_max_val.sql
とか)を配置しています。
なお、開発中はdevDB内にブランチ名のprefixが付いたスキーマが作成されるようになっているので、各自そのスキーマ内にて(手動でcreate functionを叩いたりファイルベースで同じようにlocalのsnowsql走らせたり)作って壊してをするようになっています。
別途、開発ブランチのマージ後にprefixを参照して対象のスキーマを自動的に削除するactionsがあるため、スキーマ内に閉じていると取りこぼしが無くて便利です。
dbtでのUDFを呼び出すsources.yml
環境ごとにUDFを作成出来たらあとはそれをdbt内で読み込めるようにしないといけません。
前提として、Snowflake UDFの呼び出しの構文はdb.schema.function_name(args)
となっているので、dbt側で呼び出そうとした場合にdb.schema
の部分を動的にしないといけなくてしんどいという問題があります。
が、つまるところcompile後のqueryが上記d.s.f
の形になっていればいいというだけで、それっていつもやってる{{ source('source_name', 'table_name') }}
と同じですね。
なのでテーブル管理している場合と同様にsources.ymlに以下で記載しています。
version: 2
sources:
- name: snowflake_udf
schema: schema_name
tables:
- name: snowflake_udf_1
- name: snowflake_udf_2
これにより例えば先ほど実行したudf付きのクエリは以下の様になり、試しにdbt compileなどすれば確かにd.s.f
の形になっているのが分かります。
select
*,
{{ source('snowflake_udf', 'get_max_val') }}(arr) as max_item
from
sample_table
functionsなのにtables:でいけちゃうんだ...と思い、これが面白いと思って書きました。それが面白いと思って書きたかっただけなのにかなり長くなった。
しかし、何らかの問題を起こしそうな気はする(dbt側でsourcesのリソースを全さらいしようとした場合にtableだけでなくshow functions等まで見ないといけないのはしんどそう)ため、他にうまいやり方があったら教えて欲しいです。
以上。
Discussion