💡
snowflake-connectorを使用する
はじめに
snowflakeをpythonライブラリのsnowflake-connectorを使ってキーペア認証で操作をしてみました。
準備
インストール
python >= 3.8が条件です。
$ pip install snowflake-connector-python
バージョン管理ツールpoetryを使って仮想環境に使いたい場合は
$ pip install poetry
$ poetry new snowflake_python
$ poetry config virtualenvs.in-project true
$ poetry add snowflake-connector snowflake-sqlalchemy sqlalchemy
鍵の設定
key_genを使っている場合
鍵の生成
SnowflakeはMFA必須となっており、connectorを使用すると一回の実行でMFAで入力を何度も求められ面倒です。
このため鍵を生成して、登録することでこれを回避します。
ssh-keygen -t rsa -b 4096 -m PEM
出力されたファイルを"C:\Users\your_name\.ssh"へ保存をします。
your_nameは適宜あなたのユーザ名に読み替えてください。
このままの形式だと公開鍵をSnowflakeのユーザへ登録できないので形式を変換します。ssh-kengenではなくopensshを使っている場合は不要です。
鍵の形式を変換
ssh-keygen -e -f C:\Users\your_name\.ssh\key.pub -m pkcs8
opensshを使っている場合
ユーザに公開鍵を登録
your_userとyour_keyはあなたのユーザと鍵に書き換えてください。
alter user your_user set rsa_public_key=your_key;
環境変数を作成
.envを作成して以下を書き込む
SNOWFLAKE_USER=your_user
ACCOUNT=your_account #{組織名}-{アカウント名}
DATABASE=your_db
SCHEMA=your_schema
WAREHOUSE=your_wh
SNOWFLAKE_PRIVATE_KEY_PATH=your_key_path
実行
接続設定
import os
from snowflake.connector import connect
user = os.getenv("SNOWFLAKE_USER")
password = os.getenv("SNOWFLAKE_PASSWORD")
account = os.getenv("ACCOUNT")
warehouse = os.getenv("WAREHOUSE")
database = os.getenv("DATABASE")
schema = os.getenv("SCHEMA")
private_key_file = os.getenv("SNOWFLAKE_PRIVATE_KEY_PATH")
conn = connect(
user=user,
# password=PASSWORD, # パスワードで実行したい場合
private_key_file=private_key_file, # 今回はキーペアで
account=account,
warehouse=warehouse,
database=database,
schema=schema,
)
SQL実行
認証時に指定したデータベース、スキーマ配下にテーブルを作成してデータを挿入します。
# 実行オブジェクトを生成
cur = conn.cursor()
# テーブル作成
cur.execute(
"""
CREATE OR REPLACE TABLE PERSON(
FIRST_NAME VARCHAR(40),
SECOND_NAME VARCHAR(40)
);
"""
)
# データを挿入
cur.execute(
"INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Shinon", "Ogawa")
)
cur.execute(
"INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Mako", "Kawai")
)
cur.execute(
"INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Tsutsuji", "Higa")
)
cur.execute(
"INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Kurea", "Furutachi")
)
cur.execute(
"INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Hoshi", "Nana")
)
res = cur.execute("SELECT * FROM PERSON")
print(res.fetchall())
conn.close()
以下ではSELECTした結果を取得しています。
cur.executeで返されるのはカーソルオブジェクトであるため、res.fetchallで結果を出力しています。結果はタプルのリスト形式で返されます。
print(res.fetchall())
res = cur.execute("SELECT * FROM PERSON")
print(res.fetchall())
なお非同期で実行する場合は以下の通り
cur.execute_async('SELECT * FROM PERSON')
pandasを使った操作
読み込みと書き込みを行います。
上記の操作をした後以下のコードを実行してください。
import pandas as pd
from sqlalchemy import create_engine
# データを取得
cur.execute("SELECT * FROM PERSON")
data = cur.fetchmany(5)
# pandas形式にデータを変換
df = pd.DataFrame(data, columns=[desc[0] for desc in cur.description])
# データフレームへデータを書き込んで更新
new_row = pd.DataFrame([{"FIRST_NAME": "Mayumi", "SECOND_NAME": "Ota"}])
df = pd.concat([df, new_row], ignore_index=True)
# SQL操作をするためエンジンを作成
engine = create_engine(
f"snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}"
)
# Snowflakeのテーブルへデータを書き込む(テーブルがなければ作成)
df.to_sql(
"INARI_WOMEN_UNIVERSITY_CHARACTORS", engine, index=False, if_exists="replace"
)
conn.close()
上のSQLを実行するとデータが書き込まれていることが確認されます
補足
エンジンを作成する際にパスワードで認証しましたがキーペアもできます。
以下に方法を示します。
import pathlib
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from snowflake.sqlalchemy import URL
with open(pathlib.Path(private_key_file).expanduser(), "rb") as key:
p_key = serialization.load_pem_private_key(
key.read(),
password=None,
backend=default_backend(),
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
engine = create_engine(
URL(
account=account,
user=user,
database=database,
schema=schema,
),
connect_args={
"private_key": pkb,
},
)
参考
Discussion