🔍

【RDS】リードレプリカ作成に伴うAuroraエンドポイントの選定【技術検証】

2023/03/30に公開

概要

  1. 本検証は大量のLambdaが同時にAuroraヘアクセスする(但し読み取りのみ)場合、Aurora側ではどのようなエンドポイントや設定を用いて対応すれば最適かを検証する。本記事はその時の検証記録の備忘録である。
  2. 読み取りのみのアクセスをする場合、リードレプリカを増設し読み取り専用エンドポイントにLambdaの指し先を設定するのが最適。特段の設定もせずに負荷分散を行ってくれる。
  3. RDS Proxyはコネクションの取り回しなどをしてくれるが、Auroraの場合は特定のインスタンスに紐付ける設定ができない。AuroraやRDSの多重アクセスに関しては公式Documentを読んで仕様を確認するのがいい。(「予備調査+検証予想」の章を参照のこと)

検証主題

リードレプリカの新規作成にあたって接続先となるエンドポイントの選定を行う。

背景

一度に大量の一般ユーザー向け通知を行うシステム基盤を開発していた際、何十万ユーザーのアドレス等を取得するため大量のLambdaを同時起動しRDSヘReadアクセスをする必要が出てきた。RDS Proxyを使うと大量のLambdaの間でもコネクションを持ち回して同時接続数をバーストせずにサービスを実現できることが発表されたが、読み取り専用エンドポイントも読み取りに限定すれば似たような機能を持っていることが調査中に判明。どちらが今回のシステム要件に合致するのかを調べる必要が出てきた。

結論

  1. RDS Proxyエンドポイントに関して、リードレプリカインスタンスのみを紐づける設定は不可能。したがって今回採用はできない。
  2. 特段設定をしなくとも、多重アクセス先に読み取り専用エンドポイントをアタッチすれば負荷分散とコネクション管理をしてくれる。
  3. 当初の予定ではRDS Proxyを使えるカスタムエンドポイントを新規作成し、通知基盤のLambdaはそこにアクセスするという方式で設計を行っていたが、この設計をAurora読み取り専用エンドポイントで接続するに変更する。

予備調査+検証予想

予備調査

  • Auroraには4つのエンドポイントがあり、このうちの一つを選択してLambdaの接続先とする必要がある。
  • このうち、読み取り専用エンドポイントで接続すると、プライマリーインスタンスは無視してn(1 <= n)個以上のリードレプリカに対し自動で負荷分散しながら接続するようになっている。公式ドキュメントには以下のように書いてある。

Aurora DB クラスターのリーダーエンドポイントは、その DB クラスターへの読み取り専用接続を負荷分散します。リーダーエンドポイントは、クエリなどの読み取りオペレーションで使用します。これらのステートメントを読み取り専用の Aurora レプリカで処理することにより、このエンドポイントはプライマリインスタンスのオーバーヘッドを削減します。また、クラスター内の Aurora レプリカの数に合わせて、同時 SELECT クエリを処理する容量をスケールすることもできます。Aurora DB クラスターごとに 1 つのリーダーエンドポイントがあります。

クラスターに 1 つ以上の Aurora レプリカが含まれている場合、リーダーエンドポイントは Aurora レプリカ間で各接続リクエストを負荷分散します。その場合、そのセッションでは SELECT などの読み取り専用ステートメントのみを実行できます。クラスターにプライマリインスタンスのみが含まれていて、Aurora レプリカが含まれていない場合、リーダーエンドポイントはプライマリインスタンスに接続します。その場合は、エンドポイントを介して書き込みオペレ―ションを実行できます。

参考資料:LambdaからRDSにアクセスする(28th July 2020)より抜粋した資料

$ aws rds register-db-proxy-targets \
     --db-proxy-name $DB_PROXY_NAME \
     --db-instance-identifiers $NEW_DB_INSTANCE_ID

An error occurred (InvalidParameterValue) when calling the RegisterDBProxyTargets operation: Database engine OSCAR 5.6.mysql_aurora.1.22.2 for DB Instance dev-rds-for-famm-user-getter-test is not supported. Register the DB cluster instead of DB instance, or use another DB instance that is supported.

検証予想

以上の予備調査から、以下の二つの仮説が浮上する。

  1. RDS Proxyエンドポイントに関して、リードレプリカインスタンスのみを紐づける設定は不可能。したがって今回採用はできないのではないか。
  2. 特段設定をしなくとも、多重アクセス先に読み取り専用エンドポイントをアタッチすれば負荷分散とコネクション管理をしてくれるのではないか。

※なお、このうち1は上記予備調査より証明された。採用はできない。

検証手続き

  1. DBからユーザー情報をとってくるシンプルなLambdaを作る
  2. リードレプリカ1個に紐づいている状態で読み取り専用エンドポイントに向けてLambdaを同時起動。徐々に起動回数を増やしていく(100発→1000発→10000発)
  3. DB接続カウント及びCPU使用率、メモリのスパイク状況を観察
  4. CloudWatchLogsのフィルタを使って「POSTした総数=ログレベルinfoでuser_idを示しているログの総数」であることを確認する。(これが成立すれば処理落ちしたプロセスは一つもない事になる)

検証結果

サマリー

  1. 10000件でやった結果、Lambda側でエラーは検知されず、
  2. RDSのモニタリングでは、試験実行時間中にリードレプリカ二台の読み込みレイテンシーとネットワーク受信スループットの値が上昇したが、それ以外のパラメータは大して変わらなかった。またmasterのモニタリングも怪しいスパイクは検知されなかった。
  3. したがって、読み取りオンリーのエンドポイントに接続するだけで、負荷分散できていると判断した。

検証ソース

配信Lambdaの検証ソース

  1. ログはこのように出現する。{"family_id":"2453","level":"info","msg":"This is user_ids : 3208,3209","time":"2021-01-22T09:10:33Z"}
    1. level=infoで出現するログは一通りしかないので、検索でこれが引っかかればそのfamily_idに対するprocessは成功したと同義とする
    2. コネクションで落ちる、クエリ発行時に落ちるなど要所でlevel=errorになるようにlogを吐いている。level=errorで検索して出てくれば、何かしらの理由で処理落ちしたものとカウントする。
package main

import (
	"context"
	"database/sql"
	"fmt"
	"os"
	"encoding/json"
	_ "github.com/go-sql-driver/mysql"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"strconv"
	"strings"
	log "github.com/sirupsen/logrus"
)

type Request struct {
	FamilyID string `json:"family_id"`
	TestCase string `json:"test_case"`
}

// 取得するレコード一行のデータ形式を構造体で定義する
type UserData struct {
	FamilyID		int
	UserID          int
	Email 			string
	UserDeviceToken string
}

type MySQLEnvVal struct {
	DBMS 		string
	USER 		string
	PASS 		string
	PROTOCOL 	string
	DBNAME 		string
	CONNECT 	string
}

func GetDBConnectionConfig() MySQLEnvVal {
	mysqlEnvVal := MySQLEnvVal{
		DBMS:     "mysql",
		USER:     os.Getenv("user"),
		PASS:     os.Getenv("password"),
		PROTOCOL: os.Getenv("protocol"),
		DBNAME:   os.Getenv("dbname"),
		CONNECT:  "",
	}

	mysqlEnvVal.CONNECT = mysqlEnvVal.USER + ":" + mysqlEnvVal.PASS + "@" + mysqlEnvVal.PROTOCOL + "/" + mysqlEnvVal.DBNAME + "?charset=utf8&parseTime=true&loc=Asia%2FTokyo"

	return mysqlEnvVal
}

func ConvertInputDataToStruct(inputs string) (*Request, error) {
	var req Request
	err := json.Unmarshal([]byte(inputs), &req)
	if err != nil {
		return nil, err
	}
	return &req, nil
}

func Handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
	log.SetFormatter(&log.JSONFormatter{})
	mysqlEnvVal := GetDBConnectionConfig()

	req, err := ConvertInputDataToStruct(request.Body)
	if err != nil {
		return events.APIGatewayProxyResponse{
			Body: err.Error(),
			StatusCode: 500,
		}, err
	}
	logger := log.WithFields(log.Fields{"family_id": req.FamilyID, "test_case": req.TestCase})

	conn, err := sql.Open(mysqlEnvVal.DBMS, mysqlEnvVal.CONNECT)
	defer conn.Close()

	if err != nil {
		logger.Error("Fail to connect db" + err.Error())
		return events.APIGatewayProxyResponse{
			Body: err.Error(),
			StatusCode: 500,
		}, nil
	}

	// 接続確認
	err = conn.Ping()
	defer conn.Close()
	if err != nil {
		logger.Error("Failed to connect rds : %s", err.Error())
		return events.APIGatewayProxyResponse{
			Body: err.Error(),
			StatusCode: 500,
		}, err
	}

	// DBからレコードを抽出
	// join句の条件に絞り込み条件を埋め込むことで、結合前に絞り込みを行う
	// https://atsuizo.hatenadiary.jp/entry/2016/12/12/163921
	rows, err := conn.Query("select t2.family_id, t1.user_id, t2.email, device_token from user_device_tokens as t1 inner join `user` as t2 on t1.user_id = t2.user_id and t2.family_id = ?;", req.FamilyID)
	defer conn.Close()

	if err != nil {
		logger.Error("Fail to query from db: " + err.Error())
		return events.APIGatewayProxyResponse{
			Body: err.Error(),
			StatusCode: 500,
		}, err
	}

	// データを構造体へ変換
	var UserDatas []UserData
	var UserDataStrings []string
	defer conn.Close()
	for rows.Next() {
		var tmpUserData UserData
		err := rows.Scan(&tmpUserData.FamilyID, &tmpUserData.UserID, &tmpUserData.Email, &tmpUserData.UserDeviceToken)
		if err != nil {
			logger.Error("Fail to scan records " + err.Error())
		}
		UserDatas = append(UserDatas, tmpUserData)
	}
	// 確認のための出力 + 型変換
	for _, userData := range UserDatas {
		userIDstring := strconv.Itoa(userData.UserID)
		appendText := "{user_id: " + userIDstring + " email: " + userData.Email + " device_token: " + userData.UserDeviceToken
		UserDataStrings = append(UserDataStrings, appendText)
	}

	userDatastring := strings.Join(UserDataStrings, ",")

	logger.Info(fmt.Sprintf("This is user_ids : %s", userDatastring))

	return events.APIGatewayProxyResponse{
		Body: "Success",
		StatusCode: 200,
	}, nil

}

func main() {
	lambda.Start(Handler)
}

検証結果をCloudWatchLogsでクエリする式はこちら。これでゼロ件だった場合、全てが正常に処理されている。

{ $.test_case = slot_100 && $.level = error }

Discussion