9️⃣
Redshift Serverless の Data API を使用して、S3 にある CSVファイルのデータを取り込む
Redshift Serverless の Data API を使用して、S3 にある CSVファイルのデータを取り込むためのメモ
やったこと
Redshift Serverless の NameSpace に S3 アクセス用のIAMポリシーをアタッチする
- S3 に保存されている CSV にアクセスするため、S3 バケットへの LIST アクセスとバケットオブジェクトへの GET アクセスを許可するポリシーを持つロールをアタッチする
Redshift Serverless に Schema, Table を作成する
- Table は COPY する CSV のカラムをもとにと同じカラム名と対応可能な型を定義する
テーブル への認証情報を作成する
- 今回は IAM ロールでの認証 を採用
- 実行元となる Lambda 用のユーザの権限を設定する
- User を登録する
- ユーザ名: IAMR:<Lambda Function にアタッチしたIAMロール名>
CREATE USER "IAMR:{Lambda Function にアタッチしたIAMロール名}" PASSWORD DISABLE;
- ユーザ名: IAMR:<Lambda Function にアタッチしたIAMロール名>
- Group を登録する
CREATE GROUP lambda;
- Group に Schema, Table へのアクセス権限を設定する
GRANT USAGE, CREATE ON SCHEMA dev TO GROUP lambda;
GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA dev TO GROUP lambda;
- User を登録する
Lambda Function にIAMロールをアタッチする
- AmazonRedshiftFullAccess と、
redshift:GetClusterCredentials
を許可したポリシーを持つ IAM ロールをアタッチする-
redshift:GetClusterCredentials
は Redshift の認証情報を取得するために必要
-
Lambda Function のコードを実装する
- AWS SDK の Redshift Data API サービスの execute_statement メソッドを使用する
- AWS SDK
- WorkgroupName, Database, Sql を指定する
- DBUser は指定しない、Serverlessの場合、DBUser を指定すると、実行時にエラーとなる
- DBUser は自動で、IAMR:<Lambda Function にアタッチしたIAMロール名> が指定される
- Sql に COPY 文を指定する
- IAM_ROLE には Redshift にアタッチした S3 アクセスが許可された IAM ロールを指定する
- COPY 文の例
COPY users FROM 's3://{bucket名}/{key}' DELIMITER ',' TIMEFORMAT 'YYYY-MM-DD HH:MI:SS' IAM_ROLE 'arn:aws:iam::{アカウントID}:role/{Redshift にアタッチしたロール名}';
- COPY 文の例
実装
- Redshift Serverless にCOPY する実装サンプル
- S3 の PUT イベントを受け取り、PUTされたファイルを Redshift に COPY する
Lambda Function のコード
import urllib.parse
import boto3
import os
sql="COPY users FROM 's3://{}/{}' DELIMITER ',' TIMEFORMAT 'YYYY-MM-DD HH:MI:SS' IAM_ROLE {}"
def handler(event, context):
redshift_workgroup = os.environ['REDSHIFT_WORKGROUP_NAME'] # Redshift Serverless の Workgroup 名
redshift_db = os.environ['REDSHIFT_DATABASE_NAME'] # Redshift Serverless の Database 名
redshift_iam_role = os.environ['REDSHIFT_IAM_ROLE_NAME'] # Redshift の NameSpace にアタッチした S3 アクセスが許可された IAM ロール名
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote(event['Records'][0]['s3']['object']['key'])
copy_sql = sql.format(bucket, key, redshift_iam_role)
client = boto3.client('redshift-data')
response = client.execute_statement(
WorkgroupName=redshift_workgroup,
Database=redshift_db,
Sql=copy_sql
)
return()
Discussion