🔧

Terraform で作る Snowflake UDF / UDTF

こんにちは!シンプルフォームの山岸です。

Snowflake 上でのデータ基盤構築を進めていく中で、当社でもユーザー定義関数 (UDF) を利用したいユースケースが出てきました。今回は Snowflake の UDF / UDTF を Terraform で管理する方法ついて検討してみたので、その内容についてご紹介できればと思います。

Snowflake UDF / UDTF とは?

本題に入る前に、Snowflake 関数と UDF / UDTF について簡単に触れておきたいと思います。

Snowflake 関数

Snowflake 関数 [1] は、データの処理・変換・操作 等のために Snowflake 環境内で利用可能な関数群です。組み込みで多くの Snowflake 関数を利用できますが、ドキュメントに記載の通り、いくつかの種類が存在します。

  • スカラー関数 ... スカラー関数は、呼び出しごとに1つの値を返す関数です。ほとんどの場合、これは行ごとに1つの値を返すと考えることができます。
  • 集計関数 ... 集計関数は、行全体の値を操作して、合計、平均、カウント、最小/最大値、標準偏差、推定などの数学的計算、および非数学的な演算を実行します。
  • ウィンドウ関数 ... ウィンドウ関数は、関連する行のグループ(「ウィンドウ」)で動作します。ウィンドウ関数が呼び出されるたびに、行(ウィンドウ内の現在の行)と現在の行を含む行のウィンドウが渡されます。ウィンドウ関数は、入力行ごとに1つの出力行を返します。出力は、関数に渡される個々の行と、関数に渡されるウィンドウ内の他の行の値に依存します。
  • テーブル関数 ... テーブル関数は、入力行ごとに行のセットを返します。返されるセットには、0行、1行、またはそれ以上の行を含めることができます。各行には、1つ以上の列を含めることができます。
  • システム関数 ... Snowflakeは、次のタイプのシステム関数を提供します。
    • システムでアクションを実行できるようにする制御関数(例: クエリの中止)
    • システムに関する情報を返す情報関数(例: テーブルのクラスタリングの深さの計算)
    • クエリに関する情報を返す情報関数(例: EXPLAIN プランに関する情報)

UDF / UDTF

前述の通り、多くの Snowflake 関数を組み込みで利用可能ですが、ユーザー定義関数 (UDF: User Defined Function) を使用すると、ユーザーが独自の処理ロジックを実装できます。

また Snowflake では、ユーザー定義テーブル関数 (UDTF: User Defined Table Function) という、戻り値型がテーブル形式であるような関数を実装することもできます。

UDF, UDTF がサポートする言語はそれぞれ以下の通りです。[2] (2024/9/8 現在)

実装

それでは本題の実装について、以降で説明したいと思います。
前述の通り Snowflake の UDF / UDTF はいくつかの言語に対応していますが、本記事では Python で実装されたシンプルな加算 (add) の関数を扱ってみたいと思います。

ディレクトリ構成

まず、Terraform 関連コードを管理するディレクトリ構成について触れたいと思います。

  • modules/ 側に Terraform モジュールを実装しています。当社の場合、原則 Snowflake アカウント用途毎・リソースタイプ毎にディレクトリを分けることにしているため、後述の図もそのような形になっています。
  • Snowflake 関数はスキーマレベルオブジェクトであるため、function/ の配下はデータベース名・スキーマ名でディレクトリを分けて管理しています。

modules 側

modules 側には、Terraform コードの他に、関数処理ロジックを記述したハンドラースクリプト、および単体テストコードを一緒に管理しています。(別の場所にスクリプトを集約して、そのパスをモジュールから参照する形でも良いかもしれません)

envs 側

envs 側には、modules 側で定義した Terraform モジュールを呼び出すためのコードを配置します。Terragrunt を使用している場合、terragrunt.hcl ファイルを配置します。

Terraform モジュール実装

modules 側に配置する Terraform コードは、snowflake_function のリソースタイプを使用して、例えば以下のように実装できます。(関数処理の実装については "モジュール呼び出し" のセクションで解説します。outputs.tf は割愛します)
こちらは UDF / UDTF のどちらにも対応しており、複数の関数を管理できます。

variables.tf
variables.tf
variable "database_name" {
  type = string
}
variable "schema_name" {
  type = string
}

variable "function_configs" {
  type = map(object({
    runtime_version = optional(string, "3.11")
    handler         = string
    return_type     = string
    template_vars   = optional(map(string), {})
    arguments = list(object({
      name = string
      type = string
    }))
    packages            = optional(list(string), [])
    is_secure           = optional(bool, false)
    null_input_behavior = optional(string, "CALLED ON NULL INPUT")
    return_behavior     = optional(string, "VOLATILE")
  }))
}
main.tf
main.tf
locals {
  database_name = upper(var.database_name)
  schema_name   = upper(var.schema_name)
}

resource "snowflake_function" "default" {
  for_each = var.function_configs

  name     = upper(each.key)
  language = "python"
  comment  = "Python UDF for '${each.key}'"

  database = local.database_name  # test_db
  schema   = local.schema_name    # test_schema

  dynamic "arguments" {
    for_each = each.value.arguments
    content {
      type = arguments.value.type
      name = arguments.value.name
    }
  }

  runtime_version = each.value.runtime_version
  handler         = each.value.handler
  return_type     = each.value.return_type

  statement = templatefile(
    "statements/src/${each.key}.py", 
    each.value.template_vars
  )

  is_secure           = each.value.is_secure
  null_input_behavior = each.value.null_input_behavior
  return_behavior     = each.value.return_behavior
}

プロパティとその設定方法について、少し補足します。

  • arguments ... 関数実行の引数となるカラム名・型を指定します。複数設定されうるため、Dynamic ブロックで実装しています。
  • statement ... 関数処理が記述されたハンドラースクリプトを設定します。ヒアドキュメントを使用して Terraform コード内にハードコードすることもできますが、保守性の観点から外部ファイルに切り出しています。file() 関数の引数にファイルパスを指定するか、もしくは templatefile() 関数を使用すると第二引数に指定した map オブジェクトの値をスクリプト内に埋め込むことができます。
  • return_type ... 戻り値型を設定します。UDF の場合は、VARCHARNUMBER(38,0) のような Data Type を指定します。UDTF の場合は、TABLE(VARCHAR) のように TABLE 関数を適用した型を指定します。

動作確認用テーブルの作成

UDF / UDTF 動作確認用のテーブルを作成し、適当なサンプルデータを投入します。
1~100 の整数を持つ NUM_COL 列 (NUMBER 型) に加算の関数を適用していきます。

モジュール呼び出し - UDF の場合

ハンドラースクリプト

UDF 用のハンドラースクリプトとして、以下のようなものを用意します。引数として受け取った 2 つの整数と、Terraform から設定した offset の値を加算するだけのシンプルな関数です。

[statements/src/] add_udf.py
def handler(v: int, x: int) -> int:
    offset = int("${offset}")
    return v + x + offset

terragrunt.hcl

モジュールを呼び出すための terragrunt.hcl は以下のようになります。handler には、スクリプトで実装したハンドラー関数名を指定します。また、ハンドラースクリプトのテンプレートに含まれる offset の値を template_vars で指定しています。

terragrunt.hcl
terragrunt.hcl
include "root" {
  path = find_in_parent_folders()
}

locals {
  common_vars  = yamldecode(file(find_in_parent_folders("common_vars.yaml")))
  account_vars = yamldecode(file(find_in_parent_folders("account_vars.yaml")))

  aws_account_id = local.account_vars.backend_aws.account_id
  account        = local.account_vars.account

  database_name = "test_db"
  schema_name   = "test_schema"

  language = "python"
  usage    = "default"
}

terraform {
  source = "${dirname(find_in_parent_folders())}/modules/${local.account.type}/function/${local.database_name}/${local.schema_name}//${local.language}/${local.usage}/"
}

inputs = {
  database_name = local.database_name
  schema_name   = local.schema_name

  function_configs = {
    add_udf = {
      handler = "handler"
      arguments = [
        { type = "NUMBER", name = "V" },
        { type = "NUMBER", name = "X" },
      ]
      return_type   = "NUMBER(38,0)"
      template_vars = { offset = 10 }
    }
  }
}

リソース作成 (Apply)

Terragrunt (または Terraform) で Apply を実行します。

Apply 実行結果
% export SNOWFLAKE_WAREHOUSE=DEFAULT_WH_XSMALL

% terragrunt apply
# ... 略 ...

Terraform will perform the following actions:

  # snowflake_function.default["add_udf"] will be created
  + resource "snowflake_function" "default" {
      + comment             = "Python UDF for 'add_udf'"
      + database            = "TEST_DB"
      + handler             = "handler"
      + id                  = (known after apply)
      + is_secure           = false
      + language            = "python"
      + name                = "ADD_UDF"
      + null_input_behavior = "CALLED ON NULL INPUT"
      + return_behavior     = "VOLATILE"
      + return_type         = "NUMBER(38,0)"
      + runtime_version     = "3.11"
      + schema              = "TEST_SCHEMA"
      + statement           = <<-EOT
            def handler(v: int, x: int) -> int:
                offset = int("10")
                return v + x + offset
        EOT

      + arguments {
          + name = "V"
          + type = "NUMBER"
        }
      + arguments {
          + name = "X"
          + type = "NUMBER"
        }
    }

Plan: 1 to add, 0 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: [yes]

snowflake_function.default["add_udf"]: Creating...
snowflake_function.default["add_udf"]: Creation complete after 3s [id="TEST_DB"."TEST_SCHEMA"."ADD_UDF"(NUMBER, NUMBER)]

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

動作確認

動作確認として、作成した UDF を呼び出してみます。

test_udf.sql
-- Test UDF
SELECT 
    id, 
    num_col, 
    add_udf(num_col, 5) as result_col  -- UDF 呼び出し
FROM
    demo_pyfunc
;

[ offset = 10, x = 5 ] の条件で実行しているため、実行結果も NUM_COL カラムの値に 15 足された計算結果が得られていることを確認できます。

モジュール呼び出し - UDTF の場合

ハンドラースクリプト

UTDF 用のハンドラースクリプトとして、以下のようなものを用意します。処理内容は先に示した UDF のものと同様ですが、UDTF の場合は process() メソッドを持つクラスを定義する必要があります。

[statements/src/] add_udtf.py
class AddHandler:
    def __init__(self):
        self._offset = int("${offset}")

    def process(self, v: int, x: int):
        r = v + x + self._offset
        yield (r,)

terragrunt.hcl

モジュールを呼び出すための terragrunt.hcl は以下のようになります。handler には、スクリプトで実装したハンドラー用のクラス名を指定します。

terragrunt.hcl
terragrunt.hcl
# ... 略
inputs = {
  database_name = local.database_name
  schema_name   = local.schema_name

  function_configs = {
    add_udf = {
      # ... 略
    }
    add_udtf = {
      arguments = [
        { type = "NUMBER", name = "V" },
        { type = "NUMBER", name = "X" },
      ]
      handler       = "AddHandler"
      return_type   = "TABLE (R NUMBER)"
      template_vars = { offset = 20 }
    }
  }
}

Apply 実行結果は割愛します。

動作確認

動作確認として、作成した UDTF を呼び出してみます。

-- Test UDTF
SELECT 
    id, 
    num_col, 
    res.r
FROM
    demo_pyfunc, 
    TABLE(add_udtf(num_col, 3)) AS res  -- UDTF 呼び出し
;

[ offset = 20, x = 3 ] の条件で実行しているため、実行結果も NUM_COL カラムの値に 23 足された計算結果が得られていることを確認できます。

実装の説明は以上です。

さいごに

Snowflake UDF / UDTF を Terraform で実装する方法について書いてみました。
当社ではまだ実例がありませんが、ストアドプロシージャ (Stored Procedure) の実装などにも応用できるのかなと考えています。

今回は Terraform モジュールの実装方法に焦点を当ててみましたが、UDF / UDTF を使用した実際のユースケースなどに関しても別の機会に発信できたらと思います。

最後まで読んで頂き、ありがとうございました。

脚注
  1. 関数の概要 - Snowflake Documentation ↩︎

  2. ユーザー定義関数の概要 - Snowflake Documentation ↩︎

Snowflake Data Heroes

Discussion