
Kotlin 协程与 Flow
Kotlin 协程为 Android 和后端开发提供了结构化并发。结合 Flow 响应式流,它们提供了强大的异步编程模型。
挂起函数
// 挂起函数可以暂停而不阻塞线程
suspend fun fetchUser(id: String): User {
return withContext(Dispatchers.IO) {
api.getUser(id) // 网络调用
}
}
// 从协程中调用
viewModelScope.launch {
val user = fetchUser("123") // 非阻塞挂起
updateUI(user)
}

协程作用域与构建器
// launch:即发即忘
val job = scope.launch {
delay(1000)
println("1 秒后完成")
}
// async:返回 Deferred(类似 Future/Promise)
val deferredUser = scope.async {
fetchUser("123")
}
val user = deferredUser.await()
// 使用 async 并发执行
suspend fun loadDashboard(): Dashboard {
coroutineScope {
val user = async { fetchUser(userId) }
val orders = async { fetchOrders(userId) }
val recommendations = async { fetchRecommendations(userId) }
// 并发等待所有结果
Dashboard(
user = user.await(),
orders = orders.await(),
recommendations = recommendations.await()
)
}
}
调度器
// IO 调度器用于网络/磁盘操作
withContext(Dispatchers.IO) {
database.query("SELECT * FROM users")
}
// Default 调度器用于 CPU 密集型工作
withContext(Dispatchers.Default) {
processLargeDataset(data)
}
// Main 调度器用于 UI 操作(Android)
withContext(Dispatchers.Main) {
binding.textView.text = result
}
// 使用线程池的自定义调度器
val customDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
结构化并发
// 父作用域取消所有子协程
class UserRepository(private val scope: CoroutineScope) {
fun loadUserData(userId: String) = scope.launch {
try {
val user = fetchUser(userId)
val orders = fetchOrders(userId)
processUserData(user, orders)
} catch (e: CancellationException) {
// 清理资源
throw e // 始终重新抛出 CancellationException
} catch (e: Exception) {
handleError(e)
}
}
}
// SupervisorScope:子协程失败不会取消兄弟协程
supervisorScope {
val result1 = async { riskyOperation1() }
val result2 = async { riskyOperation2() }
// 即使 result1 失败,result2 仍然运行
val r1 = try { result1.await() } catch (e: Exception) { null }
val r2 = try { result2.await() } catch (e: Exception) { null }
}

Flow 响应式流
// 冷流:在收集时执行
fun getNumbers(): Flow<Int> = flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}
// 收集流
viewModelScope.launch {
getNumbers()
.filter { it % 2 == 0 }
.map { it * it }
.collect { value ->
println("收到: $value")
}
}
// 来自数据库的 Flow(Room)
@Dao
interface UserDao {
@Query("SELECT * FROM users WHERE id = :id")
fun observeUser(id: String): Flow<User> // 每次数据库变更时发射
}
// 在 ViewModel 中收集
viewModelScope.launch {
userDao.observeUser(userId)
.catch { e -> emit(User.empty()) } // 处理错误
.collect { user ->
_uiState.value = UiState.Success(user)
}
}
StateFlow 与 SharedFlow
// StateFlow:持有当前状态,向新收集者重放
class UserViewModel : ViewModel() {
private val _uiState = MutableStateFlow<UiState>(UiState.Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
init {
viewModelScope.launch {
try {
val user = fetchUser(userId)
_uiState.value = UiState.Success(user)
} catch (e: Exception) {
_uiState.value = UiState.Error(e.message)
}
}
}
}
// 在 Activity 中收集
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
when (state) {
is UiState.Loading -> showLoading()
is UiState.Success -> showUser(state.user)
is UiState.Error -> showError(state.message)
}
}
}
}
// SharedFlow:用于事件(默认不重放)
class EventBus {
private val _events = MutableSharedFlow<AppEvent>()
val events = _events.asSharedFlow()
suspend fun publish(event: AppEvent) {
_events.emit(event)
}
}
通道
// Channel:用于协程间通信
val channel = Channel<Int>(capacity = Channel.BUFFERED)
// 生产者
launch {
for (i in 1..5) {
channel.send(i)
}
channel.close()
}
// 消费者
launch {
for (value in channel) {
println("收到: $value")
}
}
// 生产者协程构建器
fun produceNumbers() = produce<Int> {
for (i in 1..5) {
send(i)
}
}

错误处理
// CoroutineExceptionHandler 用于未捕获的异常
val handler = CoroutineExceptionHandler { _, exception ->
println("捕获异常: ${exception.message}")
// 记录到崩溃报告
}
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main + handler)
scope.launch {
throw RuntimeException("出错了")
// 由 handler 捕获,作用域继续
}
// Flow 错误处理
userFlow
.retry(3) { cause -> cause is NetworkException }
.catch { e -> emit(defaultValue) }
.onEach { value -> processValue(value) }
.launchIn(viewModelScope)
Ktor:后端的协程
// 使用协程的 Ktor 服务器
fun Application.configureRouting() {
routing {
get("/users/{id}") {
val id = call.parameters["id"]!!
// 路由处理程序中的挂起函数
val user = userRepository.findById(id)
?: return@get call.respond(HttpStatusCode.NotFound)
call.respond(user)
}
// 使用 Flow 的 WebSocket
webSocket("/stream") {
userRepository.observeAll()
.collect { user ->
send(Json.encodeToString(user))
}
}
}
}
// 并行数据库查询
suspend fun getUserDashboard(userId: String): Dashboard = coroutineScope {
val user = async { userRepository.findById(userId) }
val orders = async { orderRepository.findByUserId(userId) }
val stats = async { analyticsRepository.getUserStats(userId) }
Dashboard(user.await()!!, orders.await(), stats.await())
}
测试协程
@Test
fun testFetchUser() = runTest {
// runTest 替代测试中的 runBlocking
val mockApi = mockk<UserApi>()
coEvery { mockApi.getUser("123") } returns User(id = "123", name = "Alice")
val repository = UserRepository(mockApi)
val user = repository.getUser("123")
assertEquals("Alice", user.name)
}
@Test
fun testFlow() = runTest {
val flow = flowOf(1, 2, 3)
val results = flow.toList()
assertEquals(listOf(1, 2, 3), results)
}
总结
Kotlin 协程与 Flow 提供了:
- 挂起函数:非阻塞异步代码,读起来像同步代码
- 结构化并发:父协程取消子协程,防止泄漏
- 调度器:将工作路由到适当的线程池
- Flow:具有丰富操作符的响应式流
- StateFlow/SharedFlow:用于 UI 状态和事件的热流
- 通道:协程间的安全通信
这种组合使得异步 Android 和后端代码可维护且可测试。