🦀

Rustのスレッドとチャネルと共有メモリの話

2022/06/26に公開約11,900字2件のコメント

この記事ではRustのスレッドとチャネルについてご紹介します。最後に共有メモリについても少しだけ触れます。対象としてはgolangを書ける程度の中級者レベルを想定しています。

公式ドキュメントについて

Rustの公式ドキュメントはとても良く書けており英語の勉強にもなります。プログラミング・チュートリアルは圧巻ですのでぜひ原文で読んでみてください。

スレッド

RustのスレッドはOSスレッド(1:1 threading)になります。GolangやJavaはグリーンスレッドです。

This model where a language calls the operating system APIs to create threads is sometimes called 1:1, meaning one operating system thread per one language thread. The Rust standard library only provides an implementation of 1:1 threading

また、スレッドとは別のコンセプトにノンブロッキングasync/await呼び出しをサポートするライブラリtokioもあります。スレッドとは別の話になりますので混同しないでください。

Rustでスレッドを生成するには、thread::spawn関数を使用します。実行したいコードをクロージャで渡します。go funcに似ています。spawnという用語はLinuxのプロセス生成でも使用します。魚の産卵の意味もあります。初耳の方は魚がスポーンと卵を産卵すると覚えましょう。

thread::spawn(|| {
    println!("hello"); // 別スレッドで処理される。
});
go func() { // Goはグリーンスレッドであり、OSスレッドではない。
    log.Println("hello")
}()

To create a new thread, we call the thread::spawn function and pass it a closure containing the code we want to run in the new thread.

チャネル

Golangのchannelのように、スレッド間で値を受け渡しするためのFIFOキューのようなものです。チャネルは非同期(Sender)同期(SyncSender)の2種類があります。

A Sender or SyncSender is used to send data to a Receiver. Both senders are clone-able (multi-producer) such that many threads can send simultaneously to one receiver (single-consumer).

非同期の説明

An asynchronous, infinitely buffered channel. The channel function will return a (Sender, Receiver) tuple where all sends will be asynchronous (they never block). The channel conceptually has an infinite buffer.

同期の説明

A synchronous, bounded channel. The sync_channel function will return a (SyncSender, Receiver) tuple where the storage for pending messages is a pre-allocated buffer of a fixed size. All sends will be synchronous by blocking until there is buffer space available. Note that a bound of 0 is allowed, causing the channel to become a “rendezvous” channel where each sender atomically hands off a message to a receiver.

非同期チャネル

まずはよく使う非同期チャネルの方から見てみましょう。チャネルの生成にはmpsc::channel()を使用します。golangとは異なり、送信用txと受信用rxの1方向のエンドポイントをタプルで返します。子スレッドを生成し、送信用のエンドポイントを渡します。メインスレッドで受信用のエンドポイントから送信データを受け取ります。
注意点として、クロージャからメインスレッドのデータtxをキャプチャする必要があるため、moveを使用して所有権の移動consumeをします。

async.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel(); // 非同期チャネルを生成

    thread::spawn(move || {
        tx.send(42).unwrap(); // (非同期)送信
        println!("Done immediately!"); // すぐにこの行に処理を移します
    });
    
    thread::sleep(Duration::from_secs(3)); // メインスレッドの処理を少し遅らせる

    println!("got {}", rx.recv().unwrap()); // 受信
}

実行結果です。

Done immediately!
got 42

デバッガーで両方のprintln!にブレークポイントを設定しますと、子スレッドで先にストップするのがわかります。つまり、送信がブロックされないことがポイントです。

同期チャネル

同期チャネルにはsync_channelバッファのサイズを0にすることで同期通信になります。golangとよく似ていますね。

sync.rs
use std::sync::mpsc::sync_channel;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = sync_channel::<i32>(0); // バッファサイズを0にする
    thread::spawn(move || {
        tx.send(53).unwrap(); // 同期送信。受け取ってもらうまで待ちます。
        println!("Done after 3sec!"); // すぐには処理されません
    });

    thread::sleep(Duration::from_secs(3));

    println!("got {}", rx.recv().unwrap());
}

実行結果です。

got 53
Done after 3sec!

デバッガーで確認するとメインスレッドで先にストップします。つまり、送信がブロックされることがポイントです。

ここでバッファサイズを1以上に指定すると非同期チャネルのように送信がブロックされないんです。

buffer-channel
    let (tx, rx) = sync_channel::<i32>(1);

どちらのチャネルでもrecv()は同期になります。つまり、チャネルにデータが送信されるまで待ち続けます。非同期に受信するためにはtry_recv()を使用します。

送信したビットコインはもうありません

次のコードはコンパイルエラーになります。

    thread::spawn(move || {
        let bitcoin = String::from("1ビットコイン");
        tx.send(bitcoin).unwrap();
        println!("{}を送信しました。", bitcoin); <- bitcoinはmoveしてます!
    });

bitcoinはコピーができないし、すでにtx.send(bitcoin)で所有者がメインスレッドになっているからです。

error[E0382]: borrow of moved value: `bitcoin`
  --> src/concurrency2/async2.rs:12:32
   |
10 |         let bitcoin = String::from("1ビットコイン");
   |             ------- move occurs because `bitcoin` has type `std::string::String`, which does not implement the `Copy` trait
11 |         tx.send(bitcoin).unwrap();
   |                 ------- value moved here
12 |         println!("{}を送信しました。", bitcoin);
   |                                        ^^^^^^^ value borrowed here after move

ビットコインを複製すればOKです。

tx.send(bitcoin.clone()).unwrap();

Rustがなぜこんな面倒なことになっているかと言えば、送信した後に子スレッド側でbitcoinの値を1億ビットコインと更新した場合、メインスレッド側のOSスレッドのスケジューリングによって結果が変わってしまうからです。このようなマルチスレッドがらみの悩ましいバグをコンパイル時に検出するといいうのがRustの開発者が目指したことになります。

共有チャネル

複数スレッドでチャネルを共有することも可能です。moveでチャネルの所有権をクロージャーに移動するため、複数のスレッドにチャネルを渡すにはclone()したものを渡す必要があります。また出力を綺麗に出すためにI/Oをフラッシュしています。

shared-channel
use std::io::Write;
use std::sync::mpsc;
use std::time::Duration;
use std::{io, thread};

fn main() {
    // Create a shared channel that can be sent along from many threads.
    let (tx, rx) = mpsc::channel();
    for i in 0..10 {
        let tx = tx.clone(); // クローンする
        thread::spawn(move || {
            tx.send(i).unwrap();
            println!("sent>{}", i);
            io::stdout().flush().unwrap();
        });
    }

    println!("----------------------------");
    io::stdout().flush().unwrap();

    for _ in 0..10 {
         let v = rx.recv().unwrap();
         println!("recv>{}", v);
    }
}

またrxイテレータで処理することも可能です。

    for v in rx.iter().take(10) {
        println!("recv>{}", v);
    }

もしくはチャネルををアンバウンドするのでも良いです。drop()するとgolangのclose()のような動きをします。

    drop(tx);

    while let Ok(v) = rx.recv() {
        println!("{v}");
    }

join()で子スレッドの終了を待つことができます。

join
fn main() {
    let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();

    let (tx, rx) = mpsc::channel();
    for i in 0..10 {
        let tx = tx.clone();
        let handle = thread::spawn(move || {
	// 同じなので省略
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap(); // ランデブー
    }

    println!("----------------------------");
    io::stdout().flush().unwrap();
    
    // 以下同じなので省略

出力が綺麗になります。

実行結果
sent>1
sent>5
sent>6
sent>0
sent>7
sent>3
sent>2
sent>8
sent>4
ret>0
sent>9
ret>1
ret>2
ret>3
ret>4
ret>5
ret>6
ret>7
ret>8
ret>9
----------------------------
0
2
3
1
4
5
6
7
8
9

Process finished with exit code 0

JoinHandle経由でスレッドから値を返すことができます。スレッドで重い処理をさせ、join()で処理結果を回収します。

use std::thread;

fn main() {
    let mut handles: Vec<thread::JoinHandle<i64>> = Vec::new();

    for i in 0i64..10 {
        let handle = thread::spawn(move || {
            let ans = i.pow(10); // 重い計算のつもり
            ans
        });
        handles.push(handle);
    }

    for handle in handles {
        let ret = handle.join().unwrap();
        println!("ret>{}", ret);
    }
}

受信側が複数スレッド

複数のスレッドでチャネルを受信recv()する例です。受信側のエンドポイントを排他制御Arc::new(Mutex::new(rx))する必要があります。これについては次の共有メモリで説明しますのでここでは軽く流してください。

use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

fn worker(id: u64, shared_rx: Arc<Mutex<mpsc::Receiver<i64>>>) {
    // スレッドを生成し、無限ループします
    thread::spawn(move || loop {
        let mut n = 0; // チャネルから受け取った値を保持
        match shared_rx.lock() { // 同期
            Ok(rx) => {
                n = rx.recv().unwrap(); // 同期
            } // ロック解放
            Err(e) => println!("{:?}", e),
        };

        // ロックを取得しチャネルからrecvしないとココにはこない。
	
        thread::sleep(Duration::from_secs(3)); // 重たい演算処理のつもり
        println!("worker>{} {}^10={}", id, n, ans);
	// タスクの完了。loop処理の先頭に戻り再度ロック待ちします。
    });
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let shared_rx = Arc::new(Mutex::new(rx));

    for i in 1..5 {
        // 受信側のワーカースレッドを4つ生成
        worker(i, shared_rx.clone());
    }

    for i in 10..14 {
        tx.send(i).unwrap(); // 送信します
    }

    thread::sleep(Duration::from_secs(60));
}

このパターンはよく出てきますので覚えておくとよいでしょう。

let shared = Arc::new(Mutex::new(共有変数)); // 排他制御
ループ {
    let shared = shared.clone(); // クローン
    spawn(move || { // スポーン、所有権をクロージャに移動
        // sharedを更新
    });
}

注意点として重い計算はロックを解放してから実行する必要があります。次のコードの場合、ロックを掴んだまま重い計算をしているためその間は他のスレッドがロックを取得できません。
従って、逐次処理と同じことになります。3秒x4スレッド=12秒

        match shared_rx.lock() {
            Ok(rx) => { // ロックを取得
                n = rx.recv().unwrap();
                thread::sleep(Duration::from_secs(3)); // ダメ
                let ans = n.pow(10);
            } // ロック解放
            Err(e) => println!("{:?}", e),
        };

共有メモリ

golangでは否定していますが、チャネルによるメッセージパッシングの他に、共有メモリを使用したスレッド間通信も可能です。

Do not communicate by sharing memory; instead, share memory by communicating.

Rustの共有メモリはArc::new(Mutex::new())で生成し、lock()でロックを取得します。ロックを取得するまで次の処理には進みません。
Mutexとは排他制御mutual exclusionの意味です。Arcとはatomically reference countedの意味になります。つまり、スレッドセーフなアトミック変数を生成します。

ソースコードは公式そのままです。コメントを日本語で記述します。

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // 共有変数 i32
    let counter: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
    // 子スレッドのリストを保持
    let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();

    for _ in 0..10 { // 子スレッドを10個生成
        // 子スレッドに所有権を渡すためにクローンしとく
        let counter = Arc::clone(&counter);
        // スレッドを生成
        let handle = thread::spawn(move || {
            // counterのロックを取得
            let mut num = counter.lock().unwrap();

            *num += 1; // 共有変数を更新する
        }); // ロッグは自動解放され、別の子スレッドが動き出す
        handles.push(handle);
    }

    // ランデブー
    for handle in handles {
        handle.join().unwrap(); // 子スレッドの終了を待つ
    }

    println!("Result: {}", *counter.lock().unwrap());
}

golangで共有メモリを推奨していないのはロックにはデッドロックのリスクがあるためです。2つの共有変数を2つのスレッドがそれぞれ片方だけ持ってしまうと、両方のスレッドが相手のロックの解放を永久に待つためです。(英文の方がクリアでしょう。)

Mutex<T> comes with the risk of creating deadlocks. These occur when an operation needs to lock two resources and two threads have each acquired one of the locks, causing them to wait for each other forever.

CPUバウンドとI/Oバウンド

一般的にアプリケーションはCPUとネットワークのどちらをより多く使用するかでCPUバウンド or I/Oバウンドに分けられます。
Webアプリケーションの場合は外部のAPIやデータベースを利用するためI/Oボトルネックになりやすいです。その一方で、機械学習やバッチ処理などはCPU/GPUでの演算処理がメインになるためCPU/GPUがボトルネックになります。
この記事で紹介したマルチスレッドはCPUボトルネックなアプリケーションを、マルチコア上でデータ分割しパラレルに処理することで高速化を行います。一方でI/Oバウンドの場合は、Node.jsのようなasync/awaitなどを使用したノンブロッキングな手法を使う必要があります。
rustでもtokio[1]を使用することでマルチコア上でパラレルに動作するasync/awaitのプログラミングが可能です。A multi-threaded runtime for executing asynchronous code
またgolangの場合は、超軽量なグリーンスレッドでその違いをあまり意識せず書けてしまったりするわけですが、rustの場合は使い分けに注意が必要です。この辺りも、tokioの原文を読むとよくわかると思います。

参考

https://zenn.dev/tfutada/articles/4dbb9659bb8102

単純なデータ分割でのパラレル処理はrayonが便利です。

https://github.com/rayon-rs/rayon

cross-beamというグリーンスレッドのライブラリもあります。

https://rust-lang-nursery.github.io/rust-cookbook/concurrency/threads.html

Rust for Rustaceansの著者であり、RustチュートリアルのYouTuberとして有名なjonhoo
内容は難しいですが英語の発音が綺麗で分かりやすいです。

https://www.youtube.com/watch?v=b4mS5UPHh20
脚注
  1. Tasks are the unit of execution managed by the scheduler. Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do. The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread. The task can also be moved between threads after being spawned. ↩︎

Discussion

重箱の隅ですが、

GolangやJavaはグリーンスレッドです。

Java(のメジャーな実装系)は、OSスレッドという認識だったのですが、いかがでしょうか?

https://ja.wikipedia.org/wiki/グリーンスレッド

channelの扱い方のパターンがまとまっていて、理解が深まりました。ありがとうございます~

なんと!私のJavaの知識が何10年も昔でしたので。ご指摘ありがとうございました。

ログインするとコメントできます