並行処理で値を共有する際のMutexとRwLockの違い
- Mutex: 読み取り書き込み問わず排他的に処理
- RwLock: 読み取りは並行に処理、書き込みは排他的
読み取りの処理が支配的な場合はRwLockを使ったほうが効率的な処理をしやすいですが、read(), write()を一つのスレッド内で使おうとするとデッドロックを起こしやすいコードになるなど、運用を少し慎重にする必要があります。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let lock = Arc::new(RwLock::new(0));
let mut handles = vec![];
for _ in 0..5 {
let lock_clone = Arc::clone(&lock);
let handle = thread::spawn(move || {
let _read_guard = lock_clone.read().unwrap();
println!("Read lock acquired");
// ここで書き込みロックを取得しようとするとデッドロックが発生
let mut write_guard = lock_clone.write().unwrap();
println!("Write lock acquired");
*write_guard += 1;
println!("Value incremented");
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
RwLockは書き込みロックを行う場合、それ以前のすべてのread()が解決された後に実行されるようになります。
読み取りの値が解消されるのはスコープを抜けたときなため、read()した後に同一スコープ内でwrite()しようとすると永遠にread()による読み取りロックがドロップしないためデッドロックします。
※ちなみにですが、write()した後のread()はwrite()のロックガードがドロップされた時点で実行されるようになります。
例えば各スレッドがループ処理に入る前にread()で読み取りロックしてしまっていたりすると、永久に読み取りロックによって書き込みロックがブロックされることになったりします。
こういったデッドロックはエラーとして現れづらく、長大なコードをいざ実行して長時間経過するとなぜか発生する。なんてケースに遭遇することがあり、そうなると泣きたくなってきますね。
もし先ほどのコードを直すとするなら以下のようになるかもしれません。
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Instant;
fn main() {
let lock = Arc::new(RwLock::new(0));
let mut handles = vec![];
let start = Instant::now();
for _ in 0..5 {
let lock_clone = Arc::clone(&lock);
let handle = thread::spawn(move || {
{
let _read_guard = lock_clone.read().unwrap();
println!("Read lock acquired");
}
let mut write_guard = lock_clone.write().unwrap();
println!("Write lock acquired");
*write_guard += 1;
println!("Value incremented");
});
handles.push(handle);
}
// すべてのスレッドが終了するのを待つ
for handle in handles {
handle.join().unwrap();
}
let duration = start.elapsed();
println!("Total execution time: {:?}", duration);
}
読み取りのロックガードが所有権の関係で、スコープを抜けた時点でドロップする特徴を利用してデッドロックを回避しています。
Mutexの場合はそもそも読み取り、書き込みといったロックの区別はないため、同一スレッド内で読み取り書き込みをしたいと思った場合でも、特に所有権を意識する必要がなくシンプルに書けます。
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;
fn main() {
let lock = Arc::new(Mutex::new(0));
let mut handles = vec![];
let start = Instant::now();
for _ in 0..5 {
let lock_clone = Arc::clone(&lock);
let handle = thread::spawn(move || {
{
let mut guard = lock_clone.lock().unwrap();
println!("Lock acquired for reading and writing");
// ここで読み取りと書き込みを行う
let value = *guard;
println!("Read value: {}", value);
*guard += 1;
println!("Incremented value: {}", *guard);
} //
});
handles.push(handle);
}
// すべてのスレッドが終了するのを待つ
for handle in handles {
handle.join().unwrap();
}
let duration = start.elapsed();
println!("Total execution time: {:?}", duration);
}
とはいえ、読み取りが支配的な場合いちいち排他的なロックを取得してしまうとほかの全てのスレッドの処理を止めてしまうことになりかねないため、効率的な処理を行うならRwLockを使ったほうが無難に思います。
またそもそもの話になりますが、並行処理で共有する状態を管理していくなら個人的にはオブザーバーパターンでstd::sync::mpsc
やstd::sync::broadcast
を使って行くスタイルのほうが危なそうな場所が少なくできてよいような気がします。
Discussion