📌

DynamoDB export -> Glue Crawler -> Athena ExecuteQueryを実行するLambdaを実験

2022/03/06に公開

最近AWSにハマっていて、Athenaを使ってみたかったので、DynamoDBのExportをcronで実行して、Glue Crawlerの実行からAthenaのExecuteQueryまでを自動で実行できるLambdaを作ってみました。

サンプルコード

こちらにLambda関数のhandlerとリソース構築に使ったterraformのコードを上げています。
https://github.com/dl10yr/aws-practice/tree/main/dynamo-s3-glue-athena-lambda

注意点

  • テスト的な実験
  • cronは試してない
  • 最初の1回しか回らない(crawler名、テーブル名)
  • record数少なすぎると、クローラが上手く回らない?
    • Some files do not match the schema detected. Remove or exclude the following files from the crawler (truncated to first 200 files)

環境

  • Node.js
  • terraform(リソース構築)

コード解説

全体の流れ

lambda関数のeventのparameterにeventTypeを指定して実行

{ "eventType": "exportTable | report" }
  1. cronでDynamoDBのExportを実行(eventType: exportTable
  2. Glue Crawlerの作成・実行
  3. Athena Queryの実行(eventType: report

※ Glue Crawlerの実行はS3 Eventのtriggerを使います

export async function handler(event: any): Promise<true> {
  if (event.Records && event.Records.length > 0) {
    // s3 eventでトリガーされる時はここに入る
    const targetPath = await getTargetPath()
    await createCrawler(targetPath)
    await startCrawler()
    return true
  }

  switch (event.eventType) {
    case EVENT_TYPE.EXPORT:
      await exportDynamoToS3()
      break
    case EVENT_TYPE.RUN_CRAWLER:
      const targetPath = await getTargetPath()
      await createCrawler(targetPath)
      await startCrawler()
      break
    case EVENT_TYPE.REPORT:
      const startQueryResult = await queryAthena(sql)
      const result = await getQueryResult(startQueryResult)
      console.log(JSON.stringify(result))
      break
    default:
      break
  }
  return true
}

1. DynamoDBのExport

  • cronで実行

1.1 まずはExport

  const command = new ExportTableToPointInTimeCommand({ S3Bucket: TABLE_EXPORT_S3_BUCKET, TableArn: DYNAMO_TABLE_ARN })
  const result = await dynamoClient.send(command)

1.2 Glue Crawlerを作成する際に指定するクロールするパス(DynamoDBのデータがExportされたS3のパス)をSSMに保存

  const crawlerTargetPath = `${TABLE_EXPORT_S3_BUCKET}/AWSDynamoDB/${arnSplit[arnSplit.length - 1]}/data/`
  const command = new PutParameterCommand({
    Name: SSM_S3_EXPORT_ARN_PATH,
    Value: crawlerTargetPath,
    Overwrite: true
  })
  await ssmClient.send(command)

2. Crawlerの作成・実行

  • Export用バケットにExportが完了されたことの検知として、manifest-summary.jsonが作成されたことをtriggerに実行

2.1 まずは、↑で保存したパスを取得(getTargetPath

2.2 クローラの作成

  const command = new CreateCrawlerCommand({
    Name: GLUE_CRAWLER_NAME,
    Role: GLUE_CRAWLER_ROLE_ARN,
    Targets: { S3Targets: [{ Path: targetPath }] },
    DatabaseName: GLUE_DATABASE_NAME
  })
  await glueClient.send(command)
  • ここで渡すIAMロールにわたすポリシーが結構苦労しました。(FullAccessを結構使ってしまいました)
data "aws_iam_policy" "glue_s3_full_access" {
  arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}

data "aws_iam_policy" "glue_cloudwatch_full_access" {
  arn = "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
}

data "aws_iam_policy" "glue_glue_service_role" {
  arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
}

2.3 クローラの実行

  const command = new StartCrawlerCommand({
    Name: GLUE_CRAWLER_NAME
  })
  await glueClient.send(command)

3. AthenaのQueryの実行

  • eventBridgeを使って、CrawlerのStateがSuceededしたことを検知して実行

3.1 クエリの実行

  const command = new StartQueryExecutionCommand({
    QueryString,
    ResultConfiguration: {
      OutputLocation: `s3://${QUERY_RESULT_S3_BUCKET}`
    },
    QueryExecutionContext: {
      Database: GLUE_DATABASE_NAME
    }
  })
  return await athenaClient.send(command)

3.2 クエリの実行結果の取得

async function getQueryExecutionState(startQueryResult: StartQueryExecutionOutput): Promise<string | undefined> {
  const command = new GetQueryExecutionCommand({
    QueryExecutionId: startQueryResult.QueryExecutionId
  })
  const result = await athenaClient.send(command)
  return result.QueryExecution?.Status?.State
}

async function getQueryResult(startQueryResult: StartQueryExecutionCommandOutput): Promise<GetQueryResultsCommandOutput> {
  // クエリの実行が終わるまで待つ
  let isPolling = true
  while (isPolling) {
    const state = await getQueryExecutionState(startQueryResult)
    isPolling = state === 'RUNNING' || state === 'QUEUED'
    await sleep()
  }

  // 実行結果の取得
  const command = new GetQueryResultsCommand({
    QueryExecutionId: startQueryResult.QueryExecutionId
  })
  return await athenaClient.send(command)
}

各種トリガーの設定

serverless.yamlのfunctions.XXX.eventsで定義

    events:
      - schedule:
          rate: cron(0 10 * * ? *)
          enabled: true
          input:
            eventType: exportTable
      - s3:
          bucket: ${self:service}-${self:provider.stage}-table-export
          event: 's3:ObjectCreated:*'
          existing: true
          rules:
            - suffix: manifest-summary.json ## 一つのファイルに限定しておかないと、何回もtriggerされる
      - eventBridge:
          pattern:
            source:
              - 'aws.glue'
            detail-type:
              - 'Glue Crawler State Change'
            detail:
              state:
                - 'Succeeded'
          input:
            eventType: report

References

Discussion