Athenaの分析結果をスプレッドシートに変換しメール配信する仕組みを作った話
Athenaの分析結果のCSVをスプレッドシート形式でGoogle Driveに保存しメールで配信する仕組みをLambdaとStep Functionsを用いて構築する機会があったため、その実装例を 紹介します。
要件
要件として、大きく下記の2つがありました。
- Athenaの分析結果をGoogleスプレッドシート(以下、スプシ)で格納する。
- 格納したスプシの共有URLを本文に記載したメールを集計完了通知として配信する。
全体概要
下図が全体の概要図となります。
EventBridgeで毎日定時刻にStep Functionsのステートマシンを起動します。Step Functionsを構成しているLambda関数で、Athenaによるログ集計、Google Drive APIおよびGoogle Spread APIを用いて、S3に保存されている集計結果のCSVをスプシとしてGoogle Driveに保存、SESで集計完了メールの送付を行います。ステートマシン内で処理に失敗した場合は、Slackに通知するようにしています。
Step Functionsのステートマシン
Step Functionsのステートマシンのフロー図が下図となります。
EventBridgeからは、入力値として下記の値を渡しています。
{
"tenant_id": 9999,
"tenant_name": "テナント名",
"folder_id": "{Googleドライブの共有フォルダのID}"
}
Googleドライブの共有フォルダのIDは、スプシを保存したいフォルダのURLのhttps://drive.google.com/drive/folders/{フォルダID}
の末尾のIDを使用します。
ステートマシンのサンプルコード
{
"StartAt": "AggregateLog",
"States": {
"AggregateLog": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:aggregate-log",
"InvocationType": "RequestResponse",
"Payload": {
"tenant_id.$": "$$.Execution.Input.tenant_id",
"tenant_name.$": "$$.Execution.Input.tenant_name"
}
},
"ResultSelector": {
"s3_key.$": "$.Payload.s3_key"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "NotifyAggregateFailed"
}
],
"Next": "TransferResultToGoogleDrive"
},
"TransferResultToGoogleDrive": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:transfer-result-to-google-drive",
"InvocationType": "RequestResponse",
"Payload": {
"s3_key.$": "$.s3_key",
"folder_id.$": "$$.Execution.Input.folder_id"
}
},
"ResultSelector": {
"spreadsheet_url.$": "$.Payload.spreadsheet_url",
"log_exists.$": "$.Payload.log_exists"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "NotifyAggregateFailed"
}
],
"Next": "NotifyAggregateCompleted"
},
"NotifyAggregateCompleted": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:notify-aggregate-completed",
"InvocationType": "RequestResponse",
"Payload": {
"spreadsheet_url.$": "$.spreadsheet_url",
"tenant_name.$": "$$.Execution.Input.tenant_name",
"log_exists.$": "$.log_exists"
}
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "NotifyAggregateFailed"
}
],
"End": true
},
"NotifyAggregateFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:{aws_region}:{aws_account_id}:function:notify-aggregate-failed",
"InvocationType": "RequestResponse",
"Payload": {
"tenant_name.$": "$$.Execution.Input.tenant_name",
"execution_id.$": "$$.Execution.Id",
"state_machine_name.$": "$$.StateMachine.Name"
}
},
"End": true
}
}
}
ステートマシンの実行IDやステートマシン名をLambda関数の入力値に使いたかったため、コンテキストオブジェクトを使用しました。
{
"execution_id.$": "$$.Execution.Id",
"state_machine_name.$": "$$.StateMachine.Name"
}
Lambda関数
ステートマシンは、下記の4つのLambda関数で構成しています。
- ① Athenaでログを集計する関数(AggregateLog)
- ② 分析結果のCSVファイルをS3からGoogleドライブにスプシとして保存する関数(TransferResultToGoogleDrive)
- ③ Amazon SES経由で集計結果完了メールを配信する関数(NotifyAggregateCompleted)
- ④ クエリ失敗時にSlackに通知する関数(NotifyAggregateFailed)
関数①:Athena分析用のLambda関数
AWS SDK Rubyのstart_query_execution
メソッドを使用しています。
require 'json'
require 'aws-sdk-athena'
def handler(event:, context:)
query_string = <<-QUERY
-- 集計用のSQL
QUERY
client = Aws::Athena::Client.new({region: '{aws_region}'})
resp = client.start_query_execution(
{
query_string: query_string,
query_execution_context: {
database: "{glueのデータベース名}",
catalog: "AwsDataCatalog"
},
result_configuration: {
output_location: 's3://{Athenaのクエリ結果の保存場所のプリフィックス}/'
}
}
)
puts "query_execution_id: #{resp.query_execution_id}"
status = loop do
sleep 1
execution = client.get_query_execution(
{
query_execution_id: resp.query_execution_id
})
status = execution.query_execution.status
puts execution.query_execution
break status if ['SUCCEEDED', 'FAILED'].include?(status.state)
end
if status.state == "FAILED"
raise StandardError, 'ログの集計が失敗しました。手動で実行してください。'
else
puts "Executed at #{Time.now}."
end
{
status: "success",
s3_key: "#{resp.query_execution_id}.csv"
}
end
デプロイ用でMakefileを作成しました。make deploy
を実行するとzipファイルの作成とLambda関数が更新できます。
deploy:
zip -r function.zip function.rb
aws lambda update-function-code --function-name aggregate-log --zip-file fileb://function.zip
関数②:分析結果のCSVファイルをS3からGoogleドライブにスプシとして保存する関数
Google Drive APIとGoogle Spreadsheet APIを使用するために、まず下記を行います。
- GCPのサービスアカウントを作成
- APIの有効化とサービスアカウントキーを発行
- サービスアカウントキーのJSONをAWS Secrets Managerに登録
- Googleドライブの共有フォルダの共有設定にGCPのサービスアカウントを追加
下記が関数のサンプルコードです。
Google::Apis::DriveV3::DriveServiceのcreate_file
メソッドでスプシを保存し、
Google::Apis::SheetsV4::SheetsServiceのbatch_get_spreadsheet_values
メソッドでスプシの中の値のチェックを行っています。
require 'json'
require 'aws-sdk-secretsmanager'
require 'aws-sdk-s3'
require 'googleauth'
require 'google-apis-drive_v3'
require 'google-apis-sheets_v4'
module LambdaFunction
class Handler
CHECK_CELL_RANGE = "G:H" # 値をチェックしたいセル範囲
def self.process(event:, context:)
secretsmanager = Aws::SecretsManager::Client.new
service_account_key_json = secretsmanager.get_secret_value(secret_id: "{GCPのサービスアカウントのクレデンシャルを保存したシークレットID}").secret_string
folder_id = event["folder_id"]
s3_key = event["s3_key"]
local_file = File.join("/tmp", File.basename(s3_key))
s3 = Aws::S3::Client.new
s3_object = s3.get_object(
response_target: local_file,
bucket: "{Athenaの分析結果の保存場所のバケット名}",
key: s3_key
)
authorizer = Google::Auth::ServiceAccountCredentials.make_creds(
json_key_io: StringIO.new(service_account_key_json),
scope: "https://www.googleapis.com/auth/drive"
)
authorizer.fetch_access_token!
drive = Google::Apis::DriveV3::DriveService.new
drive.authorization = authorizer
metadata = Google::Apis::DriveV3::File.new(name: target_date, parents: [folder_id], mime_type: 'application/vnd.google-apps.spreadsheet')
spreadsheet = drive.create_file(metadata, upload_source: local_file, supports_team_drives: true)
sheet_service = Google::Apis::SheetsV4::SheetsService.new
sheet_service.authorization = authorizer
cell_values_to_check = sheet_service.batch_get_spreadsheet_values("#{spreadsheet.id}", ranges: CHECK_CELL_RANGE)
.value_ranges
.first
.values[1..] # 1行目はヘッダーなので除外
.uniq!
&.flatten
if cell_values_to_check.empty?
log_exists = false
else
log_exists = true
end
{
status: "success",
spreadsheet_url: "https://docs.google.com/spreadsheets/d/#{spreadsheet.id}",
log_exists: log_exists
}
end
end
end
この関数は、AWS SDK以外のgemも利用するため、コンテナイメージを作成しました。
FROM public.ecr.aws/lambda/ruby:3.2
COPY Gemfile Gemfile.lock ${LAMBDA_TASK_ROOT}/
RUN gem install bundler:2.4.20 && \
bundle config set --local path 'vendor/bundle' && \
bundle install
COPY function.rb ${LAMBDA_TASK_ROOT}/
CMD [ "function.LambdaFunction::Handler.process" ]
# frozen_string_literal: true
source "https://rubygems.org"
gem 'aws-sdk-secretsmanager'
gem 'aws-sdk-s3'
gem 'google-apis-drive_v3'
gem 'google-apis-sheets_v4'
gem 'googleauth'
この関数もMakefileを作成して、make deploy
を実行するビルド、ECRへのプッシュ、Lambda関数の更新が行えるようにしました。
deploy:
docker build --platform linux/arm64 -t transfer-result-to-google-drive:latest .
aws ecr get-login-password --region {aws_region} | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com
docker tag transfer-result-to-google-drive:latest {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/transfer-result-to-google-drive:latest
docker push {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/transfer-result-to-google-drive:latest
aws lambda update-function-code --function-name transfer-result-to-google-drive --image-uri {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/transfer-result-to-google-drive:latest
関数③:Amazon SES経由で集計結果完了メールを配信する関数
require 'json'
require 'aws-sdk-ses'
def handler(event:, context:)
source_address = ENV["SOURCE_ADDRESS"] # 配信元のメールアドレス
to_address = ENV["TO_ADDRESS"] # 送信先のメールアドレス(To)
cc_address = ENV["CC_ADDRESS"] # 送信先のメールアドレス(cc)
spreadsheet_url = event["spreadsheet_url"]
log_exists = event["log_exists"]
subject = log_exists ? "【連絡】集計ファイルの格納完了" : "【連絡】ログデータがありませんでした"
ses = Aws::SES::Client.new
ses.send_email(
destination: {
to_addresses: [to_address],
cc_addresses: [cc_address]
},
message: {
subject: {
charset: "UTF-8",
data: "#{subject}"
},
body: {
html: {
charset: "UTF-8",
data: "#{spreadsheet_url}"
}
}
},
source: "#{source_address}"
)
{ status: "success" }
end
関数④:クエリ失敗時にSlackに通知する関数
下記の記事と同様の関数を実装しました。
まとめ
以上がLambdaとStep Functionsを用いて構築したAthenaの分析結果CSVをスプレッドシート形式でGoogle Driveに保存しメールで配信する仕組みになります。今後も、Step Functionsをより活用して複雑なワークフローの構築にも挑戦していこうと思います。
Discussion