lambda定期実行したい
行:アイテム(item)
列:アトリビュート(attribute)
パーティションキー:
ソートキー:指定は任意。
プライマリーキー:パーティションキー or パーティション and ソートの組み合わせ
=>パーティションキーは、DynamoDBにおいてユニークである必要はありません。しかし、パーティションキーとソートキーの組み合わせはユニークでなければならない。つまりプライマリーキーはユニーク。
ログ_1
serverlessframeworkさん
インタラクティブにserverlessframeworkのPJを作れる
$ npm i -g serverless
$ serverless plugin install -n serverless-offline
$ serverless plugin install -n serverless-python-requirements
pythonとかpoetryとか
$ cd /your/path/serverless-pj
$ asdf local python 3.12.5
$ asdf local poetry 1.8.3
$ poetry init
$ poetry add -D black mypy isort
-- 現在のPJを除く依存関係のパッケージをインストール
$ poetry install --no-root
nodeとかserverlessのプラグインとか
npmをつかう。
$ asdf local nodejs 20.16.0
$ npm init
$ npm i -D serverless
serverless frameworkのプラグインを入れていく
$ serverless plugin install -n serverless-offline
boto3演習
.envrc
に下記を設定
# AWS
export AWS_ACCESS_KEY_ID=xxxx
export AWS_SECRET_ACCESS_KEY=xxxx
export AWS_DEFAULT_REGION=ap-northeast-1
overridesは、プロジェクト内で特定のパッケージが依存している別のパッケージのバージョンを制御することが出来る。
この場合、overridesの設定で、serverless-offlineがserverlessパッケージに依存しているときに、使用されるserverlessのバージョンを$serverlessに設定している。これは、プロジェクト内で使用しているserverlessのバージョンに対応させるためのもの。
{
"name": "practice-alignment-serverless",
"version": "1.0.0",
"private": true,
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"fmt": "poetry run black . && poetry run isort .",
"fmt:check": "poetry run black . --check && poetry run isort . --check && sls package",
"offline": "sls offline start"
},
"devDependencies": {
"serverless": "^4.1.20",
"serverless-offline": "^14.0.0"
},
"overrides": {
"serverless-offline": {
"serverless": "$serverless"
}
}
}
serverless-frameworkでdyanamodb定義
service: practice-alignment-serverless
provider:
name: aws
runtime: python3.12
stage: dev
region: ap-northeast-1
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:*
Resource: arn:aws:dynamodb:${env:AWS_DEFAULT_REGION}:${env:AWS_ACCOUNT_ID}:table/*
resources:
Resources:
ExampleTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ExampleTable
AttributeDefinitions:
- AttributeName: name
AttributeType: S
- AttributeName: deadline_unix_time
AttributeType: N
KeySchema:
- AttributeName: name
KeyType: HASH
- AttributeName: deadline_unix_time
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
BillingMode: PROVISIONED
functions:
hello:
handler: handler.hello
plugins:
- serverless-offline
- serverless-python-requirements
dynamodbには予約語があるから注意!!
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the Query operation: Invalid KeyConditionExpression: Attribute name is a reserved keyword; reserved keywo
rd: name
PAY_PER_REQUESTモードでは、読み取りおよび書き込みのスループットを事前に設定せず、実際のリクエスト数に基づいて課金される。
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
BillingModeをPROVISIONEDに変更すれば使えるよ。
npm install --save-dev serverless-iam-roles-per-function
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:*
Resource: arn:aws:dynamodb:${AWS_DEFAULT_REGION}:${AWS_ACCOUNT_ID}:table/*
serverless remove
package:
patterns:
- "!**"
- src/**
最初に!**で全ファイルを除外し、次にsrc/**でsrcディレクトリ内のファイルをパッケージに含めるという指定を行っている。
stage
service: my-service
provider:
name: aws
runtime: nodejs14.x
stage: ${opt:stage, 'dev'} # ステージを指定。指定がなければ'dev'を使用
region: us-east-1
# 環境ごとの設定
custom:
tableName: ${self:service}-${self:provider.stage}-table
bucketName: ${self:service}-${self:provider.stage}-bucket
resources:
Resources:
MyDynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:custom.tableName}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
MyS3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:custom.bucketName}
functions:
hello:
handler: handler.hello
environment:
TABLE_NAME: ${self:custom.tableName}
BUCKET_NAME: ${self:custom.bucketName}
前提
DynamoDBとは
Amazon DynamoDB は、AWS が提供しているフルマネージドな NoSQLデータベースのサービスです。
※NoSQLというのはリレーショナルデータベース(RDBMS)以外のデータベースの総称です。
DynamoDBはkey-valueとドキュメントデータモデルをサポートしています。
例えば下記のようなRDBのテーブルがあったとします。
id | user_name | height |
---|---|---|
1 | John | 130 |
2 | Marry | 180 |
3 | Bob | null |
それをDynamoDBでは下記のように表現します。
[
{
"id": 1,
"user_name": "John",
"height": 130
},
{
"id": 2,
"user_name": "Marry",
"height": 180
},
{
"id": 3,
"user_name": "Bob"
}
]
「id」、「user_name」、「height」がアトリビュート/attributeにあたります。
また3つ目のアイテム/itemに「height」がないですが、これはDynamoDBがスキーマレスだからです。(レコードが所定のスキーマに適合していることを保証しない)
用語 | 説明 |
---|---|
アイテム(item) | RDBで言えばレコード |
アトリビュート | キーのこと、RDBで言えばカラム名 |
Amazon DynamoDB とは - Amazon DynamoDB
Serverless FrameworkでDynamoDBを定義
resources:
Resources:
ExampleTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ExampleTable
AttributeDefinitions:
- AttributeName: user_name
AttributeType: S
- AttributeName: deadline_unix_time
AttributeType: N
KeySchema:
- AttributeName: user_name
KeyType: HASH
- AttributeName: deadline_unix_time
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
BillingMode: PROVISIONED
KeyType: HASH
となっているのがパーティションキー。
KeyType: RANGE
となっているのがソートキー
BillingMode
は事前にどのくらいのアクセス量が来るかわからないなら PAY_PER_REQUEST
事前に分かっている・料金を制御したい場合はBillingMode
をデフォルトのPROVISIONED
にしてProvisionedThroughput
の設定を行う。(今回は)
DynamoDBにデータを入れたり出したり練習
boto3はAWS が公式で提供しているライブラリ。AWSのインフラをPythonコードを使って操作出来る。
from typing import Tuple
from datetime import datetime
import boto3
dynamodb_resource = boto3.resource("dynamodb")
table = dynamodb_resource.Table("ExampleTable")
def hello(event, context):
# create("熊本 はまち")
def create(user_name: str):
deadline_unix_time = int(datetime.now().timestamp())
table.put_item(Item={"user_name": user_name, "deadline_unix_time": deadline_unix_time})
def get(user_name: str, deadline_unix_time: int) -> Tuple[dict, dict]:
response = table.get_item(Key={"user_name": user_name, "deadline_unix_time": deadline_unix_time})
item = response.get("Item")
return item, response
def get_by_user_name(user_name: str) -> None:
response = table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key('user_name').eq(user_name),
ScanIndexForward=True # Trueの場合、昇順(過去から順)に並べ替え
)
# 取得したアイテム
items = response['Items']
# アイテムを出力
for item in items:
print(item)
deadline_unix_time = item.get("deadline_unix_time")
# decimal.Decimal
print(datetime.fromtimestamp(int(deadline_unix_time)))
def scan() -> dict:
response = table.scan()
return response.get("Items")
- Lambda関数のcontextオブジェクト
関数名、メモリサイズ、タイムアウトなど、関数の実行環境、リクエストの識別に関する情報、実行時間の制御:
Lambdaから操作
Lambda関数を作る
hoge
コンソールからLambda関数のテスト実行
AWS Management ConsoleでLambda関数を開く。
テストイベントを設定、下記のようなJSONデータを入力して「Test」ボタンをクリック
※テストイベントとはいえ、しっかりDBに反映されるので一応注意。
- POST(create)
{
"body": "{\"user_name\": \"exampleUserName\", \"task_description\": \"Complete the year-end report\"}"
}
- GET(fetch_by_user_name)
{
"queryStringParameters": {
"user_name": "exampleUserName"
}
}
TODO:EventBridgeをserverless frameworkでやってみる
Amazon EventBridgeは、何かが起きたときに自動で他の処理を起動するためのサービス。システム内の自動化やタスクの連携が得意。
Amazon API Gatewayは、ユーザーや他のアプリケーションがデータをやり取りするためのインターフェースを提供するサービス。APIを通じてデータの送受信を管理するのが得意。
TODO:AWS Systems Managerは、Parameter StoreとSecrets Managerを含む総称?
パラメーターストア設定
service: my-service
provider:
name: aws
runtime: python3.8
region: us-east-1
resources:
Resources:
MyParameter:
Type: AWS::SSM::Parameter
Properties:
Name: /my-service/DYNAMODB_TABLE
Type: String
Value: my-dynamodb-table
よみとり
provider:
name: aws
runtime: python3.8
environment:
DYNAMODB_TABLE: ${ssm:/my-service/DYNAMODB_TABLE} # Parameter Storeから取得
functions:
myFunction:
handler: handler.my_function
Secret Managerに設定した環境変数を読み込みたい
provider:
name: aws
runtime: python3.8
environment:
MY_SECRET: ${ssm-secure:/my-service/my-secret}
Event Bridgeのイベントパターン
source:イベントの発生元。今回ならEC2
detail-type:イベントの詳細。EC2のstate変化
detail:イベントの詳細の情報。stateがrunningになったら。
→EC2インスタンスが起動したというイベントのみマッチして、ターゲットへ。
{
"source": ["aws.ec2"],
"detail-type": ["EC2 Instance State-change Notification"],
"detail": {
"state": ["running"]
}
}
Whenever an Amazon EC2 instance changes state, Amazon EC2 (the event source) automatically sends that event to the default event bus.
EventBridge evaluates all events sent to the default event bus against the rule you've created.
イベント自体はルールとは別に常に送ってきてるっぽい。
$ poetry shell
$ sls offline start
定期実行でdynamodbに値をいれるやつ
- serverless.yml
いったん、定期実行が動かないようにenabled: false
にしている。
create:
handler: src/handler.create
events:
- eventBridge:
enabled: false
schedule: rate(3 minutes)
input:
body:
user_name: "example_kumamoto"
- handler.py
def create(event: dict, _: LambdaContext):
print("*" * 15) # debug
print("event", type(event)) # debug
print("*" * 15) # debug
print("event", event) # debug
print("*" * 15) # debug
print("event['body']", type(event["body"])) # debug
print("*" * 15) # debug
print("event['body']", event["body"]) # debug
# httpならこっち
# item = json.loads(event['body'])
# EventBridgeならこっち
item = event['body']
item["deadline_unix_time"] = int(datetime.now().timestamp())
table.put_item(Item=item)
return {
"statusCode": 201, # 201 Created
"body": json.dumps(item)
}