🧵

Rustにおけるスレッド間でのデータ共有とstd::thread::scope

2022/07/28に公開

TL;DR

基本的には std::thread::spawn よりもRust 1.63で安定化された std::thread::scope を使う方が良いと思います。

threadの寿命はstatic

#[derive(Debug, Clone)]
struct SharedData {
    x: i32,
}
impl SharedData {
    fn new(x: i32) -> Self {
        Self { x }
    }
}

fn main() {
    use std::thread;
    let data = SharedData::new(42);
    let jh = thread::spawn(|| {
        println!("spawned {:?}", data);
    });
    jh.join();
}

上のコードは次のようなコンパイルエラーになる。

error[E0373]: closure may outlive the current function, but it borrows `data`, which is owned by the current function
  --> src/main.rs:14:28
   |
14 |     let jh = thread::spawn(|| {
   |                            ^^ may outlive borrowed value `data`
15 |         println!("spawned {:?}", data);
   |                                  ---- `data` is borrowed here
   |
note: function requires argument type to outlive `'static`
  --> src/main.rs:14:14
   |
14 |       let jh = thread::spawn(|| {
   |  ______________^
15 | |         println!("spawned {:?}", data);
16 | |     });
   | |______^
help: to force the closure to take ownership of `data` (and any other referenced variables), use the `move` keyword
   |
14 |     let jh = thread::spawn(move || {
   |                            ++++

For more information about this error, try `rustc --explain E0373`.

なぜかと言うとエラーメッセージにも書いてあるが spawn したスレッドはそれを呼び出した関数(上のコードの場合main関数)が終わった後も生きている可能性があるため、static ライフタイムを持つデータ以外の参照は出来ないためである。上のコードの場合、すぐにjoinしているため実際にはmain関数が終わる前にspawnしたスレッドは終了するのだが、コンパイラにはそれが分からないため上のようなエラーになる。

Arcを使ったスレッド間でのデータ共有

helpにある通り、クロージャーに move をつければ data の所有権がスレッド内に移るため、コンパイルが通るようになる。今回はスレッド間でデータを共有したいのでこの方法では上手く行かない。そこでArcを使う。通常の変数はスコープを抜ける際にdropされるが、Arcは参照カウントが0にならない限りデータはdropされない。つまり、mainスレッドとspwanしたスレッドにArcで包んだデータを持たせると、両方のArcがdropされて参照カウントが0になったときにdropされる。

#[derive(Debug, Clone)]
struct SharedData {
    x: i32,
}
impl SharedData {
    fn new(x: i32) -> Self {
        Self { x }
    }
}

fn main() {
    use std::sync::Arc;
    use std::thread;
    let data = Arc::new(SharedData::new(42));
    let data2 = Arc::clone(&data); // 同じデータを参照するArcを複製する
    let jh = thread::spawn(move || {
        println!("spawned {:?}", data2);
    });
    jh.join().unwrap();
    println!("main {:?}", data);
}

実行結果

spawned SharedData { x: 42 }
main SharedData { x: 42 }

ちなみに、参照カウンタを持つものとしては Rc(Reference Counted) もあるが、こちらは複数のスレッドから参照カウンタを(安全に)増減させることは出来ないため、複数のスレッドから参照カウンタを(安全に)増減させることが出来る Arc(Atomically Reference Counted) を用いた。上のコードのArcRcに置き換えた下のコードはコンパイルエラーとなる。

#[derive(Debug, Clone)]
struct SharedData {
    x: i32,
}
impl SharedData {
    fn new(x: i32) -> Self {
        Self { x }
    }
}

fn main() {
    use std::rc::Rc;
    use std::thread;
    let data = Rc::new(SharedData::new(42));
    let data2 = Rc::clone(&data); // 同じデータを参照するRcを複製する
    let jh = thread::spawn(move || {
        println!("spawned {:?}", data2);
    });
    jh.join().unwrap();
    println!("main {:?}", data);
}
error[E0277]: `Rc<SharedData>` cannot be sent between threads safely
   --> src/main.rs:16:14
    |
16  |       let jh = thread::spawn(move || {
    |  ______________^^^^^^^^^^^^^_-
    | |              |
    | |              `Rc<SharedData>` cannot be sent between threads safely
17  | |         println!("spawned {:?}", data2);
18  | |     });
    | |_____- within this `[closure@src/main.rs:16:28: 18:6]`
    |
    = help: within `[closure@src/main.rs:16:28: 18:6]`, the trait `Send` is not implemented for `Rc<SharedData>`
    = note: required because it appears within the type `[closure@src/main.rs:16:28: 18:6]`
note: required by a bound in `spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `playground` due to previous error

これはエラーメッセージにある通りRcSendトレイトを(わざと)実装していないためである。

Mutexを使ったスレッド間での可変なデータ共有

さて、Arcを使えば(不変な)データを共有出来ることが分かった。しかし、可変なデータを共有したい場合はどうすれば良いだろうか。

ここで、Rust安全性を支えている概念の一つであるshared XOR mutableについて思い出してみる。これが意味するところはあるデータに対して、不変な参照は複数個持てるし、可変な参照も持てるが、可変な参照を持っているときに(他の)不変or可変な参照は作れないし、不変な参照を持っているときは可変な参照は作れないというものである。このルールからすると複数のスレッドからデータを共有する際は不変でなければならないことになる。つまり、可変なデータを共有することは出来ない。

ただ、今回のように可変なデータを(安全に)共有したい場合もあるためMutexが用意されている。
これは高々1つしか取れないロックを取れたスレッドだけがデータに対する読み書きが可能になるようにすることでデータを共有しつつも可変な参照が同時に2つ以上存在したり、可変な参照と不変な参照が同時に存在することを防いでくれる。ただし、Mutexの場合は不変な参照を2つのスレッドが同時に持つことも防いでしまう。(可変な参照も取りたいし、不変な参照を複数のスレッドから同時に持ちたい場合には、RwLockを使うと良い。)

#[derive(Debug, Clone)]
struct SharedData {
    x: i32,
}
impl SharedData {
    fn new(x: i32) -> Self {
        Self { x }
    }
    fn inc(&mut self) {
        self.x += 1;
    }
}

fn main() {
    use std::sync::{Arc, Mutex};
    use std::thread;
    let data = Arc::new(Mutex::new(SharedData::new(42)));
    let data2 = Arc::clone(&data);
    let jh = thread::spawn(move || {
        let mut guard = data2.lock().expect("Couldn't get lock");
        guard.inc();
        println!("spawned {:?}", *guard);
    });
    jh.join().unwrap();
    let mut guard = data.lock().expect("Couldn't get lock");
    guard.inc();
    println!("main {:?}", *guard);
}

実行結果

spawned SharedData { x: 43 }
main SharedData { x: 44 }

ちなみに共有したいデータの型によってはatomicで定義されている型を使うことも考えられる。実際、先程紹介したArcの内部ではAtomicUsizeが使われている。ただし、使うメモリオーダーによっては意図しない結果を招くことがあるので注意して使うこと。

scoped threads

今まで色々と書いてきたが冒頭のコードが動かなかった原因は、threadの寿命がmainより短いにも関わらずコンパイラがそれを知らないためであった。この問題点を解消したのがscopeである。
このAPIは(stdでは)nightlyでしか使えなかったが、次の1.63.0で安定化することに決まった模様。(追記:1.63で安定化されました。)
ただし、今までのstableでも、rayonのscopecrossbeamのscopeを使えば(細かい挙動は違うが)似たようなことは出来た。

冒頭のコードをscopeを使うように書き換えたのが以下のコードになる。(Arcを使っていないことに注意)

#[derive(Debug, Clone)]
struct SharedData {
    x: i32,
}
impl SharedData {
    fn new(x: i32) -> Self {
        Self { x }
    }
}

fn main() {
    use std::thread;
    let data = SharedData::new(42);
    thread::scope(|s| {
        s.spawn(|| {
            println!("spawned {:?}", data);
        });
    });
}

実行結果

spawned SharedData { x: 42 }

さらに、Mutexを使わずに可変借用も出来る。(もちろん同時に複数の可変借用を持つことは出来ないが)

#[derive(Debug, Clone)]
struct SharedData {
    x: i32,
}
impl SharedData {
    fn new(x: i32) -> Self {
        Self { x }
    }
    fn inc(&mut self) {
        self.x += 1;
    }
}

fn main() {
    use std::thread;
    let mut data = SharedData::new(42);
    thread::scope(|s| {
        s.spawn(|| {
            data.inc();
            println!("spawned {:?}", data);
        });
    });
    data.inc();
    println!("main {:?}", data);
}

実行結果

spawned SharedData { x: 43 }
main SharedData { x: 44 }

終わりに

今までスレッドをすぐにjoinするとしてもstaticライフタイムになっていたのが、thread::scopeを使うとスコープ内にライフタイムが収まってくれて便利です。
ただ、繰り返しスレッドを立ち上げてはすぐ終了するような使い方の場合、スレッドプールで実行するrayonのscopeを使うのが良いと思います。

std::syncには今回紹介しなかったものの便利なものがあるのでどんなものがあるか一度見てみると良いかもしれません。

Discussion