正在加载,请稍候…

Spring Boot 3 + WebFlux:使用 Project Reactor 构建响应式 API

使用 Spring WebFlux 构建非阻塞响应式 REST API,涵盖 Mono/Flux、响应式仓库、背压、错误处理、测试和 R2DBC。

Spring Boot 3 + WebFlux:使用 Project Reactor 构建响应式 API

为什么选择响应式编程?

传统的 Spring MVC 每个请求会阻塞一个线程。而使用 WebFlux,一个小型线程池通过非阻塞 I/O 即可处理数千个并发连接。

Spring Boot 3 + WebFlux:使用 Project Reactor 构建响应式 API 示意图

项目设置

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
</dependency>

Mono 和 Flux 基础

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// Mono: 0 或 1 个元素
Mono<String> mono = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Error"));

// Flux: 0 到 N 个元素
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Flux<String> fromList = Flux.fromIterable(List.of("a", "b", "c"));
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));

// 操作符
flux
  .filter(n -> n % 2 == 0)
  .map(n -> n * 10)
  .take(3)
  .subscribe(System.out::println);

响应式控制器

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    @GetMapping
    public Flux<UserDto> getAllUsers(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        return userService.findAll(page, size);
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> getUserById(@PathVariable String id) {
        return userService.findById(id)
                .map(user -> ResponseEntity.ok(user))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> createUser(@RequestBody @Valid Mono<CreateUserRequest> request) {
        return request.flatMap(userService::create);
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<UserDto> streamUsers() {
        return userService.streamAll()
                .delayElements(Duration.ofMillis(100));
    }
}

Spring Boot 3 + WebFlux:使用 Project Reactor 构建响应式 API 示意图

R2DBC 响应式仓库

// 仓库接口
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
    
    Flux<User> findByEmailContaining(String email);
    
    @Query("SELECT * FROM users WHERE role = :role AND active = true")
    Flux<User> findActiveByRole(String role);
    
    Mono<Long> countByActive(boolean active);
}

// 实体
@Table("users")
public class User {
    @Id
    private Long id;
    private String name;
    private String email;
    private String role;
    private boolean active;
    // getters/setters
}

服务层与错误处理

@Service
public class UserService {

    private final UserRepository userRepository;
    private final UserMapper mapper;

    public Mono<UserDto> findById(String id) {
        return userRepository.findById(Long.parseLong(id))
                .map(mapper::toDto)
                .switchIfEmpty(Mono.error(
                    new UserNotFoundException("User not found: " + id)
                ));
    }

    public Mono<UserDto> create(CreateUserRequest request) {
        return userRepository.existsByEmail(request.email())
                .flatMap(exists -> {
                    if (exists) {
                        return Mono.error(new DuplicateEmailException(request.email()));
                    }
                    User user = mapper.toEntity(request);
                    return userRepository.save(user);
                })
                .map(mapper::toDto);
    }

    public Flux<UserDto> findAll(int page, int size) {
        return userRepository.findAll()
                .skip((long) page * size)
                .take(size)
                .map(mapper::toDto);
    }
}

全局错误处理器

@Component
public class GlobalErrorHandler extends DefaultErrorWebExceptionHandler {

    @Override
    protected Map<String, Object> getErrorAttributes(
            ServerRequest request, ErrorAttributeOptions options) {
        Map<String, Object> attrs = super.getErrorAttributes(request, options);
        Throwable error = getError(request);
        
        if (error instanceof UserNotFoundException) {
            attrs.put("status", 404);
            attrs.put("message", error.getMessage());
        } else if (error instanceof DuplicateEmailException) {
            attrs.put("status", 409);
            attrs.put("message", error.getMessage());
        }
        
        return attrs;
    }
}

Spring Boot 3 + WebFlux:使用 Project Reactor 构建响应式 API 示意图

使用 Limitrate 实现背压

Flux.range(1, 1000)
    .onBackpressureBuffer(100)     // 缓冲最多 100 个元素
    .limitRate(10)                  // 每次请求 10 个
    .delayElements(Duration.ofMillis(50))
    .subscribe(
        item -> processItem(item),
        err -> log.error("Error", err),
        () -> log.info("Complete")
    );

WebClient(响应式 HTTP 客户端)

WebClient client = WebClient.builder()
    .baseUrl("https://api.example.com")
    .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
    .filter(ExchangeFilterFunctions.basicAuthentication("user", "pass"))
    .build();

Mono<User> user = client.get()
    .uri("/users/{id}", 42)
    .retrieve()
    .onStatus(HttpStatus::is4xxClientError, response ->
        Mono.error(new ClientException(response.statusCode())))
    .bodyToMono(User.class)
    .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)))
    .timeout(Duration.ofSeconds(5));

测试响应式代码

@WebFluxTest(UserController.class)
class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private UserService userService;

    @Test
    void shouldGetUserById() {
        UserDto mockUser = new UserDto(1L, "John", "john@example.com");
        when(userService.findById("1")).thenReturn(Mono.just(mockUser));

        webTestClient.get()
            .uri("/api/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(UserDto.class)
            .isEqualTo(mockUser);
    }

    @Test
    void shouldReturn404WhenNotFound() {
        when(userService.findById("999"))
            .thenReturn(Mono.error(new UserNotFoundException("999")));

        webTestClient.get()
            .uri("/api/users/999")
            .exchange()
            .expectStatus().isNotFound();
    }
}

何时使用 WebFlux 与 MVC

场景 选择
高并发、I/O 密集型 WebFlux
使用阻塞数据库驱动的 CRUD MVC
流式数据 WebFlux
团队熟悉响应式编程 WebFlux
简单 REST、快速开发 MVC