🐬

Aurora MySQL の UPSERT イベントを Lambda で処理する

2022/10/05に公開

こんにちは。エンジニアチームの山岸 (@yamagishihrd) です。

今回は、Aurora MySQL DB テーブル上の UPSERT (INSERT / UPDATE) イベントをトリガーとして Lambda 関数を呼び出す方法について紹介します。実装だけでなく、具体的なユースケースについても最後に触れたいと思います。

機能の概要

機能の概要について、公式ドキュメントの記載を確認しておきます。

Aurora MySQL ネイティブ関数を使用した Lambda 関数の呼び出し

ネイティブ関数 lambda_sync および lambda_async を呼び出すことで、Aurora MySQL DB クラスターから AWS Lambda 関数を呼び出すことができます。このアプローチは、Aurora MySQL で実行しているデータベースを 他の AWS のサービスと統合するときに便利です。例えば、データベースの特定のテーブルに行が挿入されるたびに Amazon Simple Notification Service (Amazon SNS) を使用して通知を送信するような場合があります。

参考: Amazon Aurora MySQL DB クラスターからの Lambda 関数の呼び出し

実装

実装の手順について、「RDS データベース」「Lambda 関数」「MySQL テーブル」に分けて解説していきます。RDS テーブルや MySQL テーブルは既に存在している前提で、追加で必要になる設定等について記載します。

1. RDS データベース

1.1. IAM ロールの作成、およびアタッチ

RDS データベースが Lambda 関数を呼び出すための IAM ロールを作成し、これを DB クラスターにアタッチする必要があります。

許可するアクションは、最低限呼び出し対象の Lambda 関数に対する lambda:InvokeFunction だけあれば良いと思いますが、ここでは AWSLambdaRole マネージドポリシーをアタッチします。


Aurora 用 IAM ロールの許可ポリシー


Aurora 用 IAM ロールの信頼されたエンティティ

DB クラスターへのアタッチは、RDS のコンソール画面の [ 接続とセキュリティ ] - [ IAM ロールの管理 ] から、作成した IAM ロールを追加します。


RDS 用 IAMロールの追加

1.2. アウトバウンド接続の許可

RDS から Lambda など他の AWS サービスへのアクセスのため、サービスエンドポイントに対するアウトバウンド接続を許可する必要があります。[1]

DB インスタンスに関連づけられている VPC セキュリティグループのアウトバウンドルールに、すべての TCP 通信を許可するルールを追加します。


DB インスタンスの VPC セキュリティグループを編集


アウトバウンド接続の許可

2. Lambda 関数

関数名を test_function として、Lambda 関数を作成します。関数呼び出し時のペイロードとして、例えば以下の形式を想定し、コードを実装します。(後に説明する「テーブル上のトリガー設定」との整合を考慮します)

event.json
{
    "tablename": "sample_teble", 
    "id": "01ge4vek2knqsbc795tg6mxkfh", 
    "names": "hoge, piyo、Fuga"
}

以下、本記事に関係しそうな設定項目について触れておきます。

2.1. VPC 設定

環境によっては不要かもしれませんが、RDS データベースはプライベートサブネットで運用しているケースが多いと思うので、[ 設定 ] - [ VPC ] にて、同じサブネット内で起動するようサブネット ID およびセキュリティグループ ID を指定します。

2.2. アクセス権限

Lambda 側から、DB クラスターからのアクセスを許可するポリシーを設定します。SourceArn に DB クラスターの ARN を指定します。


Lambda 関数のポリシーステートメントの編集


DB クラスターからのアクションを許可するポリシーステートメント

3. MySQL テーブル

Aurora MySQL 側で Lambda 関数を呼び出すための Procedure と、特定のテーブル上で UPSERT イベントが生じた際に Procedure を実行する Trigger を作成します。

簡単ですが、以下のようなスキーマのテーブルを想定します。

mysql> DESC test_table;
--------------
DESC test_table
--------------

+--------------+--------------+------+-----+-------------------+-----------------------------+
| Field        | Type         | Null | Key | Default           | Extra                       |
+--------------+--------------+------+-----+-------------------+-----------------------------+
| id           | varchar(26)  | NO   | PRI | NULL              |                             |
| names        | varchar(120) | NO   |     | NULL              |                             |
| created_at   | datetime     | NO   |     | CURRENT_TIMESTAMP |                             |
| updated_at   | datetime     | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| deleted_at   | datetime     | YES  |     | NULL              |                             |
+--------------+--------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.01 sec)

3.1. Procedure の作成

まず Procedure です。BEGIN ~ END 句の中で CALL mysql.lambda_sync() を呼び出しています。第一引数に Lambda 関数 ARNを、第二引数に JSON ペイロードを与えます。Procedure への入力を設定しておくことで、UPSERT レコードの内容を動的に埋め込むことができます。

create_procedure.sql
DELIMITER $$
CREATE PROCEDURE IF NOT EXISTS procedure_test_table (
    IN tablename VARCHAR(80), 
    IN id VARCHAR(26), 
    IN names VARCHAR(120)
) LANGUAGE SQL
BEGIN
  CALL mysql.lambda_sync(
    'arn:aws:lambda:ap-northeast-1:000000000000:function:test-function',
     CONCAT('{"tablename":"', tablename,'", "id":"', id,'", "names":"', names, '"}'));
END
$$
DELIMITER ;

3.2. Trigger の作成

続いて Trigger です。UPSERT が生じたら、先に作成した Procedure を実行します。UPSERT レコードの内容は NEW.{カラム名} で取得できます。1 つの Trigger に INSERT と UPDATE 両方を設定することは出来ないようなので、それぞれ作成します。

create_trigger_insert.sql
DELIMITER $$
CREATE TRIGGER IF NOT EXISTS trigger_test_table_insert
  AFTER INSERT ON test_table
  FOR EACH ROW
BEGIN
  CALL procedure_test_table("test_table", NEW.id, NEW.names);
END
$$
DELIMITER ;
create_trigger_upsert.sql
DELIMITER $$
CREATE TRIGGER IF NOT EXISTS trigger_test_table_update
  AFTER UPDATE ON test_table
  FOR EACH ROW
BEGIN
  CALL procedure_test_table("test_table", NEW.id, NEW.names);
END
$$
DELIMITER ;

以上の手順で、test_table テーブル上で生じた UPSERT イベントをトリガーに Lambda 関数を呼び出せるようになりました。

トラブルシューティング

ネットワーク系のエラー (ERROR 1871, ERROR 1873, ERROR 1815) については公式ドキュメント [1:1] に記載があるので、そちらを確認して頂ければと思います。

それ以外のものとしては IAM ロール関連のエラーに遭遇しました。

ERROR 63996 (HY000): Lambda API returned error: Unrecognized Client. The security token included in the request is invalid.

筆者の場合、作成した IAM ロールを DB クラスターにアタッチした後、IAM ロールの削除・再作成しましたが、このとき DB クラスターからのデタッチ・最アタッチをしていなかったことが原因でした。一度 Aurora 用の IAM ロールを削除する際は、DB クラスターからも同じリソース名でもデタッチしましょう。

ユースケース例

最後にユースケース例というか、実際に本機能を利用しようとに思った背景についても触れておこうと思います。

プロダクト開発に利用している RDS データベースがあり、いくつかのテーブルは Google スプレッドシート上で入力されたデータを ECS 上にデプロイしたサービスを経由して投入しています。

手入力運用の関係でテーブルによっては表記が揺れていたり、1レコードに複数の値が入っています。これらのテーブルを正規化し、複数値を別レコードとして格納する関連テーブルを定義しました。この際、マスターテーブル側の UPSERT されたレコードを処理し、関連テーブルにデータ投入するアーキテクチャを検討しました。

データストア間の処理なのでバッチ的な ETL 処理でも良いかもしれませんが、ユースケースを考慮して UPSERT レコードのみをニアリアルタイムに処理する形にしました。この辺りは要件次第かと思います。


以上です。最後までご覧頂きありがとうございました。

脚注
  1. https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/AuroraMySQL.Integrating.Authorizing.Network.html ↩︎ ↩︎

Discussion