Javaのバーチャル・スレッドと並行処理

2023/12/10に公開

この記事はJava Advent Calendar 2023 - Qiitaの10日目の記事です。

JavaのJDK 21でバーチャル・スレッドなるものが実装されたということで少し調べてみました。参考までに、昨年のアドベントでは、Rust Tokioの非同期処理を書いています。

https://zenn.dev/tfutada/articles/5e87d6e7131e8e

背景: Javaのスレッド・モデル

背景として、Javaのスレッド・モデルは、OSスレッド(1:1のJavaVMスレッド)になります。Rustのスレッド同様に、OSレベルでのスケジューリングになります。今回説明するバーチャル・スレッドと区別するために、プラットフォーム・スレッドと呼ばれています。

本ブログでは分かりやすいようにプラットフォーム・スレッドをOSスレッドと表記します。

その後、JDK 8のCompletableFutureの導入で、Node.jsのasyncのような非同期プログラミングがサポートされました。しかし、コールバック・ベースの記述、例外処理の複雑さ、スレッド・プーリングのオーバーヘッドなど課題がありました。参考: CompletableFuture vs Virtual Thread in Java 21

そしてJDK 21にて、これまでプロジェクトloomとして開発されていたバーチャル・スレッドが正式サポートされたのです。パチパチ。

バーチャル・スレッドとは?

乱暴に言えば、Goのgoroutineのような、超軽量なスレッドになります。M:Nのグリーンスレッドモデルになります。何百万というスレッドを生成することが可能です。I/OなどのタイミングでOSスレッドを譲ります。Javaでは unmount/mount と言うそうです。OSスレッドのマルチスレッド・プログラミングと同じように同期的な記述ができるためプログラマーの負担が減ります。(学習、保守等)

従来はjava.nioを使用する必要がありましたが、java.io, java.net, Thread.sleepの呼び出しがノンブロッキング対応になっています。

InputStream in = ... 
System.out.println("before");
int next = in.read(); // ソース・コード的には同期呼び出しだが、OSスレッドを譲る。
System.out.println("after");

表現を変えれば、バーチャル・スレッドはブロックされるが、OSスレッドはブロックされません。Goプログラミングと同じ感じになります。

もちろん、フィボナッチ計算のようなCPUバウンド処理、Steam APIでのインメモリ処理には向いていません。CPUコア数以上の性能を出すことは無理です。あくまで、I/Oバウンドな状況でCPU・メモリー リソースを有効活用するためのテクニックになります。

OSスレッドをプールしておき、バーチャルスレッドを割り当てます。キャリア・スレッド(carrier)と言います。バーチャルスレッドを運ぶイメージです。JVMスレッド、OSスレッド、カーネルスレッドなどややこしいですが、こちらのビデオ Async Showdown: Java Virtual Threads vs. Kotlin Coroutines が分かりやすいです。

バーチャルスレッドがサスペンド状態から復帰したときに、以前とは異なるOSスレッドにスケジューリングされる可能性はあります。

The platform thread to which the scheduler assigns a virtual thread is called the virtual thread's carrier. A virtual thread can be scheduled on different carriers over the course of its lifetime
JEP425

Javaの簡単な歴史とおさらい

1991年にJavaはサン・マイクロシステムズ(Sun Microsystems)のJames Goslingらによって開発がスタートしました。1996年にJava1.0をリリースしています。

実はJavaの初期はシングル・コア対応でグリーンスレッド、つまりJVMでのマルチスレッド・スケジューリングでした。Java 1.3において現在のマルチコア、OSスレッドへの1:1マッピングへ変更しました。参考

その後、2010年にSunはオラクルに買収され、2018年のJDK 11から有償ライセンスに移行します。

そしてオラクルは2021年4月に、GoogleのAndroidでのJava APIの利用のライセンスに関する10年にわたる裁判に敗訴します。

同年の9月にOracle JDK 17以降は無償NFTCと方針を再度変えます。ライセンスの詳細は英語原文で確認してください。

オラクルでは、定期的にJavaのバージョン・アップをする方針に変更しており、その代わり特定のバージョン(JDK8,11,17,21,25)のみをサポート対象(LTS)とする方針にしています。現在(2023/12)の最新版は21になります。

また、AWSでは無償版のOpenJDKから派生したAmazon Correttoが利用されていることなどもオラクルのライセンスの方針に影響を与えたと言われています。Androidのアプリ開発がKotlinに移行していることもあるかと思われます。

Javaといえば、しばしばGC(stop of the world)の遅さが問題となりますが、JDK 15で導入されたZGCによって10ms以下のポーズに改善されています。

クラウド・ネイティブ

クラウド・ネイティブという用語を最近よく聞きます。簡単に言えば、Kubernetes/Docker、マネージド、サーバレス環境などのオートスケーリング環境では、コールドスタートの短縮、メモリー・フートプリンント(メモリ使用量)の削減をする必要があります。GoやRustのネイティブ・コンパイラーに比較して、JavaやPythonの場合はランタイムが大きくなるため、オートスケーリングとの相性が悪いです。

GraalVMはAOT(Ahead-Of-Time)コンパイルで、Dockerイメージの作成のタイミングでJavaバイトコードをアーキテクチャ・ネイティブのコードにコンパイルしてしまうものです。

バーチャル・スレッドと組み合わせることで、Rust ActixGo Ginのようにマイクロ・サービスの実装言語の一つとして選択肢になるでしょう。

The startup time of the application was reduced from approximately 30 seconds down to about 3 ms, and more importantly the memory usage was also significantly reduced from 6.6 GB down to 1 GB, with the same throughput and CPU utilization.
Migrating 10MinuteMail from Java to GraalVM Native

使用方法

さて、前置きが長くなりましたが本題に入ります。

バーチャル・スレッドの生成はExecutors.newVirtualThreadPerTaskExecutor()を使用し、以下のように記述します。実行したいタスクをラムダ関数でsubmit()に渡します。

バーチャル・スレッド
var executor = Executors.newVirtualThreadPerTaskExecutor()
executor.submit(() -> {
// ここにバーチャル・スレッドで処理するタスクを実装します。
// ネットワーク呼び出しやDBアクセスなどのI/Oの処理を入れます。
// インメモリ、CPUでのみの処理の場合はJVMスレッドを直接使うべきです。
})

JDKの設定

バーチャル・スレッドを使用しますので、sourceCompatibilityに21を指定します。私はAzul Zulu Java 21(無償版)を使用しました。 また、例外処理用にlombokをインストールします。

build.gradle
java {
	sourceCompatibility = '21' // バーチャル・スレッドのサポート
}

dependencies {
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.projectlombok:lombok'
}

ソースコードはこちらに置きます。Spring Bootに対応しています。./gradlew bootRunで実行します。

gradleのJDKの切り替えはgradle.propertiesで行います。

gradle.properties
org.gradle.java.home=/Library/Java/JavaVirtualMachines/zulu-21.jdk/Contents/Home

インストールされているJDKの確認方法

JDK一覧
/usr/libexec/java_home -V

ヘルパー関数

仕込み作業として、ヘルパー関数を作成しておきます。指定時間スリープする関数です。Thread.sleep()はJDK21で非同期対応しており、スレッドをブロックしないことを覚えておいてください。

helpers
public class Vt1 {
    private static final Logger log = LoggerFactory.getLogger(Vt1.class);

    @SneakyThrows
    private static void sleep(Duration duration) {
        Thread.sleep(duration);  // unmount(スレッドを譲る)
        log.info("!!! carrier thread is  {} ", Thread.currentThread());
    }

コードの作成

論理CPUコア数を取得し、+1したスレッド数をループで生成するコードです。各スレッドは先ほど作成したスリープ関数(5秒)を実行します。

私のmac M2のコア数は8ですので、9つのスレッドが生成されます。このプログラムの全体の処理時間は5秒になります。

    @SneakyThrows
    private static void myTask(Duration duration) {
        log.info("『  {} : {} ", context.get(), Thread.currentThread());
        sleep(duration); // 先ほどのスリープ関数を呼び出す。
        log.info("』 {} ", Thread.currentThread());
    }
    
    @SneakyThrows
    static void runMyTasks() {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            List<Future<Integer>> futures = new ArrayList<>();
	    // 論理コア数の取得
            var numCore = Runtime.getRuntime().availableProcessors();
            // コア数+1のスレッドを生成
            IntStream.range(0, numCore + 1).forEach(i -> {
                // この時点でスレッドは走る
                Future<Integer> future = executor.submit(() -> {
                    context.set("task-" + i);
                    myTask(Duration.ofSeconds(5));
                    log.info("done task #{}", i);
                    return i;
                });
                futures.add(future);
            });

            log.info("submitted all tasks");

            // スレッドの完了を待つ
            for (var future : futures) {
                log.info("result: {}", String.valueOf(future.get()));
            }
        }
    }

    // メイン
    public static void main(String[] args) {
        runMyTasks();
    }

実行します

実行してみましょう。

5番のOSスレッドが複数のバーチャル・スレッド(4と8)に割り当てられたのが分かります。CPUコア数以上のスレッドを生成しましたが、スリープの実行のタイミングでCPUを別のスレッドに譲ることがポイントです。

10万スレッドでも正常終了(実行時間は5秒)します。バーチャル・スレッドなのでメモリを消費しません。

10万スレッド
//            var numCore = Runtime.getRuntime().availableProcessors();
            var numCore = 100_000;

synchronized - 同期処理

さて、次は良くない例を説明します。

synchronizedを使用すると、バーチャル・スレッドがキャリアスレッド(OSスレッド)にpinされた状態になるため、sleep()の呼び出しでunmount(yield)されません。別の言い方をすれば、sleep()の前後で同一のOSスレッドにスケジューリングされます。

    // synchronizedで同期処理にしてみる
    synchronized private static void sleep(Duration duration) {
        Thread.sleep(duration);  // unmountされない
        log.info("!!! carrier thread is  {} ", Thread.currentThread());
    }

sleep()の部分が排他制御(クリティカル・セクション)になり逐次処理されてしまうため、全体の実行時間がスレッド数 x スリープ時間になります。

排他ロックの使用

別の方法として、バーチャル・スレッドから排他ロックを使用することもできます。synchronized同様に逐次処理になりますが、myTaskの内部でロック待ちになるため、ループ自体は瞬時に処理されます。クリティカル・セクションが狭くなるため、ReentrantLockの使用が望ましいでしょう。

    private static final Lock lock = new ReentrantLock();

    @SneakyThrows
    private static void myTask(Duration duration) {
        if (lock.tryLock(1, TimeUnit.HOURS)) {
            try {
                log.info("『  {} ", Thread.currentThread());
                sleep(duration);  // unmount
                log.info("』 {} ", Thread.currentThread());
            } finally {
                lock.unlock();
            }
        }
    }

先ほどとは違い、OSスレッドが解放されないため、OSスレッドが枯渇し最後のタスク(task-8)はすぐにはスタートしません。task-0が終了し、OSスレッドに空き(worker-1)が出た後でようやくスタートします。

コンテキスト

スレッド・ローカルなコンテキスト(変数)を作ることができます。

    private static final ThreadLocal<String> context = new ThreadLocal<>();

例えば、Webサーバでリクエスト・スコープのグローバル変数が欲しいときがあります。Goプログラミングでは、こんな感じに変数を関数呼び出しでバケツリレーすることがよくあります。通常、contextにはHTTPリクエスト・ヘッダー、ログイン・ユーザーのクレデンシャル、ロガーなどを保持させます。

Go言語
handler(context ctx) {
    service(context, ...)
    entity(context, ...)

Javaでは、contextにスレッド・スコープの変数が作成されます。Kotlinのコンテキストにも似ています。context.set()で値をセットし、context.get()で値を取り出します。context変数はグローバル変数として定義されているため関数の引数で渡す必要はありません。

Javaコンテキスト
	Future<Integer> future = executor.submit(() -> {
	    context.set("task-" + i); // セット
	    myTask(Duration.ofSeconds(5));

    // 子関数内にて、context経由に取り出す。
    private static void myTask(Duration duration) { // contextを渡さなくて良い。
        log.info("『  {} : {} ", context.get(), Thread.currentThread());

もちろん、バーチャルスレッドではスレッド数が多くなりますので、コンテキストのサイズが大きくなると、スレッド数分を掛けたメモリを消費しますのでご注意ください。

CPUバウンド処理

バーチャル・スレッドはCPUバウンドな処理には向いていません。試しに、フィボナッチ数列の計算を各スレッドで実行してみましょう。

    @SneakyThrows
    private static void sleep(Duration duration) {
        // フィボナッチ数列の計算をする
        fibonacciRecursive(47);
        log.info("!!! carrier thread is  {} ", Thread.currentThread());
    }
    // フィボナッチ数列の計算
    public static long fibonacciRecursive(int n) {
        if (n <= 1) {
            return n;
        }
        return fibonacciRecursive(n - 1) + fibonacciRecursive(n - 2);
    }

私のmacbook M2の8コアがフル回転しているのが分かります。このような状況はCPUがボトルネックになっているため、バーチャル・スレッドを増やしても、各スレッドがCPUをつかんで離さない状況になりますので無意味になります。旧来のOSスレッド(VMスレッド)でやるべきです。

HTTPクライアント

httpClient()も非同期に対応しています。OSスレッド(VMスレッド)は解放されます。

httpClient
    private static void httpClient() {
        var client = HttpClient.newHttpClient();
        var request = HttpRequest.newBuilder()
                .uri(URI.create("https://pokeapi.co/api/v2/pokemon/ditto"))
                .build();

        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        log.info("Status Code: {}", response.statusCode());
    }

Discussion