Spring Cloud Stream, Reactorを使ってノンブロッキングなストリーム処理を実装する
はじめに
Spring Cloud StreamとReactor(3.4)を使って、Message Queueを介したノンブロッキングなストリーム処理を実装する方法の紹介です。
最新のReactorに追従した例があまり見当たらず、以前アノテーションベースで実装していた場合と大きく変わっておりわかりにくかったのでまとめてみました。
なお、例ではGoogle Cloud Pub/Subを利用していますが、Spring Cloud Streamのbinderを変更することでRabbitMQ, Apache Kafka, Amazon Kinesisなどにも変更可能です。公式のBinder Implementationsを参考にしてください。
(各binderがノンブロッキングに対応しているかどうかについては注意してください。)
確認バージョン
dependency | version |
---|---|
Spring Boot | 2.5.5 |
Spring Cloud Stream | 3.1.4 |
Spring Cloud GCP | 1.2.8.RELEASE |
Reactor | 3.4.10 |
全体像・仕様
supplierとconsumerというアプリケーションがあり、それぞれ単独で動作します。
supplierはHTTPサーバーとなっており、POSTリクエストでユーザー登録を受け付けます。そのリクエストをもとに、MQにメッセージを送信します。
そしてconsumerがMQよりメッセージを受け取り、標準出力に流すようになっています。
リポジトリ
起動方法、リクエストサンプルについてはREADMEをご確認ください。
解説
送信側(Supplier)の実装
まず、2つBean登録を行います。
@Configuration
public class SupplierConfig {
@Bean
public Sinks.Many<User> sink() {
return Sinks.many().unicast().onBackpressureBuffer();
}
@Bean("producer")
public Supplier<Flux<User>> producer(Sinks.Many<User> sink) {
return sink::asFlux;
}
}
Sinks
はReactorによって提供される、スレッドセーフにReactive Streamsのシグナルを手動で送ることのできるAPIです。
Sinks.many().unicast().onBackpressureBuffer()
と呼び出すことで、内部バッファを持ちバックプレッシャー[1]を実現した、単一のSubscriberにシグナルを送るSinksになります。
従来はEventEmitter
がこの役目でしたが、現在はDeprecated、Reactor 3.5で削除される予定です。
Sinksに関する詳細はReactor公式Referenceの4.7. Processors and Sinksをご確認ください。
producer()
にて、java.util.function.Supplier
を実装しています。これは Spring Cloud Function に準じた関数定義となっており、Spring Cloud Stream 1.2以降ではこれをMQのInput/Outputにバインドする方法が推奨されています。
後の設定でBean名が必要になるため@Bean("producer")
と明示的に宣言しておりますが、デフォルトで関数名が用いられるので@Bean
でも構いません。
次に、MQを通じてメッセージ送信を行う箇所の実装です。この例ではSpring WebFluxのFunctional Endpointsを使って実装しています。
@Slf4j
@Component
public class UserHandler {
private final Sinks.Many<User> sink;
public UserHandler(Sinks.Many<User> sink) {
this.sink = sink;
}
public Mono<ServerResponse> send(ServerRequest request) {
return request.bodyToMono(UserRequest.class)
.doOnNext(usr -> log.info("receive {}", usr))
.map(UserRequest::toNewUser)
.doOnNext(usr -> sink.emitNext(usr, EmitFailureHandler.FAIL_FAST))
.doOnNext(usr -> log.info("send {}", usr))
.flatMap(usr -> ServerResponse.ok().build());
}
}
sink.emitNext(usr, EmitFailureHandler.FAIL_FAST)
でSinksに対して送りたいデータを送信しています。第2引数にEmitFailureHandler
の実装を渡すことで、失敗時のリトライ仕様を決めることができます。
事前定義されたFAIL_FAST
を用いると、リトライは行わずすぐに失敗として扱う仕様となります。
最後に、application.ymlにてMQとのバインド設定を行います。
spring:
cloud:
function:
definition: producer
stream:
bindings:
producer-out-0:
destination: user
spring.cloud.function.definition
で、バインドさせるSpring Cloud Functionに準じたBean名を指定します。@Bean("producer")
と登録しておいたので上記のようになります。
なお、java.util.Supplier
などのBean登録が1件のみであれば省略可能ですが、明示的に設定することが推奨されています。
また、複数の関数を設定する場合はproducer1;producer2
と;
で区切る必要があります。
spring.cloud.stream.bindings
にバインド名を設定する必要がありますが、その命名規則がそのまま送信/受信、関数紐付けの設定となります。
- 送信側:
{送信関数のBean登録名}-out-{連番}
- 受信側:
{受信関数のBean登録名}-in-{連番}
という命名規則で設定します。連番については基本0
で良いですが、もし複数の箇所に送りたい・受け取りたい場合は続けて番号をいれることができます。
受信側(Consumer)の実装
まず、MQを受け取ったあとの処理を実装しておきます。
@Slf4j
@Component
public class UserReceiver {
public Mono<Void> receiveUser(User user) {
log.info("receive {}", user);
return Mono.empty();
}
}
次にBean登録です。送信側同様Spring Cloud Functionsに沿った形としています。
@Configuration
public class ConsumerConfig {
@Bean("consumer")
public Consumer<Flux<User>> consumer(UserReceiver receiver) {
return stream -> stream
.map(receiver::receiveUser)
.subscribe();
}
}
ここではjava.util.function.Consumer
を実装していますが、java.util.function.Function
でもOKです。その場合は返り値の型がFunction<Flux<User>, Mono<Void>>
となります。
最後にapplication.ymlの設定です。こちらは送信側と同様になります。
spring:
cloud:
function:
definition: consumer
stream:
bindings:
consumer-in-0:
destination: user
まとめ
Spring Cloud Functionベースの実装により疎結合になる一方で、従来のアノテーションベースになれてる身からすると戸惑う部分が多いかなと思いました。
何かしらの参考になれば幸いです。
参考
- Spring Cloud Stream Reference Documentation (公式)
- Reactive Spring Cloud Stream in practice | by Zalán Tóth | Medium
- Azure Service Bus 用の Spring Cloud Azure Stream Binder を使用する方法 | Microsoft Docs
-
受信側の飽和を防ぐために、送信側で抑止を行う仕組み。 参考: バックプレッシャー(back pressure)とは - IT用語辞典 e-Words ↩︎
Discussion