Javaの並列処理(Parallel Stream)に関するメモ

2022/11/01に公開
2

Parallel Streamを使った並列処理に関するメモです。

Parallel Stream を使った性能改善の例

CPUがネックとなるような処理では、Parallel Streamを使うことで性能が大きく改善する場合があります。

例として、複数のテキストファイルを読み込んで形態素解析を行い、Document Frequency(その単語が出現する文書の数)をカウントするプログラムを書きました。
形態素解析器はkuromojiを利用し、名詞に絞ってカウントしています。

package com.github.onozaty.df;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.atilika.kuromoji.ipadic.Token;
import com.atilika.kuromoji.ipadic.Tokenizer;

public class DocumentFrequencyCounter {

  private final Tokenizer tokenizer = new Tokenizer();

  public List<DocumentFrequency> count(List<Path> textFilePaths) {

    Map<String, Long> dfMap = textFilePaths.stream()
        .map(this::tokenizeNouns)
        .flatMap(Set::stream)
        .collect(Collectors.groupingBy(x -> x, Collectors.counting()));

    return dfMap.entrySet().stream()
        .map(x -> new DocumentFrequency(x.getKey(), x.getValue()))
        .collect(Collectors.toList());
  }

  public List<DocumentFrequency> countParallel(List<Path> textFilePaths) {

    Map<String, Long> dfMap = textFilePaths.stream()
        .parallel()
        .map(this::tokenizeNouns)
        .flatMap(Set::stream)
        .collect(Collectors.groupingBy(x -> x, Collectors.counting()));

    return dfMap.entrySet().stream()
        .map(x -> new DocumentFrequency(x.getKey(), x.getValue()))
        .collect(Collectors.toList());
  }

  private Set<String> tokenizeNouns(Path textFilePath) {

    try {
      String text = new String(Files.readAllBytes(textFilePath), StandardCharsets.UTF_8);
      return tokenizeNouns(text);
    } catch (IOException e) {
      // Streamで扱いやすくするために非チェック例外に
      throw new UncheckedIOException(e);
    }
  }

  private Set<String> tokenizeNouns(String text) {

    return tokenizer.tokenize(text).stream()
        // 名詞だけに絞る
        .filter(token -> token.getPartOfSpeechLevel1().equals("名詞"))
        .map(Token::getSurface)
        .collect(Collectors.toSet());
  }
}

countメソッドがシーケンシャルな処理、countParallelメソッドが並列処理となります。
この2つのメソッドの違いは、Stream#parallel()によって並列化しているかどうかだけとなります。

手元のPC(Intel Core i7-8700T 論理プロセッサ数:12)で、下記のlivedoor ニュースコーパス(7,376ファイル、計24.6MB)を利用してDFをカウントしてみました。

5回実行して平均をとったところ、下記のような時間になりました。

メソッド 時間
count 8.678秒
countParallel 2.225秒

並列処理を行うことで、3倍以上早くなっていることがわかります。

コード全体は、Gradleプロジェクトとして下記に配置しています。

Parallel Stream の並列数

Parallel Stream では、ForkJoinPoolの共通プールが利用されます。
ForkJoinPool.commonPool() で取得できるものになります。

共通プールの並列数はForkJoinPool.getCommonPoolParallelism()で取得できます。
デフォルトだと、論理プロセッサ数-1になります。
Parallel Streamを呼び出しているスレッドも利用して並列処理が行われるので、トータルとしては論理プロセッサと同じ数で実行されることになります。

下記のようなコードで、実際に並列数がどのようになるか確認します。
Recorderは、処理状況を記録するために用意したクラスです。

@Test
public void parallelStream_default() throws IOException {

  Random random = new Random();
  Recorder recorder = new Recorder();

  System.out.println("ForkJoinPool.getCommonPoolParallelism :" + ForkJoinPool.getCommonPoolParallelism());

  IntStream.range(0, 30).parallel()
      .forEach(i -> {

        System.out.println(Thread.currentThread().getName());

        int recordingId = recorder.start();

        try {
          // 3秒内でランダムな時間待ち合わせ
          Thread.sleep(random.nextInt(3000));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }

        recorder.end(recordingId);
      });

  System.out.println();

  System.out.println("By Redcording");
  recorder.printByRecordingId(System.out);

  System.out.println();

  System.out.println("Active Count Graph");
  recorder.printByActiveCount(System.out);
}

論理プロセッサ数が12個の環境で実行すると、下記のような出力になります。

ForkJoinPool.getCommonPoolParallelism :11
main
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-8
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-9
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-10
ForkJoinPool.commonPool-worker-11
main
ForkJoinPool.commonPool-worker-11
ForkJoinPool.commonPool-worker-8
ForkJoinPool.commonPool-worker-9
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-10
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-9
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-11
ForkJoinPool.commonPool-worker-8
ForkJoinPool.commonPool-worker-9
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-7

By Redcording
 1 : +++++++++++++                                               
 2 :  ++++++++++++++++++++++++++                                 
 3 :   +++++++++++++++++++++++++++++++++++                       
 4 :    ++++++++++++++++++++++++++++++++++++++++++               
 5 :     +++++++++++++++++++++++++++++                           
 6 :      ++++++++++++                                           
 7 :       +++++++++++++++++++                                   
 8 :        ++++++++++++                                         
 9 :         +++++++++++++                                       
10 :          ++++++++++++++++++++++                             
11 :           +++++++++++++                                     
12 :            ++++                                             
13 :              ++++++++++++++++                               
14 :                ++++++++++++++++++++++++                     
15 :                  ++++++++++++++++++++++++                   
16 :                    ++++++++++++++++                         
17 :                      ++++++++++++++++++++++++++             
18 :                        ++++++++++++++++++++++++++++         
19 :                          ++++++++++++++++++++++++++++       
20 :                            ++++++++++++++++++++++++++++++   
21 :                              +++++++++++++++++++++          
22 :                                ++++++++++++++++++           
23 :                                  +++++++++++++++++++++++++++
24 :                                    ++++++++                 
25 :                                      ++++++++++++++++++     
26 :                                        +++++++++++++        
27 :                                          +++++++++++++++++  
28 :                                            +++++++++++      
29 :                                              +++++++++++    
30 :                                                ++++++++++++ 

Active Count Graph
12 :            ++++++++++++++++++++++++++++++++++++++           
11 :           ++++++++++++++++++++++++++++++++++++++++          
10 :          ++++++++++++++++++++++++++++++++++++++++++         
 9 :         ++++++++++++++++++++++++++++++++++++++++++++        
 8 :        ++++++++++++++++++++++++++++++++++++++++++++++       
 7 :       ++++++++++++++++++++++++++++++++++++++++++++++++      
 6 :      ++++++++++++++++++++++++++++++++++++++++++++++++++     
 5 :     ++++++++++++++++++++++++++++++++++++++++++++++++++++    
 4 :    ++++++++++++++++++++++++++++++++++++++++++++++++++++++   
 3 :   ++++++++++++++++++++++++++++++++++++++++++++++++++++++++  
 2 :  ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
 1 : ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

ForkJoinPool.commonPool は11スレッドで、それにプラスして呼び出し元のmainスレッドも利用されて計12スレッドで実行されていることがわかります。

参考

Parallel Stream の並列数の変更方法

ForkJoinPoolの共通プールの並列数は、システムプロパティとして指定することで変更できます。

-Djava.util.concurrent.ForkJoinPool.common.parallelism=4

ForkJoinPoolの共通プールは、他でも利用される(プールを明示的に指定しないForkJoinTask#invokeとか)ので、もしParallel Streamだけ変えたい場合には別の方法を取る必要があります。

別途ForkJoinPoolを生成してsubmitで実行することで、そのForkJoinPoolで実行されるようになります。

@Test
public void parallelStream_custom() throws IOException, InterruptedException, ExecutionException {

  Random random = new Random();
  Recorder recorder = new Recorder();

  System.out.println("ForkJoinPool.getCommonPoolParallelism :" + ForkJoinPool.commonPool().getParallelism());

  ForkJoinPool customThreadPool = new ForkJoinPool(3);

  customThreadPool.submit(
      () -> {
        IntStream.range(0, 30).parallel()
            .forEach(i -> {

              System.out.println(Thread.currentThread().getName());

              int recordingId = recorder.start();

              try {
                // 3秒内でランダムな時間待ち合わせ
                Thread.sleep(random.nextInt(3000));
              } catch (InterruptedException e) {
                e.printStackTrace();
              }

              recorder.end(recordingId);
            });
      }).get();

  System.out.println();

  System.out.println("By Redcording");
  recorder.printByRecordingId(System.out);

  System.out.println();

  System.out.println("Active Count Graph");
  recorder.printByActiveCount(System.out);
}

上記コードの実行結果です。
並列数が3に制限されていることがわかります。

ForkJoinPool.getCommonPoolParallelism :11
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3

By Redcording
 1 : ++++                                                        
 2 :  +++++                                                      
 3 :   ++++++++++                                                
 4 :     ++++                                                    
 5 :       ++++                                                  
 6 :         ++++++                                              
 7 :           ++++++++                                          
 8 :             ++++                                            
 9 :               ++++++                                        
10 :                 ++++++++                                    
11 :                   ++++                                      
12 :                     ++++++++                                
13 :                       ++++++++++++++++                      
14 :                         ++                                  
15 :                           ++++++++++                        
16 :                             ++                              
17 :                               ++                            
18 :                                 ++                          
19 :                                   ++++++++++                
20 :                                     ++++                    
21 :                                       ++++                  
22 :                                         ++++++++++          
23 :                                           ++++              
24 :                                             ++++            
25 :                                               ++++++++      
26 :                                                 ++++++++++  
27 :                                                   ++        
28 :                                                     ++++    
29 :                                                       +++++ 
30 :                                                         ++++

Active Count Graph
 3 :   ++++++++++++++++++++++++++++++++++++++++++++++++++++++++  
 2 :  ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
 1 : ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

なお、この方法だと、実行毎にForkJoinPoolを生成してしまうので、複数同時に呼ばれるようなことがある場合には、結果的に共通プールを使うより多く実行されてしまうということになりかねません。
共通プールの方はstaticなものなので、アプリケーション全体で並列数に制限を与えることができます。

参考

(誤解)Parallel StreamはWebアプリケーションに向いていない

同時にParallel Streamを使った処理を行うと、その分どんどんスレッドを利用してしまうので、Webアプリケーションで使うのは慎重になった方が良いような話を以前見かけたことがあります。
これは誤解で、少なくともデフォルトのForkJoinPoolの共通プールを使っている分には、そこでスレッド数に制限がかかる(デフォルト論理プロセッサ数と同じ並列数)ので、いくつ同時に処理が行われようが、スレッドを際限なく生成してしまうということはありません。
もしもデフォルトの論理プロセッサ数と同じ数だとWebアプリケーションのリクエストをさばくのに影響あるということならば、共通プールでの並列数を少なくすると良いと思います。

Parallel Streamなどの並列処理で気を付けるところ

Parallel Streamに限らず、並列処理で気を付けるところを書き連ねておきます。

ちなみに、自分が今まで書いたコードだと、Parallel Streamを使うより、ExecutorServiceを使って並列処理を実装することが多いです。
(うまくStreamに当てはまるようなものは割と少ない感じがする)

DBコネクションをそれぞれの処理で取得する

並列処理にした場合、DBコネクションとして同じものを使いまわせなくなります。(同時にクエリを発行できないので)
そのため、並列処理それぞれでDBコネクションを取得することになりますが、SELECT以外にINSERTやUPDATEがあると、トランザクションをどうすべきか気を付ける必要が出てきます。

  • それぞれの処理でコミットして問題ないならば、そのままコミットする
  • 1つのトランザクションとして扱わなければならないならば、並列処理でINSERTやUPDATEは行わず、並列処理の後にまとめて1つのトランザクションとして行う

生成にコストがかかるオブジェクトはプーリングして使いまわす

スレッドセーフではないオブジェクトを並列処理で利用する場合、各並列処理にて生成して利用する必要がありますが、そのオブジェクトの生成に時間がかかるような場合には、プーリングするようにしないと遅くなってしまいます。

事前に並列数分のオブジェクトを生成しておいて、ConcurrentLinkedQueueなどに出し入れして使いまわすような簡易的なものでも有効かもしれません。

// 事前に同時に利用される数分オブジェクトを作っておく
int size = ForkJoinPool.getCommonPoolParallelism() + 1;
ConcurrentLinkedQueue<HeavyObject> objectQueue = new ConcurrentLinkedQueue<>();
for (int i = 0; i < size; i++) {
  objectQueue.add(new HeavyObject());
}

IntStream.range(0, 200).parallel()
    .forEach(i -> {

      HeavyObject heavyObject = objectQueue.poll();
      try {
        heavyObject.execute(i);
      } finally {
        objectQueue.add(heavyObject);
      }
    });

この方法だとオブジェクトの生成がシリアルなので、オブジェクトプーリングの仕組み利用するとより時間短縮になります。

たとえば Apache Commons Pool を利用すると、ちょっとしたコードでオブジェクトプーリングが実装できます。

対象のオブジェクトを提供するFactoryを用意します。

package com.github.onozaty.parallel.pool;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

import com.github.onozaty.parallel.HeavyObject;

public class HeavyObjectFactory extends BasePooledObjectFactory<HeavyObject> {

  @Override
  public HeavyObject create() throws Exception {
    return new HeavyObject();
  }

  @Override
  public PooledObject<HeavyObject> wrap(HeavyObject obj) {
    return new DefaultPooledObject<HeavyObject>(obj);
  }
}

これを使ってオブジェクトプールを生成して利用します。
こうすると、必要になったタイミングでオブジェクトが生成されることとなり、オブジェクトの生成処理自体も並列処理で行えて、より時間短縮となります。

int size = ForkJoinPool.getCommonPoolParallelism() + 1;

try (GenericObjectPool<HeavyObject> objectPool = new GenericObjectPool<>(new HeavyObjectFactory())) {

  objectPool.setMaxTotal(size);

  IntStream.range(0, 200).parallel()
      .forEach(i -> {

        HeavyObject heavyObject = null;
        try {
          heavyObject = objectPool.borrowObject();
          heavyObject.execute(i);
        } catch (Exception e) {
          throw new RuntimeException(e);
        } finally {
          objectPool.returnObject(heavyObject);
        }
      });
}

(追記)2022-11-03

ConcurrentLinkedQueueのところだけ少しツッコミを
ThreadLocalのほうが、書く行数も少なく、またForkJoinPoolを使いきれない分の余計なオブジェクトも作成されないので、より好ましいかと思います

@reta さんからコメントで教えていただいたので、ThreadLocalの例を記載します。ありがとうございました。

// スレッド毎に別のオブジェクトを管理
ThreadLocal<HeavyObject> threadLocal = new ThreadLocal<>();

IntStream.range(0, 200).parallel()
    .forEach(i -> {

      HeavyObject heavyObject = threadLocal.get();
      if (heavyObject == null) {
        heavyObject = new HeavyObject();
        threadLocal.set(heavyObject);
      }

      heavyObject.execute(i);
    });

後で書きたい

(並列処理に絡んで最初はここで書こうと思ったのですが、、)別の記事で書く予定のことをメモしておきます。
(別記事で書いたらリンク張ります)

  • ExecuterServiceを使って雑に並列処理を行う
  • 待ちスレッド数の上限を避けるため、ExecutorCompletionServiceをを使って同時実行数の上限を超えたらブロッキングさせる
  • Spring Bootで新規トランザクションや非同期処理を簡単に開始する

Discussion

れたれた

ConcurrentLinkedQueueのところだけ少しツッコミを
ThreadLocalのほうが、書く行数も少なく、またForkJoinPoolを使いきれない分の余計なオブジェクトも作成されないので、より好ましいかと思います

onozatyonozaty

ありがとうございます!
確かにThreadLocalの方が簡単かつ不要なオブジェクト生成されないのでよいですね。
ThreadLocalの例も追記させていただきました。