Closed1

AWS Amplifyで作成した Lambda から DynamoDB の疎通確認を行う

ZUMAZUMA

amplify add function を実行します。

$ amplify add function
? Select which capability you want to add: (Use arrow keys)
❯ Lambda function (serverless function)
  Lambda layer (shared code & resource used across functions)

Lambda Function 名を決めます。今回は checkDynamoDBConnectionFunction と命名しました。

? Provide an AWS Lambda function name: checkDynamoDBConnectionFunction

言語は Python を選択します。

? Choose the runtime that you want to use:
  .NET 6
  Go
  Java
  NodeJS
❯ Python

次の設問は N を入力します。

Only one template found - using Hello World by default.

Available advanced settings:
- Resource access permissions
- Scheduled recurring invocation
- Lambda layers configuration
- Environment variables configuration
- Secret values configuration

? Do you want to configure advanced settings? (y/N)

最後の設問で Y を入力して Lambda Function の index.py を開きます。

? Do you want to edit the local lambda function now? Yes

コードを書く事前準備として Lambda のルートディレクトリに移動して boto3 をインストールします。

cd amplify/backend/function/checkDynamoDBConnectionFunction
pipenv install boto3

CRUD 操作をする Python コードは以下となります。

index.py
import json
import boto3
import time
from datetime import datetime, timezone

TABLE_NAME = "Messages-t6ceffvesncxvgkpx6u6yus4zm-dev"
QUERY_INDEX_NAME = "byLineUserId"

dynamodb = boto3.client("dynamodb")

messages = [
    {"pKey": "p0001", "lineUserId": "uid0001", "role": "user", "content": "What is your name?"},
    {"pKey": "p0002", "lineUserId": "uid0001", "role": "assistant", "content": "My name is ChatGPT."},
    {"pKey": "p0003", "lineUserId": "uid0001", "role": "user", "content": "Where are you from?"},
    {"pKey": "p0004", "lineUserId": "uid0001", "role": "assistant", "content": "I’m from the internet."},
    {"pKey": "p0005", "lineUserId": "uid0001", "role": "user", "content": "What can ChatGPT do?"},
    {"pKey": "p0006", "lineUserId": "uid0001", "role": "assistant", "content": "ChatGPT can chat with you about various topics."},
]


def handler(event, context):

    print("received event:")

    print(event)
    try:
        pKey = "p0001"
        now = datetime.now()

        print("--- scan ---")
        scan()

        print("--- put ---")
        message = messages[0]
        pKey = message["pKey"]
        put(pKey, message["lineUserId"], message["role"], message["content"], now)

        print("--- get ---")
        get(pKey)

        print("--- update ---")
        update(pKey, "It\"s a new question.", now)

        print("--- get ---")
        get(pKey)

        print("--- delete ---")
        delete(pKey)

        print("--- put items ---")
        for message in messages:
            time.sleep(1)
            now = datetime.now()
            put(message["pKey"], message["lineUserId"], message["role"], message["content"], now)

        print("--- scan ---")
        scan()

        print("--- query ---")
        items = query("uid0001")
        # Reverse messages
        reversedItems = list(reversed(items))
        prompts = list(map(lambda message: {"role": message["role"]["S"], "content": message["content"]["S"]}, reversedItems))
        print("--- prompts ---")
        print(prompts)

        print("--- delete items ---")
        for message in messages:
            delete(message["pKey"])

        print("--- scan ---")
        scan()

        return {
            "statusCode": 200,
            "body": json.dumps("Successfully connected to DynamoDB")
        }
    except Exception as e:
        print(e)
        return {
            "statusCode": 500,
            "body": json.dumps("Error scanning DynamoDB table")
        }


def printArray(items):
    # アイテムの要素(属性名と値)を順番にprintする
    print(f"length: {len(items)}")
    for index, item in enumerate(items):
        print(f"index: {index}")
        for key, value in item.items():
            print(key, value)


def scan():
    # スキャン結果の最後のキー
    last_key = None

    # スキャン結果が空になるまでループ
    while True:
        # スキャンパラメータを設定
        scan_params = {
            "TableName": TABLE_NAME,
        }
        # 最後のキーがあれば追加
        if last_key:
            scan_params["ExclusiveStartKey"] = last_key
        try:
            # テーブルをスキャンして結果を取得
            scan_result = dynamodb.scan(**scan_params)
            # 結果からアイテム(辞書型)のリストを取得
            items = scan_result["Items"]
            printArray(items)
            # 結果から最後のキーを取得
            last_key = scan_result.get("LastEvaluatedKey")
            # 最後のキーがなければループ終了
            if not last_key:
                break
        except Exception as e:
            raise e


def query(queryKey):

    # クエリパラメータを設定
    query_params = {
        "TableName": TABLE_NAME,
        "IndexName": QUERY_INDEX_NAME,
        "KeyConditionExpression": "#lineUserId = :lineUserId",
        "ExpressionAttributeNames": {
            "#lineUserId": "lineUserId"
        },
        "ExpressionAttributeValues": {
            ":lineUserId": {"S": queryKey}
        },
        # createdAtの降順でソート
        "ScanIndexForward": False,
        # 上位20件だけ取得
        "Limit": 20
    }
    try:
        # テーブルをクエリして結果を取得
        query_result = dynamodb.query(**query_params)
        # 結果からアイテム(辞書型)のリストを取得
        items = query_result["Items"]
        printArray(items)
        return items
    except Exception as e:
        raise e


def put(pKey, uid, role, content, now):
    options = {
        "TableName": TABLE_NAME,
        "Item": {
            "id": {"S": pKey},
            "lineUserId": {"S": uid},
            "role": {"S": role},
            "content": {"S": content},
            "createdAt": {"S": now.isoformat()},
        },
    }
    try:
        dynamodb.put_item(**options)
    except Exception as e:
        raise e


def get(pKey):
    options = {
        "TableName": TABLE_NAME,
        "Key": {
            "id": {"S": pKey},
        }
    }
    try:
        ret = dynamodb.get_item(**options)
        print(ret["Item"])
    except Exception as e:
        raise e


def update(pKey, content, now):
    options = {
        "TableName": TABLE_NAME,
        "Key": {
            "id": {"S": pKey},
        },
        "UpdateExpression": "set #content = :content, #updatedAt = :updatedAt",
        "ExpressionAttributeNames": {
            "#content": "content",
            "#updatedAt": "updatedAt",
        },
        "ExpressionAttributeValues": {
            ":content": {"S": content},
            ":updatedAt": {"S": now.isoformat()},
        }
    }
    try:
        dynamodb.update_item(**options)
    except Exception as e:
        raise e


def delete(pKey):
    options = {
        "TableName": TABLE_NAME,
        "Key": {
            "id": {"S": pKey},
        }
    }
    try:
        dynamodb.delete_item(**options)
    except Exception as e:
        raise e
このスクラップは2023/03/17にクローズされました