💭

AWSの機械学習サービスを使用してみた

に公開

はじめに

Cloudtechさんの動画を参考にし、以下のアーキテクチャ作ってみました。
作成を通じて、勉強になったポイントなどをまとめていきたいと思います。
※各番号については後述します。

対象

・サーバやネットワーク関連の基礎知識を習得されている方
・今回使用するAWSサービスについて、基礎知識をお持ちの方

使用したAWSサービス

・Aamazon API Gateway
・AWS Lambda
・Amazon SQS
・Amazon SES
・Amazon S3
・Amazon DynamoDB
・Amazon Rekognition
・Amazon Comprehend
・AWS Step Functions

シナリオ

①ユーザーが口コミ投稿サイト経由で、以下をAPI Gatewayに送信する
・氏名:必須
・口コミのテキスト:必須
・メールアドレス:必須
・画像:任意
※今回作成したのは、バックエンドのみで、ユーザーが直接操作するアプリケーションは作成しておりません。そのため、動作確認はローカルPCからのPowershellコマンドにて、実施しています。

②「uploadReview」関数が、必須項目が入力されているかチェックし、問題なければ画像をS3に保存し、それ以外はDynamoDBに保存する。また後続の処理で使用する「id(DynamoDBのアイテムの識別子)」をSQSにJSON形式で渡す。

③SQSが後続の「executeStepFunction」に「id」を渡す。
※関数の間にSQSが挟まることで疎結合化しています。
これにより両者の依存関係を分離することができ、またユーザは後続の分析が終わるのを待つことなく完了通知を受け取ることができます。

④「executeStepFunction」が後続のAWS Step Functionsに「id」を渡す。

⑤AWS Step Functionsが以下の関数を順番に起動させる。
reviewEmotionCheck
→Amazon Comprehendを用いて口コミを分析し、「POSITIVE」「NEGATIVE」「NEUTRAL」「MIXED」のいずれかをDynamoDBに新しく「sentiment」というキーを作成し、値として格納する。

imageComplianceCheck
→S3にアップロードされた画像に対して、Amazon Rekognitionを用いて分析を行い、DynamoDBに新たに「isInappropriate」というキーを作成し、TrueかFalseを格納する。

sendThanksMail
→DynamoDBに保存されているユーザーのメールアドレスに対して、「sentiment」に格納されているバリューに基づいて、お礼のメールを送信する。

なお条件分岐は以下のようになっています。

if sentiment == "POSITIVE":
 message = f"頂いたご意見をスタッフ一同励みとして、今後も{user_name}様に安心してご利用いただけるよう努めてまいります"
elif sentiment == "NEGATIVE":
 message = f"{user_name}様より頂戴したご意見は真摯に受け止め、今後のサービスの改善に役立てたいと考えております。"
else :
 message = f"{user_name}様より頂戴したご意見は社内にてしっかりご確認させていただきます!"  

ゴール

①ユーザーが入力した口コミに基づいて、そのユーザーに対してお礼メールが自動送信される

②ユーザーが送信した画像が、不適切でないかどうか判断し、その結果がDynamoDBに保存される

各サービスの具体的な設定

ユーザー入力

今回のアーキテクチャでは、パラメータの受け渡しはJSON形式で行われます。
そのため、必要なパラメータを、以下のように画像も含めローカルPCからJSON形式でAPI Gatewayに渡します。

# API GatewayのURLを指定
$API_URL = "https://<作成したAPI GatewayのURL>"

# 画像ファイル(JPEG)を[IO.File]のReadAllBytesメソッドで、ファイルの内容をバイト配列として# 読み出し、そのあと[Convert]のToBase64Stringメソッドでバイト配列をBase64にエンコード
$IMAGE_FILE_PATH = "<ローカルPCに保存してある画像>.png"
$ENCODED_IMAGE = [Convert]::ToBase64String([IO.File]::ReadAllBytes($IMAGE_FILE_PATH))

# 追加のデータ
$MAIL_ADDRESS = '<動作確認用のメールアドレス>'
$USER_NAME = 'テスト次郎'
$REVIEW_TEXT = '皆さんに丁寧に対応していただけました'

# 必要なパラメータをハッシュテーブルで作成し、そのあとJSONに変換
# ※そのままJSONで記載することもできますが、ハッシュテーブル→JSONとしています。
$body = @{
    imageFile = $ENCODED_IMAGE
    mailAddress = $MAIL_ADDRESS
    userName = $USER_NAME
    reviewText = $REVIEW_TEXT
} | ConvertTo-Json

# HTTPリクエストを、API Gatewayに対して送信
Invoke-RestMethod -Uri $API_URL -Method Post -ContentType 'application/json; charset=utf-8' -Body $body

API Gateway

APIタイプはREST APIで作成し、エンドポイントタイプはリージョンとします。
JSON形式のパラメータを、そのまま後続のLambda関数に渡すだけなので、
POSTメソッドが呼び出されたときに、uploadReviewが呼び出されるように設定します。

Lambda(uploadReview)

以下のようなコードを記述しました。
※なお当方のPythonスキルは基礎文法がギリギリわかっているレベルなので、
「最低限AWSサービスを動かせているコード」となります。
また、以下コードでBoto3の「Client」と「Resource」両者を使用しています。
違いについては以下をご参考ください。
https://dev.classmethod.jp/articles/boto3-client-api-and-resource-api/

lambda_handler()はeventをPython辞書型で受け取り、必要に応じてJSONに変換しています。
使い分けとしては
・Python辞書型:プログラム内でのデータ操作
・JSON:データの保存や通信
という感じです。

最後に、Amazon DynamoDBやAmazonS3のPython(SDK)における利用方法は下記を参照ください。

DynamoDB
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/service-resource/index.html#DynamoDB.ServiceResource
S3
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#s3

以下コードです。

import json
import boto3
import uuid
import base64

def lambda_handler(event, context):
    # 1.必須フィールドの確認(imageFileは任意なので含めない)
    required_fields = ['reviewText', 'userName', 'mailAddress']
    for field in required_fields:
        if field not in event:
            return {
                'statusCode': 400,
                'body': json.dumps(f'{field} does not exist')
            }
    
    # 2.入力パラメータの取得。
    # Pythonのdict.get()メソッドは、キーが存在しない場合にエラーを出さずNoneを返す
    reviewText = event.get("reviewText")
    userName = event.get("userName")
    mailAddress = event.get("mailAddress")
    imageFile = event.get("imageFile")  # 任意項目
    
    # 3.DynamoDBのパーティションキーであるidの生成(uuidを取得)
    item_id = str(uuid.uuid4())
    
    # 4.S3とDynamoDBリソースの初期化
    s3 = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('<作成したテーブル>')
    
    # 5.画像のアップロード処理(画像がある場合のみ)
    # 画像がeventから取得できた場合のみif以下の処理を行い、そうでない場合は
    # image_url,imageFileともに「None」がかえる

    image_url = None
    if imageFile:
        try:
            # Base64デコード
            image_data = base64.b64decode(imageFile)
            
            # S3にアップロード
            file_name = f"images/{item_id}.jpg" 
            s3.put_object(
                Bucket='<作成したバケット>',
                Key=file_name,
                Body=image_data,
                ContentType='image/jpeg' 
            )
            
            # S3のURLを生成
            image_url = f"s3://<作成したバケット>/{file_name}"
            
        except Exception as e:
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error uploading image to S3: {str(e)}')
            }
    
    # 6.DynamoDBに更新するitemの内容を辞書で定義
    item = {
        'id': item_id,
        'reviewText': reviewText,
        'userName': userName,
        'mailAddress': mailAddress
    }
    
    # image_urlがNoneでない場合のみDynamoDBの指定したテーブルにキーと値を格納
    if image_url:
        item['imageUrl'] = image_url
    
    try:
        # 7.DynamoDBにデータを保存
        table.put_item(Item=item)
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error saving item to DynamoDB: {str(e)}')
        }

    # リソースの初期化後、sqs.send_message()に必要な引数を定義
    sqs = boto3.client('sqs')
    queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/545009837499/testQueue'
    message = {
            "id": item_id
        }

    sqs_response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message)
        )
    print(f"SQS Response: {json.dumps(sqs_response)}")  # CloudWatchログに出力
    
    # 8.ステータスコード200(正常終了)を返す
    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'uploaded saved successfully!',
            'id': item_id,
            'imageUrl': image_url if image_url else None
        })
    }

DynamoDB

DynamoDBですが、こちらは適当にテーブルを作り、パーティションキーのみidで作成すれば問題ありません。

S3

S3に関しても、DynamoDBと同様に適当なバケットを作成し、配下にimagesという名のフォルダを作成できればOKです。

SQS

適当な名前でキューを作成します。今回は標準キューで作成しました。
そのあとAWS Lambda 関数をトリガーで、後続の関数を選択します。

Lambda(executeStepFunction)

以下のようなコードを記述しました。
少し戸惑ったのが、SQSから連携されるidを取得するためには以下のようにJSON文字列を取得し、Python辞書型に変換する必要があります。
Python辞書型でeventに連携されることが多いので、要注意です。

body = event['Records'][0]['body']
body_json = json.loads(body)

以下コード全体です。

import json
import boto3

def lambda_handler(event, context):
    # Step Functions クライアントの初期化
    stepfunctions = boto3.client('stepfunctions')
    
    try:
        # イベントからIDを取得
        body = event['Records'][0]['body']
        body_json = json.loads(body)
    
        # idを返す
        id = body_json['id']
        
        if not id:
            return{
                'statusCode': 404,
                'body': json.dumps('id not found')
            }

        # Step Functions の実行開始
        response = stepfunctions.start_execution(
            stateMachineArn='<ステートマシンのARNを指定>', 
            input=json.dumps({
                'id': id  # Step Functions に渡すパラメータ
            })
        )
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Step Functions execution started successfully',
                'executionArn': response['executionArn']
            })
        }
        
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': str(e)
            })
        }

StepFunction

次に、3つの関数を実行するStepFunctionを作成します。
なお、StepFunctionについての当方の理解は、以下ハンズオンを一回実施した程度です。

https://pages.awscloud.com/JAPAN-event-OE-Hands-on-for-Beginners-StepFunctions-2022-reg-event.html?trk=aws_introduction_page

単純に三つの関数を実行するのみですので、シンプルなワークフローになっています。

以下コードです。

{
  "QueryLanguage": "JSONata",
  "Comment": "A description of my state machine",
  "StartAt": "Lambda Invoke -ReviewEmotionCheck",
  "States": {
    "Lambda Invoke -ReviewEmotionCheck": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:545009837499:function:reviewEmotionCheck:$LATEST",
        "Payload": "{% $states.input %}"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
      "Next": "Lambda Invoke - imageComplianceCheck"
    },
    "Lambda Invoke - imageComplianceCheck": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:545009837499:function:imageComplianceCheck:$LATEST",
        "Payload": "{% $states.input %}"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
      "Next": "Lambda Invoke - sendThanksMail"
    },
    "Lambda Invoke - sendThanksMail": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:545009837499:function:sendThanksMail:$LATEST",
        "Payload": "{% $states.input %}"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
      "End": true
    }
  },
  "TimeoutSeconds": 30
}

Lambda(reviewEmotionCheck)

では、StepFunctionで実行するコードについて順番に説明します。
ここで初めて機械学習サービスであるAmazon Comprehendを使用します。
なお、Amazon comprehendのPython(SDK)における利用方法の詳細は以下をご確認ください。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/comprehend/client/detect_sentiment.html

以下コードです。

import json
import boto3

def lambda_handler(event, context):
    # 1. 入力パラメータの取得
    item_id = event.get('id')
    
    # 2. 入力パラメータが空白の場合は、エラーとする
    if not item_id:
        return {
            'statusCode': 400,
            'body': json.dumps('id does not exist')
        }

    # 3. DynamoDBリソースの初期化
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('reviews')

    try:
        # 4. DynamoDBからデータを取得
        response = table.get_item(
            Key={'id': item_id}
        )
        item = response.get('Item', {})
        reviewText = item.get("reviewText")
        
        if not reviewText:
            return {
                'statusCode': 404,
                'body': json.dumps('reviewText not found')
            }
        
        # 5. Comprehendで感情分析を実行
        comprehend = boto3.client('comprehend')
        sentiment_response = comprehend.detect_sentiment(
            Text=reviewText,
            LanguageCode="ja"
        )

        # 6. 既存のitemに感情分析結果を追加して更新
        # DynamoDBに辞書型で新しいキーと値を追加
        item['sentiment'] = sentiment_response.get("Sentiment")
        table.put_item(Item=item)

        return {
            'statusCode': 200,
            'body': json.dumps('Successfully updated item'),
            'id':item_id
        }
    
    except Exception as e:
        # 7. エラーが発生した場合、ステータスコード500(内部サーバエラー)を返す
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

Lambda(imageComplianceCheck)

次はAmazon Rekognitionを用いて、不適切な画像であるかどうか確認を行います。
なお、Amazon rekognitionのPython(SDK)における利用方法は下記を参照ください。

https://boto3.amazonaws.com/v1/documentation/api/1.26.86/reference/services/rekognition/client/detect_moderation_labels.html
以下コードです。

import json
import boto3
import re

def lambda_handler(event, context):
    # 1. 入力パラメータのチェック
    if "id" not in event:
        return {
            'statusCode': 400,
            'body': json.dumps('id does not exist')
        }
    
    # 2. 入力パラメータの取得
    item_id = event["id"]
    
    # 3. DynamoDBリソースの初期化
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('reviews')
    rekognition = boto3.client('rekognition')
    
    try:
        # 4. DynamoDBからデータを取得
        response = table.get_item(
            Key={'id': item_id}
        )
        item = response.get('Item')
        
        if not item:
            return {
                'statusCode': 404,
                'body': json.dumps('Item not found')
            }

        # imageUrlが存在するか確認
        if 'imageUrl' not in item:
            return {
                'statusCode': 400,
                'body': json.dumps('Image URL not found in item')
            }

        # S3 URLからバケット名とキーを抽出
        # 例: s3://<作成したバケット名>/images/xxx.jpg
        image_url = item['imageUrl']
        match = re.match(r's3://([^/]+)/(.+)', image_url)
        
        if not match:
            return {
                'statusCode': 400,
                'body': json.dumps('Invalid S3 URL format')
            }
        
        bucket = match.group(1)  # <作成したバケット名>
        key = match.group(2)     # images/xxx.jpg

        try:
            detectresponse = rekognition.detect_moderation_labels(
                Image={
                    'S3Object': {
                        'Bucket': bucket,
                        'Name': key
                    }
                },
                MinConfidence=70
            )
     # inappropriateの中身が0だった場合はFalseと記載し、そうでない場合はTrueと記載する
            inappropriate = detectresponse['ModerationLabels']
            table.update_item(
                Key={'id': item_id},
                UpdateExpression="SET isInappropriate = :result", 
                ExpressionAttributeValues={':result': len(inappropriate) > 0}
            )
        except Exception as e:
            print(f"Error processing image: {str(e)}")

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error show item from DynamoDB: {str(e)}')
        }
    
    return {
            'statusCode': 200,
            'body': json.dumps('Successfully checked the image'),
            'id': item_id
        }

Lambda(sendThanksMail)

最後に、お礼メールを送信するコードとなります。
Amazon SESのPython(SDK)における利用方法は下記を参照ください。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ses/client/send_email.html
以下コードです。

import json
import boto3
import os

def lambda_handler(event, context):
    # 1. 入力パラメータの取得
    item_id = event.get('id')
    
    # 2. 入力パラメータが空白の場合は、エラーとする
    if not item_id:
        return {
            'statusCode': 400,
            'body': json.dumps('id does not exist')
        }

    # 3. DynamoDBリソースの初期化
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('reviews')

    try:
        # 4. DynamoDBからデータを取得
        response = table.get_item(
            Key={'id': item_id}
        )
        item = response.get('Item', {})
        user_name = item.get("userName")
        mail_address = item.get("mailAddress")
        sentiment = item.get("sentiment")

        required_fields = ['userName', 'mailAddress', 'sentiment']
        
        for field in required_fields:
            if field not in item:
                return {
                    'statusCode': 400,
                    'body': json.dumps(f'{field} does not exist')
                }

        ses_client = boto3.client('ses', region_name='ap-northeast-1')
        # 環境変数からメール設定を取得
        from_mail_address = os.environ['FROM_MAIL_ADDRESS']

        if sentiment == "POSITIVE":
            message = f"頂いたご意見をスタッフ一同励みとして、今後も{user_name}様に安心してご利用いただけるよう努めてまいります"
        elif sentiment == "NEGATIVE":
            message = f"{user_name}様より頂戴したご意見は真摯に受け止め、今後のサービスの改善に役立てたいと考えております。"
        else :
            message = f"{user_name}様より頂戴したご意見は社内にてしっかりご確認させていただきます!"
            

        # メールを送信
        response = ses_client.send_email(
            Destination={
                'ToAddresses': [
                    mail_address
                ],
            },
            Message={
                'Body': {
                    'Text': {
                        'Charset': "UTF-8",
                        'Data': message,
                    },
                },
                'Subject': {
                    'Charset': "UTF-8",
                    'Data': "口コミのご投稿ありがとうございました",
                },
            },
            Source=from_mail_address,
        )
    # メール送信成功後
        return {
            'statusCode': 200,
            'body': json.dumps('Email sent successfully')
        }

    except Exception as e:
        # 7. エラーが発生した場合、ステータスコード500(内部サーバエラー)を返す
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

SES

メールを送信するにあたって、送信元ドメインの検証が必要です。
DKIMという仕組みを用いて、受信メールサーバ側で転送中のメールが改ざんされていないことを検証します。
詳細については以下をご参考ください。
https://dev.classmethod.jp/articles/ses-dkim-auth/

終わりに

今回初めて機械学習サービスを触ってみたのですが、SDKを使用すると思った以上に簡単に使えるな。という印象でした。
引き続き様々なAWSサービスに触れていきたいと思います。

Discussion