😇

Serverless frameworkでサーバレスにSnowflakeを触ってみる

4 min read

これは何?

この記事は Snowflake Advent Calendar 2021 4日目の記事です。

↑まだまだ記事書いてくれる方募集しているみたいなので是非ご参加を!

みなさんは、AWS LambdaからSnowflakeにクエリを投げたいと思うことたまにありませんか?
私は業務にてメトリクス系の通知等の運用用途でよくあったりします。

簡単にクエリを実行したいのであればSnowVillage村長の @foursue さんが書いてくださった Lambda から Snowflake にシュッとつなぎたい! が一番楽に利用できる方法かなと思っています!

https://qiita.com/foursue/items/394f39786693ee362fef

今回は村長の記事から着想を得て、Serverless Frameworkを利用してLambdaからSnowflakeにシュッとつなげないかを試してみました。

やること

まずは、Serverless FrameworkでPython用Snowflakeコネクタを利用できるように諸々設定します。

serverless create --template aws-python3 --name snowflake-lambda
sls plugin install -n serverless-python-requirements
pipenv --python 3.8
pipenv shell
pipenv install "snowflake-connector-python[pandas]"
pipenv lock -r > requirements.txt

serverless-python-requirements はPythonの外部モジュールをzip化してくれるプラグインです。

Serverless.yml には以下を追記しておきます。

Serverless.yml
custom:
  pythonRequirements:
    dockerizePip: true

plugins:
  - serverless-python-requirements

また、今回はSnowflakeのデータをPandas DataFrameに格納できるPythonコネクタを使用してみます。

https://docs.snowflake.com/ja/user-guide/python-connector-pandas.html

実際にSnowflakeにクエリを投げてみます。

SnowflakeのWarehouseリソースを無駄に使用して1と2のレコードを生成して、それらのデータをDataframe化した上でLambda上で足し上げるというかなり謎なことをします😇

handler.py
import os
import snowflake.connector


def main(event, context):
    sf_account = os.environ['SF_ACCOUNT']
    sf_user = os.environ['SF_USER']
    sf_pw = os.environ['SF_PW']
    conn = snowflake.connector.connect(
        account = sf_account,
        user = sf_user,
        password = sf_pw
    )
    cur = conn.cursor()
    try:
        cur.execute("select NUM from (values (1),(2)) as t (NUM)")
        df = cur.fetch_pandas_all()
    finally:
        cur.close()
        conn.close()

    response = {
        "statusCode": 200,
        "body": str(df['NUM'].sum())
    }
    print(response)

    return response

やっつけなので、パスワードは環境変数から取るようにしていますが、業務利用するならSSMなりKMSなりで管理できるようにするのが良いかと思います。

コネクタ容量ありすぎ問題

ローカルでテスト実行してみます。

pipenv run npx sls invoke local --log -f snowflake-lambda 

無事 1+2 が3であることを教えてもらえました。

{'statusCode': 200, 'body': '3'}
{
    "statusCode": 200,
    "body": "3"
}

謎のクエリがSnowflakeで動いたことも確認できました😇

ということでデプロイしてみます。

sls deploy

SnowflakeのPython connectorは容量がかなり大きいので容量エラーになります!

Resource handler returned message: "Unzipped size must be smaller than 262144000 bytes

そのため、 serverless-python-requirements の設定に以下を追記して容量削減をしてみます。

参照:

https://zenn.dev/ryo_kawamata/articles/python-exclude-package-on-serverless-framework
Serverless.yml
custom:
  pythonRequirements:
    dockerizePip: true
    zip: true
    slim: true

plugins:
  - serverless-python-requirements

あわせて、Lambdaの先頭にもこちらを追記します。

handler.py
try:
  import unzip_requirements
except ImportError:
  pass

再度デプロイしてみます。

sls deploy

うまくデプロイできました!

eventsの設定を毎時実行しているので1時間に1回、私のクレジットカードを使って 1+23 であることを計算してくれてるんです!ありがてぇ!

使い所

Snowflakeのデータを利用したバッチ処理や単純なELT処理にも利用できると思いますし、メトリクス周りのテーブルを参照して日々の運用サマリーを作り込むみたいなことにも利用できると思います!
SlackのWebhookとかに流すと日々の運用も楽になるかもですね!

あとは、コード管理ができる点と、デプロイもワンライナーで実行できるのでかなり楽になったのかなとは思っています!

最後に

いよいよ来週の12/07(火)〜12/08(水)、Snowflakeのイベント『Snowday』が開催します!

https://www.snowflake.com/snowday-japan/?utm_source=japan&utm_medium=email&utm_campaign=SnowVillage

国内外のカスタマー事例やSnowflakeの開発情報等が盛り沢山です!
個人的には毎年、世界の最新のデータエンジニアリングに関する潮流を追うには一番最適なイベントだと思うので、データに関連する方はチェックしていても良いのではと思っています。

12/08(水)の15:20〜15:50から私も出演しますので、お時間ありましたらそちらもどうぞよろしくお願いいたします!

Discussion

ログインするとコメントできます