😀

API GatewayとKinesisを用いたリアルタイムログ転送

2021/09/28に公開

はじめに

今回はAPI GatewayとKinesis firehoseを用いて、イベントログのリアルタイム収集、処理、分析を行います。
APIGatewayでは、イベントログを受け取りKinesisへストリーミングデータとして転送します。
またその際にLambdaは使用せず、**サービス統合プロクシ(ReguestTemplate)**を使用することで、lambdaを経由せずノーコードでFirehoseへのリクエストを行うことが可能です。

またKinesis firehoseとGlueと連携することで、収集と同時にParquet変換や圧縮などもノーコード行うことが可能です。

以下は今回構築する構成図になります。

Callbackシステムフロー-Kinesis(ServiceProxy)構成.drawio (1).png

API Gateway

API GatewayはServerless FlameworkやSAMを用いて定義することが多いかもしれませんが、
今回は全体構成を全てCloudformationで一括管理するため使用しておりません。
この辺はSeverless FrameworkやSAMに慣れていると、Cloudformationによる定義は少し冗長に感じるかもしれません。

RestApi

以下がAPIGatewayに関するCfn定義です。
Statementではイベントログの送信元IPが固定のため、許可するIPを制限しています。
またexecute-api:/${ApiGatewayStage}/GET/*/mobile_attributionはエンドポイントを定義しています。
こちらはもちろん複数の定義も可能です。

  ## ------------- ApiGateway -------------
  ApiGatewayRestApi:
    Type: "AWS::ApiGateway::RestApi"
    Properties:
      Name: !Sub "${ProjectName}-${Env}-api"
      EndpointConfiguration:
        Types:
          - EDGE
      Policy:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal: "*"
            Action:
              - "execute-api:Invoke"
            Resource:
              - !Sub "execute-api:/${ApiGatewayStage}/GET/*/mobile_attribution"
            Condition:
              IpAddress:
                aws:SourceIp: !Split
                  - ","
                  - !Join
                    - ","
                    - - !Join [ ",", !Ref "OfficeIpAddressList" ]
                      - !Join [ ",", !Ref "VPNIpAddressList" ]

Resource

APIパスが/{app_token}/mobile_attributionなので、/{app_token}/mobile_attributionを以下のように階層関係で定義します。

  # ApiGateway Resource
  ApiGatewayResourceAppTokenVar:
    Type: "AWS::ApiGateway::Resource"
    Properties:
      ParentId: !Sub "${ApiGatewayRestApi.RootResourceId}"
      PathPart: "{app_token}"
      RestApiId: !Ref "ApiGatewayRestApi" #前述のAPIGateway定義
  # ApiGateway Resource        
  ApiGatewayResourceMobileAttributionVar:
    Type: "AWS::ApiGateway::Resource"
    Properties:
      ParentId: !Ref "ApiGatewayResourceAppTokenVar"
      PathPart: "mobile_attribution"
      RestApiId: !Ref "ApiGatewayRestApi" #前述のAPIGateway定義
    DependsOn:
      - ApiGatewayResourceAppTokenVar

Method

CallbackシステムAPIGateway.png

前述定義したAPI Gateway Resourceを用いてAPI Methodの定義を行います。
こちらではAWSサービスプロクシを用いてKinesis firehoseへのリクエストを定義します。
Lambdaを用いることでも実現可能ですが、メンテナンス工数を削減するためにノーコードでリクエストが可能なAWSサービスプロクシを選択しています。
 そしてRequestTemplateではどのようなデータをリクエストするか定義しています。
こちらでは、URLパラメータやパス、パスに定義されたapp_tokenなど全てをリクエスト情報に追加しています。

こちらの構文はVelocityTemplateLanguage(VTL)で定義されており、詳細はAPI Gateway mapping templateで確認することができます。

  # ApiGateway Method    
  ApiGatewayMethodMobileAttributionCheckVarGet:
    Type: "AWS::ApiGateway::Method"
    Properties:
      HttpMethod: GET
      RequestParameters: { }
      ResourceId: !Ref "ApiGatewayResourceMobileAttributionVar"
      RestApiId: !Ref "ApiGatewayRestApi"
      ApiKeyRequired: true
      AuthorizationType: NONE
      Integration:
        Type: AWS
        IntegrationHttpMethod: POST
        Uri: !Sub "arn:aws:apigateway:${AWS::Region}:firehose:action/PutRecord"
        Credentials: !GetAtt "ApiGatewayIntegrationRequestRole.Arn"
        PassthroughBehavior: WHEN_NO_TEMPLATES
        RequestTemplates:
          application/json:
            !Sub |
            #set($params = $input.params().get('querystring'))
            #set($paths = $input.params().get('path'))
            #set($data = "{
              #foreach($paramName in $params.keySet())
                ""$paramName"": ""$util.escapeJavaScript($params.get($paramName))"",
              #end
              ""app_token"": ""$util.escapeJavaScript($method.request.path.app_token)""
            }")
            {
                "DeliveryStreamName": "${FirehoseMobileAttribution}",
                "Record": {
                    "Data": "$util.base64Encode($data)"
                }
            }
        IntegrationResponses:
          - StatusCode: 200
      MethodResponses:
        - StatusCode: 200
          ResponseModels: { application/json: "Empty" }
    DependsOn:
      - ApiGatewayResourceMobileAttributionVar

Deployment

今までに定義したAPI Gatewayのdeployment定義を行います。
コンソール画面で設定変更後デプロイするかと思いますが、その際の定義となります。
アクセスログが必要な場合は、こちらでログフォーマットの設定なども行えます。

  # ApiGateway deployment
  ApiGatewayDeployment:
    Type: "AWS::ApiGateway::Deployment"
    Properties:
      RestApiId: !Ref "ApiGatewayRestApi"
      StageName: !Ref "ApiGatewayStage"
      StageDescription:
        Description: !Sub "Stage - ${Env}"
        LoggingLevel: INFO
        MetricsEnabled: False
        AccessLogSetting:
          DestinationArn: !GetAtt "ApiGatewayAccessLogGroup.Arn"
          Format: "{ \"requestId\":\"$context.requestId\", \"ip\": \"$context.identity.sourceIp\", \"caller\":\"$context.identity.caller\", \"user\":\"$context.identity.user\",\"requestTime\":\"$context.requestTime\", \"httpMethod\":\"$context.httpMethod\",\"resourcePath\":\"$context.resourcePath\", \"status\":\"$context.status\",\"protocol\":\"$context.protocol\", \"responseLength\":\"$context.responseLength\" }"
    DependsOn:
      - ApiGatewayResourceMobileAttributionVar
      - ApiGatewayMethodMobileAttributionCheckVarGet
      - ApiGatewayAccessLogGroup

ApiKey

前述のMethodApiKeyRequired: trueと定義していたと思いますが、そちらのApiKey定義となります。
以下のNameValueでヘッダーに渡すApiKeyのキー名と値を定義しています。

  ## ------------- Api key --------------
  ApiGatewayRestApiBaseKey:
    Type: AWS::ApiGateway::ApiKey
    Properties:
      Name: !Sub "${ProjectName}-${Env}-api-base-key" # Replacement
      Enabled: true
      Value: !FindInMap [ !Ref Env, ApiGateway, BaseApiKey ] # Replacement
      StageKeys:
        - RestApiId: !Ref "ApiGatewayRestApi"
          StageName: !Ref "ApiGatewayStage"
    DependsOn:
      - ApiGatewayDeployment

UsagePlan

スクリーンショット 2021-09-28 12.46.18.png

こちらはApiKeyを定義しているため必要な設定で、上記コンソール画面の使用量プランに相当します。
UsagePlan を対象のApiIdとStateを指定して紐付け、それをUsagePlanKeyのUsagePlanIdにセットします。

  # ApiGateway usage plan
  ApiGatewayRestApiBaseUsagePlan:
    Type: AWS::ApiGateway::UsagePlan
    Properties:
      ApiStages:
        - ApiId: !Ref "ApiGatewayRestApi"
          Stage: !Ref "ApiGatewayStage"
      UsagePlanName: !Sub "${ProjectName}-${Env}-usage-plan"
    DependsOn:
      - ApiGatewayRestApi
      - ApiGatewayDeployment
  # ApiGateway usage plan key
  ApiGatewayRestApiBaseUsagePlanKey:
    Type: 'AWS::ApiGateway::UsagePlanKey'
    Properties:
      KeyId: !Ref "ApiGatewayRestApiBaseKey"              # Replacement
      KeyType: "API_KEY"                                  # Replacement
      UsagePlanId: !Ref "ApiGatewayRestApiBaseUsagePlan"  # Replacement
    DependsOn:
      - ApiGatewayRestApiBaseUsagePlan

S3

こちらはFirehoseで処理したデータの保存先となるS3定義です。

  ## ------------- S3 -------------
  S3Firehose:
    Type: "AWS::S3::Bucket"
    DeletionPolicy: Retain # cfn delete時にS3Bucketを残す.
    Properties:
      BucketName: !FindInMap [ !Ref Env, S3, Bucket ] 
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

Glue

Callbackシステムフロー-Kinesis(ServiceProxy)構成.drawio (1)のコピー.png

Glueとはデータの分類、クリーニング、加工をデータストア間、データストリーム間で行う完全マネージドETLです。
以下はストリーミングETLを行うための定義となっており、
リアルタイムParque変換に必要で、テーブル定義やパーティション定義も行っております。
テーブル定義やパーティション定義は必要な項目に応じて定義してください。


  # Glue database
  GlueDatabase:
    Type: "AWS::Glue::Database"
    Properties:
      CatalogId: !Ref "AWS::AccountId" 
      DatabaseInput:
        Name: !FindInMap [ !Ref Env, Glue, Database ] 

  # Glue table
  GlueTable:
    Type: "AWS::Glue::Table"
    Properties:
      CatalogId: !Ref "AWS::AccountId"  
      DatabaseName: !Ref "GlueDatabase" 
      TableInput: 
        Name: !Ref "GlueTableNameMobileAttribution"
        TableType: EXTERNAL_TABLE
        Parameters:
          has_encrypted_data: true
          EXTERNAL: true
        PartitionKeys:
          - Name: year
            Type: string
          - Name: month
            Type: string
          - Name: day
            Type: string
        StorageDescriptor:
          Columns:
            - Name: event_token
              Type: string
            - Name: event_name
              Type: string
            - Name: app_name
              Type: string
            - Name: activity_kind
              Type: string
            - Name: created_at
              Type: timestamp
          Location: !Sub
            - "s3://${S3Bucket}/${S3Dir}"
            - S3Bucket: !Sub "{{resolve:ssm:${ProjectName}-${Env}-S3FirehoseBucketName}}"
              S3Dir: !Ref "S3DirMobileAttribution"
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          SerdeInfo:
            SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
            Parameters:
              serialization.format: '1'
          BucketColumns: [ ]
          SortColumns: [ ]

Kinesis Firehose

以下はFirehose定義となります。
ExtendedS3DestinationConfigurationでS3に関する定義を行います。
BucketArn で先ほど作成したS3バケットのArnを、Prefix ではGlueで定義したパーティションを定義します。ここが一致していないとエラーになるので注意してください。
Prefixの定義方法についてはAmazon S3 オブジェクトのカスタムプレフィックスで詳しく記載されているので参考にしていただければと思います。
 またCompressionFormatはRecordFormatを使用している場合自動でSnappy圧縮となるのでUNCOMPRESSEDを定義します。
 またバックアップが必要な場合は、S3BackupMode: Enabledとしてバックアップ先を指定する必要があります。

  ## ------------- Firehose -------------
  FirehoseMobileAttribution:
    Type: "AWS::KinesisFirehose::DeliveryStream"
    Properties:
      DeliveryStreamEncryptionConfigurationInput:
        KeyType: AWS_OWNED_CMK
      DeliveryStreamName: !FindInMap [ !Ref Env, Firehose, DeliveryStreamMobileAttribution ] # Replacement
      DeliveryStreamType: DirectPut                                                          # Replacement
      ExtendedS3DestinationConfiguration:
        RoleARN: !GetAtt FirehoseRole.Arn
        BucketARN: !Sub "{{resolve:ssm:${ProjectName}-${Env}-S3FirehoseArn}}"
        Prefix: !Sub "${S3DirMobileAttribution}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
        ErrorOutputPrefix: !Sub "${S3DirMobileAttribution}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/!{firehose:error-output-type}"
        BufferingHints:
          SizeInMBs: 64
          IntervalInSeconds: 60
        CompressionFormat: UNCOMPRESSED # Record Format変換を使用する場合, 自動的にSnappy圧縮が選択されるため指定しない
        EncryptionConfiguration:
          KMSEncryptionConfig:
            AWSKMSKeyARN: !Ref "S3KeyArn"
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref "FirehoseLogGroup"
          LogStreamName: !Ref "FirehoseLogStream"
        S3BackupMode: Enabled
        S3BackupConfiguration:
          BucketARN: !Sub "{{resolve:ssm:${ProjectName}-${Env}-S3FirehoseArn}}"
          RoleARN: !GetAtt "FirehoseRole.Arn"
          Prefix: !Sub "backup/${S3DirMobileAttribution}/"
        DataFormatConversionConfiguration:
          SchemaConfiguration:
            CatalogId: !Ref "AWS::AccountId"
            RoleARN: !GetAtt "FirehoseRole.Arn"
            DatabaseName: !Sub "{{resolve:ssm:${ProjectName}-${Env}-GlueDatabaseName}}"
            TableName: !Sub "{{resolve:ssm:${ProjectName}-${Env}-GlueTableName}}"
            Region: !Ref "AWS::Region"
            VersionId: LATEST
          InputFormatConfiguration:
            Deserializer:
              OpenXJsonSerDe: { }
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: { }
          Enabled: true
    DependsOn:
      - FirehoseLogStream

最後に

いかがでしたでしょうか?
今回はイベントログのリアルタイム処理をユースケースとしていましたが、少し構成を変えると他のユースケースにも対応できるかと思います。
例えばストレージへの永続化が必要な場合、間にDynamoDBをはさみ、DynamoDB Streamsの変更データキャプチャをストリームデータとして転送するなどで、その他にも様々な活用方法があると思いますので、ぜひ活用してみてください。

参考文献

Discussion