💡

snowflake-connectorを使用する

に公開

はじめに

snowflakeをpythonライブラリのsnowflake-connectorを使ってキーペア認証で操作をしてみました。

準備

インストール

python >= 3.8が条件です。

$ pip install snowflake-connector-python

バージョン管理ツールpoetryを使って仮想環境に使いたい場合は

$ pip install poetry
$ poetry new snowflake_python
$ poetry config virtualenvs.in-project true
$ poetry add snowflake-connector snowflake-sqlalchemy sqlalchemy

鍵の設定

key_genを使っている場合

鍵の生成
SnowflakeはMFA必須となっており、connectorを使用すると一回の実行でMFAで入力を何度も求められ面倒です。
このため鍵を生成して、登録することでこれを回避します。

ssh-keygen -t rsa -b 4096 -m PEM

出力されたファイルを"C:\Users\your_name\.ssh"へ保存をします。
your_nameは適宜あなたのユーザ名に読み替えてください。

このままの形式だと公開鍵をSnowflakeのユーザへ登録できないので形式を変換します。ssh-kengenではなくopensshを使っている場合は不要です。

鍵の形式を変換

ssh-keygen -e -f C:\Users\your_name\.ssh\key.pub -m pkcs8

opensshを使っている場合

https://docs.snowflake.com/ja/user-guide/key-pair-auth

ユーザに公開鍵を登録

your_userとyour_keyはあなたのユーザと鍵に書き換えてください。

alter user your_user set rsa_public_key=your_key;

環境変数を作成

.envを作成して以下を書き込む

SNOWFLAKE_USER=your_user
ACCOUNT=your_account #{組織名}-{アカウント名}
DATABASE=your_db
SCHEMA=your_schema
WAREHOUSE=your_wh
SNOWFLAKE_PRIVATE_KEY_PATH=your_key_path

実行

接続設定

import os

from snowflake.connector import connect

user = os.getenv("SNOWFLAKE_USER")
password = os.getenv("SNOWFLAKE_PASSWORD")
account = os.getenv("ACCOUNT")
warehouse = os.getenv("WAREHOUSE")
database = os.getenv("DATABASE")
schema = os.getenv("SCHEMA")
private_key_file = os.getenv("SNOWFLAKE_PRIVATE_KEY_PATH")

conn = connect(
    user=user,
    # password=PASSWORD, # パスワードで実行したい場合
    private_key_file=private_key_file,  # 今回はキーペアで
    account=account,
    warehouse=warehouse,
    database=database,
    schema=schema,
)

SQL実行

認証時に指定したデータベース、スキーマ配下にテーブルを作成してデータを挿入します。


# 実行オブジェクトを生成
cur = conn.cursor()

# テーブル作成
cur.execute(
    """
    CREATE OR REPLACE TABLE PERSON( 
    FIRST_NAME VARCHAR(40),
    SECOND_NAME VARCHAR(40)
    );
    """
)

# データを挿入
cur.execute(
    "INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Shinon", "Ogawa")
)
cur.execute(
    "INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Mako", "Kawai")
)
cur.execute(
    "INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Tsutsuji", "Higa")
)
cur.execute(
    "INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Kurea", "Furutachi")
)
cur.execute(
    "INSERT INTO PERSON (FIRST_NAME,SECOND_NAME) VALUES(%s,%s)", ("Hoshi", "Nana")
)

res = cur.execute("SELECT * FROM PERSON")
print(res.fetchall())


conn.close()

以下ではSELECTした結果を取得しています。
cur.executeで返されるのはカーソルオブジェクトであるため、res.fetchallで結果を出力しています。結果はタプルのリスト形式で返されます。

print(res.fetchall())
res = cur.execute("SELECT * FROM PERSON")
print(res.fetchall())

なお非同期で実行する場合は以下の通り

cur.execute_async('SELECT * FROM PERSON')

pandasを使った操作

読み込みと書き込みを行います。
上記の操作をした後以下のコードを実行してください。

import pandas as pd
from sqlalchemy import create_engine

# データを取得
cur.execute("SELECT * FROM PERSON")
data = cur.fetchmany(5)

# pandas形式にデータを変換
df = pd.DataFrame(data, columns=[desc[0] for desc in cur.description])

# データフレームへデータを書き込んで更新
new_row = pd.DataFrame([{"FIRST_NAME": "Mayumi", "SECOND_NAME": "Ota"}])
df = pd.concat([df, new_row], ignore_index=True)

# SQL操作をするためエンジンを作成
engine = create_engine(
    f"snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}"
)

# Snowflakeのテーブルへデータを書き込む(テーブルがなければ作成)
df.to_sql(
    "INARI_WOMEN_UNIVERSITY_CHARACTORS", engine, index=False, if_exists="replace"
) 

conn.close()

上のSQLを実行するとデータが書き込まれていることが確認されます

補足

エンジンを作成する際にパスワードで認証しましたがキーペアもできます。
以下に方法を示します。

import pathlib

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from snowflake.sqlalchemy import URL

with open(pathlib.Path(private_key_file).expanduser(), "rb") as key:
    p_key = serialization.load_pem_private_key(
        key.read(),
        password=None,
        backend=default_backend(),
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption(),
)

engine = create_engine(
    URL(
        account=account,
        user=user,
        database=database,
        schema=schema,
    ),
    connect_args={
        "private_key": pkb,
    },
)

参考

https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector
https://zenn.dev/iwatagumi/articles/5494e921717b01
https://docs.snowflake.com/en/developer-guide/python-connector/sqlalchemy#key-pair-authentication-support

Discussion