S3 -> R2の移行をなるべく低コストで実現してみた
S3 -> R2の移行をなるべく低コストで実現してみた
どうも、DELTAの馬場です。
以前の記事では R2 Super Slurperを用いたS3からR2へのオブジェクト移行ツールについて触れました。
この記事の中では、ツールによる移行と実際にアーキテクチャを切り替えるまでの間に生成されるオブジェクトについて、対応が必要ということが課題として残ってしまっていました。
そして、当時は以下のように対応すればよいと考えていました。
- S3とR2両方にアップロードするようにアプリ側を編集
今回、アプリ側での改修無しで移行ができる方法を思いついたので、その方法を紹介します。
どのように実現するか
ざっくりしたイメージは以下の通りです。
オブジェクトの新規登録のS3イベントをトリガーにLambdaを実行しR2の同名バケット上にオブジェクトを送付することでオブジェクトストレージの移行までの同期を実現できるものと考えています。
やってみる
準備
- R2上にS3と同名のバケットを作成しておく。
- R2側でAPIトークンを発行しておく。
- Lambdaで利用するS3とSecrets Managerの権限を持ったRoleかPolicyを作成しておく。
AWS側の作業
AWS側の作業と書きましたが、事前準備以外でcloudflareを触ることは今回のケースではありません。
- Secrets ManagerにR2のトークンを保存する。
アクセスキー、シークレットキー、エンドポイントのURLがそれぞれ払い出されていると思います。
Lambdaにべた書きは流石にアレなので、SecretsManagerから取得するようにしていきます。
「新しいシークレットを保存する」から"その他のシークレットのタイプ"を選択し、それぞれキーとその値を入力していきます。
キーは後でLambdaの環境変数として使うのでわかりやすい形にしておいてください。
その後、シークレットの名前を好きにつけてあげます。
ここもLambdaの環境変数として使うので、長すぎない方がいいかなと思います。
- Lambdaの実行Roleを作成
予め作っておいてもいいですし、とりあえず新規で作成して後から権限足すでもいいかなと思います。
私の検証時は後から以下のような権限を足しました。
s3用(バケットを絞ったり、GetしかしないのでActionに制限を入れてもいいなと思ってます)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
}
]
}
SecretsManager用(R2のアクセストークン用のシークレットだけに制限してもいいかも)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "secretsmanager:*",
"Resource": "*"
}
]
}
- Lambdaを作成する。
ビルドがめんどくさいのでPythonで書いてます。
コードは後述にあります。
とりあえず動けばいいの気持ちで書いているので目を瞑りながら読んでください。。。
ポイントは以下2点
- R2はS3互換なのでboto3でS3と同じように動かせる。
接続情報について注意が必要で、AZは "auto" を設定し、clientに "endpoint_url" を設定する必要があります。 - R2への接続情報の取得は環境変数に設定した値を用いて行う。
動き方としてはざっくり以下のフローとなります。
- S3のイベントを受け取ってS3のバケット名とアップされたオブジェクトのキー(パスとファイル名)を取得
- アップされたオブジェクトをいったん取得
- R2へ取得したオブジェクトをアップ
import json
import urllib.parse
import boto3
import os
import ast
import base64
from botocore.exceptions import ClientError
s3 = boto3.client('s3')
def get_all_secrets():
print("get secrets")
secret_name = os.environ['SECRETS_NAME']
region = os.environ['AWS_REGION']
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region
)
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
print(e)
else:
if 'SecretString' in get_secret_value_response:
secret_data = get_secret_value_response['SecretString']
secrets = ast.literal_eval(secret_data)
return secrets
else:
decoded_binary_secrets = base64.b64decode(get_secret_value_response['SecretBinary'])
return decoded_binary_secrets
def upload_object_toR2(object_body,bucket,key):
print("upload s3 object to R2")
secrets = get_all_secrets()
r2_client = boto3.client('s3',
region_name ="auto",
aws_access_key_id = secrets[os.environ['r2_access_key']],
aws_secret_access_key = secrets[os.environ['r2_secret_access_key']],
endpoint_url = secrets[os.environ['r2_endpoint']])
try:
r2_client.put_object(Bucket=bucket, Key=key, Body=object_body)
print(f"Object {key} copied from S3 to R2")
return {
'statusCode': 200,
'body': json.dumps('Object copied successfully')
}
except Exception as e:
print(f"Error: {e}")
return {
'statusCode': 500,
'body': json.dumps('Error copying object')
}
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
print("get s3 object")
target_object = s3.get_object(Bucket=bucket, Key=key)
upload_object_toR2(target_object['Body'].read(),bucket,key)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
Lambdaのサンプルのコードをそのまま使う場合は、環境変数のキーと値を以下の通りに設定して下さい。
- SECRETS_NAME: R2の接続情報を保存したシークレットの名前
- r2_access_key: アクセスキーIDのシークレットキー
- r2_secret_access_key: シークレットアクセスキーのシークレットキー
- r2_endpoint: エンドポイントのURLのシークレットキー
- Lambdaのトリガーを設定する。
実際に動かしていくためにトリガーを設定していきます。
"設定タブ" から "トリガー" を選択し、"トリガーを追加" をクリックします。
とりあえず "ソースを選択" から "S3" を選択します。
"Bucket"に対象となるバケット名を入力します。
"Event types" を選択します。
今回はとりあえずオブジェクトがアップされたら同期させたいだけなので
"All object create events" のままにしておきます。
inputとoutputのバケット一緒にするとコスト大変なことになるからね!?
みたいな注意喚起にチェックを入れて "追加" をクリックするとトリガーが作成されます。
動かしてみる
fallocateで雑にダミーファイルを作成してS3にファイルをアップします。
その後Lambdaが動いてR2に同じファイルがいるか?を見ていこうと思います。
ちょこちょこ触っていたときに、「何回もファイル作るのとバケット直下以外の動きもみたいな」と思ってさっと作ったshellも置いておきます。
#!/bin/bash
BucketName=$1
fileNo=$2
if [ ! -d dir1/dir2 ]; then
mkdir dir1/dir2
fi
fallocate -l 10M 10M_dummy_from_fallocate_${fileNo}.txt
mv 10M_dummy_from_fallocate_${fileNo}.txt dir1/dir2/
aws s3 cp dir1/dir2/10M_dummy_from_fallocate_${fileNo}.txt s3://${BucketName}/dir1/dir2/
#aws s3 cp dir1/dir2/10M_dummy_from_fallocate_${fileNo}.txt s3://${BucketName}/dir1/dir2/ --profile $your_profile
ちょっと話がずれましたが、とりあえずs3にファイルを置いてLambdaが走りR2に同期されるのか?
を見ていきます。
-
まずはS3にファイルをアップ
少し見にくいですが上がってます。
ディレクトリのような構造も対応できるのか?まで見れるような形で配置してみています。 -
R2側にファイルが同じパスで配置されていました!
-
cloud watchのLog上でもうまく動いていそうでした。
-
もちろんバケット直下でも大丈夫です。
まとめ
今回の検証でS3イベントとLambdaを組み合わせることでアプリ側への改修無しでS3とR2間での同期を取れることが分かりました。
さらに、botoもほんの少し変えるだけでR2にアップロード先を変更できることも分かったので、アプリの移行自体も簡単そうであると判明しました。
問題になりそうなツールでの移行と実際にアプリを切り替えるまでの期間のオブジェクトの取扱いについては、R2 Super Slurperの実行中にでも今回のLambdaを有効化しておくことで、オブジェクトが消えてしまったり、サービス停止の時間がかなり短縮できることが分かりました。
We're hiring!
最後までお読みいただきありがとうございます。
現在DELTA では一緒に働いてくださる仲間を大募集中です!
ご興味をお持ちいただけましたら、お気軽にフォームからご連絡ください。
Discussion