💬

Amazon Aurora DSQLをRustから試してみました。

2025/01/19に公開

マルチリージョンの分散DBであるAurora DSQLを、Rustからデータ取得、登録を試してみました。

テーブル作成

CREATE TABLE whatnewrsshistory(
 url varchar(255) PRIMARY KEY,
 notifier_name varchar(255),
 category varchar(255) NOT NULL,
 title  varchar(255)
);

256bytesで、テーブル作成しようとしたら、下記エラーが起きた。

他にもPostgreSQLのサポートしていない機能があるので下記参照。
https://docs.aws.amazon.com/ja_jp/aurora-dsql/latest/userguide/working-with-postgresql-compatibility-unsupported-features.html

データ投入

認証トークンは、15分で切れるので、都度取得になりそう。

async fn put_rss(body_json: Json<RSSHistory>) -> (StatusCode) {

    let cfg = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let generator: AuthTokenGenerator = AuthTokenGenerator::new(
        Config::builder()
            .hostname("xxxxxxxxxxxxxxxxxxxxxxxxxx.dsql.us-east-2.on.aws")
            .build()
            .expect("cfg is valid"),
    );
    let token: aws_sdk_dsql::auth_token::AuthToken = generator.db_connect_admin_auth_token(&cfg).await.unwrap();
    println!("{token}");

    // Setup connections
    let connection_options = PgConnectOptions::new()
        .host("xxxxxxxxxxxxxxxxxxxxxxxxxx.dsql.us-east-2.on.aws")
        .port(5432)
        .database("postgres")
        .username("admin")
        .password(token.as_str())
        .ssl_mode(sqlx::postgres::PgSslMode::VerifyFull);

    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect_with(connection_options.clone())
        .await
        .unwrap();

    let rsshistory = sqlx::query(
        "INSERT INTO whatnewrsshistory (url, notifier_name, category, title) VALUES ($1, $2, $3, $4) RETURNING *"
    )
        .bind(body_json.url.clone())
        .bind(body_json.notifier_name.clone())
        .bind(body_json.category.clone())
        .bind(body_json.title.clone())
        .execute(&pool)
        .await;


    pool.close().await;

    return (
        StatusCode::OK
    )

}

データ取得

async fn get_rss(body_json: Json<Value>) -> (StatusCode, Json<RSSHistory>) {

    let cfg = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let generator = AuthTokenGenerator::new(
        Config::builder()
            .hostname("xxxxxxxxxxxxxxxxxxxxxxxxxx.dsql.us-east-2.on.aws")
            .build()
            .expect("cfg is valid"),
    );
    let token: aws_sdk_dsql::auth_token::AuthToken = generator.db_connect_admin_auth_token(&cfg).await.unwrap();
    println!("{token}");

    // Setup connections
    let connection_options = PgConnectOptions::new()
        .host("xxxxxxxxxxxxxxxxxxxxxxxxxx.dsql.us-east-2.on.aws")
        .port(5432)
        .database("postgres")
        .username("admin")
        .password(token.as_str())
        .ssl_mode(sqlx::postgres::PgSslMode::VerifyFull);

    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect_with(connection_options.clone())
        .await
        .unwrap();

    let rsshistory = sqlx::query_as::<_, RSSHistory>("SELECT * FROM whatnewrsshistory")
        .fetch_one(&pool)
        .await
        .unwrap();

    pool.close().await;

    return (
        StatusCode::OK,
        Json(rsshistory)
    )

}

データ取得時に使う構造体

#[derive(serde::Serialize, serde::Deserialize, sqlx::FromRow)]
struct RSSHistory {
    url: String,
    notifier_name: String,
    category: String,
    title: String

Tracing

API Testerで叩く際、2sほどかかっているので、トレーシングログを出力して確認。

async fn main() {


    global::set_text_map_propagator(XrayPropagator::default());

    // トレーシングログの出力設定を行っている。
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::fmt::layer()
                .with_line_number(true)
                .with_file(true)
        )
        .init();`

PgPoolOptions::new()でflush処理?が2sほどかかっているようにみえました。

2025-01-19T00:47:00.424620Z DEBUG sqlx_postgres::options::pgpass: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/sqlx-postgres-0.8.3/src/options/pgpass.rs:48: `.pgpass` file not found path=/Users/hiruta/.pgpass
2025-01-19T00:47:00.425012Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:00.425394Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:00.589680Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:00.754082Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:00.918902Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:01.099740Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:01.262142Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:01.423943Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:01.697173Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:01.863315Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:02.057970Z TRACE hyper::proto::h1::conn: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/hyper-0.14.32/src/proto/h1/conn.rs:731: flushed({role=server}): State { reading: KeepAlive, writing: Init, keep_alive: Busy }
2025-01-19T00:47:02.248029Z DEBUG sqlx::query: /Users/hiruta/.cargo/registry/src/index.crates.io-6f17d22bba15001f/sqlx-core-0.8.3/src/logger.rs:143: summary="SELECT * FROM whatnewrsshistory" db.statement="" rows_affected=0 rows_returned=1 elapsed=384.904791ms elapsed_secs=0.384904791

Cargo.toml

aws-config = { version = "1.1.7" ,features = ["behavior-version-latest"]}
tokio = { version = "1", features = ["full"] }
aws-sdk-dsql = "1.1.0"
serde_json = "1.0.64"
serde = { version = "1.0", features = ["derive"]}
axum = "0.6.0"
sqlx = { version="0.8.3", features = [ "postgres", "runtime-tokio-native-tls"] }
chrono = { version="0.4.39" }
anyhow = { version="1.0.95" }
tracing-subscriber = "0.3.19"

Rust

https://aws.amazon.com/blogs/opensource/verify-the-safety-of-the-rust-standard-library/

Discussion