
为什么选择响应式编程?
传统的 Spring MVC 每个请求会阻塞一个线程。而使用 WebFlux,一个小型线程池通过非阻塞 I/O 即可处理数千个并发连接。

项目设置
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
Mono 和 Flux 基础
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// Mono: 0 或 1 个元素
Mono<String> mono = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Error"));
// Flux: 0 到 N 个元素
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Flux<String> fromList = Flux.fromIterable(List.of("a", "b", "c"));
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// 操作符
flux
.filter(n -> n % 2 == 0)
.map(n -> n * 10)
.take(3)
.subscribe(System.out::println);
响应式控制器
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
@GetMapping
public Flux<UserDto> getAllUsers(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
return userService.findAll(page, size);
}
@GetMapping("/{id}")
public Mono<ResponseEntity<UserDto>> getUserById(@PathVariable String id) {
return userService.findById(id)
.map(user -> ResponseEntity.ok(user))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserDto> createUser(@RequestBody @Valid Mono<CreateUserRequest> request) {
return request.flatMap(userService::create);
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserDto> streamUsers() {
return userService.streamAll()
.delayElements(Duration.ofMillis(100));
}
}
R2DBC 响应式仓库
// 仓库接口
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByEmailContaining(String email);
@Query("SELECT * FROM users WHERE role = :role AND active = true")
Flux<User> findActiveByRole(String role);
Mono<Long> countByActive(boolean active);
}
// 实体
@Table("users")
public class User {
@Id
private Long id;
private String name;
private String email;
private String role;
private boolean active;
// getters/setters
}
服务层与错误处理
@Service
public class UserService {
private final UserRepository userRepository;
private final UserMapper mapper;
public Mono<UserDto> findById(String id) {
return userRepository.findById(Long.parseLong(id))
.map(mapper::toDto)
.switchIfEmpty(Mono.error(
new UserNotFoundException("User not found: " + id)
));
}
public Mono<UserDto> create(CreateUserRequest request) {
return userRepository.existsByEmail(request.email())
.flatMap(exists -> {
if (exists) {
return Mono.error(new DuplicateEmailException(request.email()));
}
User user = mapper.toEntity(request);
return userRepository.save(user);
})
.map(mapper::toDto);
}
public Flux<UserDto> findAll(int page, int size) {
return userRepository.findAll()
.skip((long) page * size)
.take(size)
.map(mapper::toDto);
}
}
全局错误处理器
@Component
public class GlobalErrorHandler extends DefaultErrorWebExceptionHandler {
@Override
protected Map<String, Object> getErrorAttributes(
ServerRequest request, ErrorAttributeOptions options) {
Map<String, Object> attrs = super.getErrorAttributes(request, options);
Throwable error = getError(request);
if (error instanceof UserNotFoundException) {
attrs.put("status", 404);
attrs.put("message", error.getMessage());
} else if (error instanceof DuplicateEmailException) {
attrs.put("status", 409);
attrs.put("message", error.getMessage());
}
return attrs;
}
}
使用 Limitrate 实现背压
Flux.range(1, 1000)
.onBackpressureBuffer(100) // 缓冲最多 100 个元素
.limitRate(10) // 每次请求 10 个
.delayElements(Duration.ofMillis(50))
.subscribe(
item -> processItem(item),
err -> log.error("Error", err),
() -> log.info("Complete")
);
WebClient(响应式 HTTP 客户端)
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.filter(ExchangeFilterFunctions.basicAuthentication("user", "pass"))
.build();
Mono<User> user = client.get()
.uri("/users/{id}", 42)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response ->
Mono.error(new ClientException(response.statusCode())))
.bodyToMono(User.class)
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)))
.timeout(Duration.ofSeconds(5));
测试响应式代码
@WebFluxTest(UserController.class)
class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserService userService;
@Test
void shouldGetUserById() {
UserDto mockUser = new UserDto(1L, "John", "john@example.com");
when(userService.findById("1")).thenReturn(Mono.just(mockUser));
webTestClient.get()
.uri("/api/users/1")
.exchange()
.expectStatus().isOk()
.expectBody(UserDto.class)
.isEqualTo(mockUser);
}
@Test
void shouldReturn404WhenNotFound() {
when(userService.findById("999"))
.thenReturn(Mono.error(new UserNotFoundException("999")));
webTestClient.get()
.uri("/api/users/999")
.exchange()
.expectStatus().isNotFound();
}
}
何时使用 WebFlux 与 MVC
| 场景 | 选择 |
|---|---|
| 高并发、I/O 密集型 | WebFlux |
| 使用阻塞数据库驱动的 CRUD | MVC |
| 流式数据 | WebFlux |
| 团队熟悉响应式编程 | WebFlux |
| 简单 REST、快速开发 | MVC |