🎮

Snowpipe によるファイル取り込みを AWS Lambda から動かしてみる

2024/07/16に公開

当社では現在、データ分析基盤の更改に向けて Snowflake の技術検証を進めています。主要なデータ発生源である AWS 環境からのデータ取り込みもそのうちの1つです。

今回は、AWS Lambda を起点に Snowpipe によるファイル取り込みを実行する方法について検証してみたので、その内容をご紹介したいと思います。

AWS 環境から Snowflake へのデータ取り込み

AWS 環境からのデータ取り込みの方法としては、Snowflake とのストレージ統合が設定された S3 バケットにファイルを配置して、S3 イベント通知 を使用して自動的に Snowpipe をトリガーし、Snowflake 環境にロードするというのが一般的な構成の 1 つのようです。[1]

もちろんこの方法でも良いのですが、S3 イベント通知による Auto-Ingest よりも、AWS 側のワークロードから明示的に Pipe を実行できる方が運用の柔軟性が高いような気がします。調べたところ、snowflake-ingest という Python パッケージを使用して、Lambda 関数から Pipe 実行する方法 [2] が紹介されていたので、今回はこれを試してみたいと思います。

アーキテクチャ

以下に示すアーキテクチャを実装していきます。

概要は以下の通りです。

  • Snowflake のスキーマ (TEST_DB.TEST_SCHEMA) 内にロード先テーブルと Pipe オブジェクトを作成します。Pipe のロード元は S3 外部ステージ (DEFAULT) です。
  • Pipe 操作用にロール (SNOWPIPE_TEST) を作成します。Pipe 操作に必要な権限を付与した上で、このロールに Pipe の所有権を譲渡します。
  • Lambda 関数が Snowflake 環境にアクセスするためのユーザー (AWS_PIPE_TESTUSER) を作成します。ユーザーは SNOWPIPE_TEST ロールが持つ権限を引き受けられるようにします。
  • S3 にロード対象のファイルが配置された状態で、Lambda 関数を実行します。認証に必要な情報を Secrets Manager から取得し、Pipe を実行してファイルを取り込みます。

使用する Snowflake リソース一覧

本記事で使用する Snowflake リソースの一覧を以下に示します。

Object Type Object Name Qualified Name
Database TEST_DB TEST_DB
Schema TEST_SCHEMA TEST_DB.TEST_SCHEMA
Stage DEFAULT TEST_DB.TEST_SCHEMA.DEFAULT
Table DIFF_EXAMPLE TEST_DB.TEST_SCHEMA.DIFF_EXAMPLE
Pipe DIFF_EXAMPLE_PIPE TEST_DB.TEST_SCHEMA.DIFF_EXAMPLE_PIPE
File Format MY_PARQUET_SNAPPY TEST_DB.TEST_SCHEMA.MY_PARQUET_SNAPPY
Role SNOWPIPE_TEST -
User AWS_PIPE_TESTUSER -

準備 ① 各種スキーマオブジェクトの作成

外部ステージの作成

Pipe オブジェクトがファイル読み込み元として参照する、S3 外部ステージ [4] を作成します。

locals {
  database_name = "TEST_DB"
  schema_name   = "TEST_SCHEMA"

  url = {
    s3_bucket = "your-example-bucket"
    s3_prefix = "integrations/${lower(local.database_name)}/${lower(local.schema_name)}/"
  }
}

resource "snowflake_stage" "default" {
  name     = upper("default")
  database = local.database_name
  schema   = local.schema_name

  storage_integration = var.storage_integration.name
  url                 = "s3://${local.url.s3_bucket}/${local.url.s3_prefix}"

  depends_on = [snowflake_schema.default]
}

File Format の作成

今回 S3 からロードするファイル形式 (Parquet 形式, Snappy 圧縮) に合わせて、File Format オブジェクトを作成します。

resource "snowflake_file_format" "default" {
  name     = "MY_PARQUET_SNAPPY"
  database = local.database_name
  schema   = local.schema_name

  format_type = "PARQUET"
  compression = "SNAPPY"

  depends_on = [snowflake_schema.default]
}

Pipe オブジェクト、および Pipe 操作用ロールの作成

Pipe オブジェクト

Pipe オブジェクト、およびロード先となるテーブルを作成します。(Terraform の場合、COPY STATEMENT は Qualified Name で指定しないとエラーになるようです)

resource "snowflake_table" "diff_example" {
  name     = upper("diff_example")
  database = local.database_name
  schema   = local.schema_name

  column {
    name = "v"
    type = "VARIANT"
  }
}

resource "snowflake_pipe" "diff_example" {
  name     = upper("diff_example_pipe")
  database = local.database_name
  schema   = local.schema_name

  auto_ingest    = false
  copy_statement = <<EOT
  COPY INTO ${local.database_name}.${local.schema_name}.DIFF_EXAMPLE
  FROM @${local.database_name}.${local.schema_name}.DEFAULT/diff/example/ 
  FILE_FORMAT = ${local.database_name}.${local.schema_name}.MY_PARQUET_SNAPPY
  EOT
  depends_on = [snowflake_table.diff_example]
}

Pipe 操作用ロール

Pipe 操作用ロール SNOWPIPE_TEST を作成します。
Terraform コードは割愛しますが、以下の権限が付与されていれば十分そうでした。

Object Type Object Name Object Type Plural Privileges Object Level
DATABASE TEST_DB - USAGE Account
SCHEMA TEST_SCHEMA - USAGE Database
STAGE TEST_SCHEMA - USAGE, READ Schema
TABLE * TABLES INSERT, SELECT Schema
PIPE * PIPES MONITOR, OPERATE Schema
FILE FORMAT * FILE FORMATS USAGE Schema

所有権の譲渡

Pipe オブジェクトの所有権は、Pipe 操作用ロールに譲渡されている必要があります。
公式ドキュメント [5] には SQL による設定方法が記載されていますが、Terraform の場合は以下のようになります。

resource "snowflake_grant_ownership" "diff_example_pipe" {
  account_role_name   = "SNOWPIPE_TEST"
  outbound_privileges = "COPY"
  on {
    object_type = "PIPE"
    object_name = "${local.database_name}.${local.schema_name}.DIFF_EXAMPLE_PIPE"
  }
  depends_on = [snowflake_pipe.diff_example]
}

準備 ② 認証周りの設定

キーペアの作成

公式ドキュメント [6] を参考に、Snowflake への認証用のキーペアを作成します。

秘密鍵として、暗号化バージョンか非暗号化バージョンを選択できますが、多層防御の観点から暗号化バージョンで作成してみたいと思います。(暗号化・復号化に必要となるパスフレーズとして、本記事では XN9d#FgajT2Y3G@V を使用するものとします)

% openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

Enter Encryption Password: [XN9d#FgajT2Y3G@V]↵
Verifying - Enter Encryption Password: [XN9d#FgajT2Y3G@V]↵

上記を実行すると、rsa_key.p8 というファイル名で秘密鍵が生成されます。

次に、この秘密鍵に対応する公開鍵を生成します。秘密鍵を生成した際のパスフレーズの入力を求められます。実行すると、rsa_key.pub というファイル名で公開鍵が生成されます。

% openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

Enter pass phrase for rsa_key.p8: [XN9d#FgajT2Y3G@V]↵
writing RSA key

秘密鍵の管理

秘密鍵の管理方法について考えます。今回の場合、認証元は Lambda 関数になるので、Lambda 関数に秘密鍵の情報を渡す必要があります。しかし、秘密鍵をソースコードに配置してしまうのは、たとえ暗号化されていたとしてもセキュリティ上あまり良くありません。

AWS の場合、機密情報の管理は AWS Secrets ManagerAWS Systems Manager Parameter Store を使用することが一般的なので、今回は AWS Secrets Manager を使用してみます。Lambda 関数からは、Secrets Manager シークレットを読み取る形で秘密鍵を取得します。

こうすることで、必要な IAM 権限を持つプリンシパルにのみ参照・更新権限を付与することが可能になります。Secrets Manager シークレット側にもリソースポリシーも設定することで、より安全に管理できます。

sops を用いた機密リソースの IaC 管理

sops (Secrets OPerationS) は、Mozilla が公開している暗号化ツールです。認証情報のような機密性の高い情報を暗号化した状態で保存できます。以前の記事 でご紹介した Terragrunt との相性も良く、以下のエントリで詳しく扱っているので良ければ併せてご覧ください。

https://zenn.dev/simpleform/articles/20230123-01-secrets-with-sops-and-terragrunt

秘密鍵の場合、複数行になるため YAML 形式で保存する場合は以下のように編集します。

sops secret.yml
secret.yml [decrypted]
ACCOUNT: yourorgname-account_name
HOST: yourorgname-account_name.snowflakecomputing.com
USER: AWS_PIPE_TESTUSER
ENCRYPTED_PRIVATE_KEY: |-
  -----BEGIN ENCRYPTED PRIVATE KEY-----
  MIIFJDBWBgkqhkiG9w0BBQ0wSTAxBgkqhkiG9w0BBQwwJAQQusp62iFRJW7+This
  +Hj5ZgICCAAwDAYIKoZIhvcNAgkFADAUBggqhkiG9w0DBwQIJPmKsRWEzwEEg+Is
  ......
  ZECYY+ZjXxKbhOVFz8QuE7dS+FakeKey
  -----END ENCRYPTED PRIVATE KEY-----

ユーザーの作成

Lambda 関数が認証する Snowflake ユーザーを作成します。公式ドキュメントでは SQL で設定する方法が紹介されていますが、ここでは Terraform で作成してみます。

variable "rsa_public_keys" {
  type = object({
    primary   = optional(string, null)
    secondary = optional(string, null)
  })
}

resource "snowflake_user" "test" {
  login_name   = "AWS_PIPE_TESTUSER"
  name         = "AWS_PIPE_TESTUSER"
  default_role = "SNOWPIPE_TEST"

  rsa_public_key   = var.rsa_public_keys.primary
  rsa_public_key_2 = var.rsa_public_keys.secondary
}

# 対象の Pipe を操作するのに必要な権限を持つロールに、作成したユーザーを所属させる
resource "snowflake_grant_account_role" "grant_to_testuser" {
  role_name = "SNOWPIPE_TEST"
  user_name = "AWS_PIPE_TESTUSER"
}

var.rsa_public_keys.primary には、公開鍵の内容を一行に変換したものを使用します。例えば、以下のような公開鍵であれば、MIIBIjAN ... rwIDAQAB を改行なしで指定します。

rsa_key.pub
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAmycifqPfigDna+ThisIs
... 略 ...
/s9EcapvWSwxW4LCjGvijO59ofVcxYXpPM9Fa0icmmP2GnNIvV+FakePublicKey
rwIDAQAB
-----END PUBLIC KEY-----

実装

ハンドラスクリプトのサンプルコード全体を以下に示します。
公式ドキュメント [3:1] のものをベースにしていますが、秘密鍵を Secrets Manager から取得する部分などは少し変更を加えています。

サンプルコード
handler.py
import os
import boto3
from aws_lambda_powertools import Logger

from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile

from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

from .modules.utils import get_secret, get_object_keys

logger = Logger()
s3_client = boto3.client('s3')

BUCKET_NAME = os.environ["BUCKET_NAME"]
SNOW_DATABASE_NAME = os.environ["SNOW_DATABASE_NAME"]
SNOW_SCHEMA_NAME = os.environ["SNOW_SCHEMA_NAME"]


def create_ingest_manager(qualified_pipe_name: str) -> SimpleIngestManager:
    secret = get_secret(os.environ["SECRET_NAME"])

    encrypted_private_key = secret["ENCRYPTED_PRIVATE_KEY"]
    private_key_obj = load_pem_private_key(
        encrypted_private_key.encode(),
        os.environ["PRIVATE_KEY_PASSPHRASE"].encode(),
        default_backend(),
    )
    private_key_text = private_key_obj.private_bytes(
        encoding=Encoding.PEM,
        format=PrivateFormat.PKCS8,
        encryption_algorithm=NoEncryption(),
    ).decode("utf-8")

    ingest_manager = SimpleIngestManager(
        account=secret["ACCOUNT"],
        host=secret["HOST"],
        user=secret["USER"],
        pipe=qualified_pipe_name,
        private_key=private_key_text,
    )
    return ingest_manager


@logger.inject_lambda_context(log_event=True)
def handler(event, context):
    table_name = event["TableName"]
    table_type = event["TableType"]
    execution_name = event["ExecutionName"]

    pipe_name = f"{table_type}_{table_name}_pipe"
    qualified_pipe_name = f'{SNOW_DATABASE_NAME}.{SNOW_SCHEMA_NAME}.{pipe_name}'
    qualified_pipe_name = qualified_pipe_name.upper()
    logger.info(qualified_pipe_name)

    ingest_manager = create_ingest_manager(qualified_pipe_name)

    pipe_path = os.path.join(
        "integrations",
        SNOW_DATABASE_NAME,
        SNOW_SCHEMA_NAME,
        table_type,
        table_name,
    )
    s3_prefix = os.path.join(pipe_path, f"execution_name={execution_name}")
    object_keys = get_object_keys(bucket_name=BUCKET_NAME, prefix=s3_prefix)
    filename_list = [os.path.basename(key) for key in object_keys]

    staged_file_list = []
    for filename in filename_list:
        staged_file_path = os.path.join(f"execution_name={execution_name}", filename)
        staged_file_list.append(StagedFile(staged_file_path, None))

    try:
        response = ingest_manager.ingest_files(staged_file_list)
        logger.info(response)

        return {
            "statusCode": 200,
            "body": response,
        }

    except Exception as e:
        logger.exception(e)
        return {
            "statusCode": 500,
            "body": str(e),
        }

以下にポイントを絞って解説します。

SimpleIngestManager オブジェクトの作成

Secrets Manager に保存したシークレット情報を取得し、SimpleIngestManager オブジェクトを作成します。パスフレーズは Lambda 関数の環境変数から取得しています。

def create_ingest_manager(qualified_pipe_name: str) -> SimpleIngestManager:
    secret = get_secret(os.environ["SECRET_NAME"])

    encrypted_private_key = secret["ENCRYPTED_PRIVATE_KEY"]
    private_key_obj = load_pem_private_key(
        encrypted_private_key.encode(),
        os.environ["PRIVATE_KEY_PASSPHRASE"].encode(),
        default_backend(),
    )
    private_key_text = private_key_obj.private_bytes(
        encoding=Encoding.PEM,
        format=PrivateFormat.PKCS8,
        encryption_algorithm=NoEncryption(),
    ).decode("utf-8")

    ingest_manager = SimpleIngestManager(
        account=secret["ACCOUNT"],
        host=secret["HOST"],
        user=secret["USER"],
        pipe=qualified_pipe_name,
        private_key=private_key_text,
    )
    return ingest_manager

ファイル取り込み - .ingest_files()

取り込み対象の S3 オブジェクト一式を、StagedFile (名前付きタプル) のリストとして生成し、SimpleIngestManager.ingest_files() メソッドで Pipe での取り込みを実行します。

@logger.inject_lambda_context(log_event=True)
def handler(event, context):
    # ...(略)...

    staged_file_list = []
    for filename in filename_list:
        staged_file_path = os.path.join(f"execution_name={execution_name}", filename)
        staged_file_list.append(StagedFile(staged_file_path, None))

    try:
        response = ingest_manager.ingest_files(staged_file_list)
        logger.info(response)
        # ...(略)...

    except Exception as e:
        logger.exception(e)
        # ...(略)...

ここで、StagedFile に指定する staged_file_path は、Pipe で定義されたパスからの相対パスで指定します。

例えば、以下のような S3 フォルダ構成であった場合、Pipe で定義されたパスは .../example/ までなので、execution_name=default/... からのオブジェクト相対パスを指定します。

s3://your-example-bucket/
└── integrations/ 
    └── test_db/                ... 【ストレージ統合】
        └── test_schema/        ... 【外部ステージ】
            └── diff/
                └── example/    ... 【Pipeで定義されたパス】
                    └── execution_name=default/
                        ├── part-00000-xxxxxxxx.snappy.parquet
                        ├── part-00001-xxxxxxxx.snappy.parquet
                        └── part-00002-xxxxxxxx.snappy.parquet

動作確認

以下のようなテストイベントで、実際に Lambda 関数を動かしてみます。

TestEvent.json
{
  "TableType": "diff",
  "TableName": "example",
  "ExecutionName": "default"
}

実行が成功すると、以下のようなレスポンスが返ってきます。

ロード先テーブルのレコード数をカウントし、データが投入されていることを確認できます。

落穂拾い

トラブルシューティング

Lambda 関数のコードを動かす際に遭遇したエラーと、その対処方法について記しておきます。
(Snowpipe の動作に関するトラブルシューティングは公式ドキュメント [7] をご確認ください)

JWT token is invalid

IngestResponseError: Http Error: 401, Vender Code: xxxxxx, 
Message: JWT token is invalid. [xxxxxxxx]

認証が上手くいっていない場合に発生するエラーです。
秘密鍵の処理が適切か、指定しているアカウント名・ユーザー名などが正しいかを確認します。

Specified object does not exist or not authorized

IngestResponseError: Http Error: 404, Vender Code: xxxxxx, 
Message: Specified object does not exist or not authorized. Pipe not found

Pipe オブジェクトが存在しないか、権限が不足している場合に発生するエラーです。
指定している Pipe 名 (Qualified Name) が正しい場合は、ロールがリソースに対する十分な権限を持っているかを確認します。

COPY_HISTORY ビュー

Snowpipe によるファイル取り込みは、COPY_HISTORY ビュー で 14 日間その履歴が保持されます。Snowpipe はファイル複製などによる不要な再取り込みを防ぐため、(異なる Etag を持っていたとしても)同じ名前のファイルに対しては取り込みを実行しません。[8]

  • 既にロードされたファイルと同じ名前のステージングされたファイルは、新しい行の追加や、ファイルのエラーの修正などの変更があっても無視されます。
  • TRUNCATE TABLE コマンドを使用してテーブルを切り捨てても、メタデータをロードしている Snowpipe ファイルは削除されません。

この挙動は、今回実施した snowflake-ingest による取り込みの場合でも同様のようだったので、運用する際は注意しておく必要がありそうです。(ちなみに COPY INTO ステートメントで FORCE = TRUE を設定すると、履歴の状態に関係なく取り込みが実行されます)

Pipe の外形監視

Lambda 関数側の実行で正常なレスポンスが返ってきた場合であっても、Snowflake 側リソースの設定不備でファイルが取り込まれておらず、Pipe も正常 (= RUNNING) ではない状態になっている、といったことがありました。

これまで少し触ってみた感触だと結構簡単に Pipe がエラー状態になってしまうので、Pipe 自体に対する外形監視も必要になりそうだなと感じました。この辺りは今後、以下のような記事を参考に実装を検討したいと思います。

https://datumstudio.jp/blog/1130_snowflake_16/

さいごに

Snowpipe を Lambda 関数から動かす方法について書いてみました。
目新しいものは特にないかもしれませんが、秘密鍵に関する扱いなど参考になれば幸いです。

今回の記事では、Parquet ファイルの内容を VARIANT 型カラムにロードするところまでしか出来ていないので、スキーマも含めて再現する方法についても今後アウトプットできたらと思います。

最後まで読んで頂き、ありがとうございました。

参考

脚注
  1. Amazon S3用Snowpipeの自動化 - Snowflake Documentation ↩︎

  2. オプション2: AWS LambdaによるSnowpipeの自動化 - Snowflake Documentation ↩︎

  3. オプション1: Amazon S3にアクセスするためのSnowflakeストレージ統合の構成 - Snowflake Documentation ↩︎ ↩︎

  4. 外部ステージの作成 - Snowflake Documentation ↩︎

  5. パイプの所有権の譲渡 - Snowflake Documentation ↩︎

  6. キーペア認証とキーペアローテーション - Snowflake Documentation ↩︎

  7. Snowpipeのトラブルシューティング - Snowflake Documentation ↩︎

  8. 変更済みデータをリロードできません、変更済みデータが意図せずにロードされました - Snowflake Documentation ↩︎

SimpleForm Tech Blog

Discussion