Rustにおけるスレッド間でのデータ共有とstd::thread::scope
TL;DR
基本的には std::thread::spawn
よりもRust 1.63で安定化された std::thread::scope
を使う方が良いと思います。
threadの寿命はstatic
#[derive(Debug, Clone)]
struct SharedData {
x: i32,
}
impl SharedData {
fn new(x: i32) -> Self {
Self { x }
}
}
fn main() {
use std::thread;
let data = SharedData::new(42);
let jh = thread::spawn(|| {
println!("spawned {:?}", data);
});
jh.join();
}
上のコードは次のようなコンパイルエラーになる。
error[E0373]: closure may outlive the current function, but it borrows `data`, which is owned by the current function
--> src/main.rs:14:28
|
14 | let jh = thread::spawn(|| {
| ^^ may outlive borrowed value `data`
15 | println!("spawned {:?}", data);
| ---- `data` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src/main.rs:14:14
|
14 | let jh = thread::spawn(|| {
| ______________^
15 | | println!("spawned {:?}", data);
16 | | });
| |______^
help: to force the closure to take ownership of `data` (and any other referenced variables), use the `move` keyword
|
14 | let jh = thread::spawn(move || {
| ++++
For more information about this error, try `rustc --explain E0373`.
なぜかと言うとエラーメッセージにも書いてあるが spawn
したスレッドはそれを呼び出した関数(上のコードの場合main関数)が終わった後も生きている可能性があるため、static
ライフタイムを持つデータ以外の参照は出来ないためである。上のコードの場合、すぐにjoin
しているため実際にはmain
関数が終わる前にspawn
したスレッドは終了するのだが、コンパイラにはそれが分からないため上のようなエラーになる。
Arcを使ったスレッド間でのデータ共有
helpにある通り、クロージャーに move
をつければ data
の所有権がスレッド内に移るため、コンパイルが通るようになる。今回はスレッド間でデータを共有したいのでこの方法では上手く行かない。そこでArc
を使う。通常の変数はスコープを抜ける際にdropされるが、Arc
は参照カウントが0にならない限りデータはdropされない。つまり、mainスレッドとspwanしたスレッドにArc
で包んだデータを持たせると、両方のArc
がdropされて参照カウントが0になったときにdropされる。
#[derive(Debug, Clone)]
struct SharedData {
x: i32,
}
impl SharedData {
fn new(x: i32) -> Self {
Self { x }
}
}
fn main() {
use std::sync::Arc;
use std::thread;
let data = Arc::new(SharedData::new(42));
let data2 = Arc::clone(&data); // 同じデータを参照するArcを複製する
let jh = thread::spawn(move || {
println!("spawned {:?}", data2);
});
jh.join().unwrap();
println!("main {:?}", data);
}
実行結果
spawned SharedData { x: 42 }
main SharedData { x: 42 }
ちなみに、参照カウンタを持つものとしては Rc
(Reference Counted) もあるが、こちらは複数のスレッドから参照カウンタを(安全に)増減させることは出来ないため、複数のスレッドから参照カウンタを(安全に)増減させることが出来る Arc
(Atomically Reference Counted) を用いた。上のコードのArc
をRc
に置き換えた下のコードはコンパイルエラーとなる。
#[derive(Debug, Clone)]
struct SharedData {
x: i32,
}
impl SharedData {
fn new(x: i32) -> Self {
Self { x }
}
}
fn main() {
use std::rc::Rc;
use std::thread;
let data = Rc::new(SharedData::new(42));
let data2 = Rc::clone(&data); // 同じデータを参照するRcを複製する
let jh = thread::spawn(move || {
println!("spawned {:?}", data2);
});
jh.join().unwrap();
println!("main {:?}", data);
}
error[E0277]: `Rc<SharedData>` cannot be sent between threads safely
--> src/main.rs:16:14
|
16 | let jh = thread::spawn(move || {
| ______________^^^^^^^^^^^^^_-
| | |
| | `Rc<SharedData>` cannot be sent between threads safely
17 | | println!("spawned {:?}", data2);
18 | | });
| |_____- within this `[closure@src/main.rs:16:28: 18:6]`
|
= help: within `[closure@src/main.rs:16:28: 18:6]`, the trait `Send` is not implemented for `Rc<SharedData>`
= note: required because it appears within the type `[closure@src/main.rs:16:28: 18:6]`
note: required by a bound in `spawn`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `playground` due to previous error
これはエラーメッセージにある通りRc
がSend
トレイトを(わざと)実装していないためである。
Mutexを使ったスレッド間での可変なデータ共有
さて、Arc
を使えば(不変な)データを共有出来ることが分かった。しかし、可変なデータを共有したい場合はどうすれば良いだろうか。
ここで、Rust安全性を支えている概念の一つであるshared XOR mutableについて思い出してみる。これが意味するところはあるデータに対して、不変な参照は複数個持てるし、可変な参照も持てるが、可変な参照を持っているときに(他の)不変or可変な参照は作れないし、不変な参照を持っているときは可変な参照は作れないというものである。このルールからすると複数のスレッドからデータを共有する際は不変でなければならないことになる。つまり、可変なデータを共有することは出来ない。
ただ、今回のように可変なデータを(安全に)共有したい場合もあるためMutex
が用意されている。
これは高々1つしか取れないロックを取れたスレッドだけがデータに対する読み書きが可能になるようにすることでデータを共有しつつも可変な参照が同時に2つ以上存在したり、可変な参照と不変な参照が同時に存在することを防いでくれる。ただし、Mutex
の場合は不変な参照を2つのスレッドが同時に持つことも防いでしまう。(可変な参照も取りたいし、不変な参照を複数のスレッドから同時に持ちたい場合には、RwLock
を使うと良い。)
#[derive(Debug, Clone)]
struct SharedData {
x: i32,
}
impl SharedData {
fn new(x: i32) -> Self {
Self { x }
}
fn inc(&mut self) {
self.x += 1;
}
}
fn main() {
use std::sync::{Arc, Mutex};
use std::thread;
let data = Arc::new(Mutex::new(SharedData::new(42)));
let data2 = Arc::clone(&data);
let jh = thread::spawn(move || {
let mut guard = data2.lock().expect("Couldn't get lock");
guard.inc();
println!("spawned {:?}", *guard);
});
jh.join().unwrap();
let mut guard = data.lock().expect("Couldn't get lock");
guard.inc();
println!("main {:?}", *guard);
}
実行結果
spawned SharedData { x: 43 }
main SharedData { x: 44 }
ちなみに共有したいデータの型によってはatomicで定義されている型を使うことも考えられる。実際、先程紹介したArc
の内部ではAtomicUsize
が使われている。ただし、使うメモリオーダーによっては意図しない結果を招くことがあるので注意して使うこと。
scoped threads
今まで色々と書いてきたが冒頭のコードが動かなかった原因は、threadの寿命がmainより短いにも関わらずコンパイラがそれを知らないためであった。この問題点を解消したのがscope
である。
このAPIは(stdでは)nightlyでしか使えなかったが、次の1.63.0で安定化することに決まった模様。(追記:1.63で安定化されました。)
ただし、今までのstableでも、rayonのscope
かcrossbeamのscope
を使えば(細かい挙動は違うが)似たようなことは出来た。
冒頭のコードをscope
を使うように書き換えたのが以下のコードになる。(Arc
を使っていないことに注意)
#[derive(Debug, Clone)]
struct SharedData {
x: i32,
}
impl SharedData {
fn new(x: i32) -> Self {
Self { x }
}
}
fn main() {
use std::thread;
let data = SharedData::new(42);
thread::scope(|s| {
s.spawn(|| {
println!("spawned {:?}", data);
});
});
}
実行結果
spawned SharedData { x: 42 }
さらに、Mutex
を使わずに可変借用も出来る。(もちろん同時に複数の可変借用を持つことは出来ないが)
#[derive(Debug, Clone)]
struct SharedData {
x: i32,
}
impl SharedData {
fn new(x: i32) -> Self {
Self { x }
}
fn inc(&mut self) {
self.x += 1;
}
}
fn main() {
use std::thread;
let mut data = SharedData::new(42);
thread::scope(|s| {
s.spawn(|| {
data.inc();
println!("spawned {:?}", data);
});
});
data.inc();
println!("main {:?}", data);
}
実行結果
spawned SharedData { x: 43 }
main SharedData { x: 44 }
終わりに
今までスレッドをすぐにjoin
するとしてもstatic
ライフタイムになっていたのが、thread::scope
を使うとスコープ内にライフタイムが収まってくれて便利です。
ただ、繰り返しスレッドを立ち上げてはすぐ終了するような使い方の場合、スレッドプールで実行するrayonのscope
を使うのが良いと思います。
std::sync
には今回紹介しなかったものの便利なものがあるのでどんなものがあるか一度見てみると良いかもしれません。
Discussion