Zenn
Open7

RDS to S3 to Snowflake Pipeline with Terraform

9sako69sako6

RDS のデータを定期的に S3 に吐き出し、Snowflake に連携するパイプラインを作りたい。技術選定のための実験。

なお、S3 から Snowflake に連携する部分は、S3 を外部ステージとして設定した上で Snowpipe を使ってやる想定。あとで Terraform で実装する。

RDS から S3 に出力するパイプラインはいろいろな構成を試してみたい。

参考

https://zenn.dev/9sako6/scraps/b061119f0c7ca7

9sako69sako6

前準備

S3 と Snowflake の連携は以前行ったのを流用する。
https://zenn.dev/9sako6/scraps/b061119f0c7ca7

RDS

実験用の RDS を用意する。

modules/rds の tf
main.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

resource "aws_db_instance" "default" {
  instance_class         = "db.t3.micro"
  engine                 = "postgres"
  db_name                = "demo_db"
  skip_final_snapshot    = true
  allocated_storage      = 10
  identifier_prefix      = "data-pipeline-demo"
  db_subnet_group_name   = aws_db_subnet_group.default.name
  vpc_security_group_ids = [aws_security_group.default.id]
  publicly_accessible    = true

  username = var.db_username
  password = var.db_password
}

network.tf
data "aws_availability_zones" "available" {}

module "vpc" {
  source               = "terraform-aws-modules/vpc/aws"
  version              = "5.8.1"
  name                 = "data_pipeline_demo"
  cidr                 = "10.0.0.0/16"
  azs                  = data.aws_availability_zones.available.names
  public_subnets       = ["10.0.4.0/24", "10.0.5.0/24", "10.0.6.0/24"]
  enable_dns_hostnames = true
  enable_dns_support   = true
}

resource "aws_db_subnet_group" "default" {
  name       = "data_pipeline_demo"
  subnet_ids = module.vpc.public_subnets
  tags = {
    Name = "Demo"
  }
}


resource "aws_security_group" "default" {
  name   = "data_pipeline_demo_rds"
  vpc_id = module.vpc.vpc_id

  ingress {
    from_port   = 5432
    to_port     = 5432
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    from_port   = 5432
    to_port     = 5432
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "demo_rds"
  }
}

outputs.tf
output "address" {
  value       = aws_db_instance.default.address
  description = "The address of the RDS instance"
}

output "port" {
  value       = aws_db_instance.default.port
  description = "The port of the RDS instance"
}
variables.tf
variable "db_username" {
  description = "The username for the database"
  type        = string
  sensitive   = true
}

variable "db_password" {
  description = "The password for the database"
  type        = string
  sensitive   = true
}

作成して接続を確認できた。

$ psql -h $(terraform output -raw rds_address) -p $(terraform output -raw rds_port) -U $TF_VAR_db_username demo_db
Password for user demo: 
psql (14.9 (Homebrew), server 16.2)
WARNING: psql major version 14, server major version 16.
         Some psql features might not work.
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.

demo_db=>

参考

https://developer.hashicorp.com/terraform/tutorials/aws/aws-rds

9sako69sako6

Data Pipeline に求めるもの

  • 自動で定期実行される
  • 失敗したら通知される
  • 自動リトライできる
  • 同じ入力で再実行したら同じ結果が得られる
  • 上記の仕組みをあんまり頑張らずに実現できる
  • (オプショナル)ジョブの依存関係を設定できる

参考になる記事。

https://techblog.zozo.com/entry/batch-development-guideline

9sako69sako6

RDS for PostgreSQL から S3 へのファイルエクスポート

ファイルエクスポートは SQL の実行とは違って DB への負荷が低いはず。
https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/postgresql-s3-export.html

aws_s3 という拡張を PostgreSQL にインストールする必要があるらしい。

demo_db=> CREATE EXTENSION aws_s3 CASCADE;
NOTICE:  installing required extension "aws_commons"
CREATE EXTENSION
demo_db=>  \dx
                           List of installed extensions
    Name     | Version |   Schema   |                 Description                 
-------------+---------+------------+---------------------------------------------
 aws_commons | 1.2     | public     | Common data types across AWS services
 aws_s3      | 1.1     | public     | AWS S3 extension for importing data from S3
 plpgsql     | 1.0     | pg_catalog | PL/pgSQL procedural language
(3 rows)

S3 export サポート状況の確認。

$ aws rds describe-db-engine-versions --region ap-northeast-1 --engine postgres --engine-version 14.9 | grep s3Export
                "s3Export",

(WIP)

9sako69sako6

RDS for PostgreSQL のスナップショットを S3 に吐き出す

Apache Parquet 形式で吐き出す。

DB スナップショットデータを Amazon S3 バケットにエクスポートできます。エクスポートプロセスはバックグラウンドで実行されるため、アクティブな DB インスタンスのパフォーマンスには影響しません。

https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/USER_ExportSnapshot.html#USER_ExportSnapshot.Exporting

リージョンとバージョンごとのスナップショットの対応状況はこれ。

https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/Concepts.RDS_Fea_Regions_DB-eng.Feature.ExportSnapshotToS3.html#Concepts.RDS_Fea_Regions_DB-eng.Feature.ExportSnapshotToS3.pg

9sako69sako6

AWS DMS

異なるデータソース間の移行用のツールっぽいが、データレイクへのデータ出力にも使える?

AWS DMSは、リカバリのためにデータベースが生成するトランザクションログから変更を抽出します。このプロセスにより、ターゲットへのレプリケーションがほぼリアルタイムに行われ、レプリケーション監視の複雑さが軽減されます。

https://docs.aws.amazon.com/ja_jp/dms/latest/sbs/postgresql-s3datalake.html

AWS DMS を動かすレプリケーションインスタンスが必要。

ログインするとコメントできます