🧵

resilience4j-bulkheadで同時実行数を制御する

に公開

環境

  • JDK 25
  • resilience4j-bulkhead 2.3.0

前提知識

https://zenn.dev/masatoshi_tada/articles/b44f4417e70232

resilience4j-bulkheadは何ができる?

resilience4j-bulkheadにより、マルチスレッド処理における同時実行数の制御ができます。つまりJava標準APIのSemaphoreのようなことができます。

resilience4j-bulkheadを使うことにより、Semaphoreと比較して次のようなことが可能になります。

  • ラムダ式で制御範囲を指定することで、Semaphore#release()の呼び忘れなどを防ぐことができる
  • 処理の開始・終了時にイベント処理を実行できる

依存性の追加

pom.xml
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-bulkhead</artifactId>
            <version>2.3.0</version>
        </dependency>

バルクヘッドの利用

BulkheadMain.java
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BulkheadMain {
    private static final Logger logger = LoggerFactory.getLogger(BulkheadMain.class);

    static void main() {
        // バルクヘッドの設定を作成
        BulkheadConfig config = BulkheadConfig.custom()
                // 同時実行数
                .maxConcurrentCalls(3)
                // 各スレッドの最大待ち時間
                // 待ち時間を超えたスレッドは実行されない
                .maxWaitDuration(Duration.ofSeconds(10))
                .build();
        // バルクヘッドのインスタンスを管理するレジストリを作成
        BulkheadRegistry registry = BulkheadRegistry.of(config);
        // レジストリのイベントを登録
        registry.getEventPublisher()
                .onEntryAdded(event -> {
                    logger.info("バルクヘッドが追加されました: 日時={}", event.getCreationTime());
                })
                .onEntryRemoved(event -> {
                    logger.info("バルクヘッドが削除されました: 日時={}", event.getCreationTime());
                })
                .onEntryReplaced(event -> {
                    logger.info("バルクヘッドが置き換えられました: 日時={}", event.getCreationTime());
                });
        // 任意の名前を指定してバルクヘッドを生成
        Bulkhead bulkhead = registry.bulkhead("counter");
        // バルクヘッドのイベントを登録
        bulkhead.getEventPublisher()
                .onCallFinished(event -> {
                    logger.info("バルクヘッドの処理が終了しました: {}", event);
                })
                .onCallPermitted(event -> {
                    logger.info("バルクヘッドの処理が許可されました: {}", event);
                })
                .onCallRejected(event -> {
                    logger.info("バルクヘッドの処理が拒否されました: {}", event);
                });
        
        try (ExecutorService executorService = Executors.newFixedThreadPool(10)) {
            for (int i = 1; i <= 10; i++) {
                // コンストラクタでバルクヘッドを渡す
                executorService.submit(new BulkheadCounter(i, bulkhead));
            }
        }
    }
}

class BulkheadCounter implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(BulkheadCounter.class);

    private final int index;

    private final Bulkhead bulkhead;

    BulkheadCounter(int index, Bulkhead bulkhead) {
        this.index = index;
        this.bulkhead = bulkhead;
    }

    @Override
    public void run() {
        // バルクヘッドで処理を実行
        bulkhead.executeRunnable(() -> {
            // このラムダ式内の処理が同時実行制御対象になる
            try {
                logger.info("開始: {}", index);
                Thread.sleep(1_000);
                logger.info("終了: {}", index);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}
実行結果
xx:57:34.483 [main] INFO com.example.BulkheadMain -- バルクヘッドが追加されました: 日時=2026-xx-xxTxx:57:34.483277+09:00[Asia/Tokyo]
xx:57:34.488 [pool-1-thread-2] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:34.488014+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:34.488 [pool-1-thread-4] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:34.488033+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:34.488 [pool-1-thread-2] INFO com.example.BulkheadCounter -- 開始: 2
xx:57:34.488 [pool-1-thread-4] INFO com.example.BulkheadCounter -- 開始: 4
xx:57:34.488 [pool-1-thread-6] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:34.488144+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:34.488 [pool-1-thread-6] INFO com.example.BulkheadCounter -- 開始: 6
xx:57:35.493 [pool-1-thread-2] INFO com.example.BulkheadCounter -- 終了: 2
xx:57:35.493 [pool-1-thread-6] INFO com.example.BulkheadCounter -- 終了: 6
xx:57:35.493 [pool-1-thread-4] INFO com.example.BulkheadCounter -- 終了: 4
xx:57:35.494 [pool-1-thread-1] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:35.494599+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:35.494 [pool-1-thread-1] INFO com.example.BulkheadCounter -- 開始: 1
xx:57:35.494 [pool-1-thread-5] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:35.494948+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:35.494 [pool-1-thread-7] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:35.494445+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:35.495 [pool-1-thread-5] INFO com.example.BulkheadCounter -- 開始: 5
xx:57:35.495 [pool-1-thread-7] INFO com.example.BulkheadCounter -- 開始: 7
xx:57:35.496 [pool-1-thread-6] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:35.496332+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:35.496 [pool-1-thread-4] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:35.496320+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:35.496 [pool-1-thread-2] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:35.496316+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.497 [pool-1-thread-5] INFO com.example.BulkheadCounter -- 終了: 5
xx:57:36.498 [pool-1-thread-5] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:36.497910+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.498 [pool-1-thread-8] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:36.498016+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:36.498 [pool-1-thread-8] INFO com.example.BulkheadCounter -- 開始: 8
xx:57:36.498 [pool-1-thread-1] INFO com.example.BulkheadCounter -- 終了: 1
xx:57:36.498 [pool-1-thread-7] INFO com.example.BulkheadCounter -- 終了: 7
xx:57:36.499 [pool-1-thread-1] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:36.498999+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.499 [pool-1-thread-9] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:36.499118+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:36.499 [pool-1-thread-9] INFO com.example.BulkheadCounter -- 開始: 9
xx:57:36.499 [pool-1-thread-3] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:36.499030+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:36.499 [pool-1-thread-7] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:36.499037+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:36.499 [pool-1-thread-3] INFO com.example.BulkheadCounter -- 開始: 3
xx:57:37.503 [pool-1-thread-8] INFO com.example.BulkheadCounter -- 終了: 8
xx:57:37.503 [pool-1-thread-9] INFO com.example.BulkheadCounter -- 終了: 9
xx:57:37.503 [pool-1-thread-9] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:37.503952+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:37.504 [pool-1-thread-10] INFO com.example.BulkheadMain -- バルクヘッドの処理が許可されました: 2026-xx-xxTxx:57:37.504164+09:00[Asia/Tokyo]: Bulkhead 'counter' permitted a call.
xx:57:37.503 [pool-1-thread-8] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:37.503923+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:37.504 [pool-1-thread-3] INFO com.example.BulkheadCounter -- 終了: 3
xx:57:37.504 [pool-1-thread-10] INFO com.example.BulkheadCounter -- 開始: 10
xx:57:37.504 [pool-1-thread-3] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:37.504836+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.
xx:57:38.509 [pool-1-thread-10] INFO com.example.BulkheadCounter -- 終了: 10
xx:57:38.510 [pool-1-thread-10] INFO com.example.BulkheadMain -- バルクヘッドの処理が終了しました: 2026-xx-xxTxx:57:38.510573+09:00[Asia/Tokyo]: Bulkhead 'counter' has finished a call.

参考資料

Discussion