Embulk v0.11をAWS Lambda上で動かす
Embulkは入力・出力プラグインの組み合わせで柔軟にデータ転送ができて便利ですよね。
AWS上でEmbulkを動かす場合、既存のEC2やECSに相乗りしない場合は以下の理由からLambda上で動かすのが良いのではと思いました。
- 1回の転送時間をLambdaの最大起動時間である15分以内に収めらる。
- データ転送量が増えた場合は分割してLambda関数の呼び出し回数を増やすことで対応可能。
- ステートを持つ必要がない。
- 用途的にコールドスタート時のレイテンシが問題にならない。
- GPUなど特殊なハードウェアは不要。
- コストが安い。
そこでEmbulkをLambdaで動かす例を作ってみました。
作成した例の概要
- 2023年7月時点の最新版であるEmbulk v0.11.0を使用しています。
- RDSからRedshiftにデータ転送する例になっています。
- Lambda関数のハンドラはGoで書いています。Embulkはこのハンドラから起動しています。
Embulk v0.11 について
Embulk v0.11は、以前の安定版であるv0.9から非互換の変更が複数あるようです。
主なものには以下のものがあります。
- Rubyで書かれたプラグインを動かすためのJRubyが同梱されなくなったので、自分でインストールする必要があります。
使用するJRubyは標準添付ライブラリが同梱された jruby-complete-x.x.x.x.jar です。 - embulk.propertiesというJava properties形式のファイルが導入されました。
このファイルでEmbulkのグローバルな設定を行います。JRubyのパスもこのファイルに設定します。 - JRubyを使用するにはembulk, msgpack Gemをインストールする必要があります。
BundlerやLiquidを使用する場合はそれらのGemのインストールも必要です。
詳細はEmbulkのメンテナーのMIKURUBEさんが書かれたこちらの記事を参照してください。
Embulk v0.11 がまもなく出ます
デプロイ手順
-
src/main.go
を目的にあった形に編集してください。 -
以下のコマンドでDockerイメージをビルドします。
docker build -t embulk .
M1やM2などApple Siliconでビルドする場合は、Gemのインストール処理が不安定になるようです。
その場合は最新のDocker Desktopにアップデートして、以下のオプションを有効にしてみてください。-
Settings > Features in development
✅ Use Rosetta for x86/amd64 emulation on Apple Silicon
-
-
Amazon ECRのリポジトリにDockerイメージをプッシュします。
詳細は以下のページを参照してください。
Docker イメージをプッシュする -
Lambda関数を作成します。
-
Lambda関数を設定します。
- Amazon ECRのリポジトリにプッシュしたDockerイメージをデプロイします。
- メモリ割当量を512MB以上に増やします(デフォルトの128MBだとJRubyの起動に失敗しました)。
- タイムアウト時間を十分な長さに増やします。
- アクセス権限を適切に設定します(詳細は後述します)。
- VPC、サブネット、セキュリティーグループを適切に設定します。
Lambda関数のアクセス権限について
Lambda関数の実行ロールに、必要なアクセス権限を設定する必要があります。
最低でもログ出力のために以下のポリシーが必要です(デフォルトで設定されています)。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:<your-region>:<your-account>:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:<your-region>:<your-account>:log-group:/aws/lambda/<your-lambda-function>:*"
]
}
]
}
VPC内のリソース(RDSやRedshift等)にアクセスするためには、以下のポリシーも必要です。
{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface"
],
"Resource": "*"
}
この例のように embulk-output-redshift プラグインを使用する場合は、一旦S3に出力してからLOADするので以下のポリシーも必要です。
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<your-backet>",
"arn:aws:s3:::<your-backet>/*"
]
}
DBパスワードなどのパラメータをAWS SSM パラメーターストアから取得する場合は、以下のポリシーも必要です。
{
"Effect": "Allow",
"Action": [
"ssm:GetParameter",
"kms:Decrypt"
],
"Resource": "*"
}
auth_method / aws_auth_method に何を指定するか?
AWSへの認証が必要な以下のようなプラグインを使用する場合、configファイルの auth_method / aws_auth_method の項目で指定する認証方法には env
を指定してください。
- embulk-input-s3
- embulk-output-s3
- embulk-output-redshift
env
を指定することにより、プラグインが AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN などの環境変数を参照してくれるので、Lambda関数の実行ロールに割り当てたアクセス権限が適用されます。
詳細は以下のページを参照してください。
Dockerfile
Embulkが正式にサポートするJavaはJava 8のみなので、ベースイメージはaws-lambda-java:8.al2 を使用しています。
Embulk実行用のGoバイナリはコードを修正するたびにビルドすることになるので、マルチステージビルドを使用してはじめににEmbulkをセットアップすることによりキャッシュが効くようにしています。
- ステージ1
- Embulk, JRuby, Gemのインストール
- 設定ファイルのコピー
- ステージ2
- Lambda関数ハンドラ(Goバイナリ)のビルド
- ステージ3
- 最終イメージの作成
- ステージ1, 2から必要な生成物をコピー
# ステージ1: Embulk, JRuby, 各種Gemのインストール
FROM amazon/aws-lambda-java:8.al2 AS java-builder
# EmbulkとEmbulk Gemのバージョンは同一にする必要があるで変数として定義する。
ARG embulk_version=0.11.0
COPY embulk.properties /embulk/
RUN mkdir -p /embulk/bin/
RUN yum install -y wget
RUN wget -O /embulk/bin/embulk https://dl.embulk.org/embulk-${embulk_version}.jar
RUN wget -O /embulk/bin/jruby https://repo1.maven.org/maven2/org/jruby/jruby-complete/9.3.10.0/jruby-complete-9.3.10.0.jar
# JRubyを使用するにはembulk, msgpack Gemのインストールが必要。
# configファイルの中でLiquidテンプレートを使用する場合は、liquid gemも必要。
RUN java -jar /embulk/bin/embulk -X embulk_home=/embulk \
gem install \
embulk:${embulk_version} \
msgpack \
liquid \
embulk-input-mysql \
embulk-output-redshift -N
COPY config/* /embulk/config/
# ステージ2: Lambda関数ハンドラ(Goバイナリ)のビルド
FROM golang:1.20-alpine AS go-builder
COPY src/* /go/src/
WORKDIR /go/src
RUN go mod download
RUN GOOS=linux GOARCH=amd64 go build -ldflags="-s -w -buildid=" -trimpath -o /lambda
# ステージ3: 最終イメージの作成
# Java 8を使用する(それ以外のバージョンのJavaは正式サポートされていない)。
FROM amazon/aws-lambda-java:8.al2
COPY /embulk /embulk
COPY /lambda /lambda
WORKDIR /
ENTRYPOINT [ "/lambda" ]
Terraform
Terraformで環境構築する場合、tfファイルは以下のようなコードになります。
# ====== ECR ======
resource "aws_ecr_repository" "repo_for_lambda" {
name = "repo_for_lambda"
image_tag_mutability = "MUTABLE"
}
# ====== IAM Role ======
resource "aws_iam_role" "iam_role_for_lambda" {
name = "iam_role_for_lambda"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_iam_role_policy" "iam_role_policy_for_lambda" {
name = "iam_role_policy_for_lambda"
role = aws_iam_role.iam_role_for_lambda.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"ssm:GetParameter",
"kms:Decrypt"
]
Effect = "Allow"
Resource = "*"
},
{
Action = [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
]
Effect = "Allow"
Resource = [
aws_s3_bucket.bucket_for_lambda.arn,
"${aws_s3_bucket.bucket_for_lambda.arn}/*"
]
}
]
})
}
resource "aws_iam_role_policy_attachment" "iam_role_policy_attachment_for_lambda" {
role = aws_iam_role.xxxxx.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}
# ====== Security Group ======
resource "aws_security_group" "sg_for_lambda" {
name = "sg_for_lambda"
vpc_id = aws_vpc.main.id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
# ====== Lambda ======
resource "aws_lambda_function" "lambda_for_embulk" {
function_name = "lambda_for_embulk"
package_type = "Image"
image_uri = "${aws_ecr_repository.repo_for_lambda.repository_url}:latest"
role = aws_iam_role.iam_role_for_lambda.arn
publish = true
memory_size = 512
timeout = 600
vpc_config {
subnet_ids = [aws_subnet.private_subnet.id]
security_group_ids = [aws_security_group.sg_for_lambda.id]
}
}
# ====== S3 ======
resource "aws_s3_bucket" "bucket_for_lambda" {
bucket = "bucket_for_lambda"
}
Discussion