📧

Athenaの分析結果をスプレッドシートに変換しメール配信する仕組みを作った話

2024/03/08に公開

Athenaの分析結果のCSVをスプレッドシート形式でGoogle Driveに保存しメールで配信する仕組みをLambdaとStep Functionsを用いて構築する機会があったため、その実装例を 紹介します。

要件

要件として、大きく下記の2つがありました。

  • Athenaの分析結果をGoogleスプレッドシート(以下、スプシ)で格納する。
  • 格納したスプシの共有URLを本文に記載したメールを集計完了通知として配信する。

全体概要

下図が全体の概要図となります。

EventBridgeで毎日定時刻にStep Functionsのステートマシンを起動します。Step Functionsを構成しているLambda関数で、Athenaによるログ集計、Google Drive APIおよびGoogle Spread APIを用いて、S3に保存されている集計結果のCSVをスプシとしてGoogle Driveに保存、SESで集計完了メールの送付を行います。ステートマシン内で処理に失敗した場合は、Slackに通知するようにしています。

Step Functionsのステートマシン

Step Functionsのステートマシンのフロー図が下図となります。

EventBridgeからは、入力値として下記の値を渡しています。

{
  "tenant_id": 9999,
  "tenant_name": "テナント名",
  "folder_id": "{Googleドライブの共有フォルダのID}"
}

Googleドライブの共有フォルダのIDは、スプシを保存したいフォルダのURLのhttps://drive.google.com/drive/folders/{フォルダID}の末尾のIDを使用します。

ステートマシンのサンプルコード
{
  "StartAt": "AggregateLog",
  "States": {
    "AggregateLog": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:aggregate-log",
        "InvocationType": "RequestResponse",
        "Payload": {
          "tenant_id.$": "$$.Execution.Input.tenant_id",
          "tenant_name.$": "$$.Execution.Input.tenant_name"
        }
      },
      "ResultSelector": {
        "s3_key.$": "$.Payload.s3_key"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "NotifyAggregateFailed"
        }
      ],
      "Next": "TransferResultToGoogleDrive"
    },
    "TransferResultToGoogleDrive": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:transfer-result-to-google-drive",
        "InvocationType": "RequestResponse",
        "Payload": {
          "s3_key.$": "$.s3_key",
          "folder_id.$": "$$.Execution.Input.folder_id"
        }
      },
      "ResultSelector": {
        "spreadsheet_url.$": "$.Payload.spreadsheet_url",
        "log_exists.$": "$.Payload.log_exists"
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "NotifyAggregateFailed"
        }
      ],
      "Next": "NotifyAggregateCompleted"
    },
    "NotifyAggregateCompleted": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:notify-aggregate-completed",
        "InvocationType": "RequestResponse",
        "Payload": {
          "spreadsheet_url.$": "$.spreadsheet_url",
          "tenant_name.$": "$$.Execution.Input.tenant_name",
          "log_exists.$": "$.log_exists"
        }
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "NotifyAggregateFailed"
        }
      ],
      "End": true
    },
    "NotifyAggregateFailed": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:notify-aggregate-failed",
        "InvocationType": "RequestResponse",
        "Payload": {
          "tenant_name.$": "$$.Execution.Input.tenant_name",
          "execution_id.$": "$$.Execution.Id",
          "state_machine_name.$": "$$.StateMachine.Name"
        }
      },
      "End": true
    }
  }
}

ステートマシンの実行IDやステートマシン名をLambda関数の入力値に使いたかったため、コンテキストオブジェクトを使用しました。

{
    "execution_id.$": "$$.Execution.Id",
    "state_machine_name.$": "$$.StateMachine.Name"
}

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-contextobject.html

Lambda関数

ステートマシンは、下記の4つのLambda関数で構成しています。

  • ① Athenaでログを集計する関数(AggregateLog)
  • ② 分析結果のCSVファイルをS3からGoogleドライブにスプシとして保存する関数(TransferResultToGoogleDrive)
  • ③ Amazon SES経由で集計結果完了メールを配信する関数(NotifyAggregateCompleted)
  • ④ クエリ失敗時にSlackに通知する関数(NotifyAggregateFailed)

関数①:Athena分析用のLambda関数

AWS SDK Rubyのstart_query_executionメソッドを使用しています。

https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Athena/Client.html#start_query_execution-instance_method

aggregate_log/function.rb
require 'json'
require 'aws-sdk-athena'

def handler(event:, context:)
  query_string = <<-QUERY
    -- 集計用のSQL
  QUERY

  client = Aws::Athena::Client.new({region: '{aws_region}'})
  resp = client.start_query_execution(
    {
      query_string: query_string,
      query_execution_context: {
        database: "{glueのデータベース名}",
        catalog: "AwsDataCatalog"
      },
      result_configuration: {
        output_location: 's3://{Athenaのクエリ結果の保存場所のプリフィックス}/'
      }
    }
  )
  puts "query_execution_id: #{resp.query_execution_id}"

  status = loop do
    sleep 1
    execution = client.get_query_execution(
    {
        query_execution_id: resp.query_execution_id
    })
    status =  execution.query_execution.status
    puts execution.query_execution
    break status if ['SUCCEEDED', 'FAILED'].include?(status.state)
  end

  if status.state == "FAILED"
    raise StandardError, 'ログの集計が失敗しました。手動で実行してください。'
  else
    puts "Executed at #{Time.now}."
  end

  {
    status: "success",
    s3_key: "#{resp.query_execution_id}.csv"
  }
end

デプロイ用でMakefileを作成しました。make deployを実行するとzipファイルの作成とLambda関数が更新できます。

aggregate_log/Makefile
deploy:
    zip -r function.zip function.rb
    aws lambda update-function-code --function-name aggregate-log --zip-file fileb://function.zip

関数②:分析結果のCSVファイルをS3からGoogleドライブにスプシとして保存する関数

Google Drive APIとGoogle Spreadsheet APIを使用するために、まず下記を行います。

  • GCPのサービスアカウントを作成
  • APIの有効化とサービスアカウントキーを発行
  • サービスアカウントキーのJSONをAWS Secrets Managerに登録
  • Googleドライブの共有フォルダの共有設定にGCPのサービスアカウントを追加

https://cloud.google.com/iam/docs/service-accounts-create?hl=ja
https://cloud.google.com/iam/docs/keys-create-delete?hl=ja

下記が関数のサンプルコードです。
Google::Apis::DriveV3::DriveServiceのcreate_fileメソッドでスプシを保存し、
Google::Apis::SheetsV4::SheetsServiceのbatch_get_spreadsheet_valuesメソッドでスプシの中の値のチェックを行っています。

https://googleapis.dev/ruby/google-api-client/latest/Google/Apis/DriveV3/DriveService.html#create_file-instance_method
https://googleapis.dev/ruby/google-api-client/latest/Google/Apis/SheetsV4/SheetsService.html#batch_get_spreadsheet_values-instance_method

transfer_result_to_google_drive/function.rb
require 'json'
require 'aws-sdk-secretsmanager'
require 'aws-sdk-s3'
require 'googleauth'
require 'google-apis-drive_v3'
require 'google-apis-sheets_v4'

module LambdaFunction
  class Handler
    CHECK_CELL_RANGE = "G:H" # 値をチェックしたいセル範囲

    def self.process(event:, context:)
      secretsmanager = Aws::SecretsManager::Client.new
      service_account_key_json = secretsmanager.get_secret_value(secret_id: "{GCPのサービスアカウントのクレデンシャルを保存したシークレットID}").secret_string
      folder_id = event["folder_id"]
      s3_key = event["s3_key"]
      local_file = File.join("/tmp", File.basename(s3_key))

      s3 = Aws::S3::Client.new
      s3_object = s3.get_object(
        response_target: local_file,
        bucket: "{Athenaの分析結果の保存場所のバケット名}",
        key: s3_key
      )

      authorizer = Google::Auth::ServiceAccountCredentials.make_creds(
        json_key_io: StringIO.new(service_account_key_json),
        scope: "https://www.googleapis.com/auth/drive"
      )
      authorizer.fetch_access_token!

      drive = Google::Apis::DriveV3::DriveService.new
      drive.authorization = authorizer

      metadata = Google::Apis::DriveV3::File.new(name: target_date, parents: [folder_id], mime_type: 'application/vnd.google-apps.spreadsheet')
      spreadsheet = drive.create_file(metadata, upload_source: local_file, supports_team_drives: true)

      sheet_service = Google::Apis::SheetsV4::SheetsService.new
      sheet_service.authorization = authorizer

      cell_values_to_check = sheet_service.batch_get_spreadsheet_values("#{spreadsheet.id}", ranges: CHECK_CELL_RANGE)
                                          .value_ranges
                                          .first
                                          .values[1..] # 1行目はヘッダーなので除外
                                          .uniq!
                                          &.flatten

      if cell_values_to_check.empty?
        log_exists = false     
      else
        log_exists = true
      end

      {
        status: "success",
        spreadsheet_url: "https://docs.google.com/spreadsheets/d/#{spreadsheet.id}",
        log_exists: log_exists
      }
    end
  end
end

この関数は、AWS SDK以外のgemも利用するため、コンテナイメージを作成しました。

transfer_result_to_google_drive/Dockerfile
FROM public.ecr.aws/lambda/ruby:3.2

COPY Gemfile Gemfile.lock ${LAMBDA_TASK_ROOT}/

RUN gem install bundler:2.4.20 && \
    bundle config set --local path 'vendor/bundle' && \
    bundle install

COPY function.rb ${LAMBDA_TASK_ROOT}/

CMD [ "function.LambdaFunction::Handler.process" ]
transfer_result_to_google_drive/Gemfile
# frozen_string_literal: true

source "https://rubygems.org"

gem 'aws-sdk-secretsmanager'
gem 'aws-sdk-s3'
gem 'google-apis-drive_v3'
gem 'google-apis-sheets_v4'
gem 'googleauth'

この関数もMakefileを作成して、make deployを実行するビルド、ECRへのプッシュ、Lambda関数の更新が行えるようにしました。

transfer_result_to_google_drive/Makefile
deploy:
	docker build --platform linux/arm64 -t transfer-result-to-google-drive:latest .
	aws ecr get-login-password --region {aws_region} | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com
	docker tag transfer-result-to-google-drive:latest {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/transfer-result-to-google-drive:latest
	docker push {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/transfer-result-to-google-drive:latest
	aws lambda update-function-code --function-name transfer-result-to-google-drive --image-uri {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/transfer-result-to-google-drive:latest

関数③:Amazon SES経由で集計結果完了メールを配信する関数

https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/SES/Client.html#send_email-instance_method

require 'json'
require 'aws-sdk-ses'

def handler(event:, context:)
  source_address = ENV["SOURCE_ADDRESS"] # 配信元のメールアドレス
  to_address = ENV["TO_ADDRESS"] # 送信先のメールアドレス(To)
  cc_address = ENV["CC_ADDRESS"] # 送信先のメールアドレス(cc)
  spreadsheet_url = event["spreadsheet_url"]
  log_exists = event["log_exists"]

  subject = log_exists ? "【連絡】集計ファイルの格納完了" : "【連絡】ログデータがありませんでした"

  ses = Aws::SES::Client.new
  ses.send_email(
    destination: {
      to_addresses: [to_address],
      cc_addresses: [cc_address]
    },
    message: {
      subject: {
        charset: "UTF-8",
        data: "#{subject}"
      },
      body: {
        html: {
          charset: "UTF-8",
          data: "#{spreadsheet_url}"
        }
      }
    },
    source: "#{source_address}"
  )

  { status: "success" }
end

関数④:クエリ失敗時にSlackに通知する関数

下記の記事と同様の関数を実装しました。

https://zenn.dev/stmn_inc/articles/6869d707b3af70

まとめ

以上がLambdaとStep Functionsを用いて構築したAthenaの分析結果CSVをスプレッドシート形式でGoogle Driveに保存しメールで配信する仕組みになります。今後も、Step Functionsをより活用して複雑なワークフローの構築にも挑戦していこうと思います。

株式会社スタメン

Discussion