🧵
resilience4j-bulkheadで同時実行数を制御する
環境
- JDK 25
- resilience4j-bulkhead 2.3.0
前提知識
resilience4j-bulkheadは何ができる?
resilience4j-bulkheadにより、マルチスレッド処理における同時実行数の制御ができます。つまりJava標準APIのSemaphoreのようなことができます。
resilience4j-bulkheadを使うことにより、Semaphoreと比較して次のようなことが可能になります。
- ラムダ式で制御範囲を指定することで、
Semaphore#release()の呼び忘れなどを防ぐことができる - 処理の開始・終了時にイベント処理を実行できる
依存性の追加
pom.xml
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>2.3.0</version>
</dependency>
バルクヘッドの利用
BulkheadMain.java
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BulkheadMain {
private static final Logger logger = LoggerFactory.getLogger(BulkheadMain.class);
static void main() {
// バルクヘッドの設定を作成
BulkheadConfig config = BulkheadConfig.custom()
// 同時実行数
.maxConcurrentCalls(3)
// 各スレッドの最大待ち時間
// 待ち時間を超えたスレッドは実行されない
.maxWaitDuration(Duration.ofSeconds(10))
.build();
// バルクヘッドのインスタンスを管理するレジストリを作成
BulkheadRegistry registry = BulkheadRegistry.of(config);
// レジストリのイベントを登録
registry.getEventPublisher()
.onEntryAdded(event -> {
logger.info("バルクヘッドが追加されました: 日時={}", event.getCreationTime());
})
.onEntryRemoved(event -> {
logger.info("バルクヘッドが削除されました: 日時={}", event.getCreationTime());
})
.onEntryReplaced(event -> {
logger.info("バルクヘッドが置き換えられました: 日時={}", event.getCreationTime());
});
// 任意の名前を指定してバルクヘッドを生成
Bulkhead bulkhead = registry.bulkhead("counter");
// バルクヘッドのイベントを登録
bulkhead.getEventPublisher()
.onCallFinished(event -> {
logger.info("バルクヘッドの処理が終了しました: {}", event);
})
.onCallPermitted(event -> {
logger.info("バルクヘッドの処理が許可されました: {}", event);
})
.onCallRejected(event -> {
logger.info("バルクヘッドの処理が拒否されました: {}", event);
});
try (ExecutorService executorService = Executors.newFixedThreadPool(10)) {
for (int i = 1; i <= 10; i++) {
// コンストラクタでバルクヘッドを渡す
executorService.submit(new BulkheadCounter(i, bulkhead));
}
}
}
}
class BulkheadCounter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(BulkheadCounter.class);
private final int index;
private final Bulkhead bulkhead;
BulkheadCounter(int index, Bulkhead bulkhead) {
this.index = index;
this.bulkhead = bulkhead;
}
@Override
public void run() {
// バルクヘッドで処理を実行
bulkhead.executeRunnable(() -> {
// このラムダ式内の処理が同時実行制御対象になる
try {
logger.info("開始: {}", index);
Thread.sleep(1_000);
logger.info("終了: {}", index);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
実行結果
xx:57:34.483 [main] INFO com.example.BulkheadMain -- バルクヘッドが追加されました: 日時=2026-xx-xxTxx:57:34.483277+09:00[Asia/Tokyo]
xx:57:34.488 [pool-1-thread-2] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:34.488014+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:34.488 [pool-1-thread-4] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:34.488033+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:34.488 [pool-1-thread-2] INFO com.example.BulkheadCounter -- 開始: 2
xx:57:34.488 [pool-1-thread-4] INFO com.example.BulkheadCounter -- 開始: 4
xx:57:34.488 [pool-1-thread-6] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:34.488144+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:34.488 [pool-1-thread-6] INFO com.example.BulkheadCounter -- 開始: 6
xx:57:35.493 [pool-1-thread-2] INFO com.example.BulkheadCounter -- 終了: 2
xx:57:35.493 [pool-1-thread-6] INFO com.example.BulkheadCounter -- 終了: 6
xx:57:35.493 [pool-1-thread-4] INFO com.example.BulkheadCounter -- 終了: 4
xx:57:35.494 [pool-1-thread-1] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:35.494599+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:35.494 [pool-1-thread-1] INFO com.example.BulkheadCounter -- 開始: 1
xx:57:35.494 [pool-1-thread-5] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:35.494948+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:35.494 [pool-1-thread-7] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:35.494445+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:35.495 [pool-1-thread-5] INFO com.example.BulkheadCounter -- 開始: 5
xx:57:35.495 [pool-1-thread-7] INFO com.example.BulkheadCounter -- 開始: 7
xx:57:35.496 [pool-1-thread-6] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:35.496332+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:35.496 [pool-1-thread-4] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:35.496320+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:35.496 [pool-1-thread-2] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:35.496316+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.497 [pool-1-thread-5] INFO com.example.BulkheadCounter -- 終了: 5
xx:57:36.498 [pool-1-thread-5] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:36.497910+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.498 [pool-1-thread-8] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:36.498016+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:36.498 [pool-1-thread-8] INFO com.example.BulkheadCounter -- 開始: 8
xx:57:36.498 [pool-1-thread-1] INFO com.example.BulkheadCounter -- 終了: 1
xx:57:36.498 [pool-1-thread-7] INFO com.example.BulkheadCounter -- 終了: 7
xx:57:36.499 [pool-1-thread-1] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:36.498999+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.499 [pool-1-thread-9] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:36.499118+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:36.499 [pool-1-thread-9] INFO com.example.BulkheadCounter -- 開始: 9
xx:57:36.499 [pool-1-thread-3] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:36.499030+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:36.499 [pool-1-thread-7] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:36.499037+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.499 [pool-1-thread-3] INFO com.example.BulkheadCounter -- 開始: 3
xx:57:37.503 [pool-1-thread-8] INFO com.example.BulkheadCounter -- 終了: 8
xx:57:37.503 [pool-1-thread-9] INFO com.example.BulkheadCounter -- 終了: 9
xx:57:37.503 [pool-1-thread-9] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:37.503952+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:37.504 [pool-1-thread-10] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:37.504164+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:37.503 [pool-1-thread-8] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:37.503923+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:37.504 [pool-1-thread-3] INFO com.example.BulkheadCounter -- 終了: 3
xx:57:37.504 [pool-1-thread-10] INFO com.example.BulkheadCounter -- 開始: 10
xx:57:37.504 [pool-1-thread-3] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:37.504836+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:38.509 [pool-1-thread-10] INFO com.example.BulkheadCounter -- 終了: 10
xx:57:38.510 [pool-1-thread-10] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:38.510573+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
Discussion