Snowflakeのクエリをテストする
この記事はSnowflake Advent Calendar 2021の15日目の記事です(遅れてすみません)。
はじめに
大規模なデータを取り扱うときは、Snowflakeに一旦取り込んで処理を行う所謂ELTの形でデータ処理を行うことが近年増えているかと思います。
今回はsnowflake-python-connectorを使って処理を行うためのSQLをテストしてみます。
前提条件
ある売上テーブルSALES
から集計を行ってサマリテーブルSALES_SUMMARY
にinsertを行うことを考えます。クエリは以下のようになります。このクエリをquery.sql
のような形で置いておきます。
insert into sales_summary (date, amount)
select
date,
sum(amount) as amount
from
(
select
id,
amount,
to_date(timestamp) as date
from
sales
) as date
group by
date;
こちらのSQLをテストすることを考えてみます。
SALES
, SALES_SUMMARY
のDDLは以下になります。
create table sales
(
id integer,
amount integer,
timestamp timestamp
);
create table sales_summary
(
date date,
amount integer
);
ユニットテストを行う
テストケースを準備
テストケースはcsv等で準備するとgitで管理しやすいかと思います。
今回は以下のようなテストケースを準備します。
ID,AMOUNT,TIMESTAMP
1,100,2021-01-01 0:00:00
1,100,2021-01-01 1:00:00
1,100,2021-01-02 0:00:00
2,100,2021-01-01 0:00:00
これらを集計すると以下のような期待結果が得られると考えられます。期待結果も同様にcsvにしておきます。
DATE,AMOUNT
2021-01-01,300
2021-01-02,100
テスト実行
snowflakeのpythonコネクタではselect文の結果をpandas.DataFrameに入れることができます。これを利用して、以下のようなテストコードでユニットテストを行うことができます。別途テスト用のDBやschemaに接続してテーブルが空の状態でテストを行うようにしましょう。
import snowflake.connector
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
from pandas._testing import assert_frame_equal
import unittest
class TestQuery(unittest.TestCase):
@classmethod
def setUpClass(self):
self.con = snowflake.connector.connect(
user='...',
password='...',
account='...',
warehouse='...',
database='...',
schema='...',
role='...'
)
self.cur = self.con.cursor()
def test_query(self):
with open('query.sql') as f:
sql = f.read()
# write snowflake
test_df = pd.read_csv('test_sales.csv')
write_pandas(self.con, test_df, 'SALES')
# query execute
self.cur.execute(sql)
# fetch
self.cur.execute("select * from sales_summary;")
result_df = self.cur.fetch_pandas_all()
# truncate
self.cur.execute("truncate table sales;")
self.cur.execute("truncate table sales_summary;")
# test
expect_df = pd.read_csv('expect_summary_sales.csv')
assert_frame_equal(result_df.astype('str'), expect_df.astype('str'))
こちらを実行すると、
$ python -m unittest test_sql.py
.
----------------------------------------------------------------------
Ran 1 test in 5.679s
OK
テストが通っていることがわかります。
参考
PythonコネクタでのPandas DataFrames の使用
ELTクエリのデプロイ
こちらのクエリをsnowflakeにデプロイしたいと思います。今回はtaskを用いることを想定します。デプロイにはterraformを用います。terraformは外部ファイルが読み込めるため、先ほどのSQLスクリプトをそのまま読みこんでterraformでデプロイが行えます。こちらはデプロイ先を本番環境に指定します。
resource "snowflake_task" "etl_task" {
database = "..."
schema = "..."
warehouse = "..."
name = "etl_task"
schedule = "10 minutes"
sql_statement = file("query.sql")
}
terraform plan
を行うと、
$ terraform plan
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
+ create
Terraform will perform the following actions:
# snowflake_task.etl_task will be created
+ resource "snowflake_task" "etl_task" {
+ database = "..."
+ enabled = false
+ id = (known after apply)
+ name = "etl_task"
+ schedule = "10 minutes"
+ schema = "..."
+ sql_statement = <<-EOT
insert into sales_summary (date, amount)
select
date,
sum(amount) as amount
from
(
select
id,
amount,
to_date(timestamp) as date
from
sales
) as date
group by
date;
EOT
+ warehouse = "..."
}
Plan: 1 to add, 0 to change, 0 to destroy.
先ほどのSQLスクリプトがデプロイされることがわかります。
注意点
クエリを実際に実行するため、テストした分だけ課金される
Snowflakeにはエミュレーターがないため、上記の方法だと実際にクエリが発行されて課金が発生します。CI/CDなどに組み込む場合は実行トリガーを工夫する必要があります。dockerでpostgresを立ち上げてそちらで実行するなどの方法も考えられますが、snowflakeでしか使えないクエリがある場合は注意です。
クエリが複雑になった場合、すべてのケースを網羅できない
今回の方法は1つのクエリ全体をテストするため、クエリが長大になった場合はテストケースも膨大な量になります。ETL実行時間に余裕がある場合は中間テーブルをいくつか用意してtaskを小分けに分解するのがよいかと思います。
おわりに
今回はsnowflakeのみの機能でテストを行う方法を考えてみました。dbtなどはクエリのテスト機能もあってイケてるらしいので今度使ってみようかと思います。
Discussion