👌

【続き】RustでLambdaを書いてsam local start-apiでLocalStackのDynamoDBを使う

に公開

はじめに

前回のあらすじ

sam local でAPI Gateway + Lambda のAPIサーバをローカル環境で起動し、dockerで立ち上げたLocalStackのDynamoDBにアクセスするといった内容。
前回の記事では、DynamoDB立ち上げ時にテーブルを作成し、APIでテーブル一覧を取得するところまでを実践しました。

https://zenn.dev/tsuruya/articles/d210d2829b0e96

今回やること

今回は、GETでデータ取得、POSTでデータ登録するようなAPIを実装し、よりアプリケーションに近づいた内容を実践したいと思います。

こちらにサンプルを用意しているので良かったら見てください。
https://github.com/tsuruya-jp/rust-samlocalapi-localstack

get_function.rsの改修

DynamoDBから取得したデータをjsonにして返したかったのでUserの構造体を追加しました。また、構造体をjsonにパースするのでserde_jsonをインポートしておきます。

あとは前回テーブルを取得していたlist_tables()からレコードを取得するscan()メソッドに変更し、取得したデータをVec<User>へ格納してレスポンスのボディへ含んで返却するだけになります。

get_function.rs
use aws_config::BehaviorVersion;
use aws_lambda_events::{apigw::{ApiGatewayProxyRequest, ApiGatewayProxyResponse}, encodings::Body, http::HeaderMap};
use aws_sdk_dynamodb::Client;
use lambda_runtime::{Error, LambdaEvent, run, service_fn};
use serde::{Deserialize, Serialize};
use serde_json::json;

#[derive(Deserialize, Serialize)]
struct User {
    user_id: String,
    name: String,
    email: String,
}

pub async fn handler(
    _event: LambdaEvent<ApiGatewayProxyRequest>,
) -> Result<ApiGatewayProxyResponse, Error> {

    let config = aws_config::defaults(BehaviorVersion::latest())
        .endpoint_url("http://localstack:4566")
        .load()
        .await;

    let client = Client::new(&config);

    let res = client
        .scan()
        .table_name("users")
        .send()
        .await;

    let mut headers = HeaderMap::new();
    headers.insert("content-type", "application/json".parse().unwrap());

    let resp = match res {
        Ok(resp) => {
            tracing::info!("Found {} items", resp.items().len());
            let mut users: Vec<User> = Vec::new();

            for item in resp.items() {
                if let (Some(user_id), Some(name), Some(email)) = (
                    item.get("user_id").and_then(|v| v.as_s().ok()),
                    item.get("name").and_then(|v| v.as_s().ok()),
                    item.get("email").and_then(|v| v.as_s().ok()),
                ) {
                    users.push(User {
                        user_id: user_id.clone(),
                        name: name.clone(),
                        email: email.clone(),
                    });
                }
            }

            let body = json!({
                "users": users,
                "count": users.len()
            }).to_string();

            ApiGatewayProxyResponse {
                status_code: 200,
                multi_value_headers: headers.clone(),
                is_base64_encoded: false,
                body: Some(Body::Text(body)),
                headers,
            }
        }
        Err(err) => {
            tracing::error!("Failed to scan users table: {err:?}");

            let error_body = json!({
                "error": "Failed to get users",
                "message": err.to_string()
            }).to_string();

            ApiGatewayProxyResponse {
                status_code: 500,
                multi_value_headers: headers.clone(),
                is_base64_encoded: false,
                body: Some(Body::Text(error_body)),
                headers,
            }
        }
    };

    Ok(resp)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt().json()
        .with_max_level(tracing::Level::INFO)
        .with_current_span(false)
        .with_ansi(false)
        .without_time()
        .with_target(false)
        .init();

    run(service_fn(handler)).await
}

post_function.rsの改修

get_function.rs同様にUser構造体を定義しjsonをパースするようにします。

各パラメータをDynamoDBのput_item()で登録するという流れです。返却するボディはメッセージと登録したuserそのままを返します。

post_function.rs
use aws_config::BehaviorVersion;
use aws_lambda_events::{apigw::{ApiGatewayProxyRequest, ApiGatewayProxyResponse}, encodings::Body, http::HeaderMap};
use aws_sdk_dynamodb::Client;
use lambda_runtime::{Error, LambdaEvent, run, service_fn};
use serde::{Deserialize, Serialize};
use serde_json::json;

#[derive(Deserialize, Serialize)]
struct User {
    user_id: String,
    name: String,
    email: String,
}

pub async fn handler(
    event: LambdaEvent<ApiGatewayProxyRequest>,
) -> Result<ApiGatewayProxyResponse, Error> {

    let config = aws_config::defaults(BehaviorVersion::latest())
        .endpoint_url("http://localstack:4566")
        .load()
        .await;

    let client = Client::new(&config);

    let mut headers = HeaderMap::new();
    headers.insert("content-type", "application/json".parse().unwrap());

    let body = event.payload.body.as_ref();

    let resp = match body {
        Some(body_str) => {
            match serde_json::from_str::<User>(body_str) {
                Ok(user) => {
                    let result = client
                        .put_item()
                        .table_name("users")
                        .item("user_id", aws_sdk_dynamodb::types::AttributeValue::S(user.user_id.clone()))
                        .item("name", aws_sdk_dynamodb::types::AttributeValue::S(user.name.clone()))
                        .item("email", aws_sdk_dynamodb::types::AttributeValue::S(user.email.clone()))
                        .send()
                        .await;

                    match result {
                        Ok(_) => {
                            let response_body = json!({
                                "message": "User created successfully",
                                "user": user
                            }).to_string();

                            ApiGatewayProxyResponse {
                                status_code: 201,
                                multi_value_headers: headers.clone(),
                                is_base64_encoded: false,
                                body: Some(Body::Text(response_body)),
                                headers,
                            }
                        }
                        Err(err) => {
                            eprintln!("Failed to put item: {err:?}");

                            let error_body = json!({
                                "error": "Failed to create user",
                                "message": err.to_string()
                            }).to_string();

                            ApiGatewayProxyResponse {
                                status_code: 500,
                                multi_value_headers: headers.clone(),
                                is_base64_encoded: false,
                                body: Some(Body::Text(error_body)),
                                headers,
                            }
                        }
                    }
                }
                Err(err) => {
                    eprintln!("Failed to parse JSON: {err:?}");

                    let error_body = json!({
                        "error": "Invalid request body",
                        "message": err.to_string()
                    }).to_string();

                    ApiGatewayProxyResponse {
                        status_code: 400,
                        multi_value_headers: headers.clone(),
                        is_base64_encoded: false,
                        body: Some(Body::Text(error_body)),
                        headers,
                    }
                }
            }
        }
        None => {
            let error_body = json!({
                "error": "Request body is required"
            }).to_string();

            ApiGatewayProxyResponse {
                status_code: 400,
                multi_value_headers: headers.clone(),
                is_base64_encoded: false,
                body: Some(Body::Text(error_body)),
                headers,
            }
        }
    };

    Ok(resp)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    run(service_fn(handler)).await
}

実行する

  1. ビルドしてローカルサーバを起動する

    sam build
    sam local start-api --docker-network <DOCKER NETWORK ID>
    
  2. POST

    まずはユーザーを登録します。

    • リクエスト
    curl --location 'http://127.0.0.1:3000/' \
    --header 'Content-Type: application/json' \
    --data-raw '{
    "user_id": "001",
    "name": "田中太郎",
    "email": "tanaka@example.com"
    }'
    
    • レスポンス
    {
        "message": "User created successfully",
        "user": {
            "email": "tanaka@example.com",
            "name": "田中太郎",
            "user_id": "001"
        }
    }
    
  3. GET

    今登録したユーザーを取得します。

    • リクエスト
    curl --location 'http://127.0.0.1:3000/'
    
    • レスポンス
    {
        "count": 1,
        "users": [
            {
                "email": "tanaka@example.com",
                "name": "田中太郎",
                "user_id": "001"
            }
        ]
    }
    

実装のポイント

エラーハンドリング

今回の実装では、以下のようなエラーケースに対応しています。

  1. DynamoDBアクセスエラー (500 Internal Server Error)

    • DynamoDBへの接続に失敗した場合
    • scan/put_itemの実行に失敗した場合
  2. リクエストボディのバリデーションエラー (400 Bad Request)

    • リクエストボディが空の場合
    • JSON形式が不正な場合
    • 必須フィールドが欠けている場合

これにより、適切なHTTPステータスコードとエラーメッセージを返すことができます。

DynamoDBのデータ型変換

DynamoDBから取得したデータはAttributeValue型で返ってくるため、Rustの構造体に変換する必要があります。

item.get("user_id").and_then(|v| v.as_s().ok())

この処理では:

  • get()でAttributeValueを取得
  • as_s()で文字列型に変換を試みる
  • ok()でResult型をOption型に変換

複数のフィールドを同時にチェックするため、タプルパターンマッチングを使用しています。

LocalStackとの通信

LocalStackはDockerコンテナで起動しているため、Lambda関数からアクセスする際はendpoint_urlを指定する必要があります。

let config = aws_config::defaults(BehaviorVersion::latest())
    .endpoint_url("http://localstack:4566")
    .load()
    .await;

localstackというホスト名は、--docker-networkオプションで同じネットワークに接続することで名前解決が可能になります。

実際の開発での応用例

この構成は以下のような場合に有効です。

1. ローカル開発環境の構築

  • インターネット接続不要でDynamoDBを使った開発が可能
  • AWS利用料金を気にせず開発・テストができる
  • チーム全体で統一された開発環境を構築できる
  • テストデータの準備と削除が容易

2. 複数のAWSサービスとの統合テスト

  • LocalStackはDynamoDB以外にもS3、SNS、SQSなど様々なAWSサービスをサポート
  • ローカル環境で複雑なマイクロサービス構成のテストが可能

ハマりポイントと対処法

開発中に遭遇しやすい問題と解決策をまとめます。

1. Dockerネットワークの設定

問題: Lambda関数からLocalStackに接続できない

対処法:

# LocalStackのネットワークIDを確認
docker network ls

# sam local start-apiで同じネットワークを指定
sam local start-api --docker-network <NETWORK_ID>

2. エンドポイントURLの指定

問題: localhost:4566でアクセスしようとしてエラーになる

対処法: Lambda関数もDockerコンテナで実行されるため、localhostではなくlocalstackというホスト名を使用する必要があります。

3. 環境変数での設定切り替え

問題: 本番環境とローカル環境でエンドポイントを切り替えたい

対処法: 環境変数を使って動的に切り替える実装例。

let endpoint_url = std::env::var("DYNAMODB_ENDPOINT")
    .ok();

let mut config_loader = aws_config::defaults(BehaviorVersion::latest());

if let Some(url) = endpoint_url {
    config_loader = config_loader.endpoint_url(url);
}

let config = config_loader.load().await;

まとめ

今回は前回の記事に引き続き、Rust + SAM + LocalStackを使ってより実践的なAPIを実装しました。

実装したポイント:

  • DynamoDBへのデータ登録(POST)
  • DynamoDBからのデータ取得(GET)
  • エラーハンドリング
  • JSON形式でのリクエスト/レスポンス処理

この構成を使うことで、AWSへデプロイせずにローカル環境で開発・テストが可能になります。

ぜひ参考にして、Rustでのサーバーレス開発を楽しんでください!

GitHubで編集を提案

Discussion