🦀

Rustの並列処理で安全にログ出力する

こんにちは、エンジニアの澤田です。

昨年は Rust の quick-xml というライブラリを使ってみた記事 を書きましたが、Rust の勉強を兼ねて、今回は並列処理をやってみようと思います。
普段の業務で、外部のAPIへ並列でリクエストを送り、その際に受け取ったエラーメッセージなどをログに書き込む処理を行っていますが、並列処理でのログ出力を Rust でやってみようと思います!

※rustc と cargo はバージョン 1.83.0 を使用しています。

並列処理でログ出力を安全に行う

複数のスレッドが単一のリソースを変更する場合に、安全に変更する方法は大きく以下の2つがあります。

  • リソース側にロック機構を用意して、単一のスレッドによる変更のみ受け入れるようにする
  • スレッドを生成する側で交通整理して、単一のスレッドのみリソースの変更を行うようにする

今回は複数のスレッドから単一のログファイルへ出力しようと思いますが、ファイルへの書き込みにロックが掛からないことを想定して、後者のアプローチをやってみようと思います!

標準ライブラリを使って複数のスレッドを生成する

Rust にはスレッドを生成するための標準ライブラリが用意されており、追加でライブラリを導入しなくてもスレッドの生成を簡単に行うことができます。
今回は Rust の基本機能の勉強のため、外部ライブラリを使わずに標準ライブラリを使ってみようと思います。
なお、標準ライブラリのスレッドは1スレッドがOSの1スレッドに対応しています。

まずプロジェクト hello_concurrency を作成します。

$ cargo new hello_concurrency

そしてプロジェクト用のディレクトリ hello_concurrency 配下の src/main.rs に以下のコードを記述して、5つのスレッドを生成してみます。

src/main.rs
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        println!("あああ");
    });
    thread::spawn(|| {
        println!("いいい");
    });
    thread::spawn(|| {
        println!("ううう");
    });
    thread::spawn(|| {
        println!("えええ");
    });
    thread::spawn(|| {
        println!("おおお");
    });
    thread::sleep(Duration::from_micros(1));
}

そしてプロジェクトディレクトリ hello_concurrency 内で cargo run を実行すると、以下のように出力されます。

$ cargo run
   Compiling hello_concurrency v0.1.0 (/home/forcia/study/rust/hello_concurrency)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.20s
     Running `target/debug/hello_concurrency`
おおお
えええ
あああ

コードの記述順とは異なる順番で出力されているので、並列で処理されていることが分かりますね!
ただ、 "いいい" と "ううう" が出力されていません。
これは、メインスレッドから立ち上げたスレッドは独立して動くので、メインスレッドが先に終了する、ということがあるからですね。
thread::spawn を実行すると JoinHandle 型の値が返ってくるので、その join メソッドを呼び出すと、そのスレッドが実行完了するまで待つようになります。
返却結果に変数 handle1handle5 を割り当てて、それぞれ join メソッドを呼び出すように変更します。

src/main.rs
use std::thread;

fn main() {
    let handle1 = thread::spawn(|| {
        println!("あああ");
    });
    let handle2 = thread::spawn(|| {
        println!("いいい");
    });
    let handle3 = thread::spawn(|| {
        println!("ううう");
    });
    let handle4 = thread::spawn(|| {
        println!("えええ");
    });
    let handle5 = thread::spawn(|| {
        println!("おおお");
    });
    handle1.join().unwrap();
    handle2.join().unwrap();
    handle3.join().unwrap();
    handle4.join().unwrap();
    handle5.join().unwrap();
}

それでは実行してみましょう。

$ cargo run
   Compiling hello_concurrency v0.1.0 (/home/forcia/study/rust/hello_concurrency)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
     Running `target/debug/hello_concurrency`
えええ
あああ
おおお
ううう
いいい

無事全て実行されるようになりましたね!

スレッド間での所有権の移動

メインスレッドから各スレッドへメッセージを送り、各スレッドがそのメッセージをもとに何らかの処理を行い、ログを出力する流れを考えてみます。
ここではその流れを単純化して、メインスレッドから送られたメッセージを各スレッドがそのまま出力してみることにします。
しかしメインスレッドで宣言された変数をそのまま別のスレッドで扱おうとすると、スレッド内でその変数を使用している最中にメインスレッド側で drop されるなどして参照できなくなる可能性があるため、コンパイル時にエラーになります。

コンパイルエラーになるコード

use std::thread;

fn main() {
    let message = String::from("あああ");

    let handle = thread::spawn(|| {
        println!("{message}");
    });

    handle.join().unwrap();
}

エラーメッセージ

$ cargo run
   Compiling hello_concurrency v0.1.0 (/home/forcia/study/rust/hello_concurrency)
error[E0373]: closure may outlive the current function, but it borrows `message`, which is owned by the current function
  --> src/main.rs:18:36
   |
18 |         let handle = thread::spawn(|| {
   |                                    ^^ may outlive borrowed value `message`
19 |             println!("{message}");
   |                        ------- `message` is borrowed here
(..以下略..)

この問題を解消するために、 move キーワードを記述して所有権をスレッドのクロージャ内へ移す必要があります。

use std::thread;

fn main() {
    let message = String::from("あああ");

    let handle = thread::spawn(move || {
        println!("{message}");
    });

    handle.join().unwrap();
}

スレッド間でのメッセージの受け渡し

それでは、複数のスレッドが渡したメッセージを1つのスレッドが受け取って、ログを出力するのは1スレッドだけにする処理を実装してみましょう。
その処理を行うためにはスレッド間でメッセージをやり取りする必要がありますが、その処理も標準ライブラリが提供している std::sync::mpsc (multiple producer, single consumer の略) を使用することで簡単に行うことができます。

mpsc::channel 関数を実行すると送信側・受信側がセットになったタプルを受け取れるので、それぞれ変数 txrx を割り当てます。

let (tx, rx) = mpsc::channel();

送信側( tx ) は clone を呼び出して複製できるので、それを各スレッドへ移動すれば各スレッドで送信できるようになります。
メッセージのリストを vector にして、そのループを回しつつ txclone して、各スレッドへ移動します。

以下のコードでメッセージを送信するところまで実装できました。

src/main.rs
use std::thread;
use std::sync::mpsc;

fn main() {
    let message_vec = vec![
        String::from("あああ"),
        String::from("いいい"),
        String::from("ううう"),
        String::from("えええ"),
        String::from("おおお"),
        String::from("hoge"),
        String::from("piyo"),
        String::from("foobar")
    ];

    let (tx, _rx) = mpsc::channel();
    let mut handles = vec![];

    for message in message_vec {
        let tx = tx.clone();
        let handle = thread::spawn(move || {
            tx.send(message).unwrap();  // ここでメッセージを送信する
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

メッセージを受け取ってログ出力するスレッドを生成する

次は受信側を実装していきましょう!
rx.recv() を呼び出すとメッセージが送られてくるのを待つようになるので、それをループさせるとメッセージを待ち続けながら、受け取ったらそれを出力できます。

src/main.rs
    let logger = thread::spawn(move || loop {
        let received = rx.recv().unwrap();
        println!("{received}");
    });

先ほどのコードと組み合わせて実行してみます。

$ cargo run
   Compiling hello_concurrency v0.1.0 (/home/forcia/study/rust/hello_concurrency)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.35s
     Running `target/debug/hello_concurrency`
あああ
えええ
いいい
ううう
piyo
foobar
おおお
hoge

無事受け取ったメッセージを出力できました!

ログ出力用スレッドを終了するには

上記のコードでは永遠にメッセージを待ち続けてしまうので、一通り処理が完了したら終了させるようにします。
そこで、中身の無いメッセージ( None ) を送ったら終了させるようにしてみます。
中身があるものと無いものを両方表現できる型として Option<T> 型があるので、この型を使用するのがよさそうです。

src/main.rs
use std::thread;
use std::sync::mpsc;

fn main() {
    let message_vec = vec![
        String::from("あああ"),
        String::from("いいい"),
        String::from("ううう"),
        String::from("えええ"),
        String::from("おおお"),
        String::from("hoge"),
        String::from("piyo"),
        String::from("foobar")
    ];

    let (tx, rx) = mpsc::channel();
    let mut handles = vec![];

    for message in message_vec {
        let tx = tx.clone();
        let handle = thread::spawn(move || {
            tx.send(Some(message)).unwrap();  // メッセージを Some で包んで送る
        });
        handles.push(handle);
    }

    let logger = thread::spawn(move || loop {
        let received = rx.recv().unwrap();
        if received.is_none() {
            break;
        }
        println!("{:?}", received.unwrap());  // unwrap() してメッセージを取り出す
    });

    for handle in handles {
        handle.join().unwrap();
    }
    let _ = tx.send(None);
    logger.join().unwrap();
}

実行してみると、今度は終了するようになりました!

$ cargo run
   Compiling hello_concurrency v0.1.0 (/home/forcia/study/rust/hello_concurrency)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.29s
     Running `target/debug/hello_concurrency`
"foobar"
"いいい"
"hoge"
"piyo"
"ううう"
"えええ"
"おおお"
"あああ"
$

コンパイラは完璧ではない

先ほど所有権を移動していなくてエラーになったように、基本的に問題を引き起こす可能性のあるコードはコンパイル時にエラーになったり、warning が出て事前に気付くことができるようになっていますが、コンパイルは通るものの、実行時にエラーになるケースがありました。
以下のコードの通り、受信側でメッセージを1つだけ受け取るように変更してみます。

    let logger = thread::spawn(move || {
        let received = rx.recv().unwrap();
        println!("{:?}", received.unwrap());
    });

このコードを実行すると、以下のようなエラーになる場合があります。(運良くならない場合もあります)
エラーメッセージを見ると、送信側のスレッドは残っているものの、受信側が無くてエラーになっています。
このようなケースでは、プログラムを書く側でエラーハンドリングする必要があるようです。

$ cargo run
   Compiling hello_concurrency v0.1.0 (/home/forcia/study/rust/hello_concurrency)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.61s
     Running `target/debug/hello_concurrency`
"foobar"
thread '<unnamed>' panicked at src/main.rs:22:36:
called `Result::unwrap()` on an `Err` value: SendError { .. }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at src/main.rs:22:36:
called `Result::unwrap()` on an `Err` value: SendError { .. }
thread '<unnamed>' panicked at src/main.rs:22:36:
called `Result::unwrap()` on an `Err` value: SendError { .. }
thread '<unnamed>' panicked at src/main.rs:22:36:
called `Result::unwrap()` on an `Err` value: SendError { .. }
thread 'main' panicked at src/main.rs:33:23:
called `Result::unwrap()` on an `Err` value: Any { .. }

さいごに

今回の記事を書くにあたって The Rust Programming Language を参考にしたのですが、
Turning Our Single-Threaded Server into a Multithreaded Server に、 mpsc を使いつつ受信側をマルチスレッドにする方法が書かれていて、そんなこともできるんだ!と勉強になりました。
引き続き並列処理や Rust について勉強していこうと思います:)

この記事を書いた人

澤田 哲明
大手旅行会社でWebデザイナーとして勤務しつつプログラミングを学び、2012年にフォルシアに入社。
現在は事業推進部に所属して、福利厚生アウトソーシング会社などのシステム開発を担当。
先月結婚10年目を迎えました。妻と子供にはいつも感謝です!

FORCIA Tech Blog

Discussion