👾

Rust | Arc<RwLock<T>> と Arc<Mutex<T>> で並行処理の読み取りと書き込みを実装する

2024/09/30に公開

Rc と Arc

よく似たような文脈で登場することがあるので、
Arc に触れる前に Rc にも触れておきたいと思います!!!

いずれも、データを複数の所有者で共有したい場合に使用するという共通点があります。

std::rc::Rc

RcReference Counted の略で、シングルスレッドの参照カウントポインタです。

シングルスレッド内でのみ使用可能という制限がありますが、メモリ使用量は Arc よりも少ないという特徴があります。

デフォルトでは Rc で包まれたデータは読み取り専用です。

また、循環参照を作成することが可能ですが、メモリリークの原因となる可能性があるので注意が必要です。

use std::rc::Rc;

struct User {
    name: String,
    age: u32,
}

fn main() {
    // Rc を使用してユーザーデータを作成
    let user = Rc::new(User {
        name: String::from("Alice"),
        age: 30,
    });

    // Rc のクローンを作成(参照カウントが増加)
    let user_clone1 = user.clone();
    let user_clone2 = user.clone();

    // 全ての参照から数値を読み取る
    println!("元の参照: {} は {} 歳です", user.name, user.age);
    // -> 元の参照: Alice は 30 歳です

    println!("クローン1: {} は {} 歳です", user_clone1.name, user_clone1.age);
    // -> クローン1: Alice は 30 歳です

    println!("クローン2: {} は {} 歳です", user_clone2.name, user_clone2.age);
    // -> クローン2: Alice は 30 歳です

    // 参照カウントを確認
    println!("参照カウント: {}", Rc::strong_count(&user));
    // -> 参照カウント: 3

    // クローンをドロップ
    drop(user_clone1);
    println!("クローン1をドロップした後の参照カウント: {}", Rc::strong_count(&user));
    // -> クローン1をドロップした後の参照カウント: 2

    // スコープを抜けると、残りの参照も自動的にドロップされる
}

std::sync::Arc

ArcAtomically Reference Counted の略で、スレッドセーフな参照カウントポインタです。

Rc とは異なり、複数のスレッド間でデータを安全に共有できます🥳✨

デフォルトでは Arc で包まれたデータは読み取り専用のため、変更を可能にするために後述する RwLockMutex と併用することが多いです!

use std::sync::Arc;
use std::thread;

struct User {
    name: String,
    age: u32,
}

fn main() {
    // Arc を使用してユーザーデータを作成
    let user = Arc::new(User {
        name: String::from("Alice"),
        age: 30,
    });

    // Rc のクローンを作成(参照カウントが増加)
    let user_clone1 = user.clone();
    let user_clone2 = user.clone();

    // 全ての参照から数値を読み取る
    println!("元の参照: {} は {} 歳です", user.name, user.age);
    // -> 元の参照: Alice は 30 歳です

    let thread_for_user_clone1 = thread::spawn(move || {
        println!("クローン1: {} は {} 歳です", user_clone1.name, user_clone1.age);
        // -> クローン1: Alice は 30 歳です
    });

    let thread_for_user_clone2 = thread::spawn(move || {
        println!("クローン2: {} は {} 歳です", user_clone2.name, user_clone2.age);
        // -> クローン2: Alice は 30 歳です
    });

    thread_for_user_clone1.join().unwrap();
    thread_for_user_clone2.join().unwrap();
}

RwLock と Mutex

RwLockMutex には、以下の違いがあります。

  • RwLock:読み取りは並行に処理、書き込みは排他的に処理
  • Mutex:読み取り書き込み問わず排他的に処理

std::sync::RwLock

RwLockreader-writer lock の意味で、ロック時に 1 度に複数の読み取りまたは最大 1 つの書き込みが許可されます。

サンプルコードの主な特徴は以下の通りです。

  • Arc を使用することで、複数のスレッド間でデータを安全に共有できます。
  • RwLock により、複数の読み取りか単一の書き込みを同時に行うことができます。
  • 読み取り操作には read() メソッドを、書き込み操作には write() メソッドを使用してロックを行います。

このパターンは、複数のスレッドから共有データへのアクセスが必要で、読み取り操作が書き込み操作よりも頻繁に行われる場合に特に有用です。

use std::sync::{Arc, RwLock};
use std::thread;

struct User {
    name: String,
    age: u32,
}

fn main() {
    // Arc<RwLock<T>>を使用してデータを初期化
    let user = Arc::new(RwLock::new(User {
        name: String::from("Alice"),
        age: 30,
    }));

    // スレッドのハンドルを保持するベクター
    let mut handles = vec![];

    // 10個の読み取りスレッドを作成
    for i in 0..10 {
        let data_clone = Arc::clone(&user);
        let handle = thread::spawn(move || {
            // データの読み取り
            let shared = data_clone.read().unwrap();
            println!("読み取りスレッド {}: {} は {} 歳です", i, shared.name, shared.age);
        });
        handles.push(handle);
    }

    // 5個の書き込みスレッドを作成
    for i in 0..5 {
        let data_clone = Arc::clone(&user);
        let handle = thread::spawn(move || {
            // データの書き込み
            let mut shared = data_clone.write().unwrap();
            shared.age += 1;
            shared.name = format!("Alice({})", i);

            println!("書き込みスレッド {}: カウンターを更新しました", i);
        });
        handles.push(handle);
    }


    // すべてのスレッドが終了するのを待つ
    for handle in handles {
        handle.join().unwrap();
    }

    // 最終的な状態を確認
    let final_state = user.read().unwrap();
    println!("最終状態: {} は {} 歳です", final_state.name, final_state.age);
}

実行結果の一例です。

読み取りスレッド 1: Alice は 30 歳です
読み取りスレッド 5: Alice は 30 歳です
読み取りスレッド 3: Alice は 30 歳です
読み取りスレッド 8: Alice は 30 歳です
読み取りスレッド 0: Alice は 30 歳です
読み取りスレッド 4: Alice は 30 歳です
読み取りスレッド 9: Alice は 30 歳です
読み取りスレッド 2: Alice は 30 歳です
書き込みスレッド 0: カウンターを更新しました
書き込みスレッド 1: カウンターを更新しました
読み取りスレッド 6: Alice(1) は 32 歳です
書き込みスレッド 2: カウンターを更新しました
書き込みスレッド 4: カウンターを更新しました
読み取りスレッド 7: Alice(4) は 34 歳です
書き込みスレッド 3: カウンターを更新しました
最終状態: Alice(3) は 35 歳です
別の実行結果

並行で処理されるため、実行ごとに結果は異なります。

読み取りスレッド 0: Alice は 30 歳です
読み取りスレッド 1: Alice は 30 歳です
読み取りスレッド 4: Alice は 30 歳です
読み取りスレッド 2: Alice は 30 歳です
読み取りスレッド 5: Alice は 30 歳です
読み取りスレッド 3: Alice は 30 歳です
読み取りスレッド 6: Alice は 30 歳です
読み取りスレッド 7: Alice は 30 歳です
読み取りスレッド 8: Alice は 30 歳です
読み取りスレッド 9: Alice は 30 歳です
書き込みスレッド 0: カウンターを更新しました
書き込みスレッド 1: カウンターを更新しました
書き込みスレッド 2: カウンターを更新しました
書き込みスレッド 3: カウンターを更新しました
書き込みスレッド 4: カウンターを更新しました
最終状態: Alice(4) は 35 歳です

RwLock のユースケース

RwLock は axum/examples でも登場します。

axum の State で利用する際に使用されているケースが多いです。

examples/key-value-store/src/main.rs

type SharedState = Arc<RwLock<AppState>>;

#[derive(Default)]
struct AppState {
    db: HashMap<String, Bytes>,
}

std::sync::Mutex

Mutex は共有データの保護に役立つ相互排他プリミティブです。

RwLock とは異なり、読み取り書き込み問わず排他的に処理します。

サンプルコードの主な特徴は以下の通りです。

  • Arc を使用することで、複数のスレッド間でデータを安全に共有できます。
  • Mutex により、一度に1つのスレッドだけがデータにアクセスできるようになります。
  • データにアクセスするには lock() メソッドを使用し、MutexGuard を取得します。
  • MutexGuard がスコープを抜けると自動的にロックが解放されます。

このパターンは、複数のスレッドから共有データへの排他的なアクセスが必要な場合に適しています。

ただし、読み取り操作が頻繁で書き込みが少ない場合は、前述の Arc<RwLock<T>> の方が適している可能性があります。

use std::sync::{Arc, Mutex};
use std::thread;

struct User {
    name: String,
    age: u32,
}

fn main() {
    // Arc<Mutex<T>>を使用してデータを初期化
    let user = Arc::new(Mutex::new(User {
        name: String::from("Alice"),
        age: 30,
    }));

    // スレッドのハンドルを保持するベクター
    let mut handles = vec![];

    // 20個のスレッドを作成
    for i in 0..20 {
        let data_clone = Arc::clone(&user);
        let handle = thread::spawn(move || {
            // データの書き込み
            let mut shared = data_clone.lock().unwrap();
            shared.age += 1;
            shared.name = format!("Alice({})", i);

            println!("スレッド {}: カウンターを更新しました", i);

            // ロックは自動的に解放される
        });
        handles.push(handle);
    }

    // すべてのスレッドが終了するのを待つ
    for handle in handles {
        handle.join().unwrap();
    }

    // 最終的な状態を確認
    let final_state = user.lock().unwrap();
    println!("最終状態: {} は {} 歳です", final_state.name, final_state.age);
}

実行結果の一例です。

スレッド 0: カウンターを更新しました
スレッド 1: カウンターを更新しました
スレッド 4: カウンターを更新しました
スレッド 5: カウンターを更新しました
スレッド 2: カウンターを更新しました
スレッド 3: カウンターを更新しました
スレッド 6: カウンターを更新しました
スレッド 7: カウンターを更新しました
スレッド 8: カウンターを更新しました
スレッド 9: カウンターを更新しました
スレッド 10: カウンターを更新しました
スレッド 12: カウンターを更新しました
スレッド 11: カウンターを更新しました
スレッド 14: カウンターを更新しました
スレッド 15: カウンターを更新しました
スレッド 16: カウンターを更新しました
スレッド 17: カウンターを更新しました
スレッド 18: カウンターを更新しました
スレッド 19: カウンターを更新しました
スレッド 13: カウンターを更新しました
最終状態: Alice(13) は 50 歳です
別の実行結果

並行で処理されるため、実行ごとに結果は異なります。

スレッド 0: カウンターを更新しました
スレッド 5: カウンターを更新しました
スレッド 6: カウンターを更新しました
スレッド 7: カウンターを更新しました
スレッド 8: カウンターを更新しました
スレッド 9: カウンターを更新しました
スレッド 10: カウンターを更新しました
スレッド 11: カウンターを更新しました
スレッド 12: カウンターを更新しました
スレッド 13: カウンターを更新しました
スレッド 14: カウンターを更新しました
スレッド 15: カウンターを更新しました
スレッド 16: カウンターを更新しました
スレッド 17: カウンターを更新しました
スレッド 1: カウンターを更新しました
スレッド 2: カウンターを更新しました
スレッド 3: カウンターを更新しました
スレッド 18: カウンターを更新しました
スレッド 19: カウンターを更新しました
スレッド 4: カウンターを更新しました
最終状態: Alice(4) は 50 歳です

Mutex のユースケース

axum/examples では RwLock でしたが、自分は State で利用する際に Mutex を使用しています。

読み書きのどちらも排他制御することでデータの整合性を保ちたいということで、Mutex で実装してみました。

mocks/src/server/state.rs

use crate::storage::Storage;
use std::sync::{Arc, Mutex};

pub type SharedState = Arc<Mutex<AppState>>;

pub struct AppState {
    pub storage: Storage,
}

impl AppState {
    pub fn new(storage: Storage) -> SharedState {
        Arc::new(Mutex::new(AppState { storage }))
    }
}

まとめ

Rust で並行処理を行う上で重要となってくる Arc<RwLock<T>>Arc<Mutex<T>> について触れてみました!

個人的には以下のように使い分けかなと思います🤔✨

  • データの読み取りが多い -> RwLock
  • データの書き込みが多い -> Mutex
  • パフォーマンス重視 -> RwLock
  • 安全性重視 -> Mutex

参考

Discussion