🧙

S3+Lambda+AI Servicesを用いた音声認識パイプラインを、CloudFormationで構築してみた

2022/08/30に公開

こんちわ。久々の投稿です。
最近リリースされた鋼の錬金術師モバイルこと”ハガモバ”ばっかりやってて勉強が疎かになっています。クオリティ高いんでオススメです。ガチャ確率低いけど。

さて、最近はAWSの資格取得&実務に役立つ知識を得る為、ちまちまとですがハンズオンに再度取り組んでいます。
それらをヒントに音声認識システムを構築してみたので紹介します。

また、今回CloudFormationを用いて構築しており、そちらをメインとして紹介します。
S3イベントトリガーによりLambda関数が起動しAI Servicesを呼び出す流れにしていますが、CloudFormationだとどういった記述になるのか?この辺りがポイントです。

システムの流れ

workflow

簡単ですが音源データを文章に変換し、文章に含まれる感情を分析します。

  1. S3バケットに音源ファイルがアップロードされると、それをトリガーにLambda関数が翻訳を行う
    S3 → Lambda → Transcribe
  2. 1の結果をjsonファイルに出力し、S3バケットに格納
    Transcribe → S3
  3. 2のアップロードをトリガーにLambda関数が起動し、感情分析を行ってCloudWatchに結果を出力
    S3 → Lambda → Comprehend

先に動作紹介から

本題の前に、まずは動作確認していきます。
今からアップロードするmp3音源では、以下文章を読み上げています。
>「こんにちは。これはAWS LambdaとS3、AI Servicesを組み合わせた音声認識パイプラインの動作確認用音源です。」

INPUT/OUTPUT用にバケットを2つ作成しています。では早速INPUTバケットにmp3音源をアップロード。
upload

すると、Transcribeが動きました。ジョブ実行中...
transcribeJob

ジョブが完了すると、OUTPUTバケットに文章化したファイルが格納されました。
output

中身はこんな感じです。
json
AIが、”あい”になってたりしますがまぁ一旦放置です。

最後に感情分析の結果を見てみます。CloudWatchに出力するようにしています。
感情の起伏の無い文章なので特に差はないはず。
comprehendResult
Positive、Negativeそれぞれの度合いが数値化されています。
もう少し感情を含んだ文章だと面白かったかな〜...

以上です。
ここから本題です。

(本題)CloudFormationでの構築

全体コードは後回しで、CloudFormationで構築するために工夫した点を紹介します。主に3点です。


  • pythonライブラリを用いてAI Servicesへリクエスト
  • S3→Lambdaへのイベント通知連携
  • スタック作成の順序

では解説していきます。

pythonライブラリを用いてAI Servicesへリクエスト

  TranscribeFunc:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Ref TranscribeFuncName
      Handler: index.lambda_handler                 ★Pythonの場合はコレ
      Code:
        ZipFile: |
          import json
          import urllib.parse
          import boto3
          import datetime

          s3 = boto3.client('s3')
          transcribe = boto3.client('transcribe')

          def lambda_handler(event, context):
              bucket = event['Records'][0]['s3']['bucket']['name']
              key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              try:
                  transcribe.start_transcription_job(
                      TranscriptionJobName= datetime.datetime.now().strftime("%Y%m%d%H%M%S") + '_Transcription',
                      LanguageCode='ja-JP',
                      Media={
                          'MediaFileUri': 'https://s3.ap-northeast-1.amazonaws.com/' + bucket + '/' + key
                      },
                      OutputBucketName='h4b-serverless-output-1111'
                  )
              except Exception as e:
                  print(e)
                  print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
                  raise e
      Role: !GetAtt TranscribeFuncRole.Arn
      Runtime: python3.7

pythonライブラリboto3のstart_transcription_job()を用いて、AWS Transcribeにリクエストしています。
メディアファイルと出力先バケットをパラメータにTrancribeジョブが実行されます。
Comprehendに対しても同様にboto3を用いてリクエストしています。

S3→Lambdaへのイベント通知連携

これをCloudFormationで定義するには、「バケットとLambda関数の紐付け」「S3イベント通知許可」の2点が必要です。

  1. バケットとLambda関数の紐付け
  InputBucket:
    Type: AWS::S3::Bucket
    DependsOn: S3TranscribeNotice
    Properties:
      BucketName: !Ref InputBucketName
      AccessControl: Private
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: "s3:ObjectCreated:*"              ★対象のイベント
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: .mp3                      ★対象のファイル
            Function: !GetAtt TranscribeFunc.Arn     ★呼び出すLambda関数

NotificationConfigurationで以下を定義しています。

・対象のイベント              =    createObject
・アップロードされたファイル    =    拡張子がmp3
・呼び出すLambda関数          =    TranscribeFunc
  1. イベント通知許可
  S3TranscribeNotice:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt TranscribeFunc.Arn
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceArn: !Join 
                  - ""
                  - - "arn:aws:s3:::"
                    - !Ref InputBucketName

S3→Lambdaのイベント通知を許可するには、上記のようにAWS::Lambda::Permissionを定義する必要があります。
INPUTバケット内でアップロード(createObject)が行われた時、Lambda関数TranscribeFuncに対してイベント通知を送ります。

スタック作成の順序

当たり前かもですが、イベント通知許可のAWS::Lambda::Permissionが生成された後に対象のバケットを生成するようにしています。

  InputBucket:
    Type: AWS::S3::Bucket
    DependsOn: S3TranscribeNotice       ★イベント通知生成後にこのバケットを生成
    ...

(おまけ)CloudWatch LogGroupとLambdaの紐付け

LogGroupNameにLambda関数のパスを指定すればOKです。

  TranscribeFuncLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${TranscribeFuncName}
      RetentionInDays: 14

書いてないですがCloudWatch操作権限の付与も忘れずに。

まとめ 全体テンプレート

IAMロール権限は緩めです。

AWSTemplateFormatVersion: "2010-09-09"
Description: "AWS Hands-On For Lambda(Lambda/S3/Transcribe/Comprehend/Polly)."
Parameters:
  InputBucketName:
    Type: String
    Default: h4b-serverless-input-1111
  OutputBucketName:
    Type: String
    Default: h4b-serverless-output-1111
  TranscribeFuncName:
    Type: String
    Default: transcribe-function
  ComprehendFuncName:
    Type: String
    Default: comprehend-function

Resources:
  TranscribeFunc:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Ref TranscribeFuncName
      Handler: index.lambda_handler
      Code:
        ZipFile: |
          import json
          import urllib.parse
          import boto3
          import datetime

          s3 = boto3.client('s3')
          transcribe = boto3.client('transcribe')

          def lambda_handler(event, context):
              bucket = event['Records'][0]['s3']['bucket']['name']
              key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              try:
                  transcribe.start_transcription_job(
                      TranscriptionJobName= datetime.datetime.now().strftime("%Y%m%d%H%M%S") + '_Transcription',
                      LanguageCode='ja-JP',
                      Media={
                          'MediaFileUri': 'https://s3.ap-northeast-1.amazonaws.com/' + bucket + '/' + key
                      },
                      OutputBucketName='h4b-serverless-output-1111'
                  )
              except Exception as e:
                  print(e)
                  print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
                  raise e
      Role: !GetAtt TranscribeFuncRole.Arn
      Runtime: python3.7

  S3TranscribeNotice:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt TranscribeFunc.Arn
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceArn: !Join 
                  - ""
                  - - "arn:aws:s3:::"
                    - !Ref InputBucketName

  ComprehendFunc:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Ref ComprehendFuncName
      Handler: index.lambda_handler
      Code:
        ZipFile: |
          import json
          import urllib.parse
          import boto3

          s3 = boto3.client('s3')
          comprehend = boto3.client('comprehend')

          def lambda_handler(event, context):
              bucket = event['Records'][0]['s3']['bucket']['name']
              key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              try:
                response = s3.get_object(Bucket=bucket, Key=key)
                body = json.load(response['Body'])
                transcript = body['results']['transcripts'][0]['transcript']

                sentiment_response = comprehend.batch_detect_sentiment(
                  TextList= [transcript],
                  LanguageCode='ja'
                )

                sentiment_score = sentiment_response.get('ResultList')
                print(sentiment_score[0].get('SentimentScore'))
              except Exception as e:
                  print(e)
                  print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
                  raise e
      Role: !GetAtt ComprehendFuncRole.Arn
      Runtime: python3.7

  S3ComprehendNotice:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt ComprehendFunc.Arn
      Action: lambda:InvokeFunction
      Principal: s3.amazonaws.com
      SourceArn: !Join 
                  - ""
                  - - "arn:aws:s3:::"
                    - !Ref OutputBucketName

  InputBucket:
    Type: AWS::S3::Bucket
    DependsOn: S3TranscribeNotice
    Properties:
      BucketName: !Ref InputBucketName
      AccessControl: Private
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: "s3:ObjectCreated:*"
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: .mp3
            Function: !GetAtt TranscribeFunc.Arn

  OutputBucket:
    Type: AWS::S3::Bucket
    DependsOn: S3ComprehendNotice
    Properties:
      BucketName: !Ref OutputBucketName
      AccessControl: Private
      NotificationConfiguration:
        LambdaConfigurations:
          - Event: "s3:ObjectCreated:*"
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: .json
            Function: !GetAtt ComprehendFunc.Arn


  TranscribeFuncLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${TranscribeFuncName}
      RetentionInDays: 14
  ComprehendFuncLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${ComprehendFuncName}
      RetentionInDays: 14

  TranscribeFuncRole:
    Type: AWS::IAM::Role
    Properties:
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/AmazonTranscribeFullAccess
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      MaxSessionDuration: 3600
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Action: "sts:AssumeRole"
            Principal:
              Service:
                - s3.amazonaws.com
                - lambda.amazonaws.com
                - comprehend.amazonaws.com
                - cloudwatch.amazonaws.com
      Path: /service-role/
      RoleName: !Sub ${TranscribeFuncName}-role

  ComprehendFuncRole:
    Type: AWS::IAM::Role
    Properties:
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
        - arn:aws:iam::aws:policy/ComprehendFullAccess
      MaxSessionDuration: 3600
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Action: "sts:AssumeRole"
            Principal:
              Service:
                - s3.amazonaws.com
                - lambda.amazonaws.com
                - comprehend.amazonaws.com
                - cloudwatch.amazonaws.com
      Path: /service-role/
      RoleName: !Sub ${ComprehendFuncName}-role

CloudFormationで、S3とLambda間の連携を行う時に参考になればと思います。


参考

AWS Hands-on for Beginners
Serverless #3

Discussion