❄️

Snowflake で暗号化したデータをアプリ側で復号する

2022/09/29に公開

Snowflake では外部と暗号化されたデータをやり取りする手段として、ENCRYPT_RAW 関数と DECRYPT_RAW 関数が用意されています。

https://docs.snowflake.com/ja/sql-reference/functions/encrypt_raw.html

https://docs.snowflake.com/ja/sql-reference/functions/decrypt_raw.html

下記の記事では、Snowflake 上で ENCRYPT_RAW 関数を使って暗号化したデータを Oracle, MySQL, PostgreSQL で復号しました:
https://zenn.dev/indigo13love/articles/f07cfa6f91dca6

…が、AES-256-CBC に対応した暗号化/復号機能があればデータベースでなくても復号できるので、今回は Python と OpenSSL コマンドで復号していきます。

Snowflake で暗号化した結果を Python で復号する

Python では PyCryptodome ライブラリを使って復号していきます。

具体的には、以下のような手順で検証してみます。

  1. 32-byte (256-bit) のランダムなキーを生成する (AES-256 想定のため)
  2. Snowflake の ENCRYPT_RAW 関数で暗号化し、暗号化テキストと初期化ベクタ (IV) を取り出す
  3. 取り出した暗号化テキストと IV を PyCryptodome に渡して復号する

これを順番に実装していきます。

まず下記の記事と同じように SnowSQL の情報を使って Snowflake に接続します。

import snowflake.connector
import os
import configparser

home_dir = os.environ.get("HOME")
snowsql_config_path = f"{home_dir}/.snowsql/config"

config = configparser.ConfigParser()
config.read(snowsql_config_path)

conn = snowflake.connector.connect(
	user = config["connections"]["username"],
	password = config["connections"]["password"],
	account = config["connections"]["accountname"],
)

次に、検証のための平文 (foo bar baz) を用意しつつ、PyCryptodome の Crypto.Random.get_random_bytes 関数で 32-byte (256-bit) の暗号化キーを生成します。

plaintext = "foo bar baz"
key = get_random_bytes(32)

その後、ENCRYPT_RAW 関数で暗号化するクエリを用意し、平文と暗号化キーをバインディングしつつ Snowflake に投げます。

Snowflake の ENCRYPT_RAW 関数は暗号化キーの鍵長を自動判別するので、モードを 'AES-CBC' にしつつ 32-byte の暗号化キーを渡すと、自動的に AES-256-CBC で暗号化される形になります。

query = "select encrypt_raw(hex_encode(%s)::binary, %s::binary, null, null, 'AES-CBC')"

rs = conn.cursor().execute(query, (plaintext, key)).fetchone()

ENCRYPT_RAW 関数の結果は、暗号化テキスト (ciphertext) と IV (iv) をフィールドとして持つ JSON 文字列を返してくるので json.loads 関数で JSON オブジェクトに変換し、暗号化テキストと IV を取り出します。

res = json.loads(rs[0])

encrypted = res["ciphertext"]
iv = res["iv"]

あとは取り出した暗号化テキストと IV を PyCryptodome で復号するだけです。

ここで Snowflake から返ってきた暗号化テキストと IV は 16 進数文字列なので、bytearray.fromhex 関数で bytearray 型に変換しつつ渡していきます。

暗号化キーは Crypto.Random.get_random_bytes 関数で Python 側で生成したキーで bytes 型なので、特に変換なしで渡すことができます。

aes = AES.new(key, AES.MODE_CBC, iv=bytearray.fromhex(iv))
decrypted_raw = aes.decrypt(bytearray.fromhex(encrypted))

ここで decrypt 関数は bytes 型の復号されたテキストを返してきますが、CBC の特性上 8-byte 区切りになるまでパディングされている状態になっています。

print(decrypted_raw)
# b'foo bar baz\x05\x05\x05\x05\x05'

今回の例では、foo bar baz が 11-byte なので、16-byte になるまで 5-byte 分パディングされており、またバイト数と同じ数 (\x05) を使ってパディングされているので、おそらく PKCS#7 でパディングされています。

そのため、元の平文を取り出すためには、

  1. パディングを取り除く
  2. bytes 型から str 型に変換する

という手順を取る必要があります。

PyCryptodome には unpad というブロック長を渡せばいい感じにパディングを取り除いてくれる関数があるので、これでパディングを取り除いてから decode で str 型に変換していきます。

decrypted = unpad(decrypted_raw, 8).decode('utf-8')

まとめると、下記のような感じになります。

import snowflake.connector
import os
import configparser
import json
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import unpad

home_dir = os.environ.get("HOME")
snowsql_config_path = f"{home_dir}/.snowsql/config"

config = configparser.ConfigParser()
config.read(snowsql_config_path)

conn = snowflake.connector.connect(
	user = config["connections"]["username"],
	password = config["connections"]["password"],
	account = config["connections"]["accountname"],
)

plaintext = "foo bar baz"
key = get_random_bytes(32)
query = "select encrypt_raw(hex_encode(%s)::binary, %s::binary, null, null, 'AES-CBC')"

print("Plaintext: %s" % plaintext)
print("Key: %s" % key.hex())

rs = conn.cursor().execute(query, (plaintext, key)).fetchone()
res = json.loads(rs[0])

encrypted = res["ciphertext"]
iv = res["iv"]

print("Encrypted: %s" % encrypted)
print("IV: %s" % iv)

aes = AES.new(key, AES.MODE_CBC, iv=bytearray.fromhex(iv))
decrypted_raw = aes.decrypt(bytearray.fromhex(encrypted))
decrypted = unpad(decrypted_raw, 8).decode('utf-8')

print("Decrypted: %s" % decrypted)

このスクリプトを実行すると下記のようになり、Snowflake の ENCRYPT_RAW 関数が PyCryptodome の decrypt 関数で正しく復号できていることがわかります。

$ pipenv run python decrypt.py
Plaintext: foo bar baz
Key: 852dd63b4690f90bfce2d7a3d549ee0f8a273e7d18e269217d6f0da4213a2d0f
Encrypted: 23EEC8DE42A408052CF98E7AF4AA15C3
IV: 3EA8505E87E2B8FD635863DE45C3BD4D
Decrypted: foo bar baz

Snowflake で暗号化した結果を openssl コマンドで復号する

次に、同じように Snowflake の ENCRYPT_RAW 関数で暗号化したテキストを openssl enc コマンドで復号していきます。

まず 32-byte の暗号化キーを openssl rand で生成します。

$ openssl rand -hex 32
635af014ff2cfffcd9d61a562d83e1c92e9c60df5bddacecd5ee6a3c43c7cbb9

そして SnowSQL で Snowflake に接続し、この暗号化キーを使って ENCRYPT_RAW 関数で暗号化します。

$ snowsql -q "select encrypt_raw(hex_encode('foo bar baz')::binary, '635af014ff2cfffcd9d61a562d83e1c92e9c60df5bddacecd5ee6a3c43c7cbb9'::binary, null, null, 'AES-CBC') e"
* SnowSQL * v1.2.23
Type SQL statements or !help
+-----------------------------------------------------+
| E                                                   |
|-----------------------------------------------------|
| {                                                   |
|   "ciphertext": "07CEFBC28A0CBEF4236D5ECACB3CEE94", |
|   "iv": "6D3CC37FB9293040464B86F68C27D628"          |
| }                                                   |
+-----------------------------------------------------+
1 Row(s) produced. Time Elapsed: 1.189s
Goodbye!

あとは openssl enc コマンドで復号するだけですが、今回は下記のようなオプションを指定して実行します。

  • -d ... 暗号化ではなく復号する
  • -aes-256-cbc ... AES-256-CBC で復号する
  • -nosalt ... salt なし
  • -K ... 暗号化キー (16 進数文字列)
  • -iv ... IV (16 進数文字列)

ここで、暗号化キーと IV は Snowflake から返ってきたままの 16 進数文字列で渡すことができますが、暗号化テキストはバイナリに変換する必要があるので xxd -r -p でバイナリに変換してから渡します。

まとめると、下記のようになり、元の平文が正しく復号できていることがわかります。

$ echo 07CEFBC28A0CBEF4236D5ECACB3CEE94|xxd -r -p|openssl enc -d -aes-256-cbc -nosalt -K 635af014ff2cfffcd9d61a562d83e1c92e9c60df5bddacecd5ee6a3c43c7cbb9 -iv 6D3CC37FB9293040464B86F68C27D628
foo bar baz

まとめ

今回の例では、自分で暗号化したテキストを自分で復号する自作自演検証でしたが、暗号化キーと IV がしっかり保管されていれば、別のアプリから暗号化されたデータでも復号することができます。

AES-CBC に対応した暗号化/復号ライブラリさえあれば何の言語でも同じことができるし、だいたい使い方も同じような感じなはずなので、いろいろ応用が効くと思います。

Discussion