Serverless frameworkでサーバレスにSnowflakeを触ってみる
これは何?
この記事は Snowflake Advent Calendar 2021 4日目の記事です。
↑まだまだ記事書いてくれる方募集しているみたいなので是非ご参加を!
みなさんは、AWS LambdaからSnowflakeにクエリを投げたいと思うことたまにありませんか?
私は業務にてメトリクス系の通知等の運用用途でよくあったりします。
簡単にクエリを実行したいのであればSnowVillage村長の @foursue さんが書いてくださった Lambda から Snowflake にシュッとつなぎたい! が一番楽に利用できる方法かなと思っています!
今回は村長の記事から着想を得て、Serverless Frameworkを利用してLambdaからSnowflakeにシュッとつなげないかを試してみました。
やること
まずは、Serverless FrameworkでPython用Snowflakeコネクタを利用できるように諸々設定します。
serverless create --template aws-python3 --name snowflake-lambda
sls plugin install -n serverless-python-requirements
pipenv --python 3.8
pipenv shell
pipenv install "snowflake-connector-python[pandas]"
pipenv lock -r > requirements.txt
serverless-python-requirements
はPythonの外部モジュールをzip化してくれるプラグインです。
Serverless.yml
には以下を追記しておきます。
custom:
pythonRequirements:
dockerizePip: true
plugins:
- serverless-python-requirements
また、今回はSnowflakeのデータをPandas DataFrameに格納できるPythonコネクタを使用してみます。
実際にSnowflakeにクエリを投げてみます。
SnowflakeのWarehouseリソースを無駄に使用して1と2のレコードを生成して、それらのデータをDataframe化した上でLambda上で足し上げるというかなり謎なことをします😇
import os
import snowflake.connector
def main(event, context):
sf_account = os.environ['SF_ACCOUNT']
sf_user = os.environ['SF_USER']
sf_pw = os.environ['SF_PW']
conn = snowflake.connector.connect(
account = sf_account,
user = sf_user,
password = sf_pw
)
cur = conn.cursor()
try:
cur.execute("select NUM from (values (1),(2)) as t (NUM)")
df = cur.fetch_pandas_all()
finally:
cur.close()
conn.close()
response = {
"statusCode": 200,
"body": str(df['NUM'].sum())
}
print(response)
return response
やっつけなので、パスワードは環境変数から取るようにしていますが、業務利用するならSSMなりKMSなりで管理できるようにするのが良いかと思います。
コネクタ容量ありすぎ問題
ローカルでテスト実行してみます。
pipenv run npx sls invoke local --log -f snowflake-lambda
無事 1+2
が3であることを教えてもらえました。
{'statusCode': 200, 'body': '3'}
{
"statusCode": 200,
"body": "3"
}
謎のクエリがSnowflakeで動いたことも確認できました😇
ということでデプロイしてみます。
sls deploy
SnowflakeのPython connectorは容量がかなり大きいので容量エラーになります!
Resource handler returned message: "Unzipped size must be smaller than 262144000 bytes
そのため、 serverless-python-requirements
の設定に以下を追記して容量削減をしてみます。
参照:
custom:
pythonRequirements:
dockerizePip: true
zip: true
slim: true
plugins:
- serverless-python-requirements
あわせて、Lambdaの先頭にもこちらを追記します。
try:
import unzip_requirements
except ImportError:
pass
再度デプロイしてみます。
sls deploy
うまくデプロイできました!
eventsの設定を毎時実行しているので1時間に1回、私のクレジットカードを使って 1+2
が 3
であることを計算してくれてるんです!ありがてぇ!
使い所
Snowflakeのデータを利用したバッチ処理や単純なELT処理にも利用できると思いますし、メトリクス周りのテーブルを参照して日々の運用サマリーを作り込むみたいなことにも利用できると思います!
SlackのWebhookとかに流すと日々の運用も楽になるかもですね!
あとは、コード管理ができる点と、デプロイもワンライナーで実行できるのでかなり楽になったのかなとは思っています!
最後に
いよいよ来週の12/07(火)〜12/08(水)、Snowflakeのイベント『Snowday』が開催します!
国内外のカスタマー事例やSnowflakeの開発情報等が盛り沢山です!
個人的には毎年、世界の最新のデータエンジニアリングに関する潮流を追うには一番最適なイベントだと思うので、データに関連する方はチェックしていても良いのではと思っています。
12/08(水)の15:20〜15:50から私も出演しますので、お時間ありましたらそちらもどうぞよろしくお願いいたします!
Snowlfake データクラウドのユーザ会 SnowVillage のメンバーで運営しています。 Publication参加方法はこちらをご参照ください。 zenn.dev/dataheroes/articles/db5da0959b4bdd
Discussion