Terraform で作る Snowflake UDF / UDTF
こんにちは!シンプルフォームの山岸です。
Snowflake 上でのデータ基盤構築を進めていく中で、当社でもユーザー定義関数 (UDF) を利用したいユースケースが出てきました。今回は Snowflake の UDF / UDTF を Terraform で管理する方法ついて検討してみたので、その内容についてご紹介できればと思います。
Snowflake UDF / UDTF とは?
本題に入る前に、Snowflake 関数と UDF / UDTF について簡単に触れておきたいと思います。
Snowflake 関数
Snowflake 関数 [1] は、データの処理・変換・操作 等のために Snowflake 環境内で利用可能な関数群です。組み込みで多くの Snowflake 関数を利用できますが、ドキュメントに記載の通り、いくつかの種類が存在します。
- スカラー関数 ... スカラー関数は、呼び出しごとに1つの値を返す関数です。ほとんどの場合、これは行ごとに1つの値を返すと考えることができます。
- 集計関数 ... 集計関数は、行全体の値を操作して、合計、平均、カウント、最小/最大値、標準偏差、推定などの数学的計算、および非数学的な演算を実行します。
- ウィンドウ関数 ... ウィンドウ関数は、関連する行のグループ(「ウィンドウ」)で動作します。ウィンドウ関数が呼び出されるたびに、行(ウィンドウ内の現在の行)と現在の行を含む行のウィンドウが渡されます。ウィンドウ関数は、入力行ごとに1つの出力行を返します。出力は、関数に渡される個々の行と、関数に渡されるウィンドウ内の他の行の値に依存します。
- テーブル関数 ... テーブル関数は、入力行ごとに行のセットを返します。返されるセットには、0行、1行、またはそれ以上の行を含めることができます。各行には、1つ以上の列を含めることができます。
- システム関数 ... Snowflakeは、次のタイプのシステム関数を提供します。
- システムでアクションを実行できるようにする制御関数(例: クエリの中止)
- システムに関する情報を返す情報関数(例: テーブルのクラスタリングの深さの計算)
- クエリに関する情報を返す情報関数(例: EXPLAIN プランに関する情報)
UDF / UDTF
前述の通り、多くの Snowflake 関数を組み込みで利用可能ですが、ユーザー定義関数 (UDF: User Defined Function) を使用すると、ユーザーが独自の処理ロジックを実装できます。
また Snowflake では、ユーザー定義テーブル関数 (UDTF: User Defined Table Function) という、戻り値型がテーブル形式であるような関数を実装することもできます。
UDF, UDTF がサポートする言語はそれぞれ以下の通りです。[2] (2024/9/8 現在)
実装
それでは本題の実装について、以降で説明したいと思います。
前述の通り Snowflake の UDF / UDTF はいくつかの言語に対応していますが、本記事では Python で実装されたシンプルな加算 (add) の関数を扱ってみたいと思います。
ディレクトリ構成
まず、Terraform 関連コードを管理するディレクトリ構成について触れたいと思います。
-
modules/
側に Terraform モジュールを実装しています。当社の場合、原則 Snowflake アカウント用途毎・リソースタイプ毎にディレクトリを分けることにしているため、後述の図もそのような形になっています。 - Snowflake 関数はスキーマレベルオブジェクトであるため、
function/
の配下はデータベース名・スキーマ名でディレクトリを分けて管理しています。
modules 側
modules 側には、Terraform コードの他に、関数処理ロジックを記述したハンドラースクリプト、および単体テストコードを一緒に管理しています。(別の場所にスクリプトを集約して、そのパスをモジュールから参照する形でも良いかもしれません)
envs 側
envs 側には、modules 側で定義した Terraform モジュールを呼び出すためのコードを配置します。Terragrunt を使用している場合、terragrunt.hcl
ファイルを配置します。
Terraform モジュール実装
modules 側に配置する Terraform コードは、snowflake_function のリソースタイプを使用して、例えば以下のように実装できます。(関数処理の実装については "モジュール呼び出し" のセクションで解説します。outputs.tf
は割愛します)
こちらは UDF / UDTF のどちらにも対応しており、複数の関数を管理できます。
variables.tf
variable "database_name" {
type = string
}
variable "schema_name" {
type = string
}
variable "function_configs" {
type = map(object({
runtime_version = optional(string, "3.11")
handler = string
return_type = string
template_vars = optional(map(string), {})
arguments = list(object({
name = string
type = string
}))
packages = optional(list(string), [])
is_secure = optional(bool, false)
null_input_behavior = optional(string, "CALLED ON NULL INPUT")
return_behavior = optional(string, "VOLATILE")
}))
}
main.tf
locals {
database_name = upper(var.database_name)
schema_name = upper(var.schema_name)
}
resource "snowflake_function" "default" {
for_each = var.function_configs
name = upper(each.key)
language = "python"
comment = "Python UDF for '${each.key}'"
database = local.database_name # test_db
schema = local.schema_name # test_schema
dynamic "arguments" {
for_each = each.value.arguments
content {
type = arguments.value.type
name = arguments.value.name
}
}
runtime_version = each.value.runtime_version
handler = each.value.handler
return_type = each.value.return_type
statement = templatefile(
"statements/src/${each.key}.py",
each.value.template_vars
)
is_secure = each.value.is_secure
null_input_behavior = each.value.null_input_behavior
return_behavior = each.value.return_behavior
}
プロパティとその設定方法について、少し補足します。
- arguments ... 関数実行の引数となるカラム名・型を指定します。複数設定されうるため、Dynamic ブロックで実装しています。
- statement ... 関数処理が記述されたハンドラースクリプトを設定します。ヒアドキュメントを使用して Terraform コード内にハードコードすることもできますが、保守性の観点から外部ファイルに切り出しています。file() 関数の引数にファイルパスを指定するか、もしくは templatefile() 関数を使用すると第二引数に指定した map オブジェクトの値をスクリプト内に埋め込むことができます。
-
return_type ... 戻り値型を設定します。UDF の場合は、
VARCHAR
やNUMBER(38,0)
のような Data Type を指定します。UDTF の場合は、TABLE(VARCHAR)
のように TABLE 関数を適用した型を指定します。
動作確認用テーブルの作成
UDF / UDTF 動作確認用のテーブルを作成し、適当なサンプルデータを投入します。
1~100 の整数を持つ NUM_COL
列 (NUMBER 型) に加算の関数を適用していきます。
モジュール呼び出し - UDF の場合
ハンドラースクリプト
UDF 用のハンドラースクリプトとして、以下のようなものを用意します。引数として受け取った 2 つの整数と、Terraform から設定した offset
の値を加算するだけのシンプルな関数です。
def handler(v: int, x: int) -> int:
offset = int("${offset}")
return v + x + offset
terragrunt.hcl
モジュールを呼び出すための terragrunt.hcl
は以下のようになります。handler
には、スクリプトで実装したハンドラー関数名を指定します。また、ハンドラースクリプトのテンプレートに含まれる offset
の値を template_vars
で指定しています。
terragrunt.hcl
include "root" {
path = find_in_parent_folders()
}
locals {
common_vars = yamldecode(file(find_in_parent_folders("common_vars.yaml")))
account_vars = yamldecode(file(find_in_parent_folders("account_vars.yaml")))
aws_account_id = local.account_vars.backend_aws.account_id
account = local.account_vars.account
database_name = "test_db"
schema_name = "test_schema"
language = "python"
usage = "default"
}
terraform {
source = "${dirname(find_in_parent_folders())}/modules/${local.account.type}/function/${local.database_name}/${local.schema_name}//${local.language}/${local.usage}/"
}
inputs = {
database_name = local.database_name
schema_name = local.schema_name
function_configs = {
add_udf = {
handler = "handler"
arguments = [
{ type = "NUMBER", name = "V" },
{ type = "NUMBER", name = "X" },
]
return_type = "NUMBER(38,0)"
template_vars = { offset = 10 }
}
}
}
リソース作成 (Apply)
Terragrunt (または Terraform) で Apply を実行します。
Apply 実行結果
% export SNOWFLAKE_WAREHOUSE=DEFAULT_WH_XSMALL
% terragrunt apply
# ... 略 ...
Terraform will perform the following actions:
# snowflake_function.default["add_udf"] will be created
+ resource "snowflake_function" "default" {
+ comment = "Python UDF for 'add_udf'"
+ database = "TEST_DB"
+ handler = "handler"
+ id = (known after apply)
+ is_secure = false
+ language = "python"
+ name = "ADD_UDF"
+ null_input_behavior = "CALLED ON NULL INPUT"
+ return_behavior = "VOLATILE"
+ return_type = "NUMBER(38,0)"
+ runtime_version = "3.11"
+ schema = "TEST_SCHEMA"
+ statement = <<-EOT
def handler(v: int, x: int) -> int:
offset = int("10")
return v + x + offset
EOT
+ arguments {
+ name = "V"
+ type = "NUMBER"
}
+ arguments {
+ name = "X"
+ type = "NUMBER"
}
}
Plan: 1 to add, 0 to change, 0 to destroy.
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value: [yes]
snowflake_function.default["add_udf"]: Creating...
snowflake_function.default["add_udf"]: Creation complete after 3s [id="TEST_DB"."TEST_SCHEMA"."ADD_UDF"(NUMBER, NUMBER)]
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.
動作確認
動作確認として、作成した UDF を呼び出してみます。
-- Test UDF
SELECT
id,
num_col,
add_udf(num_col, 5) as result_col -- UDF 呼び出し
FROM
demo_pyfunc
;
[ offset
= 10, x
= 5 ] の条件で実行しているため、実行結果も NUM_COL
カラムの値に 15 足された計算結果が得られていることを確認できます。
モジュール呼び出し - UDTF の場合
ハンドラースクリプト
UTDF 用のハンドラースクリプトとして、以下のようなものを用意します。処理内容は先に示した UDF のものと同様ですが、UDTF の場合は process()
メソッドを持つクラスを定義する必要があります。
class AddHandler:
def __init__(self):
self._offset = int("${offset}")
def process(self, v: int, x: int):
r = v + x + self._offset
yield (r,)
terragrunt.hcl
モジュールを呼び出すための terragrunt.hcl
は以下のようになります。handler
には、スクリプトで実装したハンドラー用のクラス名を指定します。
terragrunt.hcl
# ... 略
inputs = {
database_name = local.database_name
schema_name = local.schema_name
function_configs = {
add_udf = {
# ... 略
}
add_udtf = {
arguments = [
{ type = "NUMBER", name = "V" },
{ type = "NUMBER", name = "X" },
]
handler = "AddHandler"
return_type = "TABLE (R NUMBER)"
template_vars = { offset = 20 }
}
}
}
Apply 実行結果は割愛します。
動作確認
動作確認として、作成した UDTF を呼び出してみます。
-- Test UDTF
SELECT
id,
num_col,
res.r
FROM
demo_pyfunc,
TABLE(add_udtf(num_col, 3)) AS res -- UDTF 呼び出し
;
[ offset
= 20, x
= 3 ] の条件で実行しているため、実行結果も NUM_COL
カラムの値に 23 足された計算結果が得られていることを確認できます。
実装の説明は以上です。
さいごに
Snowflake UDF / UDTF を Terraform で実装する方法について書いてみました。
当社ではまだ実例がありませんが、ストアドプロシージャ (Stored Procedure) の実装などにも応用できるのかなと考えています。
今回は Terraform モジュールの実装方法に焦点を当ててみましたが、UDF / UDTF を使用した実際のユースケースなどに関しても別の機会に発信できたらと思います。
最後まで読んで頂き、ありがとうございました。
-
ユーザー定義関数の概要 - Snowflake Documentation ↩︎
Snowlfake データクラウドのユーザ会 SnowVillage のメンバーで運営しています。 Publication参加方法はこちらをご参照ください。 zenn.dev/dataheroes/articles/db5da0959b4bdd
Discussion