【続き】RustでLambdaを書いてsam local start-apiでLocalStackのDynamoDBを使う
はじめに
前回のあらすじ
sam local でAPI Gateway + Lambda のAPIサーバをローカル環境で起動し、dockerで立ち上げたLocalStackのDynamoDBにアクセスするといった内容。
前回の記事では、DynamoDB立ち上げ時にテーブルを作成し、APIでテーブル一覧を取得するところまでを実践しました。
今回やること
今回は、GETでデータ取得、POSTでデータ登録するようなAPIを実装し、よりアプリケーションに近づいた内容を実践したいと思います。
こちらにサンプルを用意しているので良かったら見てください。
get_function.rs
の改修
DynamoDBから取得したデータをjsonにして返したかったのでUserの構造体を追加しました。また、構造体をjsonにパースするのでserde_jsonをインポートしておきます。
あとは前回テーブルを取得していたlist_tables()
からレコードを取得するscan()
メソッドに変更し、取得したデータをVec<User>
へ格納してレスポンスのボディへ含んで返却するだけになります。
use aws_config::BehaviorVersion;
use aws_lambda_events::{apigw::{ApiGatewayProxyRequest, ApiGatewayProxyResponse}, encodings::Body, http::HeaderMap};
use aws_sdk_dynamodb::Client;
use lambda_runtime::{Error, LambdaEvent, run, service_fn};
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(Deserialize, Serialize)]
struct User {
user_id: String,
name: String,
email: String,
}
pub async fn handler(
_event: LambdaEvent<ApiGatewayProxyRequest>,
) -> Result<ApiGatewayProxyResponse, Error> {
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url("http://localstack:4566")
.load()
.await;
let client = Client::new(&config);
let res = client
.scan()
.table_name("users")
.send()
.await;
let mut headers = HeaderMap::new();
headers.insert("content-type", "application/json".parse().unwrap());
let resp = match res {
Ok(resp) => {
tracing::info!("Found {} items", resp.items().len());
let mut users: Vec<User> = Vec::new();
for item in resp.items() {
if let (Some(user_id), Some(name), Some(email)) = (
item.get("user_id").and_then(|v| v.as_s().ok()),
item.get("name").and_then(|v| v.as_s().ok()),
item.get("email").and_then(|v| v.as_s().ok()),
) {
users.push(User {
user_id: user_id.clone(),
name: name.clone(),
email: email.clone(),
});
}
}
let body = json!({
"users": users,
"count": users.len()
}).to_string();
ApiGatewayProxyResponse {
status_code: 200,
multi_value_headers: headers.clone(),
is_base64_encoded: false,
body: Some(Body::Text(body)),
headers,
}
}
Err(err) => {
tracing::error!("Failed to scan users table: {err:?}");
let error_body = json!({
"error": "Failed to get users",
"message": err.to_string()
}).to_string();
ApiGatewayProxyResponse {
status_code: 500,
multi_value_headers: headers.clone(),
is_base64_encoded: false,
body: Some(Body::Text(error_body)),
headers,
}
}
};
Ok(resp)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt().json()
.with_max_level(tracing::Level::INFO)
.with_current_span(false)
.with_ansi(false)
.without_time()
.with_target(false)
.init();
run(service_fn(handler)).await
}
post_function.rs
の改修
get_function.rs同様にUser構造体を定義しjsonをパースするようにします。
各パラメータをDynamoDBのput_item()
で登録するという流れです。返却するボディはメッセージと登録したuserそのままを返します。
use aws_config::BehaviorVersion;
use aws_lambda_events::{apigw::{ApiGatewayProxyRequest, ApiGatewayProxyResponse}, encodings::Body, http::HeaderMap};
use aws_sdk_dynamodb::Client;
use lambda_runtime::{Error, LambdaEvent, run, service_fn};
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(Deserialize, Serialize)]
struct User {
user_id: String,
name: String,
email: String,
}
pub async fn handler(
event: LambdaEvent<ApiGatewayProxyRequest>,
) -> Result<ApiGatewayProxyResponse, Error> {
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url("http://localstack:4566")
.load()
.await;
let client = Client::new(&config);
let mut headers = HeaderMap::new();
headers.insert("content-type", "application/json".parse().unwrap());
let body = event.payload.body.as_ref();
let resp = match body {
Some(body_str) => {
match serde_json::from_str::<User>(body_str) {
Ok(user) => {
let result = client
.put_item()
.table_name("users")
.item("user_id", aws_sdk_dynamodb::types::AttributeValue::S(user.user_id.clone()))
.item("name", aws_sdk_dynamodb::types::AttributeValue::S(user.name.clone()))
.item("email", aws_sdk_dynamodb::types::AttributeValue::S(user.email.clone()))
.send()
.await;
match result {
Ok(_) => {
let response_body = json!({
"message": "User created successfully",
"user": user
}).to_string();
ApiGatewayProxyResponse {
status_code: 201,
multi_value_headers: headers.clone(),
is_base64_encoded: false,
body: Some(Body::Text(response_body)),
headers,
}
}
Err(err) => {
eprintln!("Failed to put item: {err:?}");
let error_body = json!({
"error": "Failed to create user",
"message": err.to_string()
}).to_string();
ApiGatewayProxyResponse {
status_code: 500,
multi_value_headers: headers.clone(),
is_base64_encoded: false,
body: Some(Body::Text(error_body)),
headers,
}
}
}
}
Err(err) => {
eprintln!("Failed to parse JSON: {err:?}");
let error_body = json!({
"error": "Invalid request body",
"message": err.to_string()
}).to_string();
ApiGatewayProxyResponse {
status_code: 400,
multi_value_headers: headers.clone(),
is_base64_encoded: false,
body: Some(Body::Text(error_body)),
headers,
}
}
}
}
None => {
let error_body = json!({
"error": "Request body is required"
}).to_string();
ApiGatewayProxyResponse {
status_code: 400,
multi_value_headers: headers.clone(),
is_base64_encoded: false,
body: Some(Body::Text(error_body)),
headers,
}
}
};
Ok(resp)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
run(service_fn(handler)).await
}
実行する
-
ビルドしてローカルサーバを起動する
sam build sam local start-api --docker-network <DOCKER NETWORK ID>
-
POST
まずはユーザーを登録します。
- リクエスト
curl --location 'http://127.0.0.1:3000/' \ --header 'Content-Type: application/json' \ --data-raw '{ "user_id": "001", "name": "田中太郎", "email": "tanaka@example.com" }'
- レスポンス
{ "message": "User created successfully", "user": { "email": "tanaka@example.com", "name": "田中太郎", "user_id": "001" } }
-
GET
今登録したユーザーを取得します。
- リクエスト
curl --location 'http://127.0.0.1:3000/'
- レスポンス
{ "count": 1, "users": [ { "email": "tanaka@example.com", "name": "田中太郎", "user_id": "001" } ] }
実装のポイント
エラーハンドリング
今回の実装では、以下のようなエラーケースに対応しています。
-
DynamoDBアクセスエラー (500 Internal Server Error)
- DynamoDBへの接続に失敗した場合
- scan/put_itemの実行に失敗した場合
-
リクエストボディのバリデーションエラー (400 Bad Request)
- リクエストボディが空の場合
- JSON形式が不正な場合
- 必須フィールドが欠けている場合
これにより、適切なHTTPステータスコードとエラーメッセージを返すことができます。
DynamoDBのデータ型変換
DynamoDBから取得したデータはAttributeValue
型で返ってくるため、Rustの構造体に変換する必要があります。
item.get("user_id").and_then(|v| v.as_s().ok())
この処理では:
-
get()
でAttributeValueを取得 -
as_s()
で文字列型に変換を試みる -
ok()
でResult型をOption型に変換
複数のフィールドを同時にチェックするため、タプルパターンマッチングを使用しています。
LocalStackとの通信
LocalStackはDockerコンテナで起動しているため、Lambda関数からアクセスする際はendpoint_url
を指定する必要があります。
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url("http://localstack:4566")
.load()
.await;
localstack
というホスト名は、--docker-network
オプションで同じネットワークに接続することで名前解決が可能になります。
実際の開発での応用例
この構成は以下のような場合に有効です。
1. ローカル開発環境の構築
- インターネット接続不要でDynamoDBを使った開発が可能
- AWS利用料金を気にせず開発・テストができる
- チーム全体で統一された開発環境を構築できる
- テストデータの準備と削除が容易
2. 複数のAWSサービスとの統合テスト
- LocalStackはDynamoDB以外にもS3、SNS、SQSなど様々なAWSサービスをサポート
- ローカル環境で複雑なマイクロサービス構成のテストが可能
ハマりポイントと対処法
開発中に遭遇しやすい問題と解決策をまとめます。
1. Dockerネットワークの設定
問題: Lambda関数からLocalStackに接続できない
対処法:
# LocalStackのネットワークIDを確認
docker network ls
# sam local start-apiで同じネットワークを指定
sam local start-api --docker-network <NETWORK_ID>
2. エンドポイントURLの指定
問題: localhost:4566
でアクセスしようとしてエラーになる
対処法: Lambda関数もDockerコンテナで実行されるため、localhost
ではなくlocalstack
というホスト名を使用する必要があります。
3. 環境変数での設定切り替え
問題: 本番環境とローカル環境でエンドポイントを切り替えたい
対処法: 環境変数を使って動的に切り替える実装例。
let endpoint_url = std::env::var("DYNAMODB_ENDPOINT")
.ok();
let mut config_loader = aws_config::defaults(BehaviorVersion::latest());
if let Some(url) = endpoint_url {
config_loader = config_loader.endpoint_url(url);
}
let config = config_loader.load().await;
まとめ
今回は前回の記事に引き続き、Rust + SAM + LocalStackを使ってより実践的なAPIを実装しました。
実装したポイント:
- DynamoDBへのデータ登録(POST)
- DynamoDBからのデータ取得(GET)
- エラーハンドリング
- JSON形式でのリクエスト/レスポンス処理
この構成を使うことで、AWSへデプロイせずにローカル環境で開発・テストが可能になります。
ぜひ参考にして、Rustでのサーバーレス開発を楽しんでください!
Discussion