Open7

RDS to S3 to Snowflake Pipeline with Terraform

9sako69sako6

RDS のデータを定期的に S3 に吐き出し、Snowflake に連携するパイプラインを作りたい。技術選定のための実験。

なお、S3 から Snowflake に連携する部分は、S3 を外部ステージとして設定した上で Snowpipe を使ってやる想定。あとで Terraform で実装する。

RDS から S3 に出力するパイプラインはいろいろな構成を試してみたい。

参考

https://zenn.dev/9sako6/scraps/b061119f0c7ca7

9sako69sako6

前準備

S3 と Snowflake の連携は以前行ったのを流用する。
https://zenn.dev/9sako6/scraps/b061119f0c7ca7

RDS

実験用の RDS を用意する。

modules/rds の tf
main.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

resource "aws_db_instance" "default" {
  instance_class         = "db.t3.micro"
  engine                 = "postgres"
  db_name                = "demo_db"
  skip_final_snapshot    = true
  allocated_storage      = 10
  identifier_prefix      = "data-pipeline-demo"
  db_subnet_group_name   = aws_db_subnet_group.default.name
  vpc_security_group_ids = [aws_security_group.default.id]
  publicly_accessible    = true

  username = var.db_username
  password = var.db_password
}

network.tf
data "aws_availability_zones" "available" {}

module "vpc" {
  source               = "terraform-aws-modules/vpc/aws"
  version              = "5.8.1"
  name                 = "data_pipeline_demo"
  cidr                 = "10.0.0.0/16"
  azs                  = data.aws_availability_zones.available.names
  public_subnets       = ["10.0.4.0/24", "10.0.5.0/24", "10.0.6.0/24"]
  enable_dns_hostnames = true
  enable_dns_support   = true
}

resource "aws_db_subnet_group" "default" {
  name       = "data_pipeline_demo"
  subnet_ids = module.vpc.public_subnets
  tags = {
    Name = "Demo"
  }
}


resource "aws_security_group" "default" {
  name   = "data_pipeline_demo_rds"
  vpc_id = module.vpc.vpc_id

  ingress {
    from_port   = 5432
    to_port     = 5432
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    from_port   = 5432
    to_port     = 5432
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "demo_rds"
  }
}

outputs.tf
output "address" {
  value       = aws_db_instance.default.address
  description = "The address of the RDS instance"
}

output "port" {
  value       = aws_db_instance.default.port
  description = "The port of the RDS instance"
}
variables.tf
variable "db_username" {
  description = "The username for the database"
  type        = string
  sensitive   = true
}

variable "db_password" {
  description = "The password for the database"
  type        = string
  sensitive   = true
}

作成して接続を確認できた。

$ psql -h $(terraform output -raw rds_address) -p $(terraform output -raw rds_port) -U $TF_VAR_db_username demo_db
Password for user demo: 
psql (14.9 (Homebrew), server 16.2)
WARNING: psql major version 14, server major version 16.
         Some psql features might not work.
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.

demo_db=>

参考

https://developer.hashicorp.com/terraform/tutorials/aws/aws-rds

9sako69sako6

Data Pipeline に求めるもの

  • 自動で定期実行される
  • 失敗したら通知される
  • 自動リトライできる
  • 同じ入力で再実行したら同じ結果が得られる
  • 上記の仕組みをあんまり頑張らずに実現できる
  • (オプショナル)ジョブの依存関係を設定できる

参考になる記事。

https://techblog.zozo.com/entry/batch-development-guideline

9sako69sako6

RDS for PostgreSQL から S3 へのファイルエクスポート

ファイルエクスポートは SQL の実行とは違って DB への負荷が低いはず。
https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/postgresql-s3-export.html

aws_s3 という拡張を PostgreSQL にインストールする必要があるらしい。

demo_db=> CREATE EXTENSION aws_s3 CASCADE;
NOTICE:  installing required extension "aws_commons"
CREATE EXTENSION
demo_db=>  \dx
                           List of installed extensions
    Name     | Version |   Schema   |                 Description                 
-------------+---------+------------+---------------------------------------------
 aws_commons | 1.2     | public     | Common data types across AWS services
 aws_s3      | 1.1     | public     | AWS S3 extension for importing data from S3
 plpgsql     | 1.0     | pg_catalog | PL/pgSQL procedural language
(3 rows)

S3 export サポート状況の確認。

$ aws rds describe-db-engine-versions --region ap-northeast-1 --engine postgres --engine-version 14.9 | grep s3Export
                "s3Export",

(WIP)

9sako69sako6

RDS for PostgreSQL のスナップショットを S3 に吐き出す

Apache Parquet 形式で吐き出す。

DB スナップショットデータを Amazon S3 バケットにエクスポートできます。エクスポートプロセスはバックグラウンドで実行されるため、アクティブな DB インスタンスのパフォーマンスには影響しません。

https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/USER_ExportSnapshot.html#USER_ExportSnapshot.Exporting

リージョンとバージョンごとのスナップショットの対応状況はこれ。

https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/Concepts.RDS_Fea_Regions_DB-eng.Feature.ExportSnapshotToS3.html#Concepts.RDS_Fea_Regions_DB-eng.Feature.ExportSnapshotToS3.pg

9sako69sako6

AWS DMS

異なるデータソース間の移行用のツールっぽいが、データレイクへのデータ出力にも使える?

AWS DMSは、リカバリのためにデータベースが生成するトランザクションログから変更を抽出します。このプロセスにより、ターゲットへのレプリケーションがほぼリアルタイムに行われ、レプリケーション監視の複雑さが軽減されます。

https://docs.aws.amazon.com/ja_jp/dms/latest/sbs/postgresql-s3datalake.html

AWS DMS を動かすレプリケーションインスタンスが必要。