API GatewayとKinesisを用いたリアルタイムログ転送
はじめに
今回はAPI GatewayとKinesis firehoseを用いて、イベントログのリアルタイム収集、処理、分析を行います。
APIGatewayでは、イベントログを受け取りKinesisへストリーミングデータとして転送します。
またその際にLambdaは使用せず、**サービス統合プロクシ(ReguestTemplate)**を使用することで、lambdaを経由せずノーコードでFirehoseへのリクエストを行うことが可能です。
またKinesis firehoseとGlueと連携することで、収集と同時にParquet変換や圧縮などもノーコード行うことが可能です。
以下は今回構築する構成図になります。
API Gateway
API GatewayはServerless FlameworkやSAMを用いて定義することが多いかもしれませんが、
今回は全体構成を全てCloudformationで一括管理するため使用しておりません。
この辺はSeverless FrameworkやSAMに慣れていると、Cloudformationによる定義は少し冗長に感じるかもしれません。
RestApi
以下がAPIGatewayに関するCfn定義です。
Statement
ではイベントログの送信元IPが固定のため、許可するIPを制限しています。
またexecute-api:/${ApiGatewayStage}/GET/*/mobile_attribution
はエンドポイントを定義しています。
こちらはもちろん複数の定義も可能です。
## ------------- ApiGateway -------------
ApiGatewayRestApi:
Type: "AWS::ApiGateway::RestApi"
Properties:
Name: !Sub "${ProjectName}-${Env}-api"
EndpointConfiguration:
Types:
- EDGE
Policy:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal: "*"
Action:
- "execute-api:Invoke"
Resource:
- !Sub "execute-api:/${ApiGatewayStage}/GET/*/mobile_attribution"
Condition:
IpAddress:
aws:SourceIp: !Split
- ","
- !Join
- ","
- - !Join [ ",", !Ref "OfficeIpAddressList" ]
- !Join [ ",", !Ref "VPNIpAddressList" ]
Resource
APIパスが/{app_token}/mobile_attribution
なので、/{app_token}
と/mobile_attribution
を以下のように階層関係で定義します。
# ApiGateway Resource
ApiGatewayResourceAppTokenVar:
Type: "AWS::ApiGateway::Resource"
Properties:
ParentId: !Sub "${ApiGatewayRestApi.RootResourceId}"
PathPart: "{app_token}"
RestApiId: !Ref "ApiGatewayRestApi" #前述のAPIGateway定義
# ApiGateway Resource
ApiGatewayResourceMobileAttributionVar:
Type: "AWS::ApiGateway::Resource"
Properties:
ParentId: !Ref "ApiGatewayResourceAppTokenVar"
PathPart: "mobile_attribution"
RestApiId: !Ref "ApiGatewayRestApi" #前述のAPIGateway定義
DependsOn:
- ApiGatewayResourceAppTokenVar
Method
前述定義したAPI Gateway Resourceを用いてAPI Methodの定義を行います。
こちらではAWSサービスプロクシを用いてKinesis firehoseへのリクエストを定義します。
Lambdaを用いることでも実現可能ですが、メンテナンス工数を削減するためにノーコードでリクエストが可能なAWSサービスプロクシを選択しています。
そしてRequestTemplate
ではどのようなデータをリクエストするか定義しています。
こちらでは、URLパラメータやパス、パスに定義されたapp_tokenなど全てをリクエスト情報に追加しています。
こちらの構文はVelocityTemplateLanguage(VTL)で定義されており、詳細はAPI Gateway mapping templateで確認することができます。
# ApiGateway Method
ApiGatewayMethodMobileAttributionCheckVarGet:
Type: "AWS::ApiGateway::Method"
Properties:
HttpMethod: GET
RequestParameters: { }
ResourceId: !Ref "ApiGatewayResourceMobileAttributionVar"
RestApiId: !Ref "ApiGatewayRestApi"
ApiKeyRequired: true
AuthorizationType: NONE
Integration:
Type: AWS
IntegrationHttpMethod: POST
Uri: !Sub "arn:aws:apigateway:${AWS::Region}:firehose:action/PutRecord"
Credentials: !GetAtt "ApiGatewayIntegrationRequestRole.Arn"
PassthroughBehavior: WHEN_NO_TEMPLATES
RequestTemplates:
application/json:
!Sub |
#set($params = $input.params().get('querystring'))
#set($paths = $input.params().get('path'))
#set($data = "{
#foreach($paramName in $params.keySet())
""$paramName"": ""$util.escapeJavaScript($params.get($paramName))"",
#end
""app_token"": ""$util.escapeJavaScript($method.request.path.app_token)""
}")
{
"DeliveryStreamName": "${FirehoseMobileAttribution}",
"Record": {
"Data": "$util.base64Encode($data)"
}
}
IntegrationResponses:
- StatusCode: 200
MethodResponses:
- StatusCode: 200
ResponseModels: { application/json: "Empty" }
DependsOn:
- ApiGatewayResourceMobileAttributionVar
Deployment
今までに定義したAPI Gatewayのdeployment
定義を行います。
コンソール画面で設定変更後デプロイするかと思いますが、その際の定義となります。
アクセスログが必要な場合は、こちらでログフォーマットの設定なども行えます。
# ApiGateway deployment
ApiGatewayDeployment:
Type: "AWS::ApiGateway::Deployment"
Properties:
RestApiId: !Ref "ApiGatewayRestApi"
StageName: !Ref "ApiGatewayStage"
StageDescription:
Description: !Sub "Stage - ${Env}"
LoggingLevel: INFO
MetricsEnabled: False
AccessLogSetting:
DestinationArn: !GetAtt "ApiGatewayAccessLogGroup.Arn"
Format: "{ \"requestId\":\"$context.requestId\", \"ip\": \"$context.identity.sourceIp\", \"caller\":\"$context.identity.caller\", \"user\":\"$context.identity.user\",\"requestTime\":\"$context.requestTime\", \"httpMethod\":\"$context.httpMethod\",\"resourcePath\":\"$context.resourcePath\", \"status\":\"$context.status\",\"protocol\":\"$context.protocol\", \"responseLength\":\"$context.responseLength\" }"
DependsOn:
- ApiGatewayResourceMobileAttributionVar
- ApiGatewayMethodMobileAttributionCheckVarGet
- ApiGatewayAccessLogGroup
ApiKey
前述のMethod
で ApiKeyRequired: true
と定義していたと思いますが、そちらのApiKey定義となります。
以下のName
とValue
でヘッダーに渡すApiKeyのキー名と値を定義しています。
## ------------- Api key --------------
ApiGatewayRestApiBaseKey:
Type: AWS::ApiGateway::ApiKey
Properties:
Name: !Sub "${ProjectName}-${Env}-api-base-key" # Replacement
Enabled: true
Value: !FindInMap [ !Ref Env, ApiGateway, BaseApiKey ] # Replacement
StageKeys:
- RestApiId: !Ref "ApiGatewayRestApi"
StageName: !Ref "ApiGatewayStage"
DependsOn:
- ApiGatewayDeployment
UsagePlan
こちらはApiKeyを定義しているため必要な設定で、上記コンソール画面の使用量プランに相当します。
UsagePlan
を対象のApiIdとStateを指定して紐付け、それをUsagePlanKey
のUsagePlanIdにセットします。
# ApiGateway usage plan
ApiGatewayRestApiBaseUsagePlan:
Type: AWS::ApiGateway::UsagePlan
Properties:
ApiStages:
- ApiId: !Ref "ApiGatewayRestApi"
Stage: !Ref "ApiGatewayStage"
UsagePlanName: !Sub "${ProjectName}-${Env}-usage-plan"
DependsOn:
- ApiGatewayRestApi
- ApiGatewayDeployment
# ApiGateway usage plan key
ApiGatewayRestApiBaseUsagePlanKey:
Type: 'AWS::ApiGateway::UsagePlanKey'
Properties:
KeyId: !Ref "ApiGatewayRestApiBaseKey" # Replacement
KeyType: "API_KEY" # Replacement
UsagePlanId: !Ref "ApiGatewayRestApiBaseUsagePlan" # Replacement
DependsOn:
- ApiGatewayRestApiBaseUsagePlan
S3
こちらはFirehoseで処理したデータの保存先となるS3定義です。
## ------------- S3 -------------
S3Firehose:
Type: "AWS::S3::Bucket"
DeletionPolicy: Retain # cfn delete時にS3Bucketを残す.
Properties:
BucketName: !FindInMap [ !Ref Env, S3, Bucket ]
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Glue
Glueとはデータの分類、クリーニング、加工をデータストア間、データストリーム間で行う完全マネージドETLです。
以下はストリーミングETLを行うための定義となっており、
リアルタイムParque変換に必要で、テーブル定義やパーティション定義も行っております。
テーブル定義やパーティション定義は必要な項目に応じて定義してください。
# Glue database
GlueDatabase:
Type: "AWS::Glue::Database"
Properties:
CatalogId: !Ref "AWS::AccountId"
DatabaseInput:
Name: !FindInMap [ !Ref Env, Glue, Database ]
# Glue table
GlueTable:
Type: "AWS::Glue::Table"
Properties:
CatalogId: !Ref "AWS::AccountId"
DatabaseName: !Ref "GlueDatabase"
TableInput:
Name: !Ref "GlueTableNameMobileAttribution"
TableType: EXTERNAL_TABLE
Parameters:
has_encrypted_data: true
EXTERNAL: true
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
StorageDescriptor:
Columns:
- Name: event_token
Type: string
- Name: event_name
Type: string
- Name: app_name
Type: string
- Name: activity_kind
Type: string
- Name: created_at
Type: timestamp
Location: !Sub
- "s3://${S3Bucket}/${S3Dir}"
- S3Bucket: !Sub "{{resolve:ssm:${ProjectName}-${Env}-S3FirehoseBucketName}}"
S3Dir: !Ref "S3DirMobileAttribution"
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
Parameters:
serialization.format: '1'
BucketColumns: [ ]
SortColumns: [ ]
Kinesis Firehose
以下はFirehose定義となります。
ExtendedS3DestinationConfiguration
でS3に関する定義を行います。
BucketArn
で先ほど作成したS3バケットのArnを、Prefix
ではGlueで定義したパーティションを定義します。ここが一致していないとエラーになるので注意してください。
Prefixの定義方法についてはAmazon S3 オブジェクトのカスタムプレフィックスで詳しく記載されているので参考にしていただければと思います。
またCompressionFormat
はRecordFormatを使用している場合自動でSnappy圧縮となるのでUNCOMPRESSEDを定義します。
またバックアップが必要な場合は、S3BackupMode: Enabled
としてバックアップ先を指定する必要があります。
## ------------- Firehose -------------
FirehoseMobileAttribution:
Type: "AWS::KinesisFirehose::DeliveryStream"
Properties:
DeliveryStreamEncryptionConfigurationInput:
KeyType: AWS_OWNED_CMK
DeliveryStreamName: !FindInMap [ !Ref Env, Firehose, DeliveryStreamMobileAttribution ] # Replacement
DeliveryStreamType: DirectPut # Replacement
ExtendedS3DestinationConfiguration:
RoleARN: !GetAtt FirehoseRole.Arn
BucketARN: !Sub "{{resolve:ssm:${ProjectName}-${Env}-S3FirehoseArn}}"
Prefix: !Sub "${S3DirMobileAttribution}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
ErrorOutputPrefix: !Sub "${S3DirMobileAttribution}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/!{firehose:error-output-type}"
BufferingHints:
SizeInMBs: 64
IntervalInSeconds: 60
CompressionFormat: UNCOMPRESSED # Record Format変換を使用する場合, 自動的にSnappy圧縮が選択されるため指定しない
EncryptionConfiguration:
KMSEncryptionConfig:
AWSKMSKeyARN: !Ref "S3KeyArn"
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref "FirehoseLogGroup"
LogStreamName: !Ref "FirehoseLogStream"
S3BackupMode: Enabled
S3BackupConfiguration:
BucketARN: !Sub "{{resolve:ssm:${ProjectName}-${Env}-S3FirehoseArn}}"
RoleARN: !GetAtt "FirehoseRole.Arn"
Prefix: !Sub "backup/${S3DirMobileAttribution}/"
DataFormatConversionConfiguration:
SchemaConfiguration:
CatalogId: !Ref "AWS::AccountId"
RoleARN: !GetAtt "FirehoseRole.Arn"
DatabaseName: !Sub "{{resolve:ssm:${ProjectName}-${Env}-GlueDatabaseName}}"
TableName: !Sub "{{resolve:ssm:${ProjectName}-${Env}-GlueTableName}}"
Region: !Ref "AWS::Region"
VersionId: LATEST
InputFormatConfiguration:
Deserializer:
OpenXJsonSerDe: { }
OutputFormatConfiguration:
Serializer:
ParquetSerDe: { }
Enabled: true
DependsOn:
- FirehoseLogStream
最後に
いかがでしたでしょうか?
今回はイベントログのリアルタイム処理をユースケースとしていましたが、少し構成を変えると他のユースケースにも対応できるかと思います。
例えばストレージへの永続化が必要な場合、間にDynamoDBをはさみ、DynamoDB Streamsの変更データキャプチャをストリームデータとして転送するなどで、その他にも様々な活用方法があると思いますので、ぜひ活用してみてください。
Discussion