🎢

Spring Cloud Stream, Reactorを使ってノンブロッキングなストリーム処理を実装する

2021/10/13に公開

はじめに

Spring Cloud StreamとReactor(3.4)を使って、Message Queueを介したノンブロッキングなストリーム処理を実装する方法の紹介です。
最新のReactorに追従した例があまり見当たらず、以前アノテーションベースで実装していた場合と大きく変わっておりわかりにくかったのでまとめてみました。

なお、例ではGoogle Cloud Pub/Subを利用していますが、Spring Cloud Streamのbinderを変更することでRabbitMQ, Apache Kafka, Amazon Kinesisなどにも変更可能です。公式のBinder Implementationsを参考にしてください。
(各binderがノンブロッキングに対応しているかどうかについては注意してください。)

確認バージョン

dependency version
Spring Boot 2.5.5
Spring Cloud Stream 3.1.4
Spring Cloud GCP 1.2.8.RELEASE
Reactor 3.4.10

全体像・仕様

overview

supplierとconsumerというアプリケーションがあり、それぞれ単独で動作します。
supplierはHTTPサーバーとなっており、POSTリクエストでユーザー登録を受け付けます。そのリクエストをもとに、MQにメッセージを送信します。
そしてconsumerがMQよりメッセージを受け取り、標準出力に流すようになっています。

リポジトリ

https://github.com/abekoh/spring-cloud-stream-with-reactive

起動方法、リクエストサンプルについてはREADMEをご確認ください。

解説

送信側(Supplier)の実装

まず、2つBean登録を行います。

SupplierConfig.java
@Configuration
public class SupplierConfig {
	@Bean
	public Sinks.Many<User> sink() {
		return Sinks.many().unicast().onBackpressureBuffer();
	}

	@Bean("producer")
	public Supplier<Flux<User>> producer(Sinks.Many<User> sink) {
		return sink::asFlux;
	}
}

SinksはReactorによって提供される、スレッドセーフにReactive Streamsのシグナルを手動で送ることのできるAPIです。
Sinks.many().unicast().onBackpressureBuffer()と呼び出すことで、内部バッファを持ちバックプレッシャー[1]を実現した、単一のSubscriberにシグナルを送るSinksになります。
従来はEventEmitterがこの役目でしたが、現在はDeprecated、Reactor 3.5で削除される予定です。
Sinksに関する詳細はReactor公式Referenceの4.7. Processors and Sinksをご確認ください。

producer()にて、java.util.function.Supplierを実装しています。これは Spring Cloud Function に準じた関数定義となっており、Spring Cloud Stream 1.2以降ではこれをMQのInput/Outputにバインドする方法が推奨されています。
後の設定でBean名が必要になるため@Bean("producer")と明示的に宣言しておりますが、デフォルトで関数名が用いられるので@Beanでも構いません。

次に、MQを通じてメッセージ送信を行う箇所の実装です。この例ではSpring WebFluxのFunctional Endpointsを使って実装しています。

UserHandler.java
@Slf4j
@Component
public class UserHandler {

	private final Sinks.Many<User> sink;

	public UserHandler(Sinks.Many<User> sink) {
		this.sink = sink;
	}

	public Mono<ServerResponse> send(ServerRequest request) {
		return request.bodyToMono(UserRequest.class)
				.doOnNext(usr -> log.info("receive {}", usr))
				.map(UserRequest::toNewUser)
				.doOnNext(usr -> sink.emitNext(usr, EmitFailureHandler.FAIL_FAST))
				.doOnNext(usr -> log.info("send {}", usr))
				.flatMap(usr -> ServerResponse.ok().build());

	}
}

sink.emitNext(usr, EmitFailureHandler.FAIL_FAST)でSinksに対して送りたいデータを送信しています。第2引数にEmitFailureHandlerの実装を渡すことで、失敗時のリトライ仕様を決めることができます。
事前定義されたFAIL_FASTを用いると、リトライは行わずすぐに失敗として扱う仕様となります。

最後に、application.ymlにてMQとのバインド設定を行います。

application.yml
spring:
  cloud:
    function:
      definition: producer
    stream:
      bindings:
        producer-out-0:
          destination: user

spring.cloud.function.definitionで、バインドさせるSpring Cloud Functionに準じたBean名を指定します。@Bean("producer")と登録しておいたので上記のようになります。
なお、java.util.SupplierなどのBean登録が1件のみであれば省略可能ですが、明示的に設定することが推奨されています
また、複数の関数を設定する場合はproducer1;producer2;で区切る必要があります

spring.cloud.stream.bindingsにバインド名を設定する必要がありますが、その命名規則がそのまま送信/受信、関数紐付けの設定となります。

  • 送信側: {送信関数のBean登録名}-out-{連番}
  • 受信側: {受信関数のBean登録名}-in-{連番}

という命名規則で設定します。連番については基本0で良いですが、もし複数の箇所に送りたい・受け取りたい場合は続けて番号をいれることができます。

受信側(Consumer)の実装

まず、MQを受け取ったあとの処理を実装しておきます。

UserReceiver.java
@Slf4j
@Component
public class UserReceiver {
	public Mono<Void> receiveUser(User user) {
		log.info("receive {}", user);
		return Mono.empty();
	}
}

次にBean登録です。送信側同様Spring Cloud Functionsに沿った形としています。

ConsumerConfig.java
@Configuration
public class ConsumerConfig {
	@Bean("consumer")
	public Consumer<Flux<User>> consumer(UserReceiver receiver) {
		return stream -> stream
				.map(receiver::receiveUser)
				.subscribe();
	}
}

ここではjava.util.function.Consumerを実装していますが、java.util.function.FunctionでもOKです。その場合は返り値の型がFunction<Flux<User>, Mono<Void>>となります。

最後にapplication.ymlの設定です。こちらは送信側と同様になります。

application.yml
spring:
  cloud:
    function:
      definition: consumer
    stream:
      bindings:
        consumer-in-0:
          destination: user

まとめ

Spring Cloud Functionベースの実装により疎結合になる一方で、従来のアノテーションベースになれてる身からすると戸惑う部分が多いかなと思いました。

何かしらの参考になれば幸いです。

参考

脚注
  1. 受信側の飽和を防ぐために、送信側で抑止を行う仕組み。 参考: バックプレッシャー(back pressure)とは - IT用語辞典 e-Words ↩︎

Discussion