MicronautでReactorを利用して非同期処理をハンドリングする
Webアプリケーションの構築において、データレイヤや別サービスとの通信が出てくると、非同期処理をうまく扱うことが必須になってくる。
今回は表題の構成でハンドリングする場合どのようになるか、また自分の理解が大変だったところを書いていく。
本記事ではReactive Programmingに深く立ち入ることはしない。
以下の要素でデータが処理されていく、ぐらいの理解で進めていく。
- データを発行するpublisher(Mono, Fluxなど)
- データを変換するoperator(map, flatMap, filterなど)
- 実際には変換されたpublisherを返すのでpublisherともみなされる
- データを消費するsubscriber
- Micronautにおいては基本的にControllerにMonoを返したら裏側でsubscribeするので、意識しなくてよい
Micronautとは
A modern, JVM-based, full-stack framework for building modular, easily testable microservice and serverless applications
とあるように、モダンなアーキテクチャを指向して作られたJavaベースのWebアプリケーションフレームワーク。
リフレクションを減らし、コンパイル時にDIの解決を行うことで軽量かつ高速に動作するのが特徴。
Reactorとは
Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM
Java向けのリアクティブプログラミングの実装。他にはRxJavaがよく使われている。
Micronautは以前はRxJavaを推奨していたが、Spring WebFluxがReactorを使っていることと、RxJava2とRxJava3の間に互換性が無いことから移行のタイミングでReactorの方を推奨するようになったらしい。
リクエスト/レスポンスのReactor統合
イベントループ、非同期I/Oを中心に設計されたNetty上に構築されているMicronautは、Controllerで受け取ったリクエストをReactiveなレスポンス(Publisher型)として処理することが出来る。
Publisherで代表的なものにはMono(単一)とFlux(複数)がある。
WebAPIサーバとして使う場合、基本的にレスポンスは単一となる(SSEとかプッシュ通知とかでFluxを使うケースがあるらしい)ので以下ではMono前提で書いていく。
実行スレッドの指定:subscribeOn()とpublishOn()
Reactorにおいての実行スレッドを決定するための仕組みであるSchedulerを渡し、
subscribeOn()でストリーム全体のスレッドを指定し、publishOn()でそれ以降のストリームのスレッドを指定できる。
Schedulerはデフォルトで用意されているparallel(CPUコア数、同期処理向け)やboundedElastic(CPUコア数以上に動的に増える、IOなど非同期処理向け)などに加え、Javaのスレッド管理の仕組みであるExecutorServiceから柔軟に作成することが出来る。
これによって、別システムとのIO用のSchedulerを用意して通信時はそっちにする、みたいな切り替えが出来る。
パターン別の非同期処理例
基本パターン:非同期処理を1回挟む
(リクエストの検証とかを簡略化して書いています)
一番シンプルなケースで、例えばuserIdをリクエストで受けとり、user情報を検索するクエリを発行し、レスポンスとして返却するような場合。
この時クエリ部分を
Mono<User> findUserAsync(String id);
のようにMonoを返すメソッドにする。
実行スレッドの切り替えを行い、IOが発生する場所は非同期処理用スレッドにする。
findUserAsync(String id) {
return Mono.fromCallable(() -> getUserFromDB(id)) // DBにクエリ発行
.subscribeOn(Scheduler.boundedElastic()) // 全体のスレッドを指定(DB取得部分がこれになる)
.publishOn(Scheduler.parallel()); // これ以降、つまりデータを取得して加工する際のスレッドを指定
}
そしてControllerで
Mono<GetUserResponse> getUser(GetUserRequest request) {
return findUserAsync(request.userId)
.map(user -> userLogic.createGetUserResponse(user)); // レスポンスへの加工
}
のようにレスポンスを返すのがシンプルなパターン。
同じリクエストを複数回呼び出す
例えばAWS DynamoDBに書き込みを行いたいとき、batchWriteで一度に書き込めるレコード数が制限されていたりする。
複数の非同期リクエストを同じところに投げたい場合、よく使うのがFlux.fromIterable()で、リストなどからFluxを作成できる。
Mono<Integer> scoreBatchWriteAsync(List<UserScore> userScores) {
List<DynamoRequest> dynamoRequestList = createDynamoRequest(userScores)
return Flux.fromIterable(dynamoRequestsList)
.flatMap(request -> awsLogic.batchWriteRequest(request))
.collectList() // Mono<List<requestの結果>>
.map(resultList -> resultList.stream().reduce(Integer::sum));
}
書き込みたい内容を示すuserScoresのリストを基にFluxを生成して、それぞれからリクエストのMonoを生成している。
Fluxで結果が返ってくるので、それをMonoとしてまとめる集約処理を書く必要がある(今回はリクエストの件数をIntegerとしてまとめてるだけ)
別々のリクエストをまとめる
複数のDBクエリ結果などが後続の処理に必要なとき、Mono.zipで複数のMonoから結果を得ることが出来る。
それぞれのリクエストは並行で行われ、全ての結果が返ってくるまで待機される。
結果はそれぞれT1,T2...として取得出来る。
Mono<UserDetailInfo> getUserDetailInfo(String id) {
return Mono.zip(
getUser(id),
getUserSubscription(id),
getUserAddress(id)
).flatMap(result -> getUserDetailInfo(result.getT1(), result.getT2(), result.getT3()));
}
Discussion