Javaのバーチャル・スレッドと並行処理
この記事はJava Advent Calendar 2023 - Qiitaの10日目の記事です。
JavaのJDK 21でバーチャル・スレッドなるものが実装されたということで少し調べてみました。参考までに、昨年のアドベントでは、Rust Tokio
の非同期処理を書いています。
背景: Javaのスレッド・モデル
背景として、Javaのスレッド・モデルは、OSスレッド(1:1のJavaVMスレッド)になります。Rustのスレッド同様に、OSレベルでのスケジューリングになります。今回説明するバーチャル・スレッドと区別するために、プラットフォーム・スレッドと呼ばれています。
本ブログでは分かりやすいようにプラットフォーム・スレッドをOSスレッドと表記します。
その後、JDK 8のCompletableFuture
の導入で、Node.jsのasyncのような非同期プログラミングがサポートされました。しかし、コールバック・ベースの記述、例外処理の複雑さ、スレッド・プーリングのオーバーヘッドなど課題がありました。参考: CompletableFuture vs Virtual Thread in Java 21
そしてJDK 21にて、これまでプロジェクトloomとして開発されていたバーチャル・スレッドが正式サポートされたのです。パチパチ。
バーチャル・スレッドとは?
乱暴に言えば、Goのgoroutine
のような、超軽量なスレッドになります。M:Nのグリーンスレッドモデルになります。何百万というスレッドを生成することが可能です。I/OなどのタイミングでOSスレッドを譲ります。Javaでは unmount/mount
と言うそうです。OSスレッドのマルチスレッド・プログラミングと同じように同期的な記述ができるためプログラマーの負担が減ります。(学習、保守等)
従来はjava.nio
を使用する必要がありましたが、java.io, java.net, Thread.sleep
の呼び出しがノンブロッキング対応になっています。
InputStream in = ...
System.out.println("before");
int next = in.read(); // ソース・コード的には同期呼び出しだが、OSスレッドを譲る。
System.out.println("after");
表現を変えれば、バーチャル・スレッドはブロックされるが、OSスレッドはブロックされません。Goプログラミングと同じ感じになります。
もちろん、フィボナッチ計算のようなCPUバウンド処理、Steam API
でのインメモリ処理には向いていません。CPUコア数以上の性能を出すことは無理です。あくまで、I/Oバウンドな状況でCPU・メモリー リソースを有効活用するためのテクニックになります。
OSスレッドをプールしておき、バーチャルスレッドを割り当てます。キャリア・スレッド(carrier)と言います。バーチャルスレッドを運ぶイメージです。JVMスレッド、OSスレッド、カーネルスレッドなどややこしいですが、こちらのビデオ Async Showdown: Java Virtual Threads vs. Kotlin Coroutines が分かりやすいです。
バーチャルスレッドがサスペンド状態から復帰したときに、以前とは異なるOSスレッドにスケジューリングされる可能性はあります。
The platform thread to which the scheduler assigns a virtual thread is called the virtual thread's carrier. A virtual thread can be scheduled on different carriers over the course of its lifetime
JEP425
Javaの簡単な歴史とおさらい
1991年にJavaはサン・マイクロシステムズ(Sun Microsystems)のJames Gosling
らによって開発がスタートしました。1996年にJava1.0をリリースしています。
実はJavaの初期はシングル・コア対応でグリーンスレッド、つまりJVMでのマルチスレッド・スケジューリングでした。Java 1.3において現在のマルチコア、OSスレッドへの1:1マッピングへ変更しました。参考
その後、2010年にSunはオラクルに買収され、2018年のJDK 11
から有償ライセンスに移行します。
そしてオラクルは2021年4月に、GoogleのAndroidでのJava APIの利用のライセンスに関する10年にわたる裁判に敗訴します。
同年の9月にOracle JDK 17
以降は無償NFTCと方針を再度変えます。ライセンスの詳細は英語原文で確認してください。
オラクルでは、定期的にJavaのバージョン・アップをする方針に変更しており、その代わり特定のバージョン(JDK8,11,17,21,25)のみをサポート対象(LTS)とする方針にしています。現在(2023/12)の最新版は21になります。
また、AWSでは無償版のOpenJDKから派生したAmazon Corretto
が利用されていることなどもオラクルのライセンスの方針に影響を与えたと言われています。Androidのアプリ開発がKotlinに移行していることもあるかと思われます。
Javaといえば、しばしばGC(stop of the world)の遅さが問題となりますが、JDK 15で導入されたZGCによって10ms以下のポーズに改善されています。
クラウド・ネイティブ
クラウド・ネイティブという用語を最近よく聞きます。簡単に言えば、Kubernetes/Docker、マネージド、サーバレス環境などのオートスケーリング環境では、コールドスタートの短縮、メモリー・フートプリンント(メモリ使用量)の削減をする必要があります。GoやRustのネイティブ・コンパイラーに比較して、JavaやPythonの場合はランタイムが大きくなるため、オートスケーリングとの相性が悪いです。
GraalVMはAOT(Ahead-Of-Time)コンパイルで、Dockerイメージの作成のタイミングでJavaバイトコードをアーキテクチャ・ネイティブのコードにコンパイルしてしまうものです。
バーチャル・スレッドと組み合わせることで、Rust Actix
やGo Gin
のようにマイクロ・サービスの実装言語の一つとして選択肢になるでしょう。
The startup time of the application was reduced from approximately 30 seconds down to about 3 ms, and more importantly the memory usage was also significantly reduced from 6.6 GB down to 1 GB, with the same throughput and CPU utilization.
Migrating 10MinuteMail from Java to GraalVM Native
使用方法
さて、前置きが長くなりましたが本題に入ります。
バーチャル・スレッドの生成はExecutors.newVirtualThreadPerTaskExecutor()
を使用し、以下のように記述します。実行したいタスクをラムダ関数でsubmit()に渡します。
var executor = Executors.newVirtualThreadPerTaskExecutor()
executor.submit(() -> {
// ここにバーチャル・スレッドで処理するタスクを実装します。
// ネットワーク呼び出しやDBアクセスなどのI/Oの処理を入れます。
// インメモリ、CPUでのみの処理の場合はJVMスレッドを直接使うべきです。
})
JDKの設定
バーチャル・スレッドを使用しますので、sourceCompatibility
に21を指定します。私はAzul Zulu Java 21(無償版)
を使用しました。 また、例外処理用にlombok
をインストールします。
java {
sourceCompatibility = '21' // バーチャル・スレッドのサポート
}
dependencies {
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
}
ソースコードはこちらに置きます。Spring Bootに対応しています。./gradlew bootRun
で実行します。
gradleのJDKの切り替えはgradle.properties
で行います。
org.gradle.java.home=/Library/Java/JavaVirtualMachines/zulu-21.jdk/Contents/Home
インストールされているJDKの確認方法
/usr/libexec/java_home -V
ヘルパー関数
仕込み作業として、ヘルパー関数を作成しておきます。指定時間スリープする関数です。Thread.sleep()
はJDK21で非同期対応しており、スレッドをブロックしないことを覚えておいてください。
public class Vt1 {
private static final Logger log = LoggerFactory.getLogger(Vt1.class);
@SneakyThrows
private static void sleep(Duration duration) {
Thread.sleep(duration); // unmount(スレッドを譲る)
log.info("!!! carrier thread is {} ", Thread.currentThread());
}
コードの作成
論理CPUコア数を取得し、+1したスレッド数をループで生成するコードです。各スレッドは先ほど作成したスリープ関数(5秒)を実行します。
私のmac M2のコア数は8ですので、9つのスレッドが生成されます。このプログラムの全体の処理時間は5秒になります。
@SneakyThrows
private static void myTask(Duration duration) {
log.info("『 {} : {} ", context.get(), Thread.currentThread());
sleep(duration); // 先ほどのスリープ関数を呼び出す。
log.info("』 {} ", Thread.currentThread());
}
@SneakyThrows
static void runMyTasks() {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<Integer>> futures = new ArrayList<>();
// 論理コア数の取得
var numCore = Runtime.getRuntime().availableProcessors();
// コア数+1のスレッドを生成
IntStream.range(0, numCore + 1).forEach(i -> {
// この時点でスレッドは走る
Future<Integer> future = executor.submit(() -> {
context.set("task-" + i);
myTask(Duration.ofSeconds(5));
log.info("done task #{}", i);
return i;
});
futures.add(future);
});
log.info("submitted all tasks");
// スレッドの完了を待つ
for (var future : futures) {
log.info("result: {}", String.valueOf(future.get()));
}
}
}
// メイン
public static void main(String[] args) {
runMyTasks();
}
実行します
実行してみましょう。
5番のOSスレッドが複数のバーチャル・スレッド(4と8)に割り当てられたのが分かります。CPUコア数以上のスレッドを生成しましたが、スリープの実行のタイミングでCPUを別のスレッドに譲ることがポイントです。
10万スレッドでも正常終了(実行時間は5秒)します。バーチャル・スレッドなのでメモリを消費しません。
// var numCore = Runtime.getRuntime().availableProcessors();
var numCore = 100_000;
synchronized - 同期処理
さて、次は良くない例を説明します。
synchronized
を使用すると、バーチャル・スレッドがキャリアスレッド(OSスレッド)にpinされた状態になるため、sleep()の呼び出しでunmount(yield)されません。別の言い方をすれば、sleep()
の前後で同一のOSスレッドにスケジューリングされます。
// synchronizedで同期処理にしてみる
synchronized private static void sleep(Duration duration) {
Thread.sleep(duration); // unmountされない
log.info("!!! carrier thread is {} ", Thread.currentThread());
}
sleep()
の部分が排他制御(クリティカル・セクション)になり逐次処理されてしまうため、全体の実行時間がスレッド数 x スリープ時間になります。
排他ロックの使用
別の方法として、バーチャル・スレッドから排他ロックを使用することもできます。synchronized
同様に逐次処理になりますが、myTask
の内部でロック待ちになるため、ループ自体は瞬時に処理されます。クリティカル・セクションが狭くなるため、ReentrantLock
の使用が望ましいでしょう。
private static final Lock lock = new ReentrantLock();
@SneakyThrows
private static void myTask(Duration duration) {
if (lock.tryLock(1, TimeUnit.HOURS)) {
try {
log.info("『 {} ", Thread.currentThread());
sleep(duration); // unmount
log.info("』 {} ", Thread.currentThread());
} finally {
lock.unlock();
}
}
}
先ほどとは違い、OSスレッドが解放されないため、OSスレッドが枯渇し最後のタスク(task-8)はすぐにはスタートしません。task-0が終了し、OSスレッドに空き(worker-1)が出た後でようやくスタートします。
コンテキスト
スレッド・ローカルなコンテキスト(変数)を作ることができます。
private static final ThreadLocal<String> context = new ThreadLocal<>();
例えば、Webサーバでリクエスト・スコープのグローバル変数が欲しいときがあります。Goプログラミングでは、こんな感じに変数を関数呼び出しでバケツリレーすることがよくあります。通常、contextにはHTTPリクエスト・ヘッダー、ログイン・ユーザーのクレデンシャル、ロガーなどを保持させます。
handler(context ctx) {
service(context, ...)
entity(context, ...)
Javaでは、context
にスレッド・スコープの変数が作成されます。Kotlinのコンテキストにも似ています。context.set()で値をセットし、context.get()で値を取り出します。context変数はグローバル変数として定義されているため関数の引数で渡す必要はありません。
Future<Integer> future = executor.submit(() -> {
context.set("task-" + i); // セット
myTask(Duration.ofSeconds(5));
// 子関数内にて、context経由に取り出す。
private static void myTask(Duration duration) { // contextを渡さなくて良い。
log.info("『 {} : {} ", context.get(), Thread.currentThread());
もちろん、バーチャルスレッドではスレッド数が多くなりますので、コンテキストのサイズが大きくなると、スレッド数分を掛けたメモリを消費しますのでご注意ください。
CPUバウンド処理
バーチャル・スレッドはCPUバウンドな処理には向いていません。試しに、フィボナッチ数列の計算を各スレッドで実行してみましょう。
@SneakyThrows
private static void sleep(Duration duration) {
// フィボナッチ数列の計算をする
fibonacciRecursive(47);
log.info("!!! carrier thread is {} ", Thread.currentThread());
}
// フィボナッチ数列の計算
public static long fibonacciRecursive(int n) {
if (n <= 1) {
return n;
}
return fibonacciRecursive(n - 1) + fibonacciRecursive(n - 2);
}
私のmacbook M2の8コアがフル回転しているのが分かります。このような状況はCPUがボトルネックになっているため、バーチャル・スレッドを増やしても、各スレッドがCPUをつかんで離さない状況になりますので無意味になります。旧来のOSスレッド(VMスレッド)でやるべきです。
HTTPクライアント
httpClient()
も非同期に対応しています。OSスレッド(VMスレッド)は解放されます。
private static void httpClient() {
var client = HttpClient.newHttpClient();
var request = HttpRequest.newBuilder()
.uri(URI.create("https://pokeapi.co/api/v2/pokemon/ditto"))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
log.info("Status Code: {}", response.statusCode());
}
Discussion