S3+Lambda+AI Servicesを用いた音声認識パイプラインを、CloudFormationで構築してみた
こんちわ。久々の投稿です。
最近リリースされた鋼の錬金術師モバイルこと”ハガモバ”ばっかりやってて勉強が疎かになっています。クオリティ高いんでオススメです。ガチャ確率低いけど。
さて、最近はAWSの資格取得&実務に役立つ知識を得る為、ちまちまとですがハンズオンに再度取り組んでいます。
それらをヒントに音声認識システムを構築してみたので紹介します。
また、今回CloudFormationを用いて構築しており、そちらをメインとして紹介します。
S3イベントトリガーによりLambda関数が起動しAI Servicesを呼び出す流れにしていますが、CloudFormationだとどういった記述になるのか?この辺りがポイントです。
システムの流れ
簡単ですが音源データを文章に変換し、文章に含まれる感情を分析します。
- S3バケットに音源ファイルがアップロードされると、それをトリガーにLambda関数が翻訳を行う
(S3 → Lambda → Transcribe
) - 1の結果をjsonファイルに出力し、S3バケットに格納
(Transcribe → S3
) - 2のアップロードをトリガーにLambda関数が起動し、感情分析を行ってCloudWatchに結果を出力
(S3 → Lambda → Comprehend
)
先に動作紹介から
本題の前に、まずは動作確認していきます。
今からアップロードするmp3音源では、以下文章を読み上げています。
>「こんにちは。これはAWS LambdaとS3、AI Servicesを組み合わせた音声認識パイプラインの動作確認用音源です。」
INPUT/OUTPUT用にバケットを2つ作成しています。では早速INPUTバケットにmp3音源をアップロード。
すると、Transcribeが動きました。ジョブ実行中...
ジョブが完了すると、OUTPUTバケットに文章化したファイルが格納されました。
中身はこんな感じです。
AIが、”あい”になってたりしますがまぁ一旦放置です。
最後に感情分析の結果を見てみます。CloudWatchに出力するようにしています。
感情の起伏の無い文章なので特に差はないはず。
Positive、Negativeそれぞれの度合いが数値化されています。
もう少し感情を含んだ文章だと面白かったかな〜...
以上です。
ここから本題です。
(本題)CloudFormationでの構築
全体コードは後回しで、CloudFormationで構築するために工夫した点を紹介します。主に3点です。
- pythonライブラリを用いてAI Servicesへリクエスト
- S3→Lambdaへのイベント通知連携
- スタック作成の順序
では解説していきます。
pythonライブラリを用いてAI Servicesへリクエスト
TranscribeFunc:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Ref TranscribeFuncName
Handler: index.lambda_handler ★Pythonの場合はコレ
Code:
ZipFile: |
import json
import urllib.parse
import boto3
import datetime
s3 = boto3.client('s3')
transcribe = boto3.client('transcribe')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
transcribe.start_transcription_job(
TranscriptionJobName= datetime.datetime.now().strftime("%Y%m%d%H%M%S") + '_Transcription',
LanguageCode='ja-JP',
Media={
'MediaFileUri': 'https://s3.ap-northeast-1.amazonaws.com/' + bucket + '/' + key
},
OutputBucketName='h4b-serverless-output-1111'
)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
Role: !GetAtt TranscribeFuncRole.Arn
Runtime: python3.7
pythonライブラリboto3
のstart_transcription_job()を用いて、AWS Transcribeにリクエストしています。
メディアファイルと出力先バケットをパラメータにTrancribeジョブが実行されます。
Comprehendに対しても同様にboto3を用いてリクエストしています。
S3→Lambdaへのイベント通知連携
これをCloudFormationで定義するには、「バケットとLambda関数の紐付け」「S3イベント通知許可」の2点が必要です。
- バケットとLambda関数の紐付け
InputBucket:
Type: AWS::S3::Bucket
DependsOn: S3TranscribeNotice
Properties:
BucketName: !Ref InputBucketName
AccessControl: Private
NotificationConfiguration:
LambdaConfigurations:
- Event: "s3:ObjectCreated:*" ★対象のイベント
Filter:
S3Key:
Rules:
- Name: suffix
Value: .mp3 ★対象のファイル
Function: !GetAtt TranscribeFunc.Arn ★呼び出すLambda関数
NotificationConfiguration
で以下を定義しています。
・対象のイベント = createObject
・アップロードされたファイル = 拡張子がmp3
・呼び出すLambda関数 = TranscribeFunc
- イベント通知許可
S3TranscribeNotice:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt TranscribeFunc.Arn
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
SourceArn: !Join
- ""
- - "arn:aws:s3:::"
- !Ref InputBucketName
S3→Lambdaのイベント通知を許可するには、上記のようにAWS::Lambda::Permission
を定義する必要があります。
INPUTバケット内でアップロード(createObject)が行われた時、Lambda関数TranscribeFunc
に対してイベント通知を送ります。
スタック作成の順序
当たり前かもですが、イベント通知許可のAWS::Lambda::Permission
が生成された後に対象のバケットを生成するようにしています。
InputBucket:
Type: AWS::S3::Bucket
DependsOn: S3TranscribeNotice ★イベント通知生成後にこのバケットを生成
...
(おまけ)CloudWatch LogGroupとLambdaの紐付け
LogGroupName
にLambda関数のパスを指定すればOKです。
TranscribeFuncLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${TranscribeFuncName}
RetentionInDays: 14
書いてないですがCloudWatch操作権限の付与も忘れずに。
まとめ 全体テンプレート
IAMロール権限は緩めです。
AWSTemplateFormatVersion: "2010-09-09"
Description: "AWS Hands-On For Lambda(Lambda/S3/Transcribe/Comprehend/Polly)."
Parameters:
InputBucketName:
Type: String
Default: h4b-serverless-input-1111
OutputBucketName:
Type: String
Default: h4b-serverless-output-1111
TranscribeFuncName:
Type: String
Default: transcribe-function
ComprehendFuncName:
Type: String
Default: comprehend-function
Resources:
TranscribeFunc:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Ref TranscribeFuncName
Handler: index.lambda_handler
Code:
ZipFile: |
import json
import urllib.parse
import boto3
import datetime
s3 = boto3.client('s3')
transcribe = boto3.client('transcribe')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
transcribe.start_transcription_job(
TranscriptionJobName= datetime.datetime.now().strftime("%Y%m%d%H%M%S") + '_Transcription',
LanguageCode='ja-JP',
Media={
'MediaFileUri': 'https://s3.ap-northeast-1.amazonaws.com/' + bucket + '/' + key
},
OutputBucketName='h4b-serverless-output-1111'
)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
Role: !GetAtt TranscribeFuncRole.Arn
Runtime: python3.7
S3TranscribeNotice:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt TranscribeFunc.Arn
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
SourceArn: !Join
- ""
- - "arn:aws:s3:::"
- !Ref InputBucketName
ComprehendFunc:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Ref ComprehendFuncName
Handler: index.lambda_handler
Code:
ZipFile: |
import json
import urllib.parse
import boto3
s3 = boto3.client('s3')
comprehend = boto3.client('comprehend')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
body = json.load(response['Body'])
transcript = body['results']['transcripts'][0]['transcript']
sentiment_response = comprehend.batch_detect_sentiment(
TextList= [transcript],
LanguageCode='ja'
)
sentiment_score = sentiment_response.get('ResultList')
print(sentiment_score[0].get('SentimentScore'))
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
Role: !GetAtt ComprehendFuncRole.Arn
Runtime: python3.7
S3ComprehendNotice:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt ComprehendFunc.Arn
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
SourceArn: !Join
- ""
- - "arn:aws:s3:::"
- !Ref OutputBucketName
InputBucket:
Type: AWS::S3::Bucket
DependsOn: S3TranscribeNotice
Properties:
BucketName: !Ref InputBucketName
AccessControl: Private
NotificationConfiguration:
LambdaConfigurations:
- Event: "s3:ObjectCreated:*"
Filter:
S3Key:
Rules:
- Name: suffix
Value: .mp3
Function: !GetAtt TranscribeFunc.Arn
OutputBucket:
Type: AWS::S3::Bucket
DependsOn: S3ComprehendNotice
Properties:
BucketName: !Ref OutputBucketName
AccessControl: Private
NotificationConfiguration:
LambdaConfigurations:
- Event: "s3:ObjectCreated:*"
Filter:
S3Key:
Rules:
- Name: suffix
Value: .json
Function: !GetAtt ComprehendFunc.Arn
TranscribeFuncLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${TranscribeFuncName}
RetentionInDays: 14
ComprehendFuncLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${ComprehendFuncName}
RetentionInDays: 14
TranscribeFuncRole:
Type: AWS::IAM::Role
Properties:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonTranscribeFullAccess
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
MaxSessionDuration: 3600
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action: "sts:AssumeRole"
Principal:
Service:
- s3.amazonaws.com
- lambda.amazonaws.com
- comprehend.amazonaws.com
- cloudwatch.amazonaws.com
Path: /service-role/
RoleName: !Sub ${TranscribeFuncName}-role
ComprehendFuncRole:
Type: AWS::IAM::Role
Properties:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
- arn:aws:iam::aws:policy/ComprehendFullAccess
MaxSessionDuration: 3600
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action: "sts:AssumeRole"
Principal:
Service:
- s3.amazonaws.com
- lambda.amazonaws.com
- comprehend.amazonaws.com
- cloudwatch.amazonaws.com
Path: /service-role/
RoleName: !Sub ${ComprehendFuncName}-role
CloudFormationで、S3とLambda間の連携を行う時に参考になればと思います。
Discussion