Open27

lambda定期実行したい

Kumamoto-HamachiKumamoto-Hamachi

行:アイテム(item)
列:アトリビュート(attribute)

パーティションキー:
ソートキー:指定は任意。
プライマリーキー:パーティションキー or パーティション and ソートの組み合わせ
=>パーティションキーは、DynamoDBにおいてユニークである必要はありません。しかし、パーティションキーとソートキーの組み合わせはユニークでなければならない。つまりプライマリーキーはユニーク。
https://zenn.dev/issy/articles/zenn-dynamodb-overview

Kumamoto-HamachiKumamoto-Hamachi

ログ_1

serverlessframeworkさん

インタラクティブにserverlessframeworkのPJを作れる

$ npm i -g serverless
$ serverless plugin install -n serverless-offline
$ serverless plugin install -n serverless-python-requirements

pythonとかpoetryとか

$ cd /your/path/serverless-pj
$ asdf local python 3.12.5
$ asdf local poetry 1.8.3
$ poetry init
$ poetry add -D black mypy isort
-- 現在のPJを除く依存関係のパッケージをインストール
$ poetry install --no-root

nodeとかserverlessのプラグインとか

npmをつかう。

$ asdf local nodejs 20.16.0
$ npm init
$ npm i -D serverless

serverless frameworkのプラグインを入れていく

$ serverless plugin install -n serverless-offline

boto3演習

.envrcに下記を設定

# AWS
export AWS_ACCESS_KEY_ID=xxxx
export AWS_SECRET_ACCESS_KEY=xxxx
export AWS_DEFAULT_REGION=ap-northeast-1
Kumamoto-HamachiKumamoto-Hamachi

overridesは、プロジェクト内で特定のパッケージが依存している別のパッケージのバージョンを制御することが出来る。

この場合、overridesの設定で、serverless-offlineがserverlessパッケージに依存しているときに、使用されるserverlessのバージョンを$serverlessに設定している。これは、プロジェクト内で使用しているserverlessのバージョンに対応させるためのもの。

{
  "name": "practice-alignment-serverless",
  "version": "1.0.0",
  "private": true,
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "fmt": "poetry run black . && poetry run isort .",
    "fmt:check": "poetry run black . --check && poetry run isort . --check && sls package",
    "offline": "sls offline start"
  },
  "devDependencies": {
    "serverless": "^4.1.20",
    "serverless-offline": "^14.0.0"
  },
  "overrides": {
    "serverless-offline": {
      "serverless": "$serverless"
    }
  }
}
Kumamoto-HamachiKumamoto-Hamachi

serverless-frameworkでdyanamodb定義

service: practice-alignment-serverless

provider:
  name: aws
  runtime: python3.12
  stage: dev
  region: ap-northeast-1
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - dynamodb:*
          Resource: arn:aws:dynamodb:${env:AWS_DEFAULT_REGION}:${env:AWS_ACCOUNT_ID}:table/*

resources:
  Resources:
    ExampleTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ExampleTable
        AttributeDefinitions:
          - AttributeName: name
            AttributeType: S
          - AttributeName: deadline_unix_time
            AttributeType: N
        KeySchema:
          - AttributeName: name
            KeyType: HASH
          - AttributeName: deadline_unix_time
            KeyType: RANGE
        ProvisionedThroughput:
            ReadCapacityUnits: 1
            WriteCapacityUnits: 1
        BillingMode: PROVISIONED

functions:
  hello:
    handler: handler.hello

plugins:
  - serverless-offline
  - serverless-python-requirements

dynamodbには予約語があるから注意!!

botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the Query operation: Invalid KeyConditionExpression: Attribute name is a reserved keyword; reserved keywo
rd: name

Kumamoto-HamachiKumamoto-Hamachi

PAY_PER_REQUESTモードでは、読み取りおよび書き込みのスループットを事前に設定せず、実際のリクエスト数に基づいて課金される。

        ProvisionedThroughput:
            ReadCapacityUnits: 1
            WriteCapacityUnits: 1

BillingModeをPROVISIONEDに変更すれば使えるよ。

Kumamoto-HamachiKumamoto-Hamachi

serverless remove

package:
  patterns:
    - "!**"
    - src/**

最初に!**で全ファイルを除外し、次にsrc/**でsrcディレクトリ内のファイルをパッケージに含めるという指定を行っている。

Kumamoto-HamachiKumamoto-Hamachi

stage

service: my-service

provider:
  name: aws
  runtime: nodejs14.x
  stage: ${opt:stage, 'dev'}  # ステージを指定。指定がなければ'dev'を使用
  region: us-east-1

# 環境ごとの設定
custom:
  tableName: ${self:service}-${self:provider.stage}-table
  bucketName: ${self:service}-${self:provider.stage}-bucket

resources:
  Resources:
    MyDynamoDBTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:custom.tableName}
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST

    MyS3Bucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: ${self:custom.bucketName}

functions:
  hello:
    handler: handler.hello
    environment:
      TABLE_NAME: ${self:custom.tableName}
      BUCKET_NAME: ${self:custom.bucketName}

Kumamoto-HamachiKumamoto-Hamachi

前提

DynamoDBとは

Amazon DynamoDB は、AWS が提供しているフルマネージドな NoSQLデータベースのサービスです。
※NoSQLというのはリレーショナルデータベース(RDBMS)以外のデータベースの総称です。

DynamoDBはkey-valueとドキュメントデータモデルをサポートしています。

例えば下記のようなRDBのテーブルがあったとします。

id user_name height
1 John 130
2 Marry 180
3 Bob null

それをDynamoDBでは下記のように表現します。

[
  {
    "id": 1,
    "user_name": "John",
    "height": 130
  },
  {
    "id": 2,
    "user_name": "Marry",
    "height": 180
  },
  {
    "id": 3,
    "user_name": "Bob"
  }
]

「id」、「user_name」、「height」がアトリビュート/attributeにあたります。
また3つ目のアイテム/itemに「height」がないですが、これはDynamoDBがスキーマレスだからです。(レコードが所定のスキーマに適合していることを保証しない)

用語 説明
アイテム(item) RDBで言えばレコード
アトリビュート キーのこと、RDBで言えばカラム名

Amazon DynamoDB とは - Amazon DynamoDB

Serverless FrameworkでDynamoDBを定義

resources:
  Resources:
    ExampleTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ExampleTable
        AttributeDefinitions:
          - AttributeName: user_name
            AttributeType: S
          - AttributeName: deadline_unix_time
            AttributeType: N
        KeySchema:
          - AttributeName: user_name
            KeyType: HASH
          - AttributeName: deadline_unix_time
            KeyType: RANGE
        ProvisionedThroughput:
            ReadCapacityUnits: 1
            WriteCapacityUnits: 1
        BillingMode: PROVISIONED

KeyType: HASHとなっているのがパーティションキー。
KeyType: RANGEとなっているのがソートキー
BillingModeは事前にどのくらいのアクセス量が来るかわからないなら PAY_PER_REQUEST

事前に分かっている・料金を制御したい場合はBillingModeをデフォルトのPROVISIONEDにしてProvisionedThroughputの設定を行う。(今回は)

DynamoDBにデータを入れたり出したり練習

boto3はAWS が公式で提供しているライブラリ。AWSのインフラをPythonコードを使って操作出来る。

from typing import Tuple
from datetime import datetime
import boto3

dynamodb_resource = boto3.resource("dynamodb")
table = dynamodb_resource.Table("ExampleTable")


def hello(event, context):
    # create("熊本 はまち")


def create(user_name: str):
    deadline_unix_time = int(datetime.now().timestamp())
    table.put_item(Item={"user_name": user_name, "deadline_unix_time": deadline_unix_time})


def get(user_name: str, deadline_unix_time: int) -> Tuple[dict, dict]:
    response = table.get_item(Key={"user_name": user_name, "deadline_unix_time": deadline_unix_time})
    item = response.get("Item")
    return item, response


def get_by_user_name(user_name: str) -> None:
    response = table.query(
        KeyConditionExpression=boto3.dynamodb.conditions.Key('user_name').eq(user_name),
        ScanIndexForward=True  # Trueの場合、昇順(過去から順)に並べ替え
    )
    # 取得したアイテム
    items = response['Items']
    # アイテムを出力
    for item in items:
        print(item)
        deadline_unix_time = item.get("deadline_unix_time")
        # decimal.Decimal
        print(datetime.fromtimestamp(int(deadline_unix_time)))


def scan() -> dict:
    response = table.scan()
    return response.get("Items")

Kumamoto-HamachiKumamoto-Hamachi
  • Lambda関数のcontextオブジェクト
    関数名、メモリサイズ、タイムアウトなど、関数の実行環境、リクエストの識別に関する情報、実行時間の制御:
Kumamoto-HamachiKumamoto-Hamachi

Lambdaから操作

Lambda関数を作る

hoge

コンソールからLambda関数のテスト実行

AWS Management ConsoleでLambda関数を開く。
テストイベントを設定、下記のようなJSONデータを入力して「Test」ボタンをクリック
※テストイベントとはいえ、しっかりDBに反映されるので一応注意。

  • POST(create)
{
  "body": "{\"user_name\": \"exampleUserName\", \"task_description\": \"Complete the year-end report\"}"
}
  • GET(fetch_by_user_name)
{
  "queryStringParameters": {
    "user_name": "exampleUserName"
  }
}
Kumamoto-HamachiKumamoto-Hamachi

TODO:EventBridgeをserverless frameworkでやってみる

Amazon EventBridgeは、何かが起きたときに自動で他の処理を起動するためのサービス。システム内の自動化やタスクの連携が得意。
Amazon API Gatewayは、ユーザーや他のアプリケーションがデータをやり取りするためのインターフェースを提供するサービス。APIを通じてデータの送受信を管理するのが得意。

Kumamoto-HamachiKumamoto-Hamachi

TODO:AWS Systems Managerは、Parameter StoreとSecrets Managerを含む総称?

パラメーターストア設定

service: my-service

provider:
  name: aws
  runtime: python3.8
  region: us-east-1

resources:
  Resources:
    MyParameter:
      Type: AWS::SSM::Parameter
      Properties:
        Name: /my-service/DYNAMODB_TABLE
        Type: String
        Value: my-dynamodb-table

よみとり

provider:
  name: aws
  runtime: python3.8
  environment:
    DYNAMODB_TABLE: ${ssm:/my-service/DYNAMODB_TABLE}  # Parameter Storeから取得

functions:
  myFunction:
    handler: handler.my_function

Secret Managerに設定した環境変数を読み込みたい

provider:
  name: aws
  runtime: python3.8
  environment:
    MY_SECRET: ${ssm-secure:/my-service/my-secret}
Kumamoto-HamachiKumamoto-Hamachi

Event Bridgeのイベントパターン

source:イベントの発生元。今回ならEC2
detail-type:イベントの詳細。EC2のstate変化
detail:イベントの詳細の情報。stateがrunningになったら。
→EC2インスタンスが起動したというイベントのみマッチして、ターゲットへ。

{
  "source": ["aws.ec2"],
  "detail-type": ["EC2 Instance State-change Notification"],
    "detail": {
      "state": ["running"]
  }
}
  1. Whenever an Amazon EC2 instance changes state, Amazon EC2 (the event source) automatically sends that event to the default event bus.

  2. EventBridge evaluates all events sent to the default event bus against the rule you've created.

イベント自体はルールとは別に常に送ってきてるっぽい。

https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html

Kumamoto-HamachiKumamoto-Hamachi

定期実行でdynamodbに値をいれるやつ

  • serverless.yml

いったん、定期実行が動かないようにenabled: falseにしている。

  create:
    handler: src/handler.create
    events:
      - eventBridge:
          enabled: false
          schedule: rate(3 minutes)
          input:
            body:
              user_name: "example_kumamoto"

  • handler.py
def create(event: dict, _: LambdaContext):
    print("*" * 15)  # debug
    print("event", type(event))  # debug
    print("*" * 15)  # debug
    print("event", event)  # debug
    print("*" * 15)  # debug
    print("event['body']", type(event["body"]))  # debug
    print("*" * 15)  # debug
    print("event['body']", event["body"])  # debug
    # httpならこっち
    # item = json.loads(event['body'])
    # EventBridgeならこっち
    item = event['body']
    item["deadline_unix_time"] = int(datetime.now().timestamp())
    table.put_item(Item=item)
    return {
        "statusCode": 201,  # 201 Created
        "body": json.dumps(item)
    }