🦀

Rustでパフォーマンス計測しながら汎用的な爆速並列処理を実装してみた

2024/01/02に公開

はじめに

Rustという爆速の言語を使って、並列で集計や計算処理を爆速でできないだろうか?そしてできればそれは汎用的な処理にしたい。そんな妄想をある程度具現化したのが、今回投稿になります。

最終的な結果としては10億件のデータの集計、計算処理が10秒ほどで実行できました。

並列処理はパフォーマンス計測を行い、uptimeやvmstatだけでなく、perfなども使用してOSのシステムコールまで計測して、実用性や性能改善について探っていきます。

実装 -> パフォーマンス計測 -> 性能改善、または、最適化 -> パフォーマンス計測 -> ...というようなプロセスでの実装になります。

それでは最初は単純な汎用計算処理の並列化から行っていきます。

単純な並列処理

汎用処理のインターフェイス

まず汎用処理のインターフェイスとして、引数に関数、配列、分割数を指定する形にします。

pub fn execute<F, T>(func: F, data: Vec<T>, chunk_num: usize, max_threads: usize) -> Result<(), Box<dyn Error>>
    where
        F: Fn(Vec<T>) -> i32 + Send + Copy + 'static,
        T: Send + Clone + 'static,
{}

これを呼び出し側は以下のような関数、データ、分割数を指定して呼び出します。

// 処理する関数
let process_function = |chunk: Vec<i64>| -> i64 {
    chunk.iter().sum::<i64>() * 100 / 10 + 1
};

// 処理するデータ
let data = (1..=1000_000).collect();

// チャンクサイズ
let chunk_size = 100;

let start = Instant::now();

// 汎用処理を呼び出し
let ret = thread_control::execute(process_function, data, chunk_size);

let duration = start.elapsed();
let execution_time = duration.as_millis();
println!("thread_control execution time: {}ms", execution_time);

並列計算処理は以下のような実装になりました。

pub fn execute<F, T>(func: F, data: Vec<T>, chunk_num: usize) -> Result<(), Box<dyn Error>>
    where
        F: Fn(Vec<T>) -> i64 + Send + Copy + 'static,
        T: Send + Clone + 'static,
{
    let results = Arc::new(Mutex::new(Vec::new()));

    std::thread::scope(|s| {
        for chunk in data.chunks(chunk_num).map(|c| c.to_vec()) {
            let func = func;
            let results_clone = Arc::clone(&results);

            s.spawn(move || {
                let result = func(chunk);
                let mut results_lock = results_clone.lock().unwrap();
                results_lock.push(result);
            });
        }
    });

    let start = Instant::now();

    // スレッドの結果を出力
    let results_lock = results.lock().unwrap();
    for result in &*results_lock {
        println!("Result: {}", result);
    }

    let duration = start.elapsed();
    let execution_time = duration.as_millis();
    println!("output execution time: {}ms", execution_time);

    Ok(())
}

汎用処理の実行

実行時間は以下のようになりました。

output execution time: 45ms
thread_control execution time: 389ms
All Execution time: 409ms

各ログ出力の説明です。

  • output execution は計算処理が全部終わった後に結果をすべてコンソール出力するときの実行時間
  • thread_control execution は汎用処理の実行時間
  • All Execution はデータ用意などもふくめたすべての実行時間

件数を増やしてみると汎用処理の計算時間は以下の結果になりました。

件数 処理時間
100万件 389ms
1000万件 3,165ms
1億件 33,044ms

1億件で30秒くらいです。現在の状態でシステムパフォーマンスを計測してみます。

システムパフォーマンス計測

マシンスペック

  • Amazon Linux t2.2xlarge
  • vCPU:8 memory:16GM

uptime

Rustプログラムを実行直後にuptimeにて過去1分間、5分間、15分間のシステムの平均負荷を計測してみます。

$ uptime
 07:00:51 up 41 min,  3 users,  load average: 5.27, 2.77, 1.26

結果は過去1分間:5.27、5分間:2.77、15分間:1.26となりました。それぞれの数値は実行されたプロセスの平均数です。vCPUが8コアになっているので、妥当な使用率でしょうか。

vmstat

実行中にどれくらいの負荷がかかったのか確認してみます。
1秒間隔でvmstatの結果を出力します。

while :; do vmstat >> vmstat.log; sleep 1; done

重要なのは、us、sy、id の 3 つの欄で、ユーザー時間/システム時間/未使用時間となります。

# 実行前
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----  
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 0  0      0 30459760   2168 2083408    0    0    10   110  995  847  1  1 98  0  0

# 実行中
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0      0 24473052   2168 2083604    0    0    10   109  987  841  1  1 98  0  0

なぜか変化はメモリだけでCPUはほとんど変化がなく、idの数値が高く、使用されていないことになっています。なぜこのような結果になるのか謎です。

mpstat

mpstat -P ALL 1

こちらも同様に重要なのは、%usr、%sys、%idle の 3 つの欄で、ユーザー時間/システム時間/未使用時間となります。

# 実行前
07:17:04     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
07:17:05     all    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       0    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       1    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       2    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       3    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       4    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       5    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       6    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
07:17:05       7    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00

# 実行中
07:16:43     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
07:16:44     all    7.58    0.00   36.97    0.00    0.00    0.00    3.86    0.00    0.00   51.60
07:16:44       0    8.42    0.00   40.00    0.00    0.00    0.00    3.16    0.00    0.00   48.42
07:16:44       1    7.87    0.00   30.34    0.00    0.00    0.00    4.49    0.00    0.00   57.30
07:16:44       2    3.12    0.00   42.71    0.00    0.00    0.00    3.12    0.00    0.00   51.04
07:16:44       3   15.15    0.00   29.29    0.00    0.00    0.00    4.04    0.00    0.00   51.52
07:16:44       4    9.30    0.00   30.23    0.00    0.00    0.00    4.65    0.00    0.00   55.81
07:16:44       5    6.38    0.00   38.30    0.00    0.00    0.00    4.26    0.00    0.00   51.06
07:16:44       6    6.67    0.00   37.78    0.00    0.00    0.00    4.44    0.00    0.00   51.11
07:16:44       7    3.88    0.00   45.63    0.00    0.00    0.00    2.91    0.00    0.00   47.57

mpstatでははっきりと変化が確認できました。実行中では最終的に各コアが50%前後利用されています。CPUが50%前後しか使用されないのはなぜでしょうか?試しに汎用処理を同時に実行してみると、CPU使用率が70%を超えました。

22:00:55     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
22:00:56     all    9.20    0.00   62.27    0.00    0.00    0.00    1.73    0.00    0.00   26.80
22:00:56       0    4.17    0.00   67.71    0.00    0.00    0.00    1.04    0.00    0.00   27.08
22:00:56       1   14.85    0.00   60.40    0.00    0.00    0.00    1.98    0.00    0.00   22.77
22:00:56       2    5.62    0.00   65.17    0.00    0.00    0.00    1.12    0.00    0.00   28.09
22:00:56       3   14.77    0.00   55.68    0.00    0.00    0.00    1.14    0.00    0.00   28.41
22:00:56       4    7.22    0.00   57.73    0.00    0.00    0.00    3.09    0.00    0.00   31.96
22:00:56       5   10.53    0.00   58.95    0.00    0.00    0.00    2.11    0.00    0.00   28.42
22:00:56       6   10.64    0.00   63.83    0.00    0.00    0.00    1.06    0.00    0.00   24.47
22:00:56       7    5.56    0.00   68.89    0.00    0.00    0.00    2.22    0.00    0.00   23.33

単独でCPUを使い切れないのは、スレッド内での以下の処理が軽すぎるのかもしれません。

let process_function = |chunk: Vec<i64>| -> i64 {
    chunk.iter().sum::<i64>() * 100 / 10 + 1
};

処理が重くなるようにループ処理をはさんでみます。

let process_function = |chunk: Vec<i64>| -> i64 {
    let mut count = 0;
    for _ in 0..100_000 {
        count += 1;
    }
    chunk.iter().sum::<i64>() * 100 / 10 + 1
};

結果使用率は単独で80%を超えました。usrプログラムの使用率が8%程度だったのが、70%を超えています。

Average:     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
Average:     all   70.31    0.00   10.30    0.01    0.00    0.00    0.42    0.00    0.00   18.96
Average:       0   70.71    0.00    9.96    0.00    0.00    0.00    0.40    0.00    0.00   18.93
Average:       1   70.25    0.00   10.21    0.00    0.00    0.00    0.41    0.00    0.00   19.13
Average:       2   70.40    0.00   10.29    0.00    0.00    0.00    0.41    0.00    0.00   18.90
Average:       3   70.04    0.00   10.52    0.00    0.00    0.00    0.43    0.00    0.00   19.02
Average:       4   70.22    0.00   10.56    0.00    0.00    0.00    0.47    0.00    0.00   18.74
Average:       5   70.20    0.00   10.38    0.02    0.00    0.00    0.43    0.00    0.00   18.98
Average:       6   69.94    0.00   10.59    0.02    0.00    0.00    0.43    0.00    0.00   19.02
Average:       7   70.72    0.00    9.91    0.02    0.00    0.00    0.41    0.00    0.00   18.95

計算処理がそれほど重くないため、usr処理に対してsys処理の比率が多く、CPUを50%くらいしか使い切れないというような状況のようです。

ただ、一つの処理でCPUを50%消費してしまうのはよくありません。同時実行制御のセマフォを導入してCPU使用率を下げられないか試してみます。

セマフォの導入

汎用処理へのセマフォの導入と実行

関数の引数に同時実行数を指定できる形にします。

pub fn execute<F, T>(func: F, data: Vec<T>, chunk_num: usize, max_threads: usize) -> Result<(), Box<dyn Error>>
    where
        F: Fn(Vec<T>) -> i32 + Send + Copy + 'static,
        T: Send + Clone + 'static,
{}

セマフォの処理を実装します。

use std::sync::{Condvar, Mutex};

// セマフォ用の型
pub struct Semaphore {
    mutex: Mutex<isize>,
    cond: Condvar,
    max: isize,
}

impl Semaphore {
    pub fn new(max: isize) -> Self {
        Semaphore {
            mutex: Mutex::new(0),
            cond: Condvar::new(),
            max,
        }
    }
    pub fn wait(&self) {
        // カウントが最大値以上なら待機
        let mut cnt = self.mutex.lock().unwrap();
        while *cnt >= self.max {
            cnt = self.cond.wait(cnt).unwrap();
        }
        *cnt += 1;
    }
    pub fn post(&self) {
        // カウントをデクリメント
        let mut cnt = self.mutex.lock().unwrap();
        *cnt -= 1;
        if *cnt <= self.max {
            self.cond.notify_one();
        }
    }
}

impl Clone for Semaphore {
    fn clone(&self) -> Self {
        Semaphore {
            mutex: Mutex::new(*self.mutex.lock().unwrap()),
            cond: Condvar::new(),
            max: self.max,
        }
    }
}

以下のような実装になりました。

pub fn execute<F, T>(func: F, data: Vec<T>, chunk_num: usize) -> Result<(), Box<dyn Error>>
    where
        F: Fn(Vec<T>) -> i64 + Send + Copy + 'static,
        T: Send + Clone + 'static,
{
    let results = Arc::new(Mutex::new(Vec::new()));
    let semaphore = Arc::new(Semaphore::new(max_threads as isize));

    std::thread::scope(|s| {
        for chunk in data.chunks(chunk_num).map(|c| c.to_vec()) {
            let func = func;
            let results_clone = Arc::clone(&results);
	    let semaphore_clone = Arc::clone(&semaphore);

            s.spawn(move || {
                semaphore_clone.wait();
                let result = func(chunk);
                semaphore_clone.post();

                let mut results_lock = results_clone.lock().unwrap();
                results_lock.push(result);
            });
        }
    });

    let start = Instant::now();

    // スレッドの結果を出力
    let results_lock = results.lock().unwrap();
    for result in &*results_lock {
        println!("Result: {}", result);
    }

    let duration = start.elapsed();
    let execution_time = duration.as_millis();
    println!("output execution time: {}ms", execution_time);

    Ok(())
}

同時実行数4で汎用処理を実行します。するとセマフォ実装前後で優位な差は見受けられませんでした。

おそらく処理が軽すぎて、同時実行数の制御が意味がない状態になっていると思われます。今回も処理が重くなるようにループ処理をはさみ、スリープも入れてみます。

let process_function = |chunk: Vec<i64>| -> i64 {
    let mut count = 0;
    for _ in 0..20_000 {
        count += 1;
    }
    thread::sleep(Duration::from_nanos(500));
    chunk.iter().sum::<i64>() * 100 / 10 + 1
};

結果はセマフォ実装後の方がCPU使用率が10%弱押さえられ、時間は26秒ほど遅くなっています。今回の結果で並列で行っている処理が軽すぎる場合はセマフォによる同時実行制御を導入しても効果が薄いことが分かりました。

# セマフォ実装前
output execution time: 6974ms
thread_control execution time: 80694ms
All Execution time: 82930ms

23:20:21     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
23:20:22     all   45.87    0.00   22.60    0.00    0.00    0.14    1.22    0.00    0.00   30.18
23:20:22       0   40.91    0.00   23.86    0.00    0.00    0.00    1.14    0.00    0.00   34.09
23:20:22       1   43.43    0.00   28.28    0.00    0.00    0.00    1.01    0.00    0.00   27.27
23:20:22       2   43.48    0.00   25.00    0.00    0.00    0.00    1.09    0.00    0.00   30.43
23:20:22       3   47.78    0.00   21.11    0.00    0.00    0.00    2.22    0.00    0.00   28.89
23:20:22       4   44.58    0.00   21.69    0.00    0.00    0.00    1.20    0.00    0.00   32.53
23:20:22       5   53.92    0.00   15.69    0.00    0.00    0.98    0.98    0.00    0.00   28.43
23:20:22       6   41.49    0.00   27.66    0.00    0.00    0.00    1.06    0.00    0.00   29.79
23:20:22       7   50.55    0.00   17.58    0.00    0.00    0.00    1.10    0.00    0.00   30.77

# セマフォ実装後
output execution time: 7878ms
thread_control execution time: 106624ms
All Execution time: 108902ms

23:21:17     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
23:21:18     all   31.71    0.00   21.54    0.00    0.00    0.00    1.90    0.00    0.00   44.85
23:21:18       0   31.52    0.00   20.65    0.00    0.00    0.00    1.09    0.00    0.00   46.74
23:21:18       1   36.08    0.00   21.65    0.00    0.00    0.00    2.06    0.00    0.00   40.21
23:21:18       2   34.04    0.00   19.15    0.00    0.00    0.00    2.13    0.00    0.00   44.68
23:21:18       3   25.00    0.00   26.19    0.00    0.00    0.00    2.38    0.00    0.00   46.43
23:21:18       4   26.14    0.00   23.86    0.00    0.00    0.00    2.27    0.00    0.00   47.73
23:21:18       5   30.68    0.00   19.32    0.00    0.00    0.00    1.14    0.00    0.00   48.86
23:21:18       6   38.30    0.00   13.83    0.00    0.00    0.00    2.13    0.00    0.00   45.74
23:21:18       7   30.69    0.00   27.72    0.00    0.00    0.00    1.98    0.00    0.00   39.60

重くした処理を以下のように元に戻し、セマフォ処理も削除して再度別方向でシステムパフォーマンスを計測してみます。

let process_function = |chunk: Vec<i64>| -> i64 {
    // let mut count = 0;
    // for _ in 0..20_000 {
    //     count += 1;
    // }
    // thread::sleep(Duration::from_nanos(500));
    chunk.iter().sum::<i64>() * 100 / 10 + 1
};

システムパフォーマンス計測

perf

perf は、Linuxカーネルのパフォーマンスカウンター機能を利用する強力なパフォーマンス分析ツールです。

まずはperf statで確認してみます。
-eオプションでの指定イベントは以下になります。

  • CPUサイクル数: -e cycles はCPUサイクルの使用を測定します。
  • キャッシュミス: -e cache-misses はキャッシュミスの数を示します。
  • コンテキストスイッチ: -e context-switches はコンテキストスイッチの数を示します。これは特にスレッドの切り替えに関連します。
  • CPUマイグレーション: -e cpu-migrations はスレッドが異なるCPUコア間で移動する頻度を示します。
  • 命令数: -e instructions は実行された命令の総数を示します。
perf stat -e cycles,instructions,cache-references,cache-misses,context-switches,cpu-migrations cargo run thread_control

...

output execution time: 5766ms
thread_control execution time: 40753ms
All Execution time: 42776ms

 Performance counter stats for 'cargo run thread_control':

   <not supported>      cycles:u                                                    
   <not supported>      instructions:u                                              
   <not supported>      cache-references:u                                          
   <not supported>      cache-misses:u                                              
                 0      context-switches:u                                                 
                 0      cpu-migrations:u                                                   

      42.863621300 seconds time elapsed

      23.784286000 seconds user
     122.040848000 seconds sys

not supportedと出力されました。Amazon Linuxが対応していないのか、または、仮想環境だから対応していないのか。context-switchesが0というのも信頼できそうにありません。

今度はperf reportしてみます。

perf record -g cargo run thread_control
perf report

...

Samples: 26K of event 'cpu-clock:uhpppH', Event count (approx.): 6745500000
  Children      Self  Command        Shared Object         Symbol
+   17.14%    17.14%  rust-learning  libc.so.6             [.] pthread_create@GLIBC_2.2.5                                                                  ◆
+   10.74%    10.74%  rust-learning  rust-learning         [.] <core::ops::range::RangeInclusive<T> as core::iter::range::RangeInclusiveIteratorImpl>::spec▒
+    7.11%     7.11%  rust-learning  libc.so.6             [.] clone3                                                                                      ▒
+    6.09%     6.09%  rust-learning  libc.so.6             [.] _int_malloc                                                                                 ▒
+    5.77%     5.77%  rust-learning  libc.so.6             [.] __GI___libc_write                                                                           ▒
+    4.60%     4.60%  rust-learning  rust-learning         [.] alloc::vec::Vec<T,A>::extend_trusted::{{closure}}                                           ▒
+    3.94%     3.94%  rust-learning  rust-learning         [.] core::iter::traits::iterator::Iterator::for_each::call::{{closure}}                         ▒
+    3.84%     3.84%  rust-learning  libc.so.6             [.] malloc                                                                                      ▒
+    3.59%     3.59%  rust-learning  libc.so.6             [.] __memmove_avx_unaligned_erms                                                                ▒
+    3.59%     3.59%  rust-learning  rust-learning         [.] core::ops::try_trait::NeverShortCircuit<T>::wrap_mut_2::{{closure}}                         ▒
+    2.47%     2.47%  rust-learning  rust-learning         [.] std::thread::Builder::spawn_unchecked_                                                      ▒
+    1.94%     0.00%  rust-learning  [unknown]             [.] 0x0000000000000001                                                                          ▒
+    1.90%     1.90%  rust-learning  libc.so.6             [.] __GI___pthread_mutex_unlock_usercnt                                                         ▒
+    1.78%     1.78%  rust-learning  rust-learning         [.] rust_learning::module::thread_control::execute::{{closure}}1.64%     1.64%  rust-learning  libc.so.6             [.] pthread_mutex_lock@@GLIBC_2.2.5                                                             ▒
     1.44%     1.44%  rust-learning  ld-linux-x86-64.so.2  [.] _dl_allocate_tls_init                                                                       ▒
+    1.13%     0.00%  rust-learning  [unknown]             [.] 0x0000000000002f71                                                                          ▒
+    1.11%     1.11%  rust-learning  rust-learning         [.] <i64 as core::iter::range::Step>::forward_unchecked                                         ▒
+    1.09%     1.09%  rust-learning  rust-learning         [.] alloc::alloc::Global::alloc_impl                                                            ▒
+    0.97%     0.97%  rust-learning  rust-learning         [.] std::thread::scoped::<impl std::thread::Builder>::spawn_scoped                              ▒
+    0.89%     0.00%  rust-learning  [unknown]             [.] 0x000055f31b5c8700                                                                          ▒
+    0.89%     0.00%  rust-learning  [unknown]             [.] 0x000055f31b5ca130                                                                          ▒
+

こちらは優位な情報が得られました。CPU使用上位からの情報は以下になります。

  • pthread_create@GLIBC_2.2.5 (17.14%): 新しいスレッドの作成に関連する処理。
  • <core::ops::range::RangeInclusive ..>(10.74%): Rustの標準ライブラリに属するイテレーター関連の関数またはメソッド
  • clone3 (7.11%): スレッドやプロセスの複製に関連するシステムコール。
    を使用して新しいスレッドを作成していることを示しています。
  • _int_malloc, malloc (合計約9.93%): 動的メモリ割り当てに関連。
  • __GI___libc_write (5.77%): データの書き込みに関連するライブラリ関数。

上記の出力によりスレッドの作成や複製などにコストが割かれている為、スレッドプールを使用すると性能が向上する可能性があります。スレッドプールを導入してみます。

スレッドプールの導入

Rustにはrayonやthreadpoolなど、スレッドプールをサポートするクレートがありますが、今回は独自実装をしてみました。

汎用処理へのスレッドプールの導入と実行

スレッドプールを実装します。

use std::thread;
use std::error::Error;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};

type Job = Box<dyn FnOnce() -> Result<(), Box<dyn Error>> + Send + 'static>;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Arc<Mutex<VecDeque<Option<Job>>>>,
    ready: Arc<AtomicBool>,
}

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        let sender = Arc::new(Mutex::new(VecDeque::new()));
        let ready = Arc::new(AtomicBool::new(false));
        let mut workers = Vec::with_capacity(size);

        for _ in 0..size {
            workers.push(Worker::new(Arc::clone(&sender), Arc::clone(&ready)));
        }

        ThreadPool { workers, sender, ready }
    }

    pub fn execute<F>(&self, job: F)
        where
            F: FnOnce() -> Result<(), Box<dyn Error>> + Send + 'static,
    {
        let job = Box::new(job) as Box<dyn FnOnce() -> Result<(), Box<dyn Error>> + Send>;
        let mut queue = self.sender.lock().unwrap();
        queue.push_back(Some(job));
    }

    pub fn shutdown(&mut self) {
        self.ready.store(true, Ordering::SeqCst);

        for worker in &mut self.workers {
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(receiver: Arc<Mutex<VecDeque<Option<Job>>>>, ready: Arc<AtomicBool>) -> Worker {
        let thread = thread::spawn(move || {
            while !ready.load(Ordering::SeqCst) || !receiver.lock().unwrap().is_empty() {
                let job_option = receiver.lock().unwrap().pop_front();

                match job_option {
                    Some(Some(job)) => {
                        if let Err(e) = job() {
                            println!("Error in job: {:?}", e);
                        }
                    }
                    Some(None) => {
                        if ready.load(Ordering::SeqCst) {
                            break;
                        }
                    }
                    None => {
                        // キューが空で、まだ準備完了ではない場合、少し待ってから再試行
                        thread::sleep(Duration::from_millis(10));
                    }
                }
            }
        });

        Worker { thread: Some(thread) }
    }
}

スレッドプールを汎用処理へ実装します。

pub fn execute<F, T>(func: F, data: Vec<T>, chunk_num: usize, max_threads: usize) -> Result<(), Box<dyn Error>>
    where
        F: Fn(Vec<T>) -> i64 + Send + Copy + 'static,
        T: Send + Clone + 'static,
{
    let results = Arc::new(Mutex::new(Vec::new()));
    let mut pool = ThreadPool::new(max_threads);

    for chunk in data.chunks(chunk_num).map(|c| c.to_vec()) {
        let func = func;
        let results_clone = Arc::clone(&results);

        pool.execute(move || {
            let result = func(chunk);
            let mut results_lock = results_clone.lock().unwrap();
            results_lock.push(result);
            Ok(())
        });
    }

    pool.shutdown();

    // スレッドの結果を出力
    let results_lock = results.lock().unwrap();
    for result in &*results_lock {
        println!("Result: {}", result);
    }

    Ok(())
}

スレッドプール実装後では7倍近くの速度改善がされており、CPU使用率も40%前後と負荷も改善しています。汎用の計算処理が早く負荷もそれほどかからない分、スレッドの生成が一番負荷をかけていたようでした。

# スレッドプール実装前
Average:     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
Average:     all    4.85    0.00   44.29    0.00    0.00    0.00    2.44    0.00    0.00   48.42
Average:       0    4.94    0.00   47.69    0.00    0.00    0.00    2.16    0.00    0.00   45.22
Average:       1    4.12    0.00   43.90    0.00    0.00    0.00    2.44    0.00    0.00   49.54
Average:       2    5.10    0.00   44.98    0.00    0.00    0.00    2.47    0.00    0.00   47.45
Average:       3    3.87    0.00   42.26    0.00    0.00    0.00    2.63    0.00    0.00   51.24
Average:       4    5.09    0.00   44.44    0.00    0.00    0.00    2.47    0.00    0.00   47.99
Average:       5    5.06    0.00   43.67    0.00    0.00    0.00    2.37    0.00    0.00   48.89
Average:       6    5.30    0.00   43.77    0.00    0.00    0.00    2.49    0.00    0.00   48.44
Average:       7    5.35    0.00   43.58    0.00    0.00    0.00    2.45    0.00    0.00   48.62

thread_control execution time: 58766ms
All Execution time: 61037ms

# スレッドプール実装後
03:27:45     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
03:27:46     all   26.59    0.00   11.24    0.00    0.00    0.26    0.40    0.00    0.00   61.51
03:27:46       0   23.16    0.00    5.26    0.00    0.00    0.00    1.05    0.00    0.00   70.53
03:27:46       1   43.01    0.00   18.28    0.00    0.00    0.00    0.00    0.00    0.00   38.71
03:27:46       2   26.04    0.00    6.25    0.00    0.00    0.00    1.04    0.00    0.00   66.67
03:27:46       3   25.81    0.00   15.05    0.00    0.00    0.00    1.08    0.00    0.00   58.06
03:27:46       4   31.18    0.00   10.75    0.00    0.00    0.00    0.00    0.00    0.00   58.06
03:27:46       5   20.41    0.00   14.29    0.00    0.00    2.04    0.00    0.00    0.00   63.27
03:27:46       6   24.21    0.00    9.47    0.00    0.00    0.00    0.00    0.00    0.00   66.32
03:27:46       7   19.35    0.00   10.75    0.00    0.00    0.00    0.00    0.00    0.00   69.89

thread_control execution time: 8712ms
All Execution time: 10951ms

スレッドプールの同時実行ワーカー数を変化させて、実行時間を比べてみました。負荷はどれも65%くらいで実行時間も大きな差はありません。

ワーカー数 実行時間
2 9000ms
4 8789ms
6 8903ms
8 9016ms
12 9035m

mpstat -P ALL 1で各CPUコアの使用率を観測していると、結構偏りがあります。こちらの偏りを解消すればもっと性能改善されるかもしれません。

ここでまたシステムパフォーマンスの計測をしてみます。

システムパフォーマンス計測

perf

スレッドプールの実装により、スレッド作成のコストはほぼなくなり、コストとなっているのはiterの繰り返しコストであることがわかります。

perf record -g cargo run thread_control
perf report

...

# スレッドプール実装前
Samples: 26K of event 'cpu-clock:uhpppH', Event count (approx.): 6745500000
  Children      Self  Command        Shared Object         Symbol
+   17.14%    17.14%  rust-learning  libc.so.6             [.] pthread_create@GLIBC_2.2.5                                                                  ◆
+   10.74%    10.74%  rust-learning  rust-learning         [.] <core::ops::range::RangeInclusive<T> as core::iter::range::RangeInclusiveIteratorImpl>::spec▒
+    7.11%     7.11%  rust-learning  libc.so.6             [.] clone3                                                                                      ▒
+    6.09%     6.09%  rust-learning  libc.so.6             [.] _int_malloc                                                                                 ▒
+    5.77%     5.77%  rust-learning  libc.so.6             [.] __GI___libc_write                                                                           ▒
+    4.60%     4.60%  rust-learning  rust-learning         [.] alloc::vec::Vec<T,A>::extend_trusted::{{closure}}                                           ▒
+    3.94%     3.94%  rust-learning  rust-learning         [.] core::iter::traits::iterator::Iterator::for_each::call::{{closure}}                         ▒
+    3.84%     3.84%  rust-learning  libc.so.6             [.] malloc                                                                                      ▒
+    3.59%     3.59%  rust-learning  libc.so.6             [.] __memmove_avx_unaligned_erms                                                                ▒
+    3.59%     3.59%  rust-learning  rust-learning         [.] core::ops::try_trait::NeverShortCircuit<T>::wrap_mut_2::{{closure}}                         ▒
+    2.47%     2.47%  rust-learning  rust-learning         [.] std::thread::Builder::spawn_unchecked_                                                      ▒
+    1.94%     0.00%  rust-learning  [unknown]             [.] 0x0000000000000001                                                                          ▒

# スレッドプール実装後
+   16.44%    16.44%  rust-learning  rust-learning         [.] <core::ops::range::RangeInclusive<T> as core::iter::range::RangeInclusiveIteratorImpl>::spec_
+   10.84%    10.84%  rust-learning  rust-learning         [.] <core::slice::iter::Iter<T> as core::iter::traits::iterator::Iterator>::fold
+    7.47%     7.47%  rust-learning  rust-learning         [.] alloc::vec::Vec<T,A>::extend_trusted::{{closure}}
+    7.14%     0.00%  rust-learning  [unknown]             [.] 0x0000000000000001
+    6.71%     6.71%  rust-learning  rust-learning         [.] core::iter::traits::iterator::Iterator::for_each::call::{{closure}}
+    5.95%     5.95%  rust-learning  rust-learning         [.] core::ops::try_trait::NeverShortCircuit<T>::wrap_mut_2::{{closure}}
+    5.33%     5.33%  rust-learning  libc.so.6             [.] _int_free
+    5.29%     5.29%  rust-learning  rust-learning         [.] <i64 as core::iter::traits::accum::Sum<&i64>>::sum::{{closure}}
+    3.63%     3.63%  rust-learning  libc.so.6             [.] _int_malloc
+    3.47%     3.47%  rust-learning  rust-learning         [.] core::sync::atomic::atomic_compare_exchange
+    3.10%     3.10%  rust-learning  libc.so.6             [.] __lll_lock_wake_private
+    2.62%     2.62%  rust-learning  libc.so.6             [.] __memmove_avx_unaligned_erms
+    2.44%     2.44%  rust-learning  rust-learning         [.] rust_learning::module::thread_control::execute
+    2.20%     2.20%  rust-learning  libc.so.6             [.] __lll_lock_wait_private
+    1.95%     1.95%  rust-learning  rust-learning         [.] <i64 as core::iter::range::Step>::forward_unchecked

スレッドプールでCPUコストを削減して速度改善できました。

チャンネルの導入

以下のようなresultsをロックしてのリストの追加は、並列処理のチャンネルを介しても実現できますので、チェンネルを導入してみます。

let results = Arc::new(Mutex::new(Vec::new()));

        ...
	
        pool.execute(move || {
            let result = func(chunk);
            let mut results_lock = results_clone.lock().unwrap();
            results_lock.push(result);
            Ok(())
        });

汎用処理へのチェンネルの導入と実行

以下のように実装します。

let mut pool = ThreadPool::new(max_threads);
let (sender, receiver) = mpsc::channel();

for chunk in data.chunks(chunk_num).map(|c| c.to_vec()) {
    let func = func;
    let sender = sender.clone();
    pool.execute(move || {
        let result = func(chunk);
        sender.send(result).unwrap(); // 結果を送信
        Ok(())
    });
}

pool.shutdown();

drop(sender); // すべての送信者をドロップ
let mut results = Vec::new();

// 結果を受信し、格納
for received in receiver {
    results.push(received);
}

実行結果は、チェンネル導入前が8059mで、導入後が8587msと導入した方がわずかに遅くなっています。よって今回のケースでは、チャンネルは使わない方がよさそうです。

最後に

今回最終的に10億件の集計、計算処理で10秒前後(データ生成などの処理は含まない)の実行時間という結果になりました。

パフォーマンス計測は詳解 システム・パフォーマンスの書籍などを参考に行いました。その詳解 システム・パフォーマンスに興味深い記述がありました。

パフォーマンスは、「多く知れば知るほど、知らないことが増える」分野である。

Rustの並列処理についても言語知識だけがあるだけで良いわけではありません。OS、アセンブリ、CPUなどの知識も必要になってきます。

Rustの並列処理に手を出し、パファーマンスエンジニアリングを学んでいくと、どんどん大沼にハマっていく感覚があります。ただ私は今まで主に、Webエンジニアとしてキャリアを積んできたので、知らないことが知れるという楽しさもあります。良ければみなさんもこの大沼に共にハマりませんか?

Discussion