【CFn/API Gateway/DynamoDB/Lambda】API呼び出しでDynamoDBを操作する<実装編>
1.はじめに
今回はAPI呼び出しでDynamoDBを操作するLambda関数を実装したいと思います。
実装物の機能イメージは以下の通りです。
- Curlコマンドまたは画面からAPIを呼び出す
- API Gatewayを経由してLambda関数を実行する
- Lambda関数上でDynamoDBを操作しレコードを挿入する
また、各リソース(API Gateway/DynamoDB/Lambda)をテンプレート化してCloudFormationでデプロイできるようにします。
この記事は実装編・デプロイ編・フロント編の3部構成となっています。
- 実装編:PythonでのLambda関数実装が中心
- デプロイ編:「template.yaml」の記述方法・デプロイ後の動作確認方法が中心
- フロント編(未執筆):HTMLで画面を作成してAPIを呼び出す方法について
今回は実装編ということでPythonでのLambda関数実装を行っていきます。
本記事で実装するものは以下リポジトリに上げています。
2.準備
本記事では以下を前提とします。
- AWS環境があることを前提とします
- Lambda関数の実装はPythonを使用します
- 【実装編では任意】AWS SAMの環境構築を終えていること
- 【実装編では任意】AWS SAM CLI利用の準備
■ ディレクトリ構成
本記事では以下のディレクトリ構成で進めます。
親ディレクトリ
┣ samconfig.toml ← デプロイ編で作成
┣ template.yaml ← デプロイ編で作成
┣ common_layer
┃ ┗ requirements.txt ← 実装編で作成
┗ dynamo_lambda_function
┗ app.py ← 実装編で作成
┗ lambda_function.py ← 実装編で作成
■ ライブラリ
ここでは実装の前提となるライブラリについて説明します。
今回使用するライブラリは以下の通りです。
※★の付いているライブラリはLambdaの実装で必須級のライブラリです。
ライブラリ | 説明 |
---|---|
★json | Lambda関数上でレスポンス操作を行う都合上、必須と言えるライブラリです |
★aws-lambda-powertools | Lambdaでの実装を支援するライブラリです |
★boto3 | AWSの各種サービスを操作するためのライブラリです |
os | template.yamlから環境変数を取得するために使用します |
uuid | DynamoDBの一意のキーを生成するために使用します |
requirements.txtの作成
Lambda関数上で使用するライブラリの準備を行います。
紹介したライブラリを「common_layer」配下に「requirements.txt」として定義します。
※「json」「os」「uuid」はデフォルトで導入済みのため不要です。
aws-lambda-powertools==2.42.0 ; python_version >= "3.11" and python_version < "3.12"
boto3==1.34.151 ; python_version >= "3.11" and python_version < "3.12"
3.実装物の説明
それでは実装の説明を行います。
今回は以下の実装を行います。
■ src/dynamo_lambda_function/lambda_function.py
lambda_function.pyはDynamoDBを操作するためのソースコードです。
役割としては、API呼び出し時のリクエストに応じた処理を実施します。
例えば、POSTならDynamoDBへ新規登録、PUTならDynamoDBのレコード更新といった感じです。
src/dynamo_lambda_function/lambda_function.py
import json
import os
import uuid
import boto3
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler.api_gateway import Response, Router
logger = Logger()
router = Router()
@router.post("/data/<user_id>")
def post_data(user_id):
# boto3クライアントを定義
dynamo_client = boto3.client("dynamodb")
# 登録先のテーブル名をtemplate.yamlから取得
table_name = os.getenv("DYNAMO_TABLE_NAME")
# API呼び出し時のbody内容を取得
body = router.current_event.json_body
logger.info(body)
# DynamoDBに登録するdynamo_idを発番
dynamo_id = str(uuid.uuid4())
# DynamoDBの登録先テーブルと登録データを定義
params = {
"TableName": table_name,
"Item": {
"dynamo_id": {"S": dynamo_id},
"user_id": {"S": user_id},
"name": {"S": body["name"]},
},
}
try:
# DynamoDBへの登録処理
data = dynamo_client.put_item(**params)
# 正常時はステータスコードを200とし処理結果を返す
return Response(
status_code=200,
body=json.dumps(data),
content_type="application/json"
)
except Exception as e:
# エラー時はステータスコードを500としエラーメッセージを返す
return Response(
status_code=500,
body={"error": str(e)},
content_type="application/json",
)
■ src/dynamo_lambda_function/app.py
app.pyはAPI Gateway経由で呼び出されます。
役割としては、API呼び出し時のリクエストに応じて、後続の処理を割り振る役割を担います。
例えば、
API呼び出し(POST) → API Gateway → app.py → lambda_function.py
という順番で呼び出されます。
コード全体としては以下の通りです。
src/dynamo_lambda_function/app.py
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from lambda_function import router as dynamo_router
logger = Logger()
# API Gatewayのリクエストを処理するためのアプリケーションを定義
app = APIGatewayRestResolver()
# 外部定義されたルーター (dynamo_router) をアプリケーションに追加
app.include_router(dynamo_router)
@logger.inject_lambda_context(log_event=True)
def lambda_handler(event, context):
# API Gatewayからのリクエストを解決して応答を返す
return app.resolve(event, context)
4.実装
■ lambda_function.pyの実装
手順1:必要なライブラリのインポート
はじめに必要なライブラリをインポートします。
import json
import os
import uuid
import boto3
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler.api_gateway import Response, Router
手順2:loggerの定義
次に「aws_lambda_powertools」からインポートした「logger」を定義します。
「logger」を定義することでLambda関数のCloudWatchログにログ出力 できます。
logger = Logger()
手順3:routerの定義
次にルータを定義します。
ルータを定義することでAPI呼び出し時のメソッドに応じた処理を定義することができます。
router = Router()
@router.post("/data/<user_id>")
def post_data(user_id):
例えば、Lambda関数のメソッドの手前に@router.post("/data/<user_id>")
とデコレータを記述することで以下のような処理が定義できます。
- API呼び出しURL:
https://APIGatewayのエンドポイント/dev/data/00001
- @router.post:API呼び出し時のメソッドに応じた処理の定義
- <user_id>:APIに渡すことができるパラメータ
手順4:メインメソッドの定義
メインメソッドの実装に入ります。
まず、DynamoDBを操作するための以下準備を行います。
- DynamoDB操作のためのboto3クライアントを定義する
- テーブル名をtemplate.yamlから取得する
- API呼び出し時のbody内容を取得
@router.post("/data/<user_id>")
def post_data(user_id):
# boto3クライアントを定義
dynamo_client = boto3.client("dynamodb")
# 登録先のテーブル名をtemplate.yamlから取得
table_name = os.getenv("DYNAMO_TABLE_NAME")
# API呼び出し時のbody内容を取得
body = router.current_event.json_body
logger.info(body)
続いて、DynamoDBに登録するためのレコード情報を定義します。
# DynamoDBに登録するdynamo_idを発番
dynamo_id = str(uuid.uuid4())
# DynamoDBの登録先テーブルと登録データを定義
params = {
"TableName": table_name,
"Item": {
"dynamo_id": {"S": dynamo_id},
"user_id": {"S": user_id},
"name": {"S": body["name"]},
},
}
今回は以下のようなテーブル構造としてレコードを登録します。
dynamo_id | user_id | name |
---|---|---|
一意の値を持つ主キーです uuidライブラリを使って生成 |
API呼び出し時のURLとして渡されたパラメータです@router.post("/data/<user_id>")
|
API呼び出し時の「body」として与えられるパラメータです ※デプロイ編で説明します |
最後にレコードの挿入を行う部分です。
挿入処理部分はdata = dynamo_client.put_item(**params)
の一行です。
- 正常終了した場合、レスポンスとしてステータスコード200を返します。
- 異常終了した場合、レスポンスとしてステータスコード500を返します。
try:
# DynamoDBへの登録処理
data = dynamo_client.put_item(**params)
# 正常時はステータスコードを200とし処理結果を返す
return Response(
status_code=200,
body=json.dumps(data),
content_type="application/json"
)
except Exception as e:
# エラー時はステータスコードを500としエラーメッセージを返す
return Response(
status_code=500,
body={"error": str(e)},
content_type="application/json",
)
■ app.pyの実装
それでは、「app.py」の実装を実施します。
手順1:必要なライブラリのインポート
はじめに必要なライブラリをインポートします。
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from lambda_function import router as dynamo_router
手順2:loggerの定義
次に「aws_lambda_powertools」からインポートした「logger」を定義します。
「logger」を定義することでLambda関数のCloudWatchログにログ出力 できます。
logger = Logger()
手順3:外部ルーターの定義
ここでは、「src/dynamo_lambda_function/lambda_function.py」を外部ルータとして追加します。
「app.include_router(dynamo_router)」とすることで、API呼び出し時のメソッドに対応して「lambda_function.py」を呼び出すことができます。
# API Gatewayのリクエストを処理するためのアプリケーションを定義
app = APIGatewayRestResolver()
# 外部定義されたルーター (dynamo_router) をアプリケーションに追加
app.include_router(dynamo_router)
手順4:メインメソッドの定義
最後にメインメソッドです。
この部分はいわゆる決まり文句のような記述方法となります。
@logger.inject_lambda_context(log_event=True)
def lambda_handler(event, context):
# API Gatewayからのリクエストを解決して応答を返す
return app.resolve(event, context)
5.おわりに
今回の実装編では、API Gateway経由でDynamoDBを操作するためのLambda関数を実装しました。
次回のデプロイ編では、API Gateway・DynamoDB・Lambdaを「template.yaml」上で定義し、CloudFormationから一括デプロイします。
そして、PowerShellからAPIを呼び出してDynamoDBにレコードが登録されることを確認します。
デプロイ編で目指す内容
デプロイ編で目指す内容としては、以下手順でAPIの呼び出すことを目指します。
1.PowerShellからAPIを呼び出す
$ Invoke-WebRequest -Uri "https://APIゲートウェイのエンドポイント/dev/data/00001" -Method Post -Headers @{"Content-Type"="application/json";"x-api-key"="APIキー"} -Body '{"name": "is0383kk"}'
2.呼び出し成功時のレスポンスを確認する
StatusCode : 200
StatusDescription : OK
Content : {"ResponseMetadata": {"RequestId": "ダミー", "HTTPStatusCode": 200, "HTTPHeaders": {"server": "Server", "date": "Thu, 26 Sep 2024 15:05:53 GMT",
"content-...
RawContent : HTTP/1.1 200 OK
Connection: keep-alive
x-amzn-RequestId: ダミー
x-amz-apigw-id: ダミー
X-Amzn-Trace-Id: ダミー
Forms : {}
Headers : {[Connection, keep-alive], [x-amzn-RequestId, ダミー], [x-amz-apigw-id, ダミー], [X-Amzn-Trace-Id, ダミー]}
Images : {}
InputFields : {}
Links : {}
ParsedHtml : System.__ComObject
RawContentLength : 414
3.DynamoDBに挿入されたレコードを確認する
DynamoDBを確認すると以下のようにレコードが挿入されていることも確認できます。
Discussion