ストアドプロシージャを使ってSnowflakeのクエリの結果をスプレッドシートに出力する
こんにちは!Septeni Japan 株式会社のエンジニアの大志万といいます。
Snowflakeを使っていると、クエリしたデータをスプレッドシートに出力して共有したいことが度々あります。
今回はSnowflakeのストアドプロシージャを作成して、クエリの結果をスプレッドシートに出力する方法を紹介します。
ストアドプロシージャとは
Snowflakeのストアドプロシージャは、Snowflake内にサーバーサイドで実行可能なコードをまとめ、SQL実行を含む複雑な処理をひとつの手順として管理できる仕組みです。2024年12月現在で、Java・Javascript・Python・Scala・SQLが実行可能です。
Snowflake内でコードを実行できる別の方法としてはUDF(ユーザー定義関数)もあるのですが、
UDFが基本的に1つの値を返すシンプルな処理に特化しているのに対し、ストアドプロシージャはトランザクション制御や複数ステートメント実行、条件分岐・ループなどを利用したより高度なロジックを記述し、実行することができるという点で違いがあります。
詳しい比較はSnowflakeの公式ドキュメントをご確認ください。
作成方法
Secretの作成
CREATE OR REPLACE SECRET SHEET_CREDS
TYPE = GENERIC_STRING
SECRET_STRING = '{"refresh_token": xxxxx, .... "expiry": ~~~~}'
;
上記コマンドでGoogle API にアクセスするための認証情報をSecretに登録します。今回は、OAuth2.0の認証情報が記載されているJSONファイルの内容を記述しています。
ちなみに認証情報は下記の方法で取得可能です。
- GoogleCloudのコンソールからOAuth クライアントIDを作成してクライアントシークレットの記載されたJSONファイルをダウンロードし、
credentials.json
という名前に変更して保存します。 - 次に、credentials.jsonと同じディレクトリで下記のPythonを実行するとブラウザでGoogleアカウント画面が出てきます。アカウントの認証とスプレッドシートへのアクセス認可を行うと、
authorized_user_file.json
として認証情報が出力されます。
import gspread
gspread.oauth(
authorized_user_filename='authorized_user_file.json',
credentials_filename='credentials.json',
)
外部アクセス統合の作成
CREATE OR REPLACE NETWORK RULE SHEETS_RULE
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('sheets.googleapis.com','oauth2.googleapis.com')
;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION SPREADSHEET_ACCESS
ALLOWED_NETWORK_RULES = (SHEETS_RULE)
ALLOWED_AUTHENTICATION_SECRETS = (SHEET_CREDS)
ENABLED = true;
;
Snowflakeから外向きのネットワークルールを作成し、許可するホストを指定します。
Secretの登録とネットワークルールの作成が完了したら、上記のコマンドを実行して、外部アクセス統合を作成します。
ストアドプロシージャの作成
CREATE OR REPLACE PROCEDURE QUERY_TO_SHEET(query string, ss_id string, sheet_name string)
RETURNS string
LANGUAGE python
RUNTIME_VERSION = 3.11
HANDLER = 'run'
EXTERNAL_ACCESS_INTEGRATIONS = (SPREADSHEET_ACCESS)
PACKAGES = ('snowflake-snowpark-python', 'gspread', 'gspread-dataframe')
SECRETS = ('cred' = SHEET_CREDS)
AS
$$
import json
import gspread
import _snowflake
from gspread_dataframe import set_with_dataframe
def run(session, query: str, ss_id: str, sheet_name: str):
df = session.sql(query).to_pandas()
cred = _snowflake.get_generic_secret_string('cred')
gc, _ = gspread.oauth_from_dict(
authorized_user_info=json.loads(
cred
)
)
ss = gc.open_by_key(ss_id)
sheet = ss.worksheet(sheet_name)
set_with_dataframe(sheet, df)
return 'Export completed to Google Sheet'
$$
;
$$で区切られた部分にPythonのスクリプトを記述しています。
run関数の中で、クエリを実行し、その結果をPandasのDataFrameに変換して、gspreadを使ってスプレッドシートに出力する処理を行っています。
注意するポイントとして、gspread, gspread_dataframe等の外部パッケージは、PACKAGESに指定しておく必要があります。
また指定できる外部パッケージは Anaconda Snowflakeチャネル一覧のみになっていて、それ以外の外部パッケージを使いたい場合はSnowflakeのステージに外部ライブラリをアップロードし、そこからインポートを行う必要があります。
実行してみた
作成したストアドプロシージャはCALLコマンドで実行可能です。
今回はPACKAGESに指定可能なSnowparkパッケージバージョンを出力してみます。
CALL QUERY_TO_SHEET(
'
SELECT
*
FROM
information_schema.packages
','<<スプレッドシートID>>','<<シート名>>'
)
無事にスプレッドシートにクエリの結果が出力されたことを確認できました!
まとめ
今回はストアドプロシージャを使って、クエリの結果をスプレッドシートに出力する方法を紹介しました。
もし、定期的な出力を行いたい場合は、SnowflakeのCREATE TASK機能からストアドプロシージャを実行することで実現可能です。
Snowflakeは単なるDWHではなく、今回紹介したように様々なことができます。
今後もいろいろ試してみて、また良さそうな機能があったら記事にしたいと思います。
最後までお読みいただき、ありがとうございました!
質問やフィードバックがありましたら、ぜひコメント欄で教えてください。
参考
-
https://zenn.dev/dataheroes/articles/slack-notification
- ストアドプロシージャを作成する手順の参考に使用
Discussion