3.3 並行処理ユーティリティのその他の機能(CyclicBarrier、CountDownLatch等)~Java Advanced編
はじめに
自己紹介
皆さん、こんにちは、斉藤賢哉と申します。私はこれまで、25年以上に渡って企業システムの開発に携わってきました。特にアーキテクトとして、ミッションクリティカルなシステムの技術設計や、Javaフレームワーク開発などの豊富な経験を有しています。
様々なセミナーでの登壇や雑誌への技術記事寄稿の実績があり、また以下のような書籍も執筆しています。
いずれもJava EE(Jakarta EE)を中心にした企業システム開発のための書籍です。中でも 「アプリケーションアーキテクチャ設計パターン」は、(Javaに限定されない)比較的普遍的なテーマを扱っており、内容的にはまだまだ陳腐化していないため、興味のある方は是非手に取っていただけると幸いです(中級者向け)。
Udemy講座のご紹介
この記事の内容は、私が講師を務めるUdemy講座『Java Advanced編』の一部の範囲をカバーしたものです。『Java Advanced編』はこちらのリンクから購入できます(セールス対象外のためいつも同じ価格)。また定価の約30%OFFで購入可能なクーポンをZenn内で定期的に発行していますので、興味のある方は、ぜひ私の他の記事をチェックしてみてください。
この講座は、以下のような皆様にお薦めします。
- Javaの基本的なスキルを習得済みで、さらなるレベルアップを目指している方
- 将来的なキャリアとして、希少性の高い上級エンジニアやアーキテクトを志向している方
- フリーランスエンジニアとして付加価値の更なる向上を図っている方
- 「Oracle認定Javaプログラマ」の資格取得を目指している方
この記事を含むシリーズ全体像
この記事はJava SEの一部の機能・仕様を取り上げたものですが、一連のシリーズになっており、シリーズ全体でJava SEを網羅しています。また認定資格である「Oracle認定Javaプログラマ」(Silver、Gold)の範囲もカバーしています。シリーズの全体像および「Oracle認定Javaプログラマ」の範囲との対応関係については、以下を参照ください。
3.3 並行処理ユーティリティのその他の機能
チャプターの概要
このチャプターでは、並行処理ユーティリティのExecutorフレームワーク以外の機能ついて学びます。
3.3.1 同期化支援のためのCyclicBarrierとCountDownLatch
CyclicBarrierによる同期化支援
並行処理ユーティリティには、同期化支援と呼ばれる機能があります。これは複数のスレッドが実行されているとき、スレッド間で歩調を合わせて、処理を待ち合せたり、再開したりする機能を表します。
同期化支援機能は、CyclicBarrierやCountDownLatchによって提供されますが、ここではまずCyclicBarrierについて説明します。
CyclicBarrierを使うと、同じタスクを処理するための複数のスレッドがあった場合、それらが特定のポイント(バリアーポイント)に到達するまで、処理を待機させることできます。
CyclicBarrierクラスのコンストラクタは以下の2つです。
(1)CyclicBarrier(int)
(2)CyclicBarrier(int, Runnable)
(1)、(2)ともに第一引数には、待ち合わせ対象のスレッド数を指定します。(2)の第二引数には、バリアーポイント到達後に起動するタスク(これをバリアーアクションと呼ぶ)を指定します。
このようにして生成したCyclicBarrierオブジェクトを、スレッド間で共有して使う点が特徴です。
主なAPIはawait()メソッドで、スレッドの処理がバリアーポイントに到達したらこのメソッドを呼び出し、他のスレッドも到達するまで待機させます。
待ち合わせ対象に指定されたスレッド数(コンストラクタの第1引数)がバリアーポイントに到達したら、処理が再開されます。(2)のコンストラクタの場合は、再開後にバリアーアクションが起動されます。
【図3-3-1】CyclicBarrierによる同期化支援
CyclicBarrierの具体例
それでは、CyclicBarrierによる同期化支援を具体例で説明します。
まずスレッドを表すWorkerクラスのコードを示します。
public class Worker extends Thread {
private String name;
private CyclicBarrier barrier; //【1】
public Worker(String name, CyclicBarrier barrier) {
this.name = name;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 何らかの処理を行う
System.out.println("[ " + name + " ] do something...");
doSomething(2, 8); // 2~8秒(ランダム)何かをする
// 【2】バリアーポイントに到達したため待機する
System.out.println("[ " + name + " ] awaiting...");
barrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
throw new RuntimeException(ex);
}
}
}
フィールドとしてCyclicBarrierを定義し【1】、コンストラクタで初期化しています。このフィールドに設定されるCyclicBarrierオブジェクトは、スレッド間で共有されます。
run()メソッドでは何らかの処理を行い、バリアーポイントまで到達したらCyclicBarrierのawait()メソッドを呼び出して、他のスレッドの到達を待機します【2】。他のスレッドもバリアーポイントに到達すると処理が再開され、run()メソッドは終了します。
次にメインスレッドのコードを示します。
CyclicBarrier barrier = new CyclicBarrier(3, new MyBarrierAction()); //【1】
System.out.println("[ Main ] starting all threads...");
//【2】3つのWorkerスレッドを生成し、起動する
new Worker("t1", barrier).start();
new Worker("t2", barrier).start();
new Worker("t3", barrier).start();
System.out.println("[ Main ] finish");
まずCyclicBarrierオブジェクトを生成します【1】。ここでは待ち合わせ対象スレッド数を3として、待ち合わせ後に起動するタスクとしてMyBarrierActionクラス(コードは割愛)を指定しています。
次にWorkerクラスのインスタンスを3つ生成しています【2】が、同一のCyclicBarrierオブジェクトをコンストラクタに指定することで、スレッド間で共有しています。
生成した3つのスレッドは、そのまますぐに起動をかけています。
このコードを実行すると、3つのスレッドが起動され、それぞれの処理がバリアーポイントに到達したら、バリアーアクションが起動されます。
なおこの例では待ち合わせ対象スレッド数を3とし、同数である3つのスレッド数を起動していますが、例えば5つのスレッドを起動することも可能です。その場合は待ち合わせの対象は5つのスレッドすべてではなく、5つ中3つのスレッドがバリアーポイントに到達することを待ち合せます。
CountDownLatchによる同期化支援
CountDownLatchを利用すると、同じタスクを処理するための複数スレッドがあった場合、スレッド間で歩調を合わせて、処理を待機したり再開したりすることができます。
CountDownLatchクラスのオブジェクトは、カウントを指定して生成します。
生成したCountDownLatchオブジェクトを、スレッド間で共有して使う点が特徴です。
主なAPIは2つで、まずawait()メソッドを呼び出すと、カウントがゼロになるまで処理を待機します。
またcountDown()メソッドを呼び出すと、カウントが1つ減算されますが、ゼロになるとawait()メソッドで待機中のスレッドが再開されます。
CountDownLatchの具体例
それでは、CountDownLatchによる同期化支援を具体例で説明します。
まずスレッドを表すWorkerクラスのコードを示します。
public class Worker extends Thread {
private String name;
private CountDownLatch startSignal; //【1】
private CountDownLatch doneSignal; //【2】
public Worker(String name, CountDownLatch startSignal, CountDownLatch doneSignal) {
this.name = name;
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
//【3】処理を待機する
System.out.println("[ " + name + " ] awaiting...");
startSignal.await();
//【4】2~8秒間(ランダム)、何らかの処理を行う
System.out.println("[ " + name + " ] do something...");
doSomething(2, 8);
// 処理終了を通知する
System.out.println("[ " + name + " ] finish");
doneSignal.countDown(); //【5】
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
フィールドとして2つのCountDownLatchを定義し【1、2】、それらをコンストラクタで初期化しています。2つのCountDownLatchのうち、startSignalは処理の開始を制御するためのもので、doneSignalは処理の終了を制御するためのものです。2つのCountDownLatchはスレッド間で共有されます。
run()メソッドでは、まずstartSignalのawait()メソッド呼び出しにより、処理を待機します【3】。
他のスレッドからstartSignalのcountDown()メソッドが呼ばれ、カウントがゼロになると、処理が再開されます。
その後は、ランダムで2から8秒の時間を要する処理が行われるものとします【4】。
最後にdoneSignalのcountDown()メソッドを呼び出します【5】が、ここでカウントがゼロになると、別スレッドにおいてawait()メソッドで待機中の処理が再開されます。
次にメインスレッドのコードを示します。
CountDownLatch startSignal = new CountDownLatch(1); //【1】処理開始用ラッチ
CountDownLatch doneSignal = new CountDownLatch(3); //【2】処理終了用ラッチ
//【3】3つのWorkerスレッドを生成し、起動する
System.out.println("[ Main ] starting all threads...");
new Worker("t1", startSignal, doneSignal).start();
new Worker("t2", startSignal, doneSignal).start();
new Worker("t3", startSignal, doneSignal).start();
//【4】1秒間、何らかの処理を行う
System.out.println("[ Main ] do something...");
doSomething(1);
//【5】Workerスレッドの処理を進める
System.out.println("[ Main ] proceeding all threads...");
startSignal.countDown();
//【6】4秒間、何らかの処理を行う
System.out.println("[ Main ] do something...");
doSomething(4);
//【7】Workerスレッドの処理終了を待つ
System.out.println("[ Main ] waiting for all threads to finish...");
doneSignal.await();
System.out.println("[ Main ] finish");
まずは2つのCountDownLatchを生成します。
1つ目は処理の開始を制御するためのstartSignalで、カウントは1にしています【1】。
もう1つは処理の終了を制御するためのdoneSignalで、カウントは3にしています【2】。
そしてそれら2つのCountDownLatchを引数にして、3つのスレッド"t1"、"t2"、"t3"を生成し、起動します【3】。
その後、1秒間ほど何らかの処理を行います【4】。
次にstartSignalのcountDown()メソッドを呼び出します【5】が、これでカウントが1→0になるため、各Workerで待機中の処理が再開されます。
その後再び4秒間ほど何らかの処理を行います【6】。
次にdoneSignalのawait()メソッドを呼び出します【7】が、ここでいったん処理は待機します。
そして各Workerの処理が終わり、doneSignalのカウントが3→0になると、処理が再開され、メインスレッドは終了します。
このようなメインスレッドを実行すると、コンソールには以下のように表示されます。
なおここでは"t1"は7秒、"t2"は3秒、"t3"は6秒、処理に時間を要したものとします。
[ Main ] starting all threads...
[ t1 ] awaiting...
[ t2 ] awaiting...
[ t3 ] awaiting...
[ Main ] do something...
[ Main ] proceeding all threads...
[ Main ] do something...
[ t1 ] do something...
[ t2 ] do something...
[ t3 ] do something...
[ t2 ] finish
[ Main ] waiting for all threads to finish...
[ t3 ] finish
[ t1 ] finish
[ Main ] finish
このようなメインスレッドとWorkerスレッドの処理シーケンスを図に表すと、以下のようになります。
【図3-3-2】CountDownLatch(メインスレッドとWorkerスレッドの処理シーケンス)
3.3.2 マルチスレッド環境におけるコレクション
マルチスレッド環境におけるコレクション
コレクションフレームワークによって提供される主要なクラスは、基本的にスレッドセーフではありません。マルチスレッド環境でコレクション(リスト、セット、マップ)を使用する場合は、java.util.Collectionsクラスのunmodifiable〇〇メソッド(〇〇はList、SetまたはMap)によってイミュータブルなコレクションを生成したり、または同じくCollectionsクラスのsynchronized〇〇メソッドによって同期化されたコレクションを生成したりする必要がありました。
並行処理ユーティリティの中には、最初からスレッドセーフに設計されたコレクションがあるため、主要なものを紹介します。
【表3-3-1】スレッドセーフに設計されたコレクション
データ構造 | インタフェース | クラス(java.util.concurrentパッケージ) | 説明 |
---|---|---|---|
リスト | java.util.List | CopyOnWriteArrayList | スレッドセーフなリストの実装。コピーオンライト方式により、読み込みは複数スレッドからできるが、書き込みは同期化される(1つのスレッドからしかできない)。ConcurrentModificationExceptionは発生しない。 |
セット | java.util.Set | CopyOnWriteArraySet | スレッドセーフなセットの実装。CopyOnWriteArrayListクラスと同じ性質を持つ。 |
マップ | java.util.concurrent.ConcurrentMap | ConcurrentHashMap | スレッドセーフなマップの実装。 |
キュー、スタック | java.util.concurrent.BlockingDeque | ArrayBlockingQueue | スレッドセーフな両端キュー(デック)の実装。 |
コピーオンライト方式によるリストとセット
コピーオンライト方式とはその名のとおり、書き込み操作(追加、削除など)が発生した時に要素のコピーを行い、コピーの最中に読み込み操作(取得など)があった場合は古いコピーを見せる方式です。コピーオンライト方式では、書き込み時に毎回コピーを生成するためその分オーバーヘッドがかかります。ただし、あらゆる操作を同期化する方式よりも、「読み込み同士の競合」や「読み込みと書き込みの競合」に対する並列度を高めることが可能です。
コピーオンライト方式によるコレクションの実装クラスには、以下のような種類があります。
- リスト … java.util.concurrent.CopyOnWriteArrayList<E>
- セット … java.util.concurrent.CopyOnWriteArraySet<E>
例えばCopyOnWriteArrayListクラスを利用する場合は、ArrayListクラスと同じように、以下のようにしてオブジェクトを生成します。
List<String> list = new CopyOnWriteArrayList<>();
マルチスレッド環境では、リストやセットに対する読み込み操作の比率が高い場合、実装クラスとしてこれらを選択すると良いでしょう。
ブロッキング可能なデック
BlockingDequeインタフェースは、java.util.Dequeインタフェースの子インタフェースです。このインタフェースは、スレッドセーフな両端キュー(デック)を表しますが、ブロッキングされるという特徴があります。つまり要素を取り出すときにデックに要素がなければ、追加されるまで待機します。
BlockingDequeインタフェースのブロッキングに使用するAPIを、以下に示します。
API | 説明 |
---|---|
E takeFirst() | 先頭から要素を取り出すが、要素がない場合は追加されるまで待機する。要素を取り出すと同時に削除する。 |
E takeLast() | 末尾から要素を取り出す。その他はtakeFirst()メソッドに同じ。 |
BlockingDequeをスレッド間で共有し、これらのAPIによってブロッキングさせることにより、スレッド間でデータの受け渡しが可能になります。
アトミックを保証するクラス
アトミックとは、チャプター3.1でも取り上げたとおり、割り込みが発生しえない状態を意味します。
プリミティブ型変数がスレッド間で共有されている環境において、変数の書き換えで割り込みが発生しないようにするためには、synchronizedキーワードによって同期化する必要があります。ただしjava.util.concurrent.atomicパッケージで提供されるクラスを使うことにより、synchronizedを使わずともアトミックを保証することができます。
java.util.concurrent.atomicパッケージの主要なクラスを、以下の表に示します。
クラス | 説明 |
---|---|
AtomicBoolean | アトミックなboolean型を表す。 |
AtomicInteger | アトミックなint型を表す。 |
AtomicIntegerArray | アトミックなint型配列を表す。 |
AtomicLong | アトミックなlong型を表す。 |
AtomicLongArray | アトミックなlong型配列を表す。 |
これらのクラスのAPIは、アトミックな操作を可能にします。例えばAtomicIntegerクラスのaddAndGet()メソッドは、指定された値をアトミックに追加し、古い値を取得するためのものです。
APIに関する詳細な説明は「APIリファレンス」を参照ください。
このチャプターで学んだこと
このチャプターでは、以下のことを学びました。
- CyclicBarrierによる同期化支援(スレッド間の待ち合わせや再開)の方法について。
- CountDownLatchによる同期化支援(スレッド間の待ち合わせや再開)の方法について。
- マルチスレッド環境での利用が前提の様々なコレクションについて。
Discussion