⛷️

RDS AuroraからSnowflakeへfirehoseを使ってicebergテーブル連携する

2024/12/05に公開

はじめに

Amazon Data Firehoseでicebergテーブルへのexportがサポートされました。これを用いて、Snowflakeまで連携を行います。

https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/apache-iceberg-destination.html

コードはTerraformにして以下のリポジトリにアップロードしています。

https://github.com/ugmuka/snowflake-rds-iceberg-sample

準備

構成

以下のような構成でデータソースとしてaurora mysqlを立てます。

firehoseからprivate subnetへの接続が必要になるのでprivatelinkを用いたendpointを設定してあげる必要があります。

AWS PrivateLinkを用いたエンドポイントの作成

private subnetに配置されたDBに接続する場合、AWS PrivateLinkを使ってエンドポイントを作成する必要があります。endpointの向き先がNLBのみで、RDSのドメインへ名前解決を毎回してあげる必要があり、結構面倒です。本当は以下記事のようにSNSとlambdaを使ってRDSのリカバリ発生ごとにNLBの設定ipを変更してあげる必要がありますが今回はサボりました。

https://aws.amazon.com/jp/blogs/database/access-amazon-rds-across-vpcs-using-aws-privatelink-and-network-load-balancer/

nlbのtargetに関してはリカバリなどでipが変更されるため、terraformでは管理せずにdigコマンドなどでipを埋めておきます。

最後にprivatelinkにてfirehose.amazonaws.comからの接続を許可してあげます。

DBに接続して適当なテーブルを作成しておきます。

CREATE DATABASE IF NOT EXISTS example_db;

USE example_db;

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,          -- ユーザーID(自動インクリメント)
    username VARCHAR(50) NOT NULL,             -- ユーザー名(ユニーク)
    email VARCHAR(100) NOT NULL,               -- メールアドレス(ユニーク)
    password_hash VARCHAR(255) NOT NULL,       -- パスワード(ハッシュ化されたものを想定)
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 作成日時
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, -- 更新日時
    is_active BOOLEAN DEFAULT TRUE             -- アクティブフラグ
);

-- サンプルデータの挿入
INSERT INTO users (username, email, password_hash) VALUES
('john_doe', 'john.doe@example.com', 'hashed_password_123'),
('jane_smith', 'jane.smith@example.com', 'hashed_password_456');

構築

RDS→S3へのicebergテーブル連携

構築したprivatelinkに対してfirehoseで接続をしていきます。

ここまで来れば接続は割と簡単にできますが、watermarkテーブルを作成する必要があるため、DB側でテーブルの作成権限が必要、かつレプリカではなくマスターDBへの接続が必要になります。
正しく設定ができていれば以下のようにS3へparquetファイルや、メタデータのjsonが送られていることがわかります。

Snowflakeのicebergテーブル作成

AWSとの接続設定

S3に保存したicebergテーブルを読み込むには、external volumeとcatalog integrationが必要です。ほぼexternal stageと同じ手順で作成できますが、glue catalogへのアクセス権限が必要になります。詳細は以下のドキュメントを参照してください。

https://docs.snowflake.com/ja/user-guide/tables-iceberg-configure-external-volume

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:GetObjectVersion",
                "s3:DeleteObject",
                "s3:DeleteObjectVersion"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::${bucket_name}/*"
            ]
        },
        {
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "*"
                    ]
                }
            },
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::${bucket_name}"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetTable",
                "glue:GetTables"
            ],
            "Resource": [
                "arn:aws:glue:*:${account_id}:table/*/*",
                "arn:aws:glue:*:${account_id}:catalog",
                "arn:aws:glue:*:${account_id}:database/*"
            ]
    }
  ]
}

catalog integrationのみterraformで対応していないため手動で作成しました。

CREATE CATALOG INTEGRATION MUKAI_DEV_GLUE_INT
   CATALOG_SOURCE=GLUE
   CATALOG_NAMESPACE='example_db'
   TABLE_FORMAT=ICEBERG
   GLUE_AWS_ROLE_ARN='arn:aws:iam::1234567890:role/mukai-dev-snowflake-role'
   GLUE_CATALOG_ID='1234567890'
   GLUE_REGION='ap-northeast-1'
   ENABLED=TRUE;

テーブル作成

最後にcreate tableを行えばテーブル作成の完了です。

CREATE ICEBERG TABLE users
  EXTERNAL_VOLUME='MUKAI_DEV_S3_VOLUME'
  CATALOG='MUKAI_DEV_GLUE_INT'
  CATALOG_TABLE_NAME='users';

感想

privatelinkを作成する手間はあるものの、RDS側のネットワーク設定に手を入れる必要がないため、データ連携の選択肢としては将来的には主要なものの一つとして上がってきそうです(fivetranやtroccoでも現状同様の手順が必要になります)。

Discussion