💪

Rustの並行処理入門:Fearless Concurrency

に公開

表紙

並行プログラムとは、複数のタスクを実行するプログラム(あるいはマルチタスクのように見えるもの)を指します。つまり、2 つ以上のタスクが時間的に重なり合いながら交互に実行されることを意味します。これらのタスクは「スレッド」と呼ばれる最小の処理単位によって実行されます。

その背後で実際にマルチタスク(並列)処理が行われているわけではなく、スレッド同士が人間には知覚できない速度で高速にコンテキストを切り替えているだけです。このような錯覚に依存している現代のアプリケーションは多く、たとえばサーバーがリクエストを処理しつつ他のリクエストを待機する場合などがそれにあたります。

スレッド間でデータを共有する際にはさまざまな問題が発生する可能性があります。最もよくある 2 つの問題は、競合状態(レースコンディション)とデッドロックです。

Rust の所有権システムおよび型安全システムは、メモリ安全性および並行処理の問題を解決するための強力なツール群です。所有権と型チェックによって、多くのエラーは実行時ではなくコンパイル時に検出されます。つまり、開発中にバグを修正することができ、本番環境にデプロイしてからバグを直す必要がありません。

一度コードがコンパイルできれば、そのコードはマルチスレッド環境でも正しく動作することが保証され、他の言語でよく見られるような追跡困難なバグは発生しません。これこそが Rust の「恐れ知らずの並行性(fearless concurrency)」なのです。

スレッドモデル

マルチスレッドプログラミングのリスク

ほとんどの現代のオペレーティングシステムでは、実行中のプログラムコードは「プロセス(process)」内で動作しており、OS は複数のプロセスを管理します。プログラムの内部にも、同時に動作する複数の独立した部分が存在することができ、これらの独立部分は「スレッド(threads)」と呼ばれます。

プログラム内の計算を複数のスレッドに分割することで、複数の作業を同時に行うことができるため、パフォーマンスの向上が見込めます。しかし、その一方で複雑さが増します。スレッドは同時に実行されるため、それぞれのスレッドのコード実行順序を事前に保証することができません。そのため、以下のような問題が発生する可能性があります:

  • 競合状態(Race conditions):複数のスレッドがデータやリソースに対して不一致な順序でアクセスする
  • デッドロック(Deadlocks):2 つのスレッドが互いに相手のリソースの解放を待ち続け、実行が止まってしまう
  • 特定の状況でのみ発生し、再現・修正が困難なバグ

プログラミング言語によって、スレッドの実装方法は異なります。多くのオペレーティングシステムは新しいスレッドを作成するための API を提供しています。言語がこの OS の API を使ってスレッドを作成するモデルは「1:1 モデル」とも呼ばれ、1 つの OS スレッドが 1 つの言語スレッドに対応します。

Rust の標準ライブラリは、この 1:1 のスレッドモデルのみを提供しています。

spawn を使って新しいスレッドを作成する

use std::thread;
use std::time::Duration;

fn main() {
    let thread = thread::spawn(|| {
        for i in 1..10 {
            println!("this is thread {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for k in 1..5 {
        println!("this is main {}", k);
        thread::sleep(Duration::from_millis(1));
    }
}

出力例:

this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5

ここでは、メインスレッドが 5 回ループを実行した後に終了し、それに伴い新しいスレッドも終了しています。新しいスレッドは 10 回ループするように定義されていますが、実際には 5 回までしか実行されていません。つまり、メインスレッドが終了すると、新しいスレッドも完了していようがいまいが終了します。

新しいスレッドが完了してからメインスレッドを終了させたい場合は、JoinHandle を使用します:

use std::thread;
use std::time::Duration;

fn main() {
    let handler = thread::spawn(||{
        for i in 1..10 {
            println!("this is thread {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for k in 1..5 {
        println!("this is main {}", k);
        thread::sleep(Duration::from_millis(1));
    }

    handler.join().unwrap(); // 新しいスレッドの終了を待機
}

出力例:

this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5
this is thread 6
this is thread 7
this is thread 8
this is thread 9

thread::spawn の戻り値は JoinHandle です。JoinHandle は所有権を持つ値で、join メソッドを呼ぶことで、そのスレッドの終了を待つことができます。join の呼び出しによって現在のスレッドはブロックされ、対象スレッドが終了するまで処理を停止します。

スレッドと move クロージャ

move クロージャを使用することで、メインスレッドの変数の所有権をクロージャに移動させることができます:

use std::thread;

fn main() {
    let v = vec![2,4,5];
    // move によって変数 v の所有権をクロージャに移動
    let thread = thread::spawn( move || {
        println!("v is {:?}", v);
    });
}

出力:

v is [2, 4, 5]

Rust は、変数 v の所有権を新しく作成したスレッドに移動させます。これにより、メインスレッドでは変数 v をもう使用できなくなります(たとえば drop したりできません)。この仕組みによって、Rust は新しいスレッド内での v の安全性を保証できます。

もし move キーワードを使用しなかった場合、以下のようにコンパイルエラーが発生します:

$ cargo run
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`

Rust の所有権ルールが、またもや我々を助けてくれました!

メッセージパッシング(メッセージの送受信)

Rust において、メッセージパッシングによる並行処理を実現する主なツールは「チャネル(channel)」です。これは Rust 標準ライブラリが提供する並行処理の概念であり、チャネルはまるで水の流れのようなもので、ゴムのアヒルや小舟を入れると下流に流れていくイメージです。

チャネルは以下の 2 つの要素から構成されます:

  • 送信側(transmitter)
  • 受信側(receiver)

送信者または受信者のいずれかが破棄(drop)されると、チャネルはクローズされたと見なされます。

Rust では標準ライブラリ std::sync::mpsc を使ってチャネルを実現します。mpsc とは “Multiple Producer, Single Consumer”(複数生産者、単一消費者)の略です。

補足:チャネルは送信者と受信者の数に応じて、以下のように分類されます:

  • SPSC:Single-Producer Single-Consumer(単一生産者・単一消費者)→ アトミック操作のみで実装可能
  • SPMC:Single-Producer Multiple-Consumer(単一生産者・複数消費者)→ 消費側でロックが必要
  • MPSC:Multiple-Producer Single-Consumer(複数生産者・単一消費者)→ 生産側でロックが必要
  • MPMC:Multiple-Producer Multiple-Consumer(複数生産者・複数消費者)

スレッド間でメッセージを送る

use std::thread;
use std::sync::mpsc;

fn main() {
    // 通常、tx(transmitter)と rx(receiver)は送信者と受信者の略として用いられる
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        tx.send("hello").unwrap();
    });

    let msg = rx.recv().unwrap(); // ブロックしてメッセージを待つ
    println!("message is {}", msg);
}

出力:

message is hello

受信側には 2 つの便利なメソッドがあります:recvtry_recv。ここでは recv(receive の略)を使用しています。このメソッドはチャネルから値を受信するまでスレッドの実行をブロックします。値が送られると、recvResult<T, E> 型でそれを返します。送信側がすでにクローズされている場合は、Err を返します。

一方、try_recv はブロックせず、即座に Result<T, E> を返します。Ok の場合は値を含み、Err の場合は「まだ値がない」ことを意味します。

以下は try_recv を使った例ですが、非ブロッキングであるため、新しいスレッドが値を送信する前にメインスレッドが受信しようとするとエラーになります:

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        tx.send("hello").unwrap();
    });

    let msg = rx.try_recv().unwrap();
    println!("message is {}", msg);
}

エラー出力:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Empty'

複数の値を送信して、受信側の待機を観察する

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap(); // 各文字列を個別に送信
            thread::sleep(Duration::from_secs(1)); // 1秒間スリープ
        }
    });

    // メインスレッドでは、rx をイテレータとして使用し、各メッセージを受信
    for received in rx {
        println!("Got: {}", received);
    }
}

出力例(1 秒間隔で 1 行ずつ表示):

Got: hi
Got: from
Got: the
Got: thread

この例では、スレッド内のループが 1 秒ごとに文字列を送信しており、メインスレッドは for ループを使って rx(受信側)からそれを受け取っています。rx はイテレータとして機能し、チャネルがクローズされるまで受信を続けます。

send をクローンして複数の生産者を作成する

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone(); // 送信者 tx をクローン

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap(); // クローンした tx1 から送信
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap(); // 元の tx から送信
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

出力の例:(実行ごとに順序は異なる。OS のスレッドスケジューリングに依存)

Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: thread
Got: you

この例では、2 つのスレッドがそれぞれ異なる文字列のベクタを使って、同じ rx(受信者)にメッセージを送信しています。tx.clone() によって 2 つの送信者を生成し、1 つのチャネルに複数のスレッドから値を送信できるようにしています。

共有状態

「共有状態」や「共有データ」とは、複数のスレッドが同時に同じメモリ領域にアクセスすることを指します。Rust では**ミューテックス(mutex)**と呼ばれるロックを使って、共有メモリ型の並行処理を安全に行うことができます。

ミューテックスは一度に一つのスレッドのみがデータにアクセス可能

ミューテックス(mutex)は “mutual exclusion”(相互排他)の略で、任意の時点で 1 つのスレッドだけがデータにアクセスできることを保証する仕組みです。データにアクセスするには、スレッドはまずロック(lock)を取得する必要があります。

Rust の標準ライブラリ std::sync::Mutex を使ってミューテックスを利用できます:

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);
    {
        let mut num = m.lock().unwrap(); // ロックを取得
        *num = 10; // データを書き換え
        println!("num is {}", num);
    }
    println!("m is {:?}", m);
}

出力:

num is 10
m is Mutex { data: 10 }

lock() メソッドを使ってミューテックスのロックを取得すると、スレッドはロックされている間、その内部データに排他的にアクセスできます。lock() はブロッキング操作であり、ロックが利用可能になるまでスレッドを停止させます。

ミューテックスはスマートポインタでもあり、正確には MutexGuard というスマートポインタを返します。このポインタは Deref を実装しており、中のデータへの参照として扱えます。また、MutexGuard はスコープを抜けると自動的にロックを解放する Drop を実装しています。

複数のスレッド間で Mutex を共有する

複数のスレッド間で値を共有したい場合、複数の所有者が同じ値を共有する必要があります。Rust ではそのために**Arc(原子参照カウント)**というスマートポインタを使います。

Rc はシングルスレッド専用の参照カウントポインタなので、マルチスレッド環境では使えません。スレッド間で共有するには、スレッド安全な Arc を使う必要があります。

以下は、ArcMutex を包み、複数のスレッド間でカウンターを共有する例です:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter); // Arc をクローンしてスレッドへ渡す
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap(); // 各スレッドの終了を待つ
    }

    println!("Result: {}", *counter.lock().unwrap());
}

出力:

Result: 10

補足まとめ

  • Rc<T> / RefCell<T>シングルスレッド向け
  • Arc<T> / Mutex<T>マルチスレッド向け

なお、ミューテックスの使用は**デッドロック(deadlock)**のリスクも伴います。たとえば、2 つのリソースのロックを取得しようとして、異なるスレッドがそれぞれ別のリソースをロックして待ち続けると、永遠に処理が進まなくなる可能性があります。

SendSync に基づくスレッド安全性

Rust における並行処理関連の機能は、主に標準ライブラリによって提供されています。しかし、SendSync という 2 つのトレイトは、言語自体に組み込まれた並行性の概念です。これらは std::marker モジュールに定義されています。

SendSync の役割

SendSync は、Rust における安全な並行処理を実現するための最も重要な基盤であり、マーカートレイト(marker trait) です。マーカートレイトとは、振る舞いを定義せずに型に印を付けるためだけに使われるトレイトのことです。

  • Send を実装している型は、スレッド間で安全に所有権を移動(ムーブ) できます。
  • Sync を実装している型は、スレッド間で安全に共有(参照) できます。

重要な法則:

T の参照 &TSend を実装していれば、TSync である

SendSync を実装している型

Rust では、ほとんどの型はデフォルトで SendSync を実装しています。つまり、ある構造体などの複合型が Send / Sync を自動的に実装する条件は:

  • その内部のすべてのフィールドが Send / Sync を実装していること

逆に、フィールドの中に一つでも Send または Sync を持たない型がある場合、その構造体全体も Send / Sync を持ちません。

簡単にまとめると:

  • Send:スレッド間で所有権をムーブできる
  • Sync:スレッド間で安全に参照を共有できる

以下は代表的な Send / Sync を実装していない型の例

  • 生ポインタ(raw pointers)
  • Cell / RefCell
  • Rc(スレッド非対応の参照カウント)

これらの型はスレッドセーフでないため、並行環境で使用しようとするとコンパイルエラーになります。

自作型に Send / Sync を実装するには?

  • Rust では、必要に応じてユーザー定義型に Send / Sync を手動で実装することもできますが、これは unsafe(アンセーフ)コードが必須 です。
  • 安全性の保証をすべて自分で担うことになるため、非常に慎重に行う必要があります。
  • また、ラップ用の新しい型(newtype)を使って、一部の型に Send / Sync を与える方法もあります。

⚠️ 注意:

  • CellRefCellSync を実装していません(内部に UnsafeCell を使っており、これ自体が Sync ではないため)
  • RcSendSync も実装していません(内部の参照カウントがスレッド安全ではないため)
  • 生ポインタ(*const T*mut T)も Send / Sync を実装していません(安全保証がないため)

つまり:

手動で SendSync を実装することは可能ですが、とても危険 なので、通常は避けるべきです。実装者が慎重に unsafe コードを使ってスレッド安全性を保たなければなりません。

まとめ

Rust は async/awaitマルチスレッド という 2 種類の並行モデルをサポートしています。マルチスレッド並行モデルを使いこなすには、Rust のスレッド管理、同期、スレッド安全性に関する知識が必要です。

たとえば:

  • スレッド間のメッセージ通信には channel(チャネル)
  • スレッド間での状態共有には MutexArc
  • コンパイル時の型システムや借用チェッカーがデータ競合や無効な参照を防止

さらに:

  • SendSync トレイトが、マルチスレッド環境での安全な所有権移動と参照共有を保証してくれます。

最後にポイントを再確認:

  • スレッドモデル:競合状態やデッドロック、再現困難なバグへの対処が必要
  • メッセージパッシングchannel を使ったスレッド間通信
  • 共有状態Mutex + Arc による共有アクセス
  • スレッド安全SendSync による安全性保証

私たちはLeapcell、Rustプロジェクトのホスティングの最適解です。

Leapcell

Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです:

複数言語サポート

  • Node.js、Python、Go、Rustで開発できます。

無制限のプロジェクトデプロイ

  • 使用量に応じて料金を支払い、リクエストがなければ料金は発生しません。

比類のないコスト効率

  • 使用量に応じた支払い、アイドル時間は課金されません。
  • 例: $25で6.94Mリクエスト、平均応答時間60ms。

洗練された開発者体験

  • 直感的なUIで簡単に設定できます。
  • 完全自動化されたCI/CDパイプラインとGitOps統合。
  • 実行可能なインサイトのためのリアルタイムのメトリクスとログ。

簡単なスケーラビリティと高パフォーマンス

  • 高い同時実行性を容易に処理するためのオートスケーリング。
  • ゼロ運用オーバーヘッド — 構築に集中できます。

ドキュメントで詳細を確認!

Try Leapcell

Xでフォローする:@LeapcellHQ


ブログでこの記事を読む

Discussion