🚥

Rustで並列処理数(スレッド数)を一定にする方法

2022/11/04に公開

https://zenn.dev/oreno_dinner/articles/f9c5ee5a25b4e3
上記の記事の続き

画像リサイズクレートを作ったので、これを使用したフォルダ内一括処理プログラム書きました。
フォルダ内のファイル一括処理も並列処理にしたかったので、何も考えずフォルダ内ファイル数分をthread::spawn()したらCPU/メモリ共に使用率100%になってしまいました。

というわけで、現在実行中のスレッド数を監視してthread::spawn()のタイミングを調整するプログラムに変更したので下記に書きます。

use std::clone;
use std::env;
use std::fmt::Error;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::vec;
use walkdir::{DirEntry, WalkDir};

use zipimgzip::*;

fn main() -> Result<(), io::Error> {
    let test_pixels: [u32; 2] = [750, 1334];
    let test_quality: u8 = 90;
    let mut THREAD_NUM = -12;
  
    let args: Vec<_> = env::args().collect();

    if args.len() < 2 {
        println!("Usage: {} <filename>", args[0]);
        return Err(io::Error::new(io::ErrorKind::NotFound, "Folder NotFound"));
    }

    let fname = std::path::Path::new(&*args[1]);
    let oname = std::path::Path::new(&*args[2]);
    let walkdir = WalkDir::new(fname);
   
    let mut _i = 0;
    
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for entry in walkdir {
        let entry = entry?;
        _i += 1;

        while THREAD_NUM > *counter.lock().unwrap() {
            thread::sleep(Duration::from_millis(1));
        }
        let counter = Arc::clone(&counter);
        let a = oname.clone().to_path_buf();
        THREAD_NUM += 1;
        let handle = thread::spawn(move || {
            let res;

            match do_multithread(entry, a, test_pixels, test_quality) {
                Ok(r) => {
                    res = r;
                }
                Err(e) => {
                    res = String::from("err");
                }
            }
            let mut nums = counter.lock().unwrap();
            *nums += 1;
            return res;
        });

        handles.push(handle);
    }
    println!("");
    for h in handles {
        let ans = h.join();
        match ans {
            Ok(r) => {}
            Err(e) => {
                println!("ansErr{:?}", e);
            }
        }
    }
    println!("Result: {}", *counter.lock().unwrap());
    println!("FIN");
    Ok(())
}

fn do_multithread(
    entry: DirEntry,
    oname: PathBuf,
    test_pixels: [u32; 2],
    test_quality: u8,
) -> Result<String, io::Error> {
    let a = oname.clone();
    let return_path;
    match entry.path().extension() {
        None => return_path = String::from("None"),
        Some(r) => match r {
            r if r == "zip" => {
                let outpath = a
                    .join(entry.path().file_name().unwrap())
                    .to_str()
                    .unwrap()
                    .to_string();
                let _ = unzip_to_memory(
                    entry.path().to_str().unwrap().to_string(),
                    PrintMode::Unprint,
                )?
                .convert_size_multithread(test_pixels[0], test_pixels[1], ConvMode::Height)?
                .create_zip_multithread(
                    outpath.clone(),
                    SaveFormat::Ref,
                    test_quality,
                )?;
                return_path = outpath;
                println!("conv:{:?}", return_path)
            }
            _ => return_path = String::from("None"),
        },
    }
    return Ok(return_path);
}

ポイント下記コードです。handle(型JoinHandle)では.join()はできても、現在の状態を確認できないのでスレッド内の関数の終わりに変数'counter'を+=1することで whileでスレッド完了数を監視することができるようになります。
THREAD_NUMを-12と初期化してるのでfor entry in walkdir内のループ数との差は12となります。スレッド完了数とループ数の差が12以上になるとwhileループにハマるようになっています。

 let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for entry in walkdir {
        let entry = entry?;
        _i += 1;

        while THREAD_NUM > *counter.lock().unwrap() {
            thread::sleep(Duration::from_millis(1));
        }
        let counter = Arc::clone(&counter);
        let a = oname.clone().to_path_buf();
        THREAD_NUM += 1;
        let handle = thread::spawn(move || {
            let res;

            match do_multithread(entry, a, test_pixels, test_quality) {
                Ok(r) => {
                    res = r;
                }
                Err(e) => {
                    res = String::from("err");
                }
            }
            let mut nums = counter.lock().unwrap();
            *nums += 1;
            return res;
        });
        handles.push(handle);
    }

やはりというか、並列処理は奥が深い。汎用的な並列処理プログラムを書くのは結構大変そう。
上記プログラムは試行錯誤して同時12スレッドに調整したのだけれど、PCスペック変わると数値変わるだろうし、自動調整するにしてもメモリ・CPU負荷が上がったらもう時すでにお寿司って感じもするし。
うーむ、今度はメモリ・CPU監視してスレッド数自動調整にチャレンジてみるか検討してみよう。

Discussion