🚀

[AWS]S3へのCSVデータ追加をトリガーとしてAuroraデータベースに自動反映させる②

2022/12/13に公開


はじめに

ご覧いただきありがとうございます。阿河です。

前回は「S3にファイルをアップロード⇒SQS⇒DynamoDBと連携を行いながらLambdaを実行(S3からデータを読み取る)」までを実装しました。

今回は「LambdaでAmazon Aurora PostgreSQLデータベースに情報反映させる」部分を実装していきます。

対象者

  • AWSを運用中の方
  • 日々のデータ管理を楽にしたいと思っている方

概要

※前回記事

  1. S3にデータを用意する
  2. S3イベント通知およびSQSとの連携
  3. LambdaとDynamoDBの連携
  4. S3からのデータ読み込み部分の実装

※本記事の対象

  1. Auroraデータベース作成/テーブル作成
  2. Secrets Managerの設定
  3. Lambda関数(PostgreSQLへの書き込み)
  4. 検証

5. Auroraデータベース作成/テーブル作成

事前準備

  • 踏み台EC2インスタンスの作成(パブリックサブネット)
  • サブネットグループ(プライベートサブネット)
  • DB用セキュリティグループ(EC2/Lambdaのセキュリティグループからのインバウンド通信を許可、TCPポート: 5432)

Auroraデータベースを作成

RDSのページで、Amazon Aurora PostgreSQLデータベースを作成します。

エンジンタイプ: Amazon Aurora
エディション: Amazon Aurora PostgreSQL互換エディション
テンプレート: 開発/テスト
DBクラスター識別子: 任意の名前
マスターユーザー名: postgres
マスターパスワード: 任意のパスワード
DBインスタンスクラス: db.t3.medium
マルチAZ配置: Auroraレプリカを作成しない
ネットワークタイプ: IPv4
VPC/DBサブネットグループ: 作成済みのVPCとサブネットグループを紐づけ
セキュリティグループ: 作成済みのセキュリティグループを紐づけ
パブリックアクセス: なし
データベースポート: 5432
最初のデータベース名: mydb

今回はRDS ProxyやIAM DB認証は使いません。
必要に応じて設定下さい。

Auroraデータベース内にテーブルを作成

踏み台EC2にログインして、下記のコマンドでAuroraデータベースに接続します。

psql --host=xxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com --port=5432 --username=postgres --password --dbname=mydb

--hostには、Auroraインスタンスのエンドポイントを指定しています。
コマンドを入力すると、パスワードを求められます。

mydb=> \c mydb
Password: 
psql (10.21, server 13.7)
WARNING: psql major version 10, server major version 13.
         Some psql features might not work.
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
You are now connected to database "mydb" as user "postgres".
                                                      ^
mydb=> CREATE TABLE "branch_table"(id integer PRIMARY KEY, branch varchar(100), customerID
 integer, product varchar(100));
CREATE TABLE
                            
mydb=> INSERT INTO branch_table(id,branch,customerID,product) VALUES (1, 'branch_A', 11678
734, 'productA');
INSERT 0 1
mydb=> SELECT * FROM branch_table;
 id |  branch  | customerid | product  
----+----------+------------+----------
  1 | branch_A |   11678734 | productA

「id」「branch」「customerID」「product」カラムを持つテーブルを作成しました。

6. Secrets Managerの設定

LambdaからAmazon Aurora PostgreSQLデータベースに接続するにあたり、データベースの認証情報をコードに埋め込むのはセキュリティ面で懸念があります。

よってSecrets Managerを利用します。

  • シークレットの種類: RDSデータベースの認証情報
  • ユーザー名/パスワード: RDSの認証情報を入力
  • シークレットがアクセスするRDSデータベース: 先ほど作成したRDS
  • シークレット名: 任意の名前
  • 自動ローテーション: 無効

上記の設定でシークレットを作成。

それぞれのデータベースに関わる情報がキー/バリューで保存されました。

LambdaコードでSecrets Managerを利用する

  • Lambdaのロールに下記ポリシーを追加します。
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "secretsmanager:GetSecretValue",
            "Resource": "*"
        }
    ]
}
  • コードの変更

Secrets Managerのページに、各プログラミング言語ごとのサンプルコードが載っています。

試しにLambdaのコードで、シンプルにシークレットの値を取得してみます。

import boto3
import json
import pandas as pd
from botocore.exceptions import ClientError


def get_secret():

    secret_name = "postgres-secrets"
    region_name = "ap-northeast-1"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    
    secret = get_secret_value_response['SecretString']
    
    return secret
    
    
def lambda_handler(event, context):
    
    print(get_secret())
    
{"username":"postgres","password":"xxxxxxxxx","engine":"postgres","host":"xxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com","port":5432,"dbClusterIdentifier":"xxxxxxxxxx"}

問題なくシークレットの値が取得できました。

次のセクションで、PostgreSQLへアクセスするコードと含めて実装します。

7. Lambda関数(PostgreSQLへの書き込み)

  • aws-psycopg2を追加

今回はLambda関数用のZIPファイルを作って用意します。
ローカルやCloud9、CloudShell等で、下記の作業を行います。

Psycopg2を取得

$ git clone https://github.com/jkehler/awslambda-psycopg2.git

アップロード用フォルダの準備

$ mkdir -p pfile/package 
$ touch pfile/lambda_function.py 
$ mv awslambda-psycopg2/psycopg2-3.8 pfile/package/psycopg2

lambda.function.pyの編集(Lambdaアップロード後は、コードの編集が必要です)

import psycopg2

def lambda_handler(event,context):
    
    print("Hello World")

ZIPファイル化

$ zip -r ../psyco.zip . 
$ zip -r ./psyco.zip lambda_function.py

Lambda関数のコードタブページのコードブロック右上の「アップロード元」から「zipファイル」を選択。ZIPファイルをアップロード。

lambda.function.pyに、次のコードを記述します。

  • コードの修正
import boto3
import json
import pandas as pd
from botocore.exceptions import ClientError
import psycopg2
import psycopg2.extras

s3 = boto3.client('s3')
dynamo = boto3.resource('dynamodb')
dynamo_table = dynamo.Table('postgre-dynamo')

def get_secret():

    secret_name = "postgres-secrets"
    region_name = "ap-northeast-1"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    
    secret = get_secret_value_response['SecretString']
    sc = json.loads(secret)
    
    return sc
    
    
def file_read(bucket, filename):
    
    obj = s3.get_object(Bucket=bucket, Key=filename)
    df = pd.read_csv(obj['Body'], header=None)
    df_len = len(df)
    
    title_list = []
    year_list = []
    product_list = []
    total_list = []
    
    
    for i in range(df_len):
        title_list.append(df.iloc[i, 0])
        year_list.append(df.iloc[i, 1])
        product_list.append(df.iloc[i, 2])
        
    total_list.append(title_list)
    total_list.append(year_list)
    total_list.append(product_list)
    total_list.append(df_len)
    
    return total_list


def lambda_handler(event, context):
    
    secrets = get_secret()
    
    ENDPOINT= secrets['host']
    PORT= secrets['port'] 
    USR= secrets['username']
    PASS= secrets['password']
    
    sqs_event = event['Records'][0]['body']
    ev = json.loads(sqs_event)
    bucket = ev['Records'][0]['s3']['bucket']['name']
    filename = ev['Records'][0]['s3']['object']['key']

    try:
        add_item = {
            'file_name': filename.split('/')[3],
            'status': 'exist'
        }

        response = dynamo_table.put_item(
            Item=add_item,
            ConditionExpression='attribute_not_exists(file_name)'
        )
	
        csv_datalist = file_read(bucket, filename)
    
    
        connection = psycopg2.connect(host=ENDPOINT, dbname='mydb', user=USR, password=PASS)
        
        cur = connection.cursor()
        results = cur.execute('SELECT COUNT(*) FROM branch_table')
        
        # (count,)
        results = cur.fetchone()
        # (count)
        count = list(results)[0]
        print(count)
        
    
        for num in range(csv_datalist[3]):
            
            cur.execute("INSERT INTO branch_table(id, branch, customerID, product) VALUES('%s', '%s', '%s', '%s')"%(count + num, csv_datalist[0][num], csv_datalist[1][num], csv_datalist[2][num] ))
    
        connection.commit()
        cur.close()
        connection.close()
        
        return { 'statusCode': 200 }
        
        
    except Exception:
        
        print('already exist')   

大まかな処理の流れは以下の通りです。

・Secrets Managerから認証用のシークレット情報を取得
・SQSキューからイベント情報取得
・取得したファイル情報が既にDynamoDBに保存されていない場合のみ、次の処理に移る
・DynamoDBにファイル情報を格納
・該当ファイルの元データをS3バケットから取得
・psycopg2を使って、Amazon Aurora PostgreSQLデータベースと接続を行う
・テーブルの現状の行数を取得する(INSERTをする際に、idの値を計算する必要があるため)
・CSVファイルの情報を、Amazon Aurora PostgreSQLデータベースのテーブルにINSERTする
・最後まで処理がいったら、ステータスコード200を返す

8. 検証

ではS3にファイルをアップロードしてみましょう。

テーブルを確認します。

S3にアップロードしたCSVファイルのデータが、Amazon Aurora PostgreSQLデータベースにINSERTされました。

DynamoDBにもデータが格納されています。これにより重複処理は行われません。
念のためS3に同じファイルをアップロードし直しました。

already exist

CloudWatch Logsに、上記のログが出力されました。

テーブルに追加のINSERTは行われません。
S3ファイルアップロードの、Amazon Aurora PostgreSQL自動反映化は成功しているようです。

さらに別のCSVファイルを追加したところ、テーブルが更新されました。

idの値も問題なくインクリメントされています。

さいごに

以上、長くなりましたが「S3にCSVをアップロード⇒SQS⇒Lambda⇒AWS Aurora PostgreSQL」へのパイプラインができました。

少しでも誰かのお役に立てれば幸いです。
御覧頂き ありがとうございます!

MEGAZONE株式会社 Tech Blog

Discussion