スレッドセーフを考えたことがないあなたへ
はじめに
スレッドセーフ?いつ使うのそれ?という方に向けて書きました!
不明点や誤りがあれば、ぜひぜひご指摘頂けると助かります。
ちなみに、本記事は、Java Concurrency in Practice を主なソースとして執筆しました。
(この後に登場する例のうちいくつかは本書から引用しています)
スレッドセーフとは
まず、どのような場面でスレッドセーフを意識する必要が生じるのか、見ていきましょう。
分かり切ったことですが、シングルスレッドであれば、スレッドセーフを意識する必要はありません。
マルチスレッドで処理が行われる場合のみ、スレッドセーフを意識する必要があります。
そのうえで、スレッドセーフなコードとは、
- ステートレスである
- 共有される状態、ミュータブル(可変)な状態へのアクセスを適切に管理できている
- 複数のスレッドからどのようなタイミングでアクセスされても正しい挙動を行うことができる
を満たすようなコードとなります。
つまり、複数のスレッドからアクセスを受けるオブジェクトが状態を保持していない場合や、保持していてもそれがイミュータブル(不変)であれば、スレッドセーフとなります。
よって、スレッドセーフを意識すべき状況とは、複数スレッドで共有されるオブジェクトが存在し、それがミュータブルである場合となります。
スレッドセーフでないと何が困るのか
Race Condition (競合状態)
Race Condition により、予期しない状態が引き起こされる可能性があります。
それは、バグやデータ不整合に繋がりますし、発見されれば早急に解決すべき問題となりえます。
複数の処理が同時に行われた際に競合状態によって予期しない状態が引き起こされる問題
(参考:レースコンディション(Race Condition)とは?)
例えば、以下のコードを見てみます。
@NotThreadSafe
public class LazyInitRace {
private ExpensiveObject instance = null;
public ExpensiveObject getInstance() {
if (instance == null) {
instance = new ExpensiveObject();
}
return instance;
}
}
上記の例では、instance
変数は 1 度だけ生成され、その後の呼び出しでは同じものが使われると想定されています。
しかし、例えば、2 つのスレッドが同時に getInstance
メソッドを呼び出した場合、それぞれが同時に instance == null
と判断する可能性があります。
その場合、それぞれのスレッドの呼出に基づいて複数の ExpensiveObject
インスタンスの生成が開始され、getInstance
メソッドから返却されるインスタンスはそれぞれ異なるものとなります。
これは LazyInitRace
クラスが想定している使われ方とは異なることになります。
スレッドセーフの実現方式例
Immutability(不変性)
複数スレッドからアクセスされる値を不変にしてしまえば、それはスレッドセーフとなります。
複数スレッドからアクセスされることにより、データ不整合の発生を心配する必要はありません。
public class AccountId {
// final 修飾子により、値の再代入不可(イミュータブル)
private final String id;
public AccountId(String id) {
this.id = id;
}
// 返却する id 変数はイミュータブルなので、
// 複数スレッドから同時に呼出されても不整合な結果は返さない
public String id() {
return id;
}
}
Thread Confinement (閉じ込め)
各スレッドにスコープを限定して変数を利用します。
例えば、Java では ThreadLocal を利用することで実現できます。
ThreadLocal の仕組み
- ThreadLocal を利用することで、各スレッドは自分専用のローカルコピーを持ちます
- 他スレッドが同じ ThreadLocal インスタンスへアクセスしても、異なる値が保存/参照されます
- データ共有を防ぐことができるため、スレッドセーフな実装となります
public class ThreadLocalExample {
private static final ThreadLocal<Integer> threadLocalValue = ThreadLocal.withInitial(() -> 0);
public int getValue() {
return threadLocalValue.get();
}
public void setValue(int value) {
threadLocalValue.set(value);
}
public static void main(String[] args) {
final ThreadLocalExample example = new ThreadLocalExample();
// スレッドA
final Thread threadA = new Thread(() -> {
example.setValue(10);
System.out.println("Thread A Value: " + example.getValue());
});
// スレッドB
final Thread threadB = new Thread(() -> {
example.setValue(20);
System.out.println("Thread B Value: " + example.getValue());
});
// ThreadLocal を使っていることにより、スレッド間で値が共有されないため、データ競合(Race Condition)が発生しない
threadA.start();
threadB.start();
}
}
Delegation(スレッドセーフ性の移譲)
スレッドセーフ性の移譲という表現だけでは、概念を理解しきれないので、例を用いて説明します。
例えば、以下のクラスがあります。
変数x
y
は final
修飾子が付けられているため、イミュータブルになります。
つまり、Point
クラスはスレッドセーフになります。
@ThreadSafe
public class Point {
public final int x, y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}
}
次に、先ほどのPoint
クラスを利用した例を見てみましょう。
以下で示しているDelegatingVehicleTracker
クラスはスレッドセーフになります。
実はDelegatingVehicleTracker
クラスのスレッドセーフ性はPoint
クラスに依存しています。
Point
クラスがスレッドセーフでなければ、DelegatingVehicleTracker
クラスはスレッドセーフになりません。
@ThreadSafe
public class DelegatingVehicleTracker {
private final ConcurrentMap<String, Point> locations;
private final Map<String, Point> unmodifiableMap;
public DelegatingVehicleTracker(Map<String, Point> points) {
locations = new ConcurrentHashMap<>(points);
unmodifiableMap = Collections.unmodifiableMap(locations);
}
public Map<String, Point> getLocations() {
return unmodifiableMap;
}
public Point getLocation(String id) {
return locations.get(id);
}
public void setLocations(String id, int x, int y) {
if (locations.replace(id, new Point(x, y)) == null) {
throw new IllegalArgumentException("invalid vehicle name: " + id);
}
}
}
先ほどPoint
クラスを示しましたが、同じ機能を提供しミュータブルなクラスを見てみましょう。
以下にMutablePoint
クラスを示します。これはスレッドセーフではありません。
なぜなら複数スレッドが同時にMutablePoint
のインスタンスへアクセスした場合、
ミュータブルである変数x
y
が更新されることになりますが、
悪いタイミングでそれが重なるとx
y
は意図しない値となることがあるからです。
@NotThreadSafe
public class MutablePoint {
public int x, y;
public MutablePoint() { x = 0; y = 0; }
public MutablePoint(MutablePoint p) {
this.x = p.x;
this.y = p.y;
}
}
先ほど述べたように、DelegatingVehicleTracker
クラスはスレッドセーフになりますが、Point
クラスの代わりにMutablePoint
クラスを使うと、スレッドセーフではなくなります。
なぜならgetLocations
メソッドで取得したMap
を経由して、Point
インスタンスを取得し操作できるからです。
Point
クラスがスレッドセーフでないと、複数スレッドから同時アクセスされた場合に不整合が生じる可能性がありDelegatingVehicleTracker
クラスもスレッドセーフではなくなってしまいます。
これはつまりDelegatingVehicleTracker
クラスのスレッドセーフ性がPoint
クラスに移譲されているということになります。
この概念を応用すると、例えば自作クラスをスレッドセーフにしたければ、標準ライブラリなどが提供しているスレッドセーフなクラスを利用することで、簡単にスレッドセーフな実装とすることが可能となります。
Producer Consumer Pattern
Producer Consumer Pattern とは、キューのような FIFO のデータ構造を利用して、処理対象を渡す Producer 側と、処理対象を取得して適切な処理を行う Consumer 側とに分けて実装する方式です。
Synchronizer
スレッドの制御フローを、状態に基づいて調整することができるオブジェクトを Synchronizer という。
Synchronizer として機能するものとして、例えば、以下のようなものがある。
- Blocking Queue
- Semaphores (信号装置、シグナル)
- Barriers
- Latches(ドア・門などの掛け金)
以下に、Producer Consumer Pattern を利用した実装例を示します。
// Producer としての役割を果たす
public class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
...
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
List<File> entries = root.listFiles(fileFilter);
if (entries.isEmpty()) {
return;
}
for (File entry : entries) {
if (entry.isDirectory()) {
crawl(entry);
} else if (!alreadyIndexed(entry)) {
fileQueue.put(entry);
}
}
}
}
// Consumer としての役割を果たす
public class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
indexFile(queue.take());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Producer Consumer の利用
public static void startIndexing(List<File> roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND); // BOUND はキューの容量
FileFilter filter = new FileFilter() {
public boolean accept(File file) { return true; }
};
for (File root : roots) {
new Thread(new FileCrawler(queue, filter, root)).start();
}
for (int i = 0; i < N_CONSUMERS; i++) {
new Thread(new Indexer(queue)).start();
}
}
上記の例の着目点は、以下になります。
-
FileCrawler
(Producer) およびIndexer
(Consumer) ともに、queue
をコンストラクタで受け取っている。 Producer と Consumer は直接呼出を行う関係ではない。 - 上記の例がスレッドセーフであるかは、
queue
のスレッドセーフ性に依存している。
このパターンでは、片方が I/O-bound で、もう片方が CPU-bound である場合にスループットが向上する等のメリットもあります。
〇〇-bound
"bound" は「制約されている」「限界に達している」という意味で、システムのどの部分がボトルネックになっているかを表す。
I/O-bound は、入出力処理(I/O)がボトルネックになっている状態。
スレッドセーフで気を付けること
不変条件(Invariant)
まず、不変条件(invariant) とは、ある処理が行われる前後で常に保持されるべき状態または条件のことを言います。
例えば、複数のメンバ変数がお互いの関係性に制約条件を持っている場合、各々をスレッドセーフにするだけでは不十分です。
その複数のメンバ変数を併せてロックし、不整合な関係性にならず制約条件を守ることを保証しなければいけません。
こちらも例を交えて説明します。
以下に示すNumberRange
クラスは 2 つのメンバ変数lower
upper
を持っています。
public class NumberRange {
// INVARIANT: lower <= upper
private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);
public void setLower(int i) {
// Warning -- unsafe check-then-act
if (i > upper.get()) {
throw new IllegalArgumentException("can't set lower to " + i + " > upper");
}
lower.set(i);
}
public void setUpper(int i) {
// Warning -- unsafe check-then-act
if (i < lower.get()) {
throw new IllegalArgumentException("can't set upper to " + i + " < lower");
}
upper.set(i);
}
public boolean isInRange(int i) {
return i >= lower.get() && i <= upper.get();
}
}
lower
とupper
は、それぞれAtomicInteger
型として定義されていて、複数スレッドから同時にアクセスされても、アクセスされた順序に基づいて値を更新していくことが可能です。
しかし、不変条件である lower
< upper
が満たされなくなる可能性があります。
例えば、NumberRange
が (0,10) であるとして、
スレッド A:setLower(5)
を呼出
スレッド B:setUpper(4)
を呼出
を同時に行ったとします。
タイミングが悪いと両方の呼出で if 文によるチェックに引っかからず、それぞれが更新されます。その結果、NumberRange
が (5,4) となってしまい、不変条件に反することになります。
このように複数の変数が組み合わさって不変条件が構成されている場合は注意が必要です。
それぞれの変数を標準ライブラリが提供しているようなスレッドセーフな型で定義しても不変条件が満たされません。
今回の場合、上記のような不変条件を守るためには、setUpper
メソッド、setLower
メソッドで共通ロックを用いることが解決策の 1 つになります。
ドキュメントを読む・記載する
ライブラリや他開発者が実装したモジュールを扱う場合は、必ずドキュメントを確認し、スレッドセーフな実装を行う上での注意点が記載されていないか見ましょう。
また、自分がモジュールを実装する際にも、注意点を必ずドキュメントに記載するようにしましょう。他の開発者にとっても大きな助けになりますし、将来の自分にとっても備忘録として残しておくことは価値があります。
おわりに
今回は、スレッドセーフの概念や実現方式の例などを説明しました。
実現方式や気を付けるべき点については、他にも様々ありますが、今回は基礎的な内容に留めています。
実際にスレッドセーフな実装を行う場合、主にライブラリの利用を検討することになると思いますが、処理が CPU-bound または I/O-bound によって利用するライブラリが変わってきたりします。
また、過度にスレッドセーフを意識してロックを取得する範囲を広く取ると、次はパフォーマンス問題が発生することがあります。
上記の内容については、また機会があれば記事を書きたいと思います。
それでは!
Discussion