❄️

Snowflakeのクエリをテストする

2021/12/19に公開

この記事はSnowflake Advent Calendar 2021の15日目の記事です(遅れてすみません)。

はじめに

大規模なデータを取り扱うときは、Snowflakeに一旦取り込んで処理を行う所謂ELTの形でデータ処理を行うことが近年増えているかと思います。

今回はsnowflake-python-connectorを使って処理を行うためのSQLをテストしてみます。

前提条件

ある売上テーブルSALESから集計を行ってサマリテーブルSALES_SUMMARYにinsertを行うことを考えます。クエリは以下のようになります。このクエリをquery.sqlのような形で置いておきます。

query.sql
insert into sales_summary (date, amount)
select
  date,
  sum(amount) as amount
from
(
  select
    id,
    amount,
    to_date(timestamp) as date
  from
    sales
) as date
group by
  date;

こちらのSQLをテストすることを考えてみます。
SALES, SALES_SUMMARYのDDLは以下になります。

create table sales
(
    id integer,
    amount integer,
    timestamp timestamp
);

create table sales_summary
(
    date date,
    amount integer
);

ユニットテストを行う

テストケースを準備

テストケースはcsv等で準備するとgitで管理しやすいかと思います。
今回は以下のようなテストケースを準備します。

test_sales.csv
ID,AMOUNT,TIMESTAMP
1,100,2021-01-01 0:00:00
1,100,2021-01-01 1:00:00
1,100,2021-01-02 0:00:00
2,100,2021-01-01 0:00:00

これらを集計すると以下のような期待結果が得られると考えられます。期待結果も同様にcsvにしておきます。

expect_summary_sales.csv
DATE,AMOUNT
2021-01-01,300
2021-01-02,100

テスト実行

snowflakeのpythonコネクタではselect文の結果をpandas.DataFrameに入れることができます。これを利用して、以下のようなテストコードでユニットテストを行うことができます。別途テスト用のDBやschemaに接続してテーブルが空の状態でテストを行うようにしましょう。

test_sql.py
import snowflake.connector
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
from pandas._testing import assert_frame_equal
import unittest

class TestQuery(unittest.TestCase):
    
    @classmethod
    def setUpClass(self):

        self.con = snowflake.connector.connect(
            user='...',
            password='...',
            account='...',
            warehouse='...',
            database='...',
            schema='...',
            role='...'
        )

        self.cur = self.con.cursor()

    def test_query(self):

        with open('query.sql') as f:
            sql = f.read()

        # write snowflake
        test_df = pd.read_csv('test_sales.csv')
        write_pandas(self.con, test_df, 'SALES')

        # query execute
        self.cur.execute(sql)

        # fetch
        self.cur.execute("select * from sales_summary;")
        result_df = self.cur.fetch_pandas_all()

        # truncate
        self.cur.execute("truncate table sales;")
        self.cur.execute("truncate table sales_summary;")

        # test
        expect_df = pd.read_csv('expect_summary_sales.csv')
        assert_frame_equal(result_df.astype('str'), expect_df.astype('str'))

こちらを実行すると、

$ python -m unittest test_sql.py
.
----------------------------------------------------------------------
Ran 1 test in 5.679s

OK

テストが通っていることがわかります。

参考

PythonコネクタでのPandas DataFrames の使用

ELTクエリのデプロイ

こちらのクエリをsnowflakeにデプロイしたいと思います。今回はtaskを用いることを想定します。デプロイにはterraformを用います。terraformは外部ファイルが読み込めるため、先ほどのSQLスクリプトをそのまま読みこんでterraformでデプロイが行えます。こちらはデプロイ先を本番環境に指定します。

task.tf
resource "snowflake_task" "etl_task" {

  database  = "..."
  schema    = "..."
  warehouse = "..."

  name          = "etl_task"
  schedule      = "10 minutes"
  sql_statement = file("query.sql")
}

terraform planを行うと、

$ terraform plan

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

  # snowflake_task.etl_task will be created
  + resource "snowflake_task" "etl_task" {
      + database      = "..."
      + enabled       = false
      + id            = (known after apply)
      + name          = "etl_task"
      + schedule      = "10 minutes"
      + schema        = "..."
      + sql_statement = <<-EOT
            insert into sales_summary (date, amount)
            select
              date,
              sum(amount) as amount
            from
            (
              select
                id,
                amount,
                to_date(timestamp) as date
              from
                sales
            ) as date
            group by
              date;
        EOT
      + warehouse     = "..."
    }

Plan: 1 to add, 0 to change, 0 to destroy.

先ほどのSQLスクリプトがデプロイされることがわかります。

注意点

クエリを実際に実行するため、テストした分だけ課金される

Snowflakeにはエミュレーターがないため、上記の方法だと実際にクエリが発行されて課金が発生します。CI/CDなどに組み込む場合は実行トリガーを工夫する必要があります。dockerでpostgresを立ち上げてそちらで実行するなどの方法も考えられますが、snowflakeでしか使えないクエリがある場合は注意です。

クエリが複雑になった場合、すべてのケースを網羅できない

今回の方法は1つのクエリ全体をテストするため、クエリが長大になった場合はテストケースも膨大な量になります。ETL実行時間に余裕がある場合は中間テーブルをいくつか用意してtaskを小分けに分解するのがよいかと思います。

おわりに

今回はsnowflakeのみの機能でテストを行う方法を考えてみました。dbtなどはクエリのテスト機能もあってイケてるらしいので今度使ってみようかと思います。

Discussion