📌
DynamoDB export -> Glue Crawler -> Athena ExecuteQueryを実行するLambdaを実験
最近AWSにハマっていて、Athenaを使ってみたかったので、DynamoDBのExportをcronで実行して、Glue Crawlerの実行からAthenaのExecuteQueryまでを自動で実行できるLambdaを作ってみました。
サンプルコード
こちらにLambda関数のhandlerとリソース構築に使ったterraformのコードを上げています。
注意点
- テスト的な実験
- cronは試してない
- 最初の1回しか回らない(crawler名、テーブル名)
- record数少なすぎると、クローラが上手く回らない?
Some files do not match the schema detected. Remove or exclude the following files from the crawler (truncated to first 200 files)
環境
- Node.js
- terraform(リソース構築)
コード解説
全体の流れ
lambda関数のeventのparameterにeventTypeを指定して実行
{ "eventType": "exportTable | report" }
- cronでDynamoDBのExportを実行(
eventType: exportTable
) - Glue Crawlerの作成・実行
- Athena Queryの実行(
eventType: report
)
※ Glue Crawlerの実行はS3 Eventのtriggerを使います
export async function handler(event: any): Promise<true> {
if (event.Records && event.Records.length > 0) {
// s3 eventでトリガーされる時はここに入る
const targetPath = await getTargetPath()
await createCrawler(targetPath)
await startCrawler()
return true
}
switch (event.eventType) {
case EVENT_TYPE.EXPORT:
await exportDynamoToS3()
break
case EVENT_TYPE.RUN_CRAWLER:
const targetPath = await getTargetPath()
await createCrawler(targetPath)
await startCrawler()
break
case EVENT_TYPE.REPORT:
const startQueryResult = await queryAthena(sql)
const result = await getQueryResult(startQueryResult)
console.log(JSON.stringify(result))
break
default:
break
}
return true
}
1. DynamoDBのExport
- cronで実行
1.1 まずはExport
const command = new ExportTableToPointInTimeCommand({ S3Bucket: TABLE_EXPORT_S3_BUCKET, TableArn: DYNAMO_TABLE_ARN })
const result = await dynamoClient.send(command)
1.2 Glue Crawlerを作成する際に指定するクロールするパス(DynamoDBのデータがExportされたS3のパス)をSSMに保存
const crawlerTargetPath = `${TABLE_EXPORT_S3_BUCKET}/AWSDynamoDB/${arnSplit[arnSplit.length - 1]}/data/`
const command = new PutParameterCommand({
Name: SSM_S3_EXPORT_ARN_PATH,
Value: crawlerTargetPath,
Overwrite: true
})
await ssmClient.send(command)
2. Crawlerの作成・実行
- Export用バケットにExportが完了されたことの検知として、
manifest-summary.json
が作成されたことをtriggerに実行
getTargetPath
)
2.1 まずは、↑で保存したパスを取得(2.2 クローラの作成
const command = new CreateCrawlerCommand({
Name: GLUE_CRAWLER_NAME,
Role: GLUE_CRAWLER_ROLE_ARN,
Targets: { S3Targets: [{ Path: targetPath }] },
DatabaseName: GLUE_DATABASE_NAME
})
await glueClient.send(command)
- ここで渡すIAMロールにわたすポリシーが結構苦労しました。(FullAccessを結構使ってしまいました)
data "aws_iam_policy" "glue_s3_full_access" {
arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}
data "aws_iam_policy" "glue_cloudwatch_full_access" {
arn = "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
}
data "aws_iam_policy" "glue_glue_service_role" {
arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
}
2.3 クローラの実行
const command = new StartCrawlerCommand({
Name: GLUE_CRAWLER_NAME
})
await glueClient.send(command)
3. AthenaのQueryの実行
- eventBridgeを使って、CrawlerのStateが
Suceeded
したことを検知して実行
3.1 クエリの実行
const command = new StartQueryExecutionCommand({
QueryString,
ResultConfiguration: {
OutputLocation: `s3://${QUERY_RESULT_S3_BUCKET}`
},
QueryExecutionContext: {
Database: GLUE_DATABASE_NAME
}
})
return await athenaClient.send(command)
3.2 クエリの実行結果の取得
async function getQueryExecutionState(startQueryResult: StartQueryExecutionOutput): Promise<string | undefined> {
const command = new GetQueryExecutionCommand({
QueryExecutionId: startQueryResult.QueryExecutionId
})
const result = await athenaClient.send(command)
return result.QueryExecution?.Status?.State
}
async function getQueryResult(startQueryResult: StartQueryExecutionCommandOutput): Promise<GetQueryResultsCommandOutput> {
// クエリの実行が終わるまで待つ
let isPolling = true
while (isPolling) {
const state = await getQueryExecutionState(startQueryResult)
isPolling = state === 'RUNNING' || state === 'QUEUED'
await sleep()
}
// 実行結果の取得
const command = new GetQueryResultsCommand({
QueryExecutionId: startQueryResult.QueryExecutionId
})
return await athenaClient.send(command)
}
各種トリガーの設定
serverless.yamlのfunctions.XXX.eventsで定義
events:
- schedule:
rate: cron(0 10 * * ? *)
enabled: true
input:
eventType: exportTable
- s3:
bucket: ${self:service}-${self:provider.stage}-table-export
event: 's3:ObjectCreated:*'
existing: true
rules:
- suffix: manifest-summary.json ## 一つのファイルに限定しておかないと、何回もtriggerされる
- eventBridge:
pattern:
source:
- 'aws.glue'
detail-type:
- 'Glue Crawler State Change'
detail:
state:
- 'Succeeded'
input:
eventType: report
Discussion