Spring WebFluxとSpring Data R2DBCでリアクティブプログラミング
はじめに
Spring WebFluxは、非同期で動作するリアクティブなWebアプリケーションを構築するためのSpring Frameworkです。WebFluxは、Spring 5以降で導入され、Reactive Streams APIに基づいて構築されています。
WebFluxを利用することで非同期的なデータアクセスを可能にしますが、一般的なJavaアプリケーションで利用されるJDBC(Java Database Connectivity)では、同期的な処理しかサポートされていないため、リアクティブなWebアプリケーション構築する上で直接データベースにアクセスすることは避けられていました。
一方で、Spring Data R2DBCは、Reactive Streams APIに基づいて構築されており、非同期的なデータアクセスを実現することができます。さらにSpring Data R2DBCは、リレーショナルデータベースに対するサポートを提供しています。これにより、従来のJDBCによるデータベースアクセスと同じように扱うことができます。
これらの機能を利用することで、非同期処理により、アプリケーションのレスポンス性が向上し、よりスケーラブルなアプリケーションを構築することができます。
そこで今回は、Spring WebFluxとSpring Data R2DBCを利用したノンブロッキングなデータアクセスを実装していきます。
前準備
実行環境
java -version
openjdk version "11.0.15" 2022-04-19 LTS
OpenJDK Runtime Environment Corretto-11.0.15.9.1 (build 11.0.15+9-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.15.9.1 (build 11.0.15+9-LTS, mixed mode)
プロジェクト作成
Spring Initializrで以下を選択してください。
- Web->Spring Reactive Web
- SQL->Spring Data R2DBC, PostgreSQL Driver(サポートしているDriver一覧)
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'io.r2dbc:r2dbc-postgresql'
runtimeOnly 'org.postgresql:postgresql'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
DBの起動
今回は簡易的にdocker-composeで立ち上げます。
version: '3'
services:
postgres:
image: postgres:latest
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
POSTGRES_DB: demo
TZ: "Asia/Tokyo"
ports:
- "5432:5432"
実装
作成するアプリ
従業員データを操作する簡単なWeb APIを実装します。
メソッド | URL | 説明 |
---|---|---|
GET | /employee/{id} | idで指定した従業員データを取得する |
GET | /employee | 従業員の一覧データを取得する |
POST | /employee | 従業員データを登録する |
PUT | /employee | 従業員データを更新する |
DELETE | /employee/{id} | idで指定した従業員データを削除する |
APIのルーティング
Spring WebFluxではSpring MVCのアノテーション付きコントローラーの書き方もできますが、今回は関数型のように書くことができるRouterFunctionを利用します。
各HTTPリクエストに対して後述するHandlerのメソッドを割り当てています。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public RouterFunction route(EmployeeHandler handler) {
return RouterFunctions.route()
.GET("/employee/{id}", handler::get)
.GET("/employee", handler::list)
.POST("/employee", handler::create)
.PUT("/employee", handler::update)
.DELETE("/employee/{id}", handler::delete)
.build();
}
}
リポジトリの実装
DBの設定
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/demo
spring.r2dbc.username=postgres
spring.r2dbc.password=password
DB初期化
DBの初期化はdockerで行なっても良いですが、せっかくなのでSpring Data R2DBCでサポートされている機能を利用して行います。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public RouterFunction route(EmployeeHandler handler) {
return RouterFunctions.route()
.GET("/employee/{id}", handler::get)
.GET("/employee", handler::list)
.POST("/employee", handler::create)
.PUT("/employee", handler::update)
.DELETE("/employee/{id}", handler::delete)
.build();
}
+ @Bean
+ public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {
+ var initializer = new ConnectionFactoryInitializer();
+ initializer.setConnectionFactory(connectionFactory);
+ var populator = new CompositeDatabasePopulator();
+ populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("./db-schema.sql")));
+ initializer.setDatabasePopulator(populator);
+ return initializer;
+ }
}
drop table if exists employee;
create table employee
(
id serial,
name varchar(20),
primary key (id)
);
Entity
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Employee {
@Id
private Long id;
private String name;
}
Repositoryインタフェース
Spring Data R2DBCではRDBにクエリを発行する方法がいくつかあります。そのうちの1つのRepositoryインタフェースを継承する方法で実装します。
ReactiveCrudRepositoryを利用することで永続ストアのデータアクセスレイヤーを実装するために必要な定型コードの量を大幅に削減することができます。
管理するドメインクラスとドメインクラスのIDを指定することで実現できます。
今回はEntityとして作成したEmployee
とPKのLong
の型でReactiveCrudRepositoryを継承したインタフェースを作成します。
interface EmployeeRepository extends ReactiveCrudRepository<Employee, Long> {
}
ハンドラーの実装
最後にルーティングで渡したHandlerで各APIの処理を実装します。
基本的には先程実装したrepositoryインタフェースで用意されているメソッドを利用してDBへアクセスし、結果をServerResponseのbody部分に詰めて返却しています。
@AllArgsConstructor
@Component
public class EmployeeHandler {
EmployeeRepository employeeRepository;
Mono<ServerResponse> get(ServerRequest serverRequest) {
var id = Long.parseLong(serverRequest.pathVariable("id"));
return employeeRepository.findById(id)
.flatMap(employee -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(employee))
.switchIfEmpty(ServerResponse.notFound().build());
}
Mono<ServerResponse> create(ServerRequest serverRequest) {
return handleBody(serverRequest, CreateEmployeeRequest.class, request -> {
var employee = Employee.builder()
.name(request.name)
.build();
return employeeRepository.save(employee);
});
}
Mono<ServerResponse> list(ServerRequest serverRequest) {
return employeeRepository.findAll().collectList()
.flatMap(response -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(response))
.switchIfEmpty(ServerResponse.notFound().build());
}
Mono<ServerResponse> update(ServerRequest serverRequest) {
return handleBody(serverRequest, UpdateEmployeeRequest.class, request -> {
var employee = Employee.builder()
.id(request.id)
.name(request.name)
.build();
return employeeRepository.save(employee);
}).flatMap(response -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(response));
}
Mono<ServerResponse> delete(ServerRequest serverRequest) {
var id = Long.parseLong(serverRequest.pathVariable("id"));
return employeeRepository.deleteById(id)
.flatMap(response -> ServerResponse.ok().build())
.doOnError(Mono::error);
}
public static <T, R extends Mono<?>> Mono<ServerResponse> handleBody(ServerRequest serverRequest, Class<T> clazz, Function<T, R> f) {
return serverRequest.bodyToMono(clazz)
.flatMap(f)
.flatMap(response -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(response))
.switchIfEmpty(ServerResponse.notFound().build());
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
static class CreateEmployeeRequest {
private String name;
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
static class UpdateEmployeeRequest {
private Long id;
private String name;
}
}
おわりに
Spring WebFluxとSpring Data R2DBCを利用したリアクティブプログラミングについて簡単なサンプルを用いて解説しました。
Spring WebFluxは情報が少なく、取っつきづらいかと思います。 しかしながら、スケーラブルなアプリケーションの構築やよりパフォーマンスが求められるシステムにおいて重要な技術だと考えています。
普段JavaやSpring Frameworkを利用している開発者の方で興味がある方はreactorのドキュメントを一読してみると良いかも知れません。
Discussion