🌸

Spring WebFluxとSpring Data R2DBCでリアクティブプログラミング

2023/02/21に公開

はじめに

Spring WebFluxは、非同期で動作するリアクティブなWebアプリケーションを構築するためのSpring Frameworkです。WebFluxは、Spring 5以降で導入され、Reactive Streams APIに基づいて構築されています。

WebFluxを利用することで非同期的なデータアクセスを可能にしますが、一般的なJavaアプリケーションで利用されるJDBC(Java Database Connectivity)では、同期的な処理しかサポートされていないため、リアクティブなWebアプリケーション構築する上で直接データベースにアクセスすることは避けられていました。

一方で、Spring Data R2DBCは、Reactive Streams APIに基づいて構築されており、非同期的なデータアクセスを実現することができます。さらにSpring Data R2DBCは、リレーショナルデータベースに対するサポートを提供しています。これにより、従来のJDBCによるデータベースアクセスと同じように扱うことができます。
これらの機能を利用することで、非同期処理により、アプリケーションのレスポンス性が向上し、よりスケーラブルなアプリケーションを構築することができます。

そこで今回は、Spring WebFluxとSpring Data R2DBCを利用したノンブロッキングなデータアクセスを実装していきます。

前準備

実行環境

java -version
openjdk version "11.0.15" 2022-04-19 LTS
OpenJDK Runtime Environment Corretto-11.0.15.9.1 (build 11.0.15+9-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.15.9.1 (build 11.0.15+9-LTS, mixed mode)

プロジェクト作成

Spring Initializrで以下を選択してください。

build.gradle
dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
	implementation 'org.springframework.boot:spring-boot-starter-webflux'
	compileOnly 'org.projectlombok:lombok'
	runtimeOnly 'io.r2dbc:r2dbc-postgresql'
	runtimeOnly 'org.postgresql:postgresql'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'io.projectreactor:reactor-test'
}

DBの起動

今回は簡易的にdocker-composeで立ち上げます。

docker-compose.yml
version: '3'

services:
  postgres:
    image: postgres:latest
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: demo
      TZ: "Asia/Tokyo"
    ports:
      - "5432:5432"

実装

作成するアプリ

従業員データを操作する簡単なWeb APIを実装します。

メソッド URL 説明
GET /employee/{id} idで指定した従業員データを取得する
GET /employee 従業員の一覧データを取得する
POST /employee 従業員データを登録する
PUT /employee 従業員データを更新する
DELETE /employee/{id} idで指定した従業員データを削除する

APIのルーティング

Spring WebFluxではSpring MVCのアノテーション付きコントローラーの書き方もできますが、今回は関数型のように書くことができるRouterFunctionを利用します。
各HTTPリクエストに対して後述するHandlerのメソッドを割り当てています。

DemoApplication.java  
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
	SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public RouterFunction route(EmployeeHandler handler) {
        return RouterFunctions.route()
                .GET("/employee/{id}", handler::get)
		.GET("/employee", handler::list)
		.POST("/employee", handler::create)
		.PUT("/employee", handler::update)
		.DELETE("/employee/{id}", handler::delete)
		.build();
    }
}

リポジトリの実装

DBの設定

application.properties
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/demo
spring.r2dbc.username=postgres
spring.r2dbc.password=password

DB初期化

DBの初期化はdockerで行なっても良いですが、せっかくなのでSpring Data R2DBCでサポートされている機能を利用して行います。

DemoApplication.java
@SpringBootApplication  
public class DemoApplication {  

    public static void main(String[] args) {
	SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public RouterFunction route(EmployeeHandler handler) {
        return RouterFunctions.route()
                .GET("/employee/{id}", handler::get)
		.GET("/employee", handler::list)
		.POST("/employee", handler::create)
		.PUT("/employee", handler::update)
		.DELETE("/employee/{id}", handler::delete)
		.build();
    }

+   @Bean
+   public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {
+	var initializer = new ConnectionFactoryInitializer();
+	initializer.setConnectionFactory(connectionFactory);
+	var populator = new CompositeDatabasePopulator();
+	populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("./db-schema.sql")));
+	initializer.setDatabasePopulator(populator);
+	return initializer;
+   }
}
db-schema.sql
drop table if exists employee;

create table employee
(
    id   serial,
    name varchar(20),
    primary key (id)
);

Entity

Employee.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Employee {
    @Id
    private Long id;
    private String name;
}

Repositoryインタフェース

Spring Data R2DBCではRDBにクエリを発行する方法がいくつかあります。そのうちの1つのRepositoryインタフェースを継承する方法で実装します。
ReactiveCrudRepositoryを利用することで永続ストアのデータアクセスレイヤーを実装するために必要な定型コードの量を大幅に削減することができます。
管理するドメインクラスとドメインクラスのIDを指定することで実現できます。
今回はEntityとして作成したEmployeeとPKのLongの型でReactiveCrudRepositoryを継承したインタフェースを作成します。

EmployeeRepository.java
interface EmployeeRepository extends ReactiveCrudRepository<Employee, Long> {
}

ハンドラーの実装

最後にルーティングで渡したHandlerで各APIの処理を実装します。
基本的には先程実装したrepositoryインタフェースで用意されているメソッドを利用してDBへアクセスし、結果をServerResponseのbody部分に詰めて返却しています。

EmployeeHandler.java
@AllArgsConstructor
@Component
public class EmployeeHandler {

    EmployeeRepository employeeRepository;

    Mono<ServerResponse> get(ServerRequest serverRequest) {
        var id = Long.parseLong(serverRequest.pathVariable("id"));
        return employeeRepository.findById(id)
                .flatMap(employee -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(employee))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    Mono<ServerResponse> create(ServerRequest serverRequest) {
        return handleBody(serverRequest, CreateEmployeeRequest.class, request -> {
            var employee = Employee.builder()
                    .name(request.name)
                    .build();
            return employeeRepository.save(employee);
        });
    }

    Mono<ServerResponse> list(ServerRequest serverRequest) {
        return employeeRepository.findAll().collectList()
                .flatMap(response -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(response))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    Mono<ServerResponse> update(ServerRequest serverRequest) {
        return handleBody(serverRequest, UpdateEmployeeRequest.class, request -> {
            var employee = Employee.builder()
                    .id(request.id)
                    .name(request.name)
                    .build();
            return employeeRepository.save(employee);
        }).flatMap(response -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(response));
    }

    Mono<ServerResponse> delete(ServerRequest serverRequest) {
        var id = Long.parseLong(serverRequest.pathVariable("id"));
        return employeeRepository.deleteById(id)
                .flatMap(response -> ServerResponse.ok().build())
                .doOnError(Mono::error);
    }

    public static <T, R extends Mono<?>> Mono<ServerResponse> handleBody(ServerRequest serverRequest, Class<T> clazz, Function<T, R> f) {
        return serverRequest.bodyToMono(clazz)
                .flatMap(f)
                .flatMap(response -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(response))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @Getter
    static class CreateEmployeeRequest {
        private String name;
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @Getter
    static class UpdateEmployeeRequest {
        private Long id;
        private String name;
    }
}

おわりに

Spring WebFluxとSpring Data R2DBCを利用したリアクティブプログラミングについて簡単なサンプルを用いて解説しました。
Spring WebFluxは情報が少なく、取っつきづらいかと思います。 しかしながら、スケーラブルなアプリケーションの構築やよりパフォーマンスが求められるシステムにおいて重要な技術だと考えています。
普段JavaやSpring Frameworkを利用している開発者の方で興味がある方はreactorのドキュメントを一読してみると良いかも知れません。

Discussion