⏱️

Rustでfor in内処理を簡単に並列処理

2022/10/29に公開8

RustでZip内画像リサイズのクレートを作っていたところ
思ったより簡単に並列処理を作れたので、記事にしようと思います。
成果物はこちら
https://github.com/orenodinner/zipimgzip

最初に作ったシングルスレッドの関数が下記

シングルスレッド
pub struct MemoryImages {
    pub input_memory_images: Vec<DynamicImage>,
    pub out_names: Vec<PathBuf>,
    pub print_mode: PrintMode,
}

impl MemoryImages {
    /// MemoryImage is resized to the specified size.
    /// Resizes a MemoryImage to the specified size; the aspect ratio is maintained by conv_mode.
    pub fn convert_size(
        &self,
        as_width: u32,
        as_height: u32,
        conv_mode: ConvMode,
    ) -> Result<MemoryImages, io::Error> {
        let mut conv_images: Vec<DynamicImage> = Vec::new();
        let print;

        let mut conv_width = as_width.clone();
        let mut conv_height = as_height.clone();
        match self.print_mode {
            PrintMode::Print => {
                print = true;
            }
            PrintMode::Unprint => {
                print = false;
            }
        }
        if print {
            println!("");
        }

        let mut im_i = 0;
        for im in &self.input_memory_images {
            let debug_s_time = std::time::Instant::now();
            match conv_mode {
                ConvMode::Height => {
                    let w_p: f32 = as_height as f32 / im.height() as f32;
                    conv_width = ((im.width() as f32) * &w_p) as u32;
                }
                ConvMode::Width => {
                    let h_p: f32 = as_width as f32 / im.width() as f32;
                    conv_height = (im.height() as f32 * &h_p) as u32;
                }
                ConvMode::Both => {
                    let w_p: f32 = as_height as f32 / im.height() as f32;
                    conv_width = ((im.width() as f32) * &w_p) as u32;
                    if conv_width > as_width {
                        let h_p: f32 = as_width as f32 / im.width() as f32;
                        conv_height = (im.height() as f32 * &h_p) as u32;
                    }
                }
            }
            let conv_im = im.resize(conv_width, conv_height, FilterType::CatmullRom);
            conv_images.push(conv_im);
            let debug_e_time = std::time::Instant::now();

            if print {
                print!(
                    "\rimage {}/{} conv to [{},{}] :{:?}",
                    im_i,
                    (&self.input_memory_images.len() - 1),
                    conv_width,
                    conv_height,
                    debug_e_time.duration_since(debug_s_time)
                );
            }
            stdout().flush()?;
            im_i += 1;
        }
        if print {
            println!("")
        }
        return Ok(MemoryImages {
            input_memory_images: conv_images,
            out_names: self.out_names.clone(),
            print_mode: self.print_mode.clone(),
        });
    }

for in内の処理を並列処理に書き直したいと思います。
for im in &self.input_memory_images{}の画像変換処理を下記の関数へ書き出します。

マルチスレッド用関数
fn do_convert_image_multithread(
  im: DynamicImage,
  as_width: u32,
  as_height: u32,
  out_path: PathBuf,
  conv_num: i32,
  print_mode: PrintMode,
) -> (DynamicImage, PathBuf) {
  let mut conv_width = as_width.clone();
  let mut conv_height = as_height.clone();
  let print;
  match print_mode {
      PrintMode::Print => {
          print = true;
      }
      PrintMode::Unprint => {
          print = false;
      }
  }

  match conv_num {
      1 => {
          let w_p: f32 = as_height as f32 / im.height() as f32;
          conv_width = ((im.width() as f32) * &w_p) as u32;
      }
      2 => {
          let h_p: f32 = as_width as f32 / im.width() as f32;
          conv_height = (im.height() as f32 * &h_p) as u32;
      }
      3 => {
          let w_p: f32 = as_height as f32 / im.height() as f32;
          conv_width = ((im.width() as f32) * &w_p) as u32;
          if conv_width > as_width {
              let h_p: f32 = as_width as f32 / im.width() as f32;
              conv_height = (im.height() as f32 * &h_p) as u32;
          }
      }
      _ => {
          let w_p: f32 = as_height as f32 / im.height() as f32;
          conv_width = ((im.width() as f32) * &w_p) as u32;
      }
  }

  let conv_im = im.resize(conv_width, conv_height, FilterType::CatmullRom);

  if print {
      print!("\rimage conv{:?}", out_path);
  }

  return (conv_im, out_path);
}

微妙な違いはありますが、殆ど同じ内容になっていると思います。
そしてfor im in &self.input_memory_images{}内でスレッド作成と値を渡します。

マルチスレッド
pub fn convert_size_multithread(
      &self,
      as_width: u32,
      as_height: u32,
      conv_mode: ConvMode,
  ) -> Result<MemoryImages, io::Error> {
      let mut conv_images: Vec<DynamicImage> = Vec::new();
      let mut conv_outpath = vec![];
      let print;
      let mut handles = vec![];

      match self.print_mode {
          PrintMode::Print => {
              print = true;
          }
          PrintMode::Unprint => {
              print = false;
          }
      }
      if print {
          println!("");
      }

      let mut im_i = 0;

      let conv_num;
      match conv_mode {
          ConvMode::Height => {
              conv_num = 1;
          }
          ConvMode::Width => {
              conv_num = 2;
          }
          ConvMode::Both => {
              conv_num = 3;
          }
      }

      for im_o in &self.input_memory_images {
          let im = im_o.clone();
          let out_path = self.out_names[im_i].clone();
          let print_mode = self.print_mode.clone();

          let handle = thread::spawn(move || {
              do_convert_image_multithread(
                  im, as_width, as_height, out_path, conv_num, print_mode,
              )
          });
          handles.push(handle);
          im_i += 1;
      }

      for h in handles {
          let (im_conv, _outpath) = h.join().unwrap();
          conv_images.push(im_conv);
          conv_outpath.push(_outpath);
      }

      if print {
          println!("")
      }
      return Ok(MemoryImages {
          input_memory_images: conv_images,
          out_names: conv_outpath,
          print_mode: self.print_mode.clone(),
      });
  }

並列処理に特に関係するのが下記箇所

スレッド生成
   for im_o in &self.input_memory_images {
            let im = im_o.clone();
            let out_path = self.out_names[im_i].clone();
            let print_mode = self.print_mode.clone();

            let handle = thread::spawn(move || {
                do_convert_image_multithread(
                    im, as_width, as_height, out_path, conv_num, print_mode,
                )
            });
            handles.push(handle);
            im_i += 1;
        }

        for h in handles {
            let (im_conv, _outpath) = h.join().unwrap();
            conv_images.push(im_conv);
            conv_outpath.push(_outpath);
        }

thread::spawn()でスレッド生成と値を渡すのですが、並列処理でメモリ競合を避けるために
self以下の変数をclone()して渡しています。
(なぜself以下は'clone()'しなければいけないのかは勉強中です。誰か教えてください・・・)
各スレッドはhandleに格納され'join()'にて完了まで待機、結果をunwrap()で展開しています。

これだけの変更だけでfor in内の処理を並列化できました。
Rustの素晴らしい点はコンパイラがメモリエラーが出る可能性を細かくチェックするので、外部クレートをマルチスレッド処理に気軽に組み込める点です。
並列処理の初心者でもメモリの状態がコードで分かりやすく見える点もGOOD。

というわけで、for in処理を並列処理にしてみました。
並列処理するならRustと言えるくらいに好感触だったので、もっとRustで色々作ってみようかなと思いました。

https://crates.io/crates/zipimgzip
良かったらZip内画像リサイズクレート"zipimgzip"を使ってみてね。

Discussion

白山風露白山風露

なぜself以下は'clone()'しなければいけないのか

https://doc.rust-lang.org/std/thread/fn.spawn.html
std::thread::spawn のwhere句の部分を見れば分かりますが、引数には 'static ライフタイム制約が付いています。一方で self のライフタイムは省略されているので暗黙のライフタイムですが、明示的に表記するとすれば fn convert_size_multithread<'a>(&'a self, ...) のような形になります。この 'a'static 制約を満たしていないので、selfの中身を参照するクロージャも同じライフタイムになり、 spawn の引数に渡すことはできなくなります。実際のスレッドの寿命は、関数内で join を行っている以上 'a より短いのですが、 thread::spawn を使う限りは 'static 制約を満たす必要があります。

https://doc.rust-lang.org/std/thread/fn.scope.html
'static よりも短いライフタイムでスレッドを動かしたい場合、 std::thread::scope を使うことができます。 scope(|s| { /* ... */ }) の中で s.spawn() で起動したスレッドの寿命はscope の呼び出しを超えることがありません。そのため、scope の呼び出しの外側の環境への参照はスレッドの寿命より長いことが保証されます。

oreno_ dinneroreno_ dinner

ありがとうございます。'staticの制約の為だったんですね。
scopeの方が自然にみえるので、scopeを使って書き直してみようと思います。
(scopeの存在を知りませんでした。。。重ねてありがとうございます)

oreno_ dinneroreno_ dinner

scopeではhandleをVecに溜めて一気にjoinさせる方法が取れなさそうだったのでscopeへの書き換えは中止にしました。

oreno_ dinneroreno_ dinner

ありがとうございます。
スコープの範囲を広げれば良かったのですね。やってみます。

oreno_ dinneroreno_ dinner

下記コードで上手くいきました。ありがとうございます。

   thread::scope(|s| {
            let mut handles = vec![];

            for im in &self.input_memory_images {
                let out_path = &self.out_names[im_i];
                let print_mode = &self.print_mode;

                let handle = s.spawn(move || {
                    do_convert_image_multithread(
                        im, as_width, as_height, out_path, conv_num, print_mode,
                    )
                });
                handles.push(handle);
                im_i += 1;
            }

            for h in handles {
                let (im_conv, _outpath) = h.join().unwrap();
                conv_images.push(im_conv);
                conv_outpath.push(_outpath);
            }

            if print {
                println!("")
            }
        });
        return Ok(MemoryImages {
            input_memory_images: conv_images,
            out_names: conv_outpath,
            print_mode: self.print_mode.clone(),
        });
    }