正在加载,请稍候…

Java 21 虚拟线程:Project Loom 实现高并发 API

利用 Java 21 虚拟线程处理数百万并发连接——线程每请求模型、结构化并发、作用域值以及从响应式代码迁移。

Java 21 虚拟线程:Project Loom 实现高并发 API

Project Loom 改变一切

Java 21 的虚拟线程让你编写像异步代码一样可扩展的阻塞代码。无需响应式编程。

Java 21 虚拟线程:Project Loom 实现高并发 API 插图

虚拟线程 vs 平台线程

// 平台线程(操作系统线程)——昂贵
Thread platformThread = new Thread(() -> {
    // 每个线程使用约 1MB 栈
    doWork();
});

// 虚拟线程——廉价(几百字节)
Thread virtualThread = Thread.ofVirtual().start(() -> {
    // 仅在运行时挂载到平台线程
    doWork();
});

// 创建数百万个虚拟线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    IntStream.range(0, 1_000_000)
        .forEach(i -> executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1)); // 睡眠时卸载!
            return i;
        }));
}

Spring Boot 3.2+ 使用虚拟线程

# application.yml
spring:
  threads:
    virtual:
      enabled: true  # 就这样!
// 验证虚拟线程是否激活
@RestController
public class ThreadController {
    
    @GetMapping("/thread-info")
    public Map<String, Object> threadInfo() {
        Thread current = Thread.currentThread();
        return Map.of(
            "name", current.getName(),
            "isVirtual", current.isVirtual(),
            "threadId", current.threadId()
        );
    }
}

Java 21 虚拟线程:Project Loom 实现高并发 API 插图

结构化并发(预览)

import java.util.concurrent.StructuredTaskScope;

public record UserProfile(User user, List<Order> orders, List<Review> reviews) {}

public UserProfile fetchUserProfile(long userId) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        StructuredTaskScope.Subtask<User> userTask =
            scope.fork(() -> userRepository.findById(userId));
        StructuredTaskScope.Subtask<List<Order>> ordersTask =
            scope.fork(() -> orderRepository.findByUserId(userId));
        StructuredTaskScope.Subtask<List<Review>> reviewsTask =
            scope.fork(() -> reviewRepository.findByUserId(userId));
        
        scope.join().throwIfFailed(); // 等待所有,快速失败
        
        return new UserProfile(
            userTask.get(),
            ordersTask.get(),
            reviewsTask.get()
        );
    }
}

作用域值(替代 ThreadLocal)

import java.lang.ScopedValue;

// 定义作用域值
private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
private static final ScopedValue<User> CURRENT_USER = ScopedValue.newInstance();

// 在过滤器/中间件中设置
public void handleRequest(HttpRequest request) {
    String requestId = request.getHeader("X-Request-ID");
    User user = authenticate(request);
    
    ScopedValue.where(REQUEST_ID, requestId)
               .where(CURRENT_USER, user)
               .run(() -> processRequest(request));
}

// 在调用栈的任何位置访问
public void someDeepMethod() {
    String requestId = REQUEST_ID.get();
    User user = CURRENT_USER.get();
    // 无需将这些作为参数传递!
}

Java 21 虚拟线程:Project Loom 实现高并发 API 插图

虚拟线程与 JDBC

// 虚拟线程在处理阻塞 JDBC 时表现出色
// 连接池大小:核心数 × 10(而非核心数 × 2)
@Configuration
public class DataSourceConfig {
    
    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost/db");
        config.setMaximumPoolSize(
            Runtime.getRuntime().availableProcessors() * 10
        );
        config.setMinimumIdle(10);
        return new HikariDataSource(config);
    }
}

虚拟线程与 HTTP 客户端

// Java 11+ HttpClient 已经支持异步
// 使用虚拟线程,你可以大规模使用同步 API

HttpClient client = HttpClient.newBuilder()
    .version(HttpClient.Version.HTTP_2)
    .connectTimeout(Duration.ofSeconds(5))
    .build();

// 这会阻塞,但虚拟线程下可扩展
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    List<CompletableFuture<String>> futures = urls.stream()
        .map(url -> CompletableFuture.supplyAsync(() -> {
            try {
                HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(url))
                    .GET()
                    .build();
                HttpResponse<String> response = client.send(
                    request, 
                    HttpResponse.BodyHandlers.ofString()
                );
                return response.body();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, executor))
        .toList();
    
    List<String> results = futures.stream()
        .map(CompletableFuture::join)
        .toList();
}

钉住陷阱

虚拟线程在以下情况下会被“钉住”到平台线程:

  1. synchronized 块内
  2. 使用 native 方法
// 不好:synchronized 块钉住虚拟线程
public synchronized void doWork() {
    Thread.sleep(1000); // 睡眠时钉住!
}

// 好:改用 ReentrantLock
private final ReentrantLock lock = new ReentrantLock();

public void doWork() {
    lock.lock();
    try {
        Thread.sleep(1000); // 虚拟线程卸载!
    } finally {
        lock.unlock();
    }
}

基准测试数据

在现代服务器上使用虚拟线程:

  • 10,000 个并发请求 使用 Spring Boot
  • 响应时间:与响应式 WebFlux 相似
  • 代码复杂度:比响应式简单 90%

虚拟线程使 Java 与 Go 的 goroutine 和 Kotlin 的协程相竞争。