⏱️
Rustでfor in内処理を簡単に並列処理
RustでZip内画像リサイズのクレートを作っていたところ
思ったより簡単に並列処理を作れたので、記事にしようと思います。
成果物はこちら
最初に作ったシングルスレッドの関数が下記
シングルスレッド
pub struct MemoryImages {
pub input_memory_images: Vec<DynamicImage>,
pub out_names: Vec<PathBuf>,
pub print_mode: PrintMode,
}
impl MemoryImages {
/// MemoryImage is resized to the specified size.
/// Resizes a MemoryImage to the specified size; the aspect ratio is maintained by conv_mode.
pub fn convert_size(
&self,
as_width: u32,
as_height: u32,
conv_mode: ConvMode,
) -> Result<MemoryImages, io::Error> {
let mut conv_images: Vec<DynamicImage> = Vec::new();
let print;
let mut conv_width = as_width.clone();
let mut conv_height = as_height.clone();
match self.print_mode {
PrintMode::Print => {
print = true;
}
PrintMode::Unprint => {
print = false;
}
}
if print {
println!("");
}
let mut im_i = 0;
for im in &self.input_memory_images {
let debug_s_time = std::time::Instant::now();
match conv_mode {
ConvMode::Height => {
let w_p: f32 = as_height as f32 / im.height() as f32;
conv_width = ((im.width() as f32) * &w_p) as u32;
}
ConvMode::Width => {
let h_p: f32 = as_width as f32 / im.width() as f32;
conv_height = (im.height() as f32 * &h_p) as u32;
}
ConvMode::Both => {
let w_p: f32 = as_height as f32 / im.height() as f32;
conv_width = ((im.width() as f32) * &w_p) as u32;
if conv_width > as_width {
let h_p: f32 = as_width as f32 / im.width() as f32;
conv_height = (im.height() as f32 * &h_p) as u32;
}
}
}
let conv_im = im.resize(conv_width, conv_height, FilterType::CatmullRom);
conv_images.push(conv_im);
let debug_e_time = std::time::Instant::now();
if print {
print!(
"\rimage {}/{} conv to [{},{}] :{:?}",
im_i,
(&self.input_memory_images.len() - 1),
conv_width,
conv_height,
debug_e_time.duration_since(debug_s_time)
);
}
stdout().flush()?;
im_i += 1;
}
if print {
println!("")
}
return Ok(MemoryImages {
input_memory_images: conv_images,
out_names: self.out_names.clone(),
print_mode: self.print_mode.clone(),
});
}
for in
内の処理を並列処理に書き直したいと思います。
for im in &self.input_memory_images{}
の画像変換処理を下記の関数へ書き出します。
マルチスレッド用関数
fn do_convert_image_multithread(
im: DynamicImage,
as_width: u32,
as_height: u32,
out_path: PathBuf,
conv_num: i32,
print_mode: PrintMode,
) -> (DynamicImage, PathBuf) {
let mut conv_width = as_width.clone();
let mut conv_height = as_height.clone();
let print;
match print_mode {
PrintMode::Print => {
print = true;
}
PrintMode::Unprint => {
print = false;
}
}
match conv_num {
1 => {
let w_p: f32 = as_height as f32 / im.height() as f32;
conv_width = ((im.width() as f32) * &w_p) as u32;
}
2 => {
let h_p: f32 = as_width as f32 / im.width() as f32;
conv_height = (im.height() as f32 * &h_p) as u32;
}
3 => {
let w_p: f32 = as_height as f32 / im.height() as f32;
conv_width = ((im.width() as f32) * &w_p) as u32;
if conv_width > as_width {
let h_p: f32 = as_width as f32 / im.width() as f32;
conv_height = (im.height() as f32 * &h_p) as u32;
}
}
_ => {
let w_p: f32 = as_height as f32 / im.height() as f32;
conv_width = ((im.width() as f32) * &w_p) as u32;
}
}
let conv_im = im.resize(conv_width, conv_height, FilterType::CatmullRom);
if print {
print!("\rimage conv{:?}", out_path);
}
return (conv_im, out_path);
}
微妙な違いはありますが、殆ど同じ内容になっていると思います。
そしてfor im in &self.input_memory_images{}
内でスレッド作成と値を渡します。
マルチスレッド
pub fn convert_size_multithread(
&self,
as_width: u32,
as_height: u32,
conv_mode: ConvMode,
) -> Result<MemoryImages, io::Error> {
let mut conv_images: Vec<DynamicImage> = Vec::new();
let mut conv_outpath = vec![];
let print;
let mut handles = vec![];
match self.print_mode {
PrintMode::Print => {
print = true;
}
PrintMode::Unprint => {
print = false;
}
}
if print {
println!("");
}
let mut im_i = 0;
let conv_num;
match conv_mode {
ConvMode::Height => {
conv_num = 1;
}
ConvMode::Width => {
conv_num = 2;
}
ConvMode::Both => {
conv_num = 3;
}
}
for im_o in &self.input_memory_images {
let im = im_o.clone();
let out_path = self.out_names[im_i].clone();
let print_mode = self.print_mode.clone();
let handle = thread::spawn(move || {
do_convert_image_multithread(
im, as_width, as_height, out_path, conv_num, print_mode,
)
});
handles.push(handle);
im_i += 1;
}
for h in handles {
let (im_conv, _outpath) = h.join().unwrap();
conv_images.push(im_conv);
conv_outpath.push(_outpath);
}
if print {
println!("")
}
return Ok(MemoryImages {
input_memory_images: conv_images,
out_names: conv_outpath,
print_mode: self.print_mode.clone(),
});
}
並列処理に特に関係するのが下記箇所
スレッド生成
for im_o in &self.input_memory_images {
let im = im_o.clone();
let out_path = self.out_names[im_i].clone();
let print_mode = self.print_mode.clone();
let handle = thread::spawn(move || {
do_convert_image_multithread(
im, as_width, as_height, out_path, conv_num, print_mode,
)
});
handles.push(handle);
im_i += 1;
}
for h in handles {
let (im_conv, _outpath) = h.join().unwrap();
conv_images.push(im_conv);
conv_outpath.push(_outpath);
}
thread::spawn()
でスレッド生成と値を渡すのですが、並列処理でメモリ競合を避けるために
self
以下の変数をclone()
して渡しています。
(なぜself以下は'clone()'しなければいけないのかは勉強中です。誰か教えてください・・・)
各スレッドはhandleに格納され'join()'にて完了まで待機、結果をunwrap()で展開しています。
これだけの変更だけでfor in
内の処理を並列化できました。
Rustの素晴らしい点はコンパイラがメモリエラーが出る可能性を細かくチェックするので、外部クレートをマルチスレッド処理に気軽に組み込める点です。
並列処理の初心者でもメモリの状態がコードで分かりやすく見える点もGOOD。
というわけで、for in
処理を並列処理にしてみました。
並列処理するならRustと言えるくらいに好感触だったので、もっとRustで色々作ってみようかなと思いました。
良かったらZip内画像リサイズクレート"zipimgzip"を使ってみてね。
Discussion
std::thread::spawn
のwhere句の部分を見れば分かりますが、引数には'static
ライフタイム制約が付いています。一方でself
のライフタイムは省略されているので暗黙のライフタイムですが、明示的に表記するとすればfn convert_size_multithread<'a>(&'a self, ...)
のような形になります。この'a
は'static
制約を満たしていないので、selfの中身を参照するクロージャも同じライフタイムになり、spawn
の引数に渡すことはできなくなります。実際のスレッドの寿命は、関数内でjoin
を行っている以上'a
より短いのですが、thread::spawn
を使う限りは'static
制約を満たす必要があります。'static
よりも短いライフタイムでスレッドを動かしたい場合、std::thread::scope
を使うことができます。scope(|s| { /* ... */ })
の中でs.spawn()
で起動したスレッドの寿命はscope
の呼び出しを超えることがありません。そのため、scope
の呼び出しの外側の環境への参照はスレッドの寿命より長いことが保証されます。ありがとうございます。'staticの制約の為だったんですね。
scopeの方が自然にみえるので、scopeを使って書き直してみようと思います。
(scopeの存在を知りませんでした。。。重ねてありがとうございます)
scopeではhandleをVecに溜めて一気にjoinさせる方法が取れなさそうだったのでscopeへの書き換えは中止にしました。
そんなことはないと思いますが……。
scopeの中で起動したスレッドはscopeの終わりで自動的にjoinされますが、それより前に手動でjoinすることができます。
ありがとうございます。
スコープの範囲を広げれば良かったのですね。やってみます。
下記コードで上手くいきました。ありがとうございます。
rayonを使うと簡単にデータ並列が出来ます。
ありがとうございます!
完結・便利に書けそうですね。
(並列処理がこんなに楽に書けるとは驚きです・・・)