正在加载,请稍候…

Kotlin 协程与 Flow:Android 和后端的异步编程

掌握 Kotlin 协程,包括挂起函数、结构化并发、Flow 响应式流、StateFlow、SharedFlow 和通道,适用于 Android 和 Ktor

Kotlin 协程与 Flow:Android 和后端的异步编程

Kotlin 协程与 Flow

Kotlin 协程为 Android 和后端开发提供了结构化并发。结合 Flow 响应式流,它们提供了强大的异步编程模型。

挂起函数

// 挂起函数可以暂停而不阻塞线程
suspend fun fetchUser(id: String): User {
    return withContext(Dispatchers.IO) {
        api.getUser(id)  // 网络调用
    }
}

// 从协程中调用
viewModelScope.launch {
    val user = fetchUser("123") // 非阻塞挂起
    updateUI(user)
}

Kotlin 协程与 Flow:Android 和后端的异步编程 插图

协程作用域与构建器

// launch:即发即忘
val job = scope.launch {
    delay(1000)
    println("1 秒后完成")
}

// async:返回 Deferred(类似 Future/Promise)
val deferredUser = scope.async {
    fetchUser("123")
}
val user = deferredUser.await()

// 使用 async 并发执行
suspend fun loadDashboard(): Dashboard {
    coroutineScope {
        val user = async { fetchUser(userId) }
        val orders = async { fetchOrders(userId) }
        val recommendations = async { fetchRecommendations(userId) }
        
        // 并发等待所有结果
        Dashboard(
            user = user.await(),
            orders = orders.await(),
            recommendations = recommendations.await()
        )
    }
}

调度器

// IO 调度器用于网络/磁盘操作
withContext(Dispatchers.IO) {
    database.query("SELECT * FROM users")
}

// Default 调度器用于 CPU 密集型工作
withContext(Dispatchers.Default) {
    processLargeDataset(data)
}

// Main 调度器用于 UI 操作(Android)
withContext(Dispatchers.Main) {
    binding.textView.text = result
}

// 使用线程池的自定义调度器
val customDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()

结构化并发

// 父作用域取消所有子协程
class UserRepository(private val scope: CoroutineScope) {
    
    fun loadUserData(userId: String) = scope.launch {
        try {
            val user = fetchUser(userId)
            val orders = fetchOrders(userId)
            processUserData(user, orders)
        } catch (e: CancellationException) {
            // 清理资源
            throw e // 始终重新抛出 CancellationException
        } catch (e: Exception) {
            handleError(e)
        }
    }
}

// SupervisorScope:子协程失败不会取消兄弟协程
supervisorScope {
    val result1 = async { riskyOperation1() }
    val result2 = async { riskyOperation2() }
    
    // 即使 result1 失败,result2 仍然运行
    val r1 = try { result1.await() } catch (e: Exception) { null }
    val r2 = try { result2.await() } catch (e: Exception) { null }
}

Kotlin 协程与 Flow:Android 和后端的异步编程 插图

Flow 响应式流

// 冷流:在收集时执行
fun getNumbers(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}

// 收集流
viewModelScope.launch {
    getNumbers()
        .filter { it % 2 == 0 }
        .map { it * it }
        .collect { value ->
            println("收到: $value")
        }
}

// 来自数据库的 Flow(Room)
@Dao
interface UserDao {
    @Query("SELECT * FROM users WHERE id = :id")
    fun observeUser(id: String): Flow<User>  // 每次数据库变更时发射
}

// 在 ViewModel 中收集
viewModelScope.launch {
    userDao.observeUser(userId)
        .catch { e -> emit(User.empty()) } // 处理错误
        .collect { user ->
            _uiState.value = UiState.Success(user)
        }
}

StateFlow 与 SharedFlow

// StateFlow:持有当前状态,向新收集者重放
class UserViewModel : ViewModel() {
    private val _uiState = MutableStateFlow<UiState>(UiState.Loading)
    val uiState: StateFlow<UiState> = _uiState.asStateFlow()
    
    init {
        viewModelScope.launch {
            try {
                val user = fetchUser(userId)
                _uiState.value = UiState.Success(user)
            } catch (e: Exception) {
                _uiState.value = UiState.Error(e.message)
            }
        }
    }
}

// 在 Activity 中收集
lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.uiState.collect { state ->
            when (state) {
                is UiState.Loading -> showLoading()
                is UiState.Success -> showUser(state.user)
                is UiState.Error -> showError(state.message)
            }
        }
    }
}

// SharedFlow:用于事件(默认不重放)
class EventBus {
    private val _events = MutableSharedFlow<AppEvent>()
    val events = _events.asSharedFlow()
    
    suspend fun publish(event: AppEvent) {
        _events.emit(event)
    }
}

通道

// Channel:用于协程间通信
val channel = Channel<Int>(capacity = Channel.BUFFERED)

// 生产者
launch {
    for (i in 1..5) {
        channel.send(i)
    }
    channel.close()
}

// 消费者
launch {
    for (value in channel) {
        println("收到: $value")
    }
}

// 生产者协程构建器
fun produceNumbers() = produce<Int> {
    for (i in 1..5) {
        send(i)
    }
}

Kotlin 协程与 Flow:Android 和后端的异步编程 插图

错误处理

// CoroutineExceptionHandler 用于未捕获的异常
val handler = CoroutineExceptionHandler { _, exception ->
    println("捕获异常: ${exception.message}")
    // 记录到崩溃报告
}

val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main + handler)

scope.launch {
    throw RuntimeException("出错了")
    // 由 handler 捕获,作用域继续
}

// Flow 错误处理
userFlow
    .retry(3) { cause -> cause is NetworkException }
    .catch { e -> emit(defaultValue) }
    .onEach { value -> processValue(value) }
    .launchIn(viewModelScope)

Ktor:后端的协程

// 使用协程的 Ktor 服务器
fun Application.configureRouting() {
    routing {
        get("/users/{id}") {
            val id = call.parameters["id"]!!
            
            // 路由处理程序中的挂起函数
            val user = userRepository.findById(id)
                ?: return@get call.respond(HttpStatusCode.NotFound)
            
            call.respond(user)
        }
        
        // 使用 Flow 的 WebSocket
        webSocket("/stream") {
            userRepository.observeAll()
                .collect { user ->
                    send(Json.encodeToString(user))
                }
        }
    }
}

// 并行数据库查询
suspend fun getUserDashboard(userId: String): Dashboard = coroutineScope {
    val user = async { userRepository.findById(userId) }
    val orders = async { orderRepository.findByUserId(userId) }
    val stats = async { analyticsRepository.getUserStats(userId) }
    
    Dashboard(user.await()!!, orders.await(), stats.await())
}

测试协程

@Test
fun testFetchUser() = runTest {
    // runTest 替代测试中的 runBlocking
    val mockApi = mockk<UserApi>()
    coEvery { mockApi.getUser("123") } returns User(id = "123", name = "Alice")
    
    val repository = UserRepository(mockApi)
    val user = repository.getUser("123")
    
    assertEquals("Alice", user.name)
}

@Test
fun testFlow() = runTest {
    val flow = flowOf(1, 2, 3)
    val results = flow.toList()
    assertEquals(listOf(1, 2, 3), results)
}

总结

Kotlin 协程与 Flow 提供了:

  • 挂起函数:非阻塞异步代码,读起来像同步代码
  • 结构化并发:父协程取消子协程,防止泄漏
  • 调度器:将工作路由到适当的线程池
  • Flow:具有丰富操作符的响应式流
  • StateFlow/SharedFlow:用于 UI 状态和事件的热流
  • 通道:协程间的安全通信

这种组合使得异步 Android 和后端代码可维护且可测试。