🎤

MicronautでReactorを利用して非同期処理をハンドリングする

2024/10/06に公開

Webアプリケーションの構築において、データレイヤや別サービスとの通信が出てくると、非同期処理をうまく扱うことが必須になってくる。
今回は表題の構成でハンドリングする場合どのようになるか、また自分の理解が大変だったところを書いていく。

本記事ではReactive Programmingに深く立ち入ることはしない。
以下の要素でデータが処理されていく、ぐらいの理解で進めていく。

  • データを発行するpublisher(Mono, Fluxなど)
  • データを変換するoperator(map, flatMap, filterなど)
    • 実際には変換されたpublisherを返すのでpublisherともみなされる
  • データを消費するsubscriber
    • Micronautにおいては基本的にControllerにMonoを返したら裏側でsubscribeするので、意識しなくてよい

Micronautとは

https://micronaut.io/

A modern, JVM-based, full-stack framework for building modular, easily testable microservice and serverless applications

とあるように、モダンなアーキテクチャを指向して作られたJavaベースのWebアプリケーションフレームワーク。
リフレクションを減らし、コンパイル時にDIの解決を行うことで軽量かつ高速に動作するのが特徴。

Reactorとは

https://projectreactor.io/

Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM

Java向けのリアクティブプログラミングの実装。他にはRxJavaがよく使われている。
Micronautは以前はRxJavaを推奨していたが、Spring WebFluxがReactorを使っていることと、RxJava2とRxJava3の間に互換性が無いことから移行のタイミングでReactorの方を推奨するようになったらしい。

リクエスト/レスポンスのReactor統合

https://docs.micronaut.io/4.6.5/guide/#reactiveServer
イベントループ、非同期I/Oを中心に設計されたNetty上に構築されているMicronautは、Controllerで受け取ったリクエストをReactiveなレスポンス(Publisher型)として処理することが出来る。

Publisherで代表的なものにはMono(単一)とFlux(複数)がある。
WebAPIサーバとして使う場合、基本的にレスポンスは単一となる(SSEとかプッシュ通知とかでFluxを使うケースがあるらしい)ので以下ではMono前提で書いていく。

実行スレッドの指定:subscribeOn()とpublishOn()

Reactorにおいての実行スレッドを決定するための仕組みであるSchedulerを渡し、
subscribeOn()でストリーム全体のスレッドを指定し、publishOn()でそれ以降のストリームのスレッドを指定できる。
Schedulerはデフォルトで用意されているparallel(CPUコア数、同期処理向け)やboundedElastic(CPUコア数以上に動的に増える、IOなど非同期処理向け)などに加え、Javaのスレッド管理の仕組みであるExecutorServiceから柔軟に作成することが出来る。

これによって、別システムとのIO用のSchedulerを用意して通信時はそっちにする、みたいな切り替えが出来る。

パターン別の非同期処理例

基本パターン:非同期処理を1回挟む

(リクエストの検証とかを簡略化して書いています)
一番シンプルなケースで、例えばuserIdをリクエストで受けとり、user情報を検索するクエリを発行し、レスポンスとして返却するような場合。
この時クエリ部分を

Mono<User> findUserAsync(String id);

のようにMonoを返すメソッドにする。

実行スレッドの切り替えを行い、IOが発生する場所は非同期処理用スレッドにする。

findUserAsync(String id) {
    return Mono.fromCallable(() -> getUserFromDB(id))   // DBにクエリ発行
        .subscribeOn(Scheduler.boundedElastic())        // 全体のスレッドを指定(DB取得部分がこれになる)
        .publishOn(Scheduler.parallel());               // これ以降、つまりデータを取得して加工する際のスレッドを指定
}

そしてControllerで

Mono<GetUserResponse> getUser(GetUserRequest request) {
    return findUserAsync(request.userId)
        .map(user -> userLogic.createGetUserResponse(user)); // レスポンスへの加工
}

のようにレスポンスを返すのがシンプルなパターン。

同じリクエストを複数回呼び出す

例えばAWS DynamoDBに書き込みを行いたいとき、batchWriteで一度に書き込めるレコード数が制限されていたりする。
複数の非同期リクエストを同じところに投げたい場合、よく使うのがFlux.fromIterable()で、リストなどからFluxを作成できる。

Mono<Integer> scoreBatchWriteAsync(List<UserScore> userScores) {
    List<DynamoRequest> dynamoRequestList = createDynamoRequest(userScores)
    return Flux.fromIterable(dynamoRequestsList)
        .flatMap(request -> awsLogic.batchWriteRequest(request))
        .collectList() // Mono<List<requestの結果>>
        .map(resultList -> resultList.stream().reduce(Integer::sum));  
}

書き込みたい内容を示すuserScoresのリストを基にFluxを生成して、それぞれからリクエストのMonoを生成している。
Fluxで結果が返ってくるので、それをMonoとしてまとめる集約処理を書く必要がある(今回はリクエストの件数をIntegerとしてまとめてるだけ)

別々のリクエストをまとめる

複数のDBクエリ結果などが後続の処理に必要なとき、Mono.zipで複数のMonoから結果を得ることが出来る。
それぞれのリクエストは並行で行われ、全ての結果が返ってくるまで待機される。
結果はそれぞれT1,T2...として取得出来る。

Mono<UserDetailInfo> getUserDetailInfo(String id) {
    return Mono.zip(
        getUser(id),
        getUserSubscription(id),
        getUserAddress(id)
    ).flatMap(result -> getUserDetailInfo(result.getT1(), result.getT2(), result.getT3()));
}

Discussion