Rust | SQLx で transaction & commit / rollbackを実装する
Transaction in sqlx
The Rust SQL Toolkit である SQLx でトランザクションを実装していきます。
データベースの準備
注文伝票とその明細行を表すテーブルを作成します。
order_slip_details.order_slip_id
には、order_slips.id
との外部キー制約を設けています。
create table if not exists order_slips (
id serial primary key,
title varchar not null,
created_at timestamp not null default now(),
updated_at timestamp not null default now()
);
create table if not exists order_slip_details (
id serial primary key,
order_slip_id integer,
product_name varchar not null,
product_count integer not null,
created_at timestamp not null default now(),
updated_at timestamp not null default now(),
foreign key (order_slip_id) references order_slips(id)
);
クレートと .env の準備
非同期処理ランタイムには tokio を使います。
cargo add sqlx --features "postgres runtime-tokio-native-tls chrono"
cargo add tokio --features=full
cargo add dotenv
データベースの接続 URL を定義しておきます。
DATABASE_URL=postgres://postgres:postgres@localhost:5432/sample_db
実装サンプル
文房具の注文するというケースを想定して実装してみました。
ポイントは、各関数の引数に tx: &mut Transaction<'_, Postgres>,
を持っているところでしょうか。
生成した同一のトランザクションを引き回しています。
そして、最後に commit
を実行しています。
トランザクションが out-of-scope になる前に commit
もしくは rollback
のどちらも呼び出されなかった場合、
ロールバックが呼び出される仕様になっています。[1]
use sqlx::postgres::PgPoolOptions;
use sqlx::types::chrono::NaiveDateTime;
use sqlx::{Postgres, Transaction};
use std::env;
#[tokio::main]
async fn main() {
// 環境変数を読み取り、URLを生成
dotenv::dotenv().expect("Failed to read .env file");
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
// DBコネクションを生成
let db = PgPoolOptions::new()
.max_connections(10)
.connect(&database_url)
.await
.unwrap_or_else(|_| panic!("Cannot connect to the database"));
// トランザクションを生成する
let mut tx = db.begin().await.expect("transaction error.");
// 注文伝票データを作成する
match create_order_slip(&mut tx, "注文書").await {
Ok(os) => {
// 明細行データを生成
let details = vec![
NewOrderSlipDetail {
order_slip_id: os.id,
product_name: "えんぴつ".to_string(),
product_count: 10,
},
NewOrderSlipDetail {
order_slip_id: os.id,
product_name: "消しゴム".to_string(),
product_count: 3,
},
NewOrderSlipDetail {
order_slip_id: os.id,
product_name: "ボールペン".to_string(),
product_count: 5,
},
];
// 注文伝票に紐づく明細行データを作成する
match create_order_slip_details(&mut tx, details).await {
Ok(_) => println!("done."),
Err(e) => println!("{:?}", e),
}
}
Err(e) => println!("{:?}", e),
}
// コミット
// ここで rollback() を指定することも可能
let _ = tx.commit().await.unwrap_or_else(|e| {
println!("{:?}", e);
});
}
#[derive(Debug)]
struct OrderSlip {
id: i32,
title: String,
created_at: NaiveDateTime,
updated_at: NaiveDateTime,
}
#[derive(Debug)]
struct OrderSlipDetail {
id: i32,
order_slip_id: i32,
product_name: String,
product_count: i32,
created_at: NaiveDateTime,
updated_at: NaiveDateTime,
}
struct NewOrderSlipDetail {
order_slip_id: i32,
product_name: String,
product_count: i32,
}
async fn create_order_slip(
tx: &mut Transaction<'_, Postgres>,
title: &str,
) -> Result<OrderSlip, sqlx::Error> {
sqlx::query_as!(
OrderSlip,
r#"
insert into
order_slips (title)
values
($1)
returning *
"#,
title.to_string()
)
.fetch_one(&mut **tx)
.await
}
async fn create_order_slip_details(
tx: &mut Transaction<'_, Postgres>,
details: Vec<NewOrderSlipDetail>,
) -> Result<(), sqlx::Error> {
for d in details {
let _ = sqlx::query!(
r#"
insert into
order_slip_details (order_slip_id, product_name, product_count)
values
($1, $2, $3)
"#,
d.order_slip_id,
d.product_name,
d.product_count
)
.execute(&mut **tx)
.await?;
}
Ok(())
}
実行し、テーブルを確認すると、以下のような状態になっていると思います。
order_slips
id | title | created_at | updated_at |
---|---|---|---|
1 | 注文書 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
order_slip_details
id | order_slip_id | product_name | count | created_at | updated_at |
---|---|---|---|---|---|
1 | 1 | えんぴつ | 10 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
2 | 1 | 消しゴム | 3 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
3 | 1 | ボールペン | 5 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
ソースコードに細工をして、無理やりエラーを発生させてみます。
async fn create_order_slip_details(
tx: &mut Transaction<'_, Postgres>,
details: Vec<NewOrderSlipDetail>,
) -> Result<(), sqlx::Error> {
for d in details {
let _ = sqlx::query!(
r#"
insert into
order_slip_details (order_slip_id, product_name, count)
values
($1, $2, $3)
"#,
+ d.order_slip_id,
- 1000, // 存在しないID
d.product_name,
d.count
)
.execute(&mut **tx)
.await?;
}
Ok(())
}
存在しない注文伝票IDのため、外部キー制約違反でエラーが発生します。
Database(PgDatabaseError { severity: Error, code: "23503", message: "insert or update on table \"order_slip_details\" violates foreign key constraint \"order_slip_details_order_slip_id_fkey\"", detail: Some("Key (order_slip_id)=(1000) is not present in table \"order_slips\"."), hint: None, position: None, where: None, schema: Some("public"), table: Some("order_slip_details"), column: None, data_type: None, constraint: Some("order_slip_details_order_slip_id_fkey"), file: Some("ri_triggers.c"), line: Some(2528), routine: Some("ri_ReportViolation") })
ソースコードを元に戻して、もう一度実行しテーブルの状態を確認します。
order_slips
id | title | created_at | updated_at |
---|---|---|---|
1 | 注文書 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
3 | 注文書 | 2023-09-23 12:43:16.697 | 2023-09-23 12:43:16.697 |
order_slip_details
id | order_slip_id | product_name | count | created_at | updated_at |
---|---|---|---|---|---|
1 | 1 | えんぴつ | 10 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
2 | 1 | 消しゴム | 3 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
3 | 1 | ボールペン | 5 | 2023-09-23 12:36:29.557 | 2023-09-23 12:36:29.557 |
5 | 3 | えんぴつ | 10 | 2023-09-23 12:43:16.697 | 2023-09-23 12:43:16.697 |
6 | 3 | 消しゴム | 3 | 2023-09-23 12:43:16.697 | 2023-09-23 12:43:16.697 |
7 | 3 | ボールペン | 5 | 2023-09-23 12:43:16.697 | 2023-09-23 12:43:16.697 |
order_slips の id や order_slip_details の id が歯抜けになっていることが分かります。
正常にトランザクション・コミット・ロールバックが機能していることが分かりました!
めでたし、めでたし👏✨
まとめ
公式の Examples を参考に実装することができました😘
Discussion