Open7
RDS to S3 to Snowflake Pipeline with Terraform
RDS のデータを定期的に S3 に吐き出し、Snowflake に連携するパイプラインを作りたい。技術選定のための実験。
なお、S3 から Snowflake に連携する部分は、S3 を外部ステージとして設定した上で Snowpipe を使ってやる想定。あとで Terraform で実装する。
RDS から S3 に出力するパイプラインはいろいろな構成を試してみたい。
参考
前準備
S3 と Snowflake の連携は以前行ったのを流用する。
RDS
実験用の RDS を用意する。
modules/rds の tf
main.tf
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
resource "aws_db_instance" "default" {
instance_class = "db.t3.micro"
engine = "postgres"
db_name = "demo_db"
skip_final_snapshot = true
allocated_storage = 10
identifier_prefix = "data-pipeline-demo"
db_subnet_group_name = aws_db_subnet_group.default.name
vpc_security_group_ids = [aws_security_group.default.id]
publicly_accessible = true
username = var.db_username
password = var.db_password
}
network.tf
data "aws_availability_zones" "available" {}
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "5.8.1"
name = "data_pipeline_demo"
cidr = "10.0.0.0/16"
azs = data.aws_availability_zones.available.names
public_subnets = ["10.0.4.0/24", "10.0.5.0/24", "10.0.6.0/24"]
enable_dns_hostnames = true
enable_dns_support = true
}
resource "aws_db_subnet_group" "default" {
name = "data_pipeline_demo"
subnet_ids = module.vpc.public_subnets
tags = {
Name = "Demo"
}
}
resource "aws_security_group" "default" {
name = "data_pipeline_demo_rds"
vpc_id = module.vpc.vpc_id
ingress {
from_port = 5432
to_port = 5432
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 5432
to_port = 5432
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "demo_rds"
}
}
outputs.tf
output "address" {
value = aws_db_instance.default.address
description = "The address of the RDS instance"
}
output "port" {
value = aws_db_instance.default.port
description = "The port of the RDS instance"
}
variables.tf
variable "db_username" {
description = "The username for the database"
type = string
sensitive = true
}
variable "db_password" {
description = "The password for the database"
type = string
sensitive = true
}
作成して接続を確認できた。
$ psql -h $(terraform output -raw rds_address) -p $(terraform output -raw rds_port) -U $TF_VAR_db_username demo_db
Password for user demo:
psql (14.9 (Homebrew), server 16.2)
WARNING: psql major version 14, server major version 16.
Some psql features might not work.
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
demo_db=>
参考
Data Pipeline に求めるもの
- 自動で定期実行される
- 失敗したら通知される
- 自動リトライできる
- 同じ入力で再実行したら同じ結果が得られる
- 上記の仕組みをあんまり頑張らずに実現できる
- (オプショナル)ジョブの依存関係を設定できる
参考になる記事。
RDS for PostgreSQL から S3 へのファイルエクスポート
ファイルエクスポートは SQL の実行とは違って DB への負荷が低いはず。
aws_s3
という拡張を PostgreSQL にインストールする必要があるらしい。
demo_db=> CREATE EXTENSION aws_s3 CASCADE;
NOTICE: installing required extension "aws_commons"
CREATE EXTENSION
demo_db=> \dx
List of installed extensions
Name | Version | Schema | Description
-------------+---------+------------+---------------------------------------------
aws_commons | 1.2 | public | Common data types across AWS services
aws_s3 | 1.1 | public | AWS S3 extension for importing data from S3
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
(3 rows)
S3 export サポート状況の確認。
$ aws rds describe-db-engine-versions --region ap-northeast-1 --engine postgres --engine-version 14.9 | grep s3Export
"s3Export",
(WIP)
RDS for PostgreSQL のスナップショットを S3 に吐き出す
Apache Parquet 形式で吐き出す。
DB スナップショットデータを Amazon S3 バケットにエクスポートできます。エクスポートプロセスはバックグラウンドで実行されるため、アクティブな DB インスタンスのパフォーマンスには影響しません。
リージョンとバージョンごとのスナップショットの対応状況はこれ。
Apatch Airflow
Python 書いて管理するの、今回は避けたい。
AWS DMS
異なるデータソース間の移行用のツールっぽいが、データレイクへのデータ出力にも使える?
AWS DMSは、リカバリのためにデータベースが生成するトランザクションログから変更を抽出します。このプロセスにより、ターゲットへのレプリケーションがほぼリアルタイムに行われ、レプリケーション監視の複雑さが軽減されます。
AWS DMS を動かすレプリケーションインスタンスが必要。