🐘

UPDATE/DELETEと並行実行 (一人自作RDBMS Advent Calendar 2025 8日目)

に公開

この記事は「一人自作RDBMS Advent Calendar 2025」8日目の記事です。

本日の実装はGitHubにあります。昨日からの差分は以下のコマンドで確認できます。

git diff --no-index day07 day08

今日のゴール

UPDATE/DELETE文の実装と、並行実行への対応を行います。今週からトランザクションの実装に入りますが、その準備としてこれらを実装しておきます。今日は新しい概念というよりは、これまでの実装の延長と下準備的な内容です。

UPDATE/DELETEの実装方針

UPDATE/DELETEの実装方法にはいくつかのアプローチがあります。今回は、来週あたりにMVCC(Multi-Version Concurrency Control)を実装する予定があるため、MVCCと親和性の高いシンプルな方式を採用しました。

  • DELETE: スロットのlengthを0にマークする(ソフトデリート)
  • UPDATE: DELETE + INSERTで実現

この方式は物理削除と比べてシンプルで、MVCCでは「古いバージョンを残して新しいバージョンを追加する」という考え方になるため、今回の実装もその発想に近いものになっています。

DELETE: ソフトデリート

pub fn delete(&mut self, slot_id: u16) -> Result<()> {
    if slot_id >= self.tuple_count() {
        bail!("slot {} does not exist", slot_id);
    }
    let (offset, length) = self.get_slot(slot_id);
    if length == 0 {
        bail!("slot {} is already deleted", slot_id);
    }
    // Mark as deleted by setting length to 0
    self.set_slot(slot_id, offset, 0);
    Ok(())
}

get_tuple()length == 0のスロットをNoneとして返すため、SeqScanは削除済みタプルを自動的にスキップします。

UPDATE: DELETE + INSERT

// Delete old tuple
let page_arc = self.bpm.lock().unwrap().fetch_page_mut(rid.page_id)?;
let mut page_guard = page_arc.write().unwrap();
page_guard.delete(rid.slot_id)?;
drop(page_guard);
self.bpm.lock().unwrap().unpin_page(rid.page_id, true)?;

// Insert new tuple
let tuple_data = serialize_tuple(&new_values);
Self::insert_tuple(&self.bpm, &tuple_data)?;

Row ID (RID)

DELETE/UPDATEでは対象タプルの物理位置を知る必要があるため、Ridを導入しました。

pub struct Rid {
    pub page_id: u32,
    pub slot_id: u16,
}

pub struct Tuple {
    pub values: Vec<Value>,
    pub rid: Option<Rid>,
}

SeqScanがタプルを返す際にRIDを付与し、DeleteExecutor/UpdateExecutorがそれを使って対象を特定します。

並行実行への対応

Rustの所有権と並行処理

並行処理を導入すると、複数スレッドからBufferPoolManagerにアクセスする必要が出てきます。しかしRustの所有権システムでは、可変参照(&mut)は同時に一つしか存在できません。

day07までの実装:

pub struct SeqScanExecutor<'a> {
    bpm: &'a mut BufferPoolManager,  // 可変参照
    // ...
}

この形式では、複数のExecutorやスレッドでBufferPoolManagerを共有できません。

Arc: 参照カウント方式のスマートポインタ

Arc(Atomic Reference Counted)は、複数の所有者間でデータを共有するためのスマートポインタです。

let bpm = Arc::new(BufferPoolManager::new(disk_manager));

// Arc::clone()で参照カウントを増やす(データはコピーされない)
let bpm_clone = Arc::clone(&bpm);

Arcは参照カウントをスレッドセーフに管理し、最後の参照がなくなった時点でデータを解放します。

Mutex: 排他制御

Arcだけではデータを共有できますが、複数スレッドから同時に書き込むとデータ競合が起きます。実際、Arc<BufferPoolManager>を複数スレッドで使おうとすると、Rustコンパイラが「BufferPoolManagerSyncトレイトを実装していないのでスレッド間で安全に共有できない」とエラーを出します[1]

Mutexは排他制御を提供し、同時に一つのスレッドだけがデータにアクセスできることを保証します。Mutex<T>TSendであればSyncを実装するため、Mutexでラップすることでコンパイラに「このデータは排他制御されているから安全に共有できる」と伝えられます。

let bpm = Arc::new(Mutex::new(BufferPoolManager::new(disk_manager)));

// lock()でロックを取得し、MutexGuardを得る
let mut guard = bpm.lock().unwrap();
guard.fetch_page(page_id)?;
// guardがスコープを抜けると自動的にロック解放

lock()MutexGuardを返します。このGuardはスコープを抜けると自動的にロックを解放するため、ロックの解放忘れを防げます。これはRAII(Resource Acquisition Is Initialization)と呼ばれるパターンで、リソースの取得をオブジェクトの初期化時に、解放を破棄時に行うことで、リソース管理を安全に行う手法です[2]

RwLock: 読み書きを区別するロック

BufferPoolManagerが返すPageも共有する必要があります。day07では&mut Pageを返していましたが、day08ではArc<RwLock<Page>>を返すように変更しました。

// day07
pub fn fetch_page(&mut self, page_id: u32) -> Result<&mut Page>

// day08
pub fn fetch_page(&mut self, page_id: u32) -> Result<Arc<RwLock<Page>>>

RwLockは読み取りと書き込みを区別します:

  • 読み取りロック(read()): 複数のスレッドが同時に取得可能
  • 書き込みロック(write()): 排他的に一つのスレッドのみ取得可能

SELECTは読み取りのみなので複数同時実行可能、INSERT/UPDATE/DELETEは書き込みなので排他的に実行、という使い分けができます。

// 読み取り
let page_guard = page_arc.read().unwrap();
let tuple_data = page_guard.get_tuple(slot_id);
drop(page_guard);  // 明示的に解放(スコープを抜けても自動解放されるが、早めに解放したい場合)

// 書き込み
let mut page_guard = page_arc.write().unwrap();
page_guard.delete(slot_id)?;
drop(page_guard);

SeqScanExecutorでの適用

これらを組み合わせたSeqScanExecutorの実装:

pub struct SeqScanExecutor<'a> {
    bpm: Arc<Mutex<BufferPoolManager>>,
    // ...
}

fn next(&mut self) -> Result<Option<Tuple>> {
    // 1. BPMのロックを取得してページを取得、すぐにロック解放
    let page_arc = self.bpm.lock().unwrap().fetch_page(self.current_page_id)?;

    // 2. ページの読み取りロックを取得(BPMはロックされていない)
    let page_guard = page_arc.read().unwrap();
    let tuple_data = page_guard.get_tuple(self.current_slot_id);
    // ...
    drop(page_guard);

    // 3. 再度BPMをロックしてunpin
    self.bpm.lock().unwrap().unpin_page(self.current_page_id, false)?;
    // ...
}

ポイントは、BPMのロックを長時間保持しないことです。fetch_pageunpin_pageの呼び出し時のみBPMをロックし、ページの読み取り中は解放しておくことで、他のスレッドもBPMにアクセスできます。

マルチスレッド接続

thread::spawnによる並行処理

コネクションごとにスレッドを生成します。

for stream in listener.incoming() {
    match stream {
        Ok(stream) => {
            let conn = Connection::new(stream);
            let catalog = Arc::clone(&self.catalog);
            let bpm = Arc::clone(&self.bpm);

            thread::spawn(move || {
                Self::handle_client(conn, catalog, bpm)
            });
        }
        // ...
    }
}

Arc::clone()で参照を複製し、moveクロージャで新しいスレッドに所有権を移動します。

Instanceの変更

pub struct Instance {
    catalog: Arc<Catalog>,
    bpm: Arc<Mutex<BufferPoolManager>>,
}

CatalogとBufferPoolManagerをArcでラップし、複数スレッドで共有できるようにしました。

動作確認

INSERT INTO users VALUES (1, 'Alice');
INSERT INTO users VALUES (2, 'Bob');
INSERT INTO users VALUES (3, 'Charlie');

SELECT * FROM users;
 id |  name
----+---------
  1 | Alice
  2 | Bob
  3 | Charlie
(3 rows)

DELETE FROM users WHERE id = 2;
DELETE 1

SELECT * FROM users;
 id |  name
----+---------
  1 | Alice
  3 | Charlie
(2 rows)

UPDATE users SET name = 'Alice Updated' WHERE id = 1;
UPDATE 1

SELECT * FROM users;
 id |     name
----+---------------
  3 | Charlie
  1 | Alice Updated
(2 rows)

UPDATEの結果でid=1の行が末尾に移動しているのは、DELETE + INSERTで実装しているためです。リレーショナルモデルでは行に順序の概念がないため、これは正常な動作です。

まとめと次回予告

今日はトランザクション実装への準備として、UPDATE/DELETEの実装と、Rustの並行処理プリミティブ(Arc、Mutex、RwLock)を使った並行実行への対応を行いました。

明日からはトランザクションの実装に入ります。まずはCOMMIT/ROLLBACKの簡易実装から始めます。

脚注
  1. Rustではスレッド安全性をSendSyncという2つのトレイトで表現します。Sendは所有権を別スレッドに移動できること、Syncは参照を複数スレッドで共有できることを意味します。詳しくはThe Rust Programming Language - Extensible Concurrency with the Sync and Send Traitsを参照してください。 ↩︎

  2. RAII - Rust By Example ↩︎

GitHubで編集を提案
株式会社primeNumber

Discussion