🌚

SpringBootでRxJavaを使用し並列実行やエラーハンドリング、リトライ処理を実装してみた

12 min read

rxjava and springboot

前提条件

  • RxJava 2.2.20
  • SpringBoot 2.4.1

SpringBootはそこまで関係しないですが、最終的にSpringBootでRxJavaを使いたかったので同時に実装することにしました。
本記事は以下のことがらに焦点を当てています。

  • RxJavaは何ができるの?
  • RxJavaを使ったときに嬉しいことって何?
  • 具体的にはどういう実装するの?

これらを踏まえて、簡単レスポンスを返すWebAPIを実装していきます。

実装するAPI

Githubにあげてます。

https://github.com/iku8/java-reactive-trial

よくあるSNSを想定して、ログイン後のマイページへアクセスしたときに発行されるAPIを実装実装します。

このAPIはユーザ情報を取得し、情報が取得できれば、ユーザに紐づく投稿一覧やフォロワーの数を取得します。

取得するもの(ログインユーザに紐づくものです)

  1. 投稿タイトルの一覧
    • List<String>
  2. フォロワー数
    • Long

APIの実装要件

  1. ユーザ情報の取得が成功したときのみ投稿タイトル一覧・フォロワー数を取得する
  2. 投稿タイトル一覧・フォロワー数は並列に取得する(スレッドを分ける)
  3. 投稿タイトル一覧・フォロワー数を保持するような専用クラスは作成しない
    • PostsAndFollowersCountのようなクラスは作成しない
  4. フォロワー数の取得が失敗した場合でも投稿タイトル一覧が取得できれば、レスポンスは正常に返す
  5. フォロワー数の取得が失敗した場合、リトライを1度だけ実行する

UML

今回の要件のフォロー図を示しておきます。

RxJavaによる直列処理・並列処理とエラーハンドリングとリトライ

用語説明

用語を先に軽く説明しておきます。Spring固有のものは説明しません。

  • Observable
    • RxJavaの中核となるような機能であり、あらゆる値をObservable化することで、並列実行したり連結したり、変換したりすることができる
  • Observable.just()
    • 何らかの値をObservable化する
    • StringでもIntegerでもListでもなんでも
  • Observable.fromArray()
    • 配列をObservable化する
    • justでも良いかもしれないが、配列用のメソッドがあるので今回これを使っている
  • Observable.flatMap()
    • 特定のObservableの処理の結果を使用し後続のObservableの処理に繋げられる
    • StreamAPIのflatMapと同じような感じ
  • Observable.zip()
    • 複数のObservable処理の待ち合わせが行える
    • A,Bの処理があった場合、両方終わるまで待つことができる
    • subscribeOnと組み合わせて並列処理できる
  • subscribeOn()
    • Schedulerを使用することで、どのスレッドを使用してObservable処理をするか選択できる
  • Schedulers.io()
    • どのスレッドを使用するか選択する
    • io特化なのかCPU特化なのか、単に新しいスレッドなのか等
  • Pair
    • 2つの異なる型を一つの値として詰める箱
    • クラスを作って詰めるまでもない時にしようする

gradleの設定

gradleで以下依存関係を追加しておきます。
spring-boot-starter-webfluxを直接使っているわけではありません。httpのリクエストをハンドルするために入れているだけです(今後webfluxについても調査していきます)

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-webflux'
	implementation 'io.reactivex.rxjava2:rxjava:2.2.20'
	implementation 'org.springframework.data:spring-data-commons:2.4.2'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'io.projectreactor:reactor-test'
}

実装

ユーザ、投稿、フォロワーのクラスを作成

Lombokを使用ながら、適当に3つのクラスを作成しておきます。
本題ではないのでコードだけ。

User.java
@Builder
@Data
public class User {
    private long userId;
    private String name;
    private String email;
}
Post.java
@Builder
@Data
public class Post {
    private long postId;
    private String title;
    private String body;
}
Follower.java
@Builder
@Data
public class Follower {
    private long userId;
}

ユーザ、投稿、フォロワー取得

それぞれの値をObservableにくるんで取得します。これで直列とか並列処理するための準備をします。
インタフェースは適当に作ってください。githubも上げております。

Obserbale.just()でUserクラスをObservableにくるむことができます。

UserServiceImpl.java
@Service
public class UserServiceImpl implements UserService {

    @Override
    public Observable<User> findByUserId(long userId) {
        return Observable.just(User.builder()
            .userId(userId)
            .name("test name")
            .email("test@example.com")
            .build());
    }
}

Observable.fromArray()を使用することで配列をObservableにくるむことができます。くるんであげましょう。
ちょっと無理やりですがmap内でThread.sleep(3000);をすることによって、このメソッドを処理を遅くしています(3秒)。
ObservableEmitterを使って書き直すこともできるぽいですが。

PostServiceImpl.java
@Service
public class PostServiceImpl implements PostService {

    @Override
    public Observable<List<Post>> findAllByUserId(long userId) {
        return Observable.fromArray(
            Arrays.asList(
                Post.builder()
                    .postId(1L)
                    .title("test title1")
                    .body("test body1")
                    .build(),
                Post.builder()
                    .postId(1L)
                    .title("test title1")
                    .body("test body1")
                    .build()
            )
        ).map(e -> {
            Thread.sleep(3000);
            return e;
        });
    }
}

こちらも同じく3秒遅くしてくるみます。

FollowerServiceImpl.java
@Service
public class FollowerServiceImpl implements FollowerService {

    @Override
    public Observable<List<Follower>> getByUserId(long userId) {
        return Observable.fromArray(
            Arrays.asList(
                Follower.builder()
                    .userId(1L)
                    .build(),
                Follower.builder()
                    .userId(2L)
                    .build(),
                Follower.builder()
                    .userId(3L)
                    .build()
            )
        )
        .map(e -> {
            Thread.sleep(3000);
            return e;
        });
    }
}

直列、並列処理を実装

コントローラを作って、ユーザ取得をした後に投稿一覧、フォロワー取得を行います。
各ServiceはObservableなので、メソッドチェーンにより合成したり、並列実行したりすることができます。

RxController.java
@RestController
@RequestMapping("api/rx")
@RequiredArgsConstructor
public class RxController {

    private final UserService userService;
    private final PostService postService;
    private final FollowerService followerService;

    @GetMapping
    public Observable<Pair<List<Post>, Long>> index() throws InterruptedException {
        StopWatch sw = new StopWatch();
        sw.start();
        return userService.findByUserId(1L)
            .flatMap(user -> {
                return Observable.zip(
                    postService.findAllByUserId(user.getUserId()),
                    followerService.getByUserId(user.getUserId())
                    (posts, followers) -> Pair.of(posts, followers.stream().count()));
            })
            .doOnComplete(() -> {
                sw.stop();
                System.out.println(sw.getTotalTimeSeconds());
            });
    }
}

コントローラの実装解説

処理時間計測

別スレッドを作成し、並列実行した場合の時間を計測したいのでStopWatchで測ります。

StopWatch sw = new StopWatch();
sw.start();

すべてのObservable処理が完了したら.doOnCompleteに入るので、ここで処理時間を表示しています。

ユーザ情報取得の結果を後続の処理と直列化

userService.findByUserId()はObservableを返すので、flatMapをメソッドチェーンすることで、ユーザ情報取得結果を後続の処理に渡すことができます。後続の処理は投稿取得とフォロワー取得です。

投稿取得、フォロワー取得を並列に実行し、待ち合わせ

Observable.zip()を使用することで、zip内の処理をそれぞれ実行し、すべての処理が実行し終わったらreturnします。
投稿取得に3秒、フォロワー取得3秒なので合計6秒かかるはずの処理が、並列に実行され3秒で済むはずです。

すべての処理が終わったらreturnされるので、後続のObservable処理に投稿情報、フォロワー情報を使用するなどといったことが出来ます。便利ですね。

Pairを使って無駄なクラスを作成しない

投稿タイトル一覧とフォロワー数を最終的に返したいので、

List<String>Longをメンバ変数に持つ、PostsAndFollowersを作りたくなるかもしれませんが、用途が限定的すぎるので、今回はorg.springframework.data.util.Pairを使用してみます。

(posts, followers) -> Pair.of(posts, followers.stream().count()))

これでList<String>Longを持つPair型となります。PairはgetFirst()getSecond()で設定した値を取得することできます。今回は取り出す必要がないのでそのままで。

実はまだ並列実行出来ていない

アプリケーションを起動させて./gradlew bootRunhttp://localhost:8080/api/rxにアクセスしてみます。

ブラウザで結果は以下のように返却されました。

[{"first":[{"postId":1,"title":"test title1","body":"test body1"},{"postId":1,"title":"test title1","body":"test body1"}],"second":3}]

consoleを見ると
6.069420594

なんと6秒かかっていました。

投稿取得に3秒、フォロワー取得3秒なので合計6秒かかるはずの処理が、並列に実行され3秒で済むはずです

と書きましたが、実はこのままでは並列実行されずに6秒かかってしまうんです。

元のコードのままだとメインのスレッドがそのまま使用され、Observable.zip()を実行したとしてもスレッドは1つなので、逐次実行みたいになってしまうんですね。残念。

別スレッドを作成し並列実行

メインのスレッドとは別に、Observable.zip()内の処理を別スレッドで実行してみます。

RxController.java
return Observable.zip(
    postService.findAllByUserId(user.getUserId()).subscribeOn(Schedulers.io()),
    followerService.getByUserId(user.getUserId()).subscribeOn(Schedulers.io()),
    (posts, followers) -> Pair.of(posts, followers.stream().count()));

.subscribeOn(Schedulers.io())をzip内のObservableの各処理に付けます。これはIOバウンドに特化したスレッドを生成し、処理するという設定のようです。他にもCPUバウンドのスレッドとかを作れるらしいですが詳しくは分かりません。今回はなんとなくioを使ってみました。

ビルドし直して、http://localhost:8080/api/rxに再度アクセスしてみます

3.095990596
ちゃんと記事取得処理とフォロワー取得処理が並列実行されて3秒程度で帰ってきましたね。

エラーハンドリングとリトライ処理

今回の要件では以下の2点もあるので、これらを実装していきます。

  • フォロワー数の取得が失敗した場合でも投稿タイトル一覧が取得できれば、レスポンスは正常に返す
  • フォロワー数の取得が失敗した場合、リトライを1度だけ実行する

先に要件に即したコードを示します。

FollowerServiceImpl.java
@Service
public class FollowerServiceImpl implements FollowerService {

    @Override
    public Observable<List<Follower>> getByUserId(long userId) {
        return Observable.fromArray(
            Arrays.asList(
                Follower.builder()
                    .userId(1L)
                    .build(),
                Follower.builder()
                    .userId(2L)
                    .build(),
                Follower.builder()
                    .userId(3L)
                    .build()
            ))
            .map(e -> {
                // 追加
                long randomNumber = new Random().nextInt(6);

                if(3 != randomNumber) {
                    throw new RuntimeException("fail get followers");
                }

                Thread.sleep(3000);
                return e;
            })
            .retry(1) // 追加
            .onErrorReturn(e -> {  // 追加
                return Collections.emptyList();
            });
    }
}

エラーハンドリング

通常Observable内の処理でExceptionが発生すると、全体の処理が停止してしまいます。
投稿一覧は取得出来たのに、フォロワーが取得出来ないためにこのAPI自体が停止してしまいます。

それはもったいないので、フォロワーが取得出来ない場合はフォロワー数を0件として返してあげるようにします。

今回の要件だとエラーハンドリングをするのはフォロワーのServiceのみで良いです。
何らかの例外が発生した場合onErrorReturnが呼ばれるので、そのときは空のListを返すようにします。

.onErrorReturn(e -> { 
    return Collections.emptyList();
});

次にランダムなタイミングで例外が投げられるようにします。以下は6回に1回ほど正常処理となります。
なので大抵リクエストの度にフォロワー数0となります。

.map(e -> {
    // 追加
    long randomNumber = new Random().nextInt(6);

    // 3以外の数字の場合例外を投げる
    if(3 != randomNumber) {
        throw new RuntimeException("fail get followers");
    }

    Thread.sleep(3000);
    return e;
})

リトライ

エラーが発生したとしても何度かリトライしたい場合があると思います。
今回だとフォロワー取得に1度は失敗してしまったが、もう一度リトライしたいという場合に以下が使えます。

.retry(1)

リトライ処理めちゃくちゃ簡単で驚愕しました。
これで一度例外が履かれたとしても、リトライします。リトライして例外が吐かれたら、素直にエラーハンドリングされ空リストを返すといった感じになります。かっこいい。

2回試行するので3回に1回程度フォロワー数が取れると思います。

すべて場合 - (1回目に3とならない確率 * 2回目に3とならない確率) = 1 - (5/6 * 5/6) = 11/36 ≒ 1/3

RxJava面白かった

RxJava初めて触ってみましたが、あらゆる実装が新鮮で刺激的で、個人プロダクトにも利用できそうなので、積極的に学習続けたいと思います。

今回は以下のような要件をクリアしてみましたが、他にも面白そうなメソッドや仕組みがありそうなので調査を続けていきます。

  1. ユーザ情報の取得が成功したときのみ投稿タイトル一覧・フォロワー数を取得する
  2. 投稿タイトル一覧・フォロワー数は並列に取得する(スレッドを分ける)
  3. 投稿タイトル一覧・フォロワー数を保持するような専用クラスは作成しない
    • PostsAndFollowersCountのようなクラスは作成しない
  4. フォロワー数の取得が失敗した場合でも投稿タイトル一覧が取得できれば、レスポンスは正常に返す
  5. フォロワー数の取得が失敗した場合、リトライを1度だけ実行する

最後に最初の問に答えておきます。

  • RxJavaは何ができるの?
    • 並列処理やデータの加工、エラーハンドリングやリトライ処理
  • RxJavaを使ったときに嬉しいことって何?
    • 複数の処理を同時に並列実行することで、パフォーマンスが向上する
    • エラー発生時のハンドリングやリトライ処理が簡単に書ける
  • 具体的にはどういう実装するの?
    • Observableを使用してStreamAPIのような感じでメソッドチェーンとして処理をつなげていく

間違ってる点や、RxJava等のりアクティブプログラミングについてこんな面白いことも出来るよとかあれば教えて下さい。

RxJava3系も出ているようなので、そちらも今後調査していきます。

RxJava Releases

今回のコード: https://github.com/iku8/java-reactive-trial

Discussion

ログインするとコメントできます