跳到主要内容

Kotlin 协程

协程是 Kotlin 强大的异步编程特性,让异步代码像同步代码一样简洁。理解协程的工作原理对于编写高效的 Kotlin 异步代码至关重要。

协程基础概念

什么是协程?

协程是一种轻量级的线程,它可以在不阻塞线程的情况下挂起和恢复执行。与操作系统管理的线程不同,协程由 Kotlin 运行时管理,创建和切换的开销极小。

协程的核心特点

  • 轻量级:可以轻松创建数十万个协程,而不会耗尽系统资源
  • 挂起而非阻塞:协程挂起时释放线程,让其他协程可以使用
  • 结构化并发:协程形成父子层级关系,生命周期自动管理

协程 vs 线程

理解协程和线程的区别是掌握协程的关键:

特性协程线程
创建成本极低(几 KB)高(几 MB 栈空间)
数量限制可达百万级受限于系统资源
阻塞行为挂起时释放线程阻塞整个线程
上下文切换用户态,成本低内核态,成本高
管理方式协程运行时操作系统

实际对比示例

// 协程版本:5 万个协程,每个等待 5 秒
fun main() = runBlocking {
repeat(50_000) {
launch {
delay(5000)
print(".")
}
}
}
// 内存占用:约 500 MB

// 线程版本:5 万个线程
fun main() {
repeat(50_000) {
Thread {
Thread.sleep(5000)
print(".")
}.start()
}
}
// 内存占用:可能超过 100 GB,甚至会 OOM

挂起函数

suspend 关键字

挂起函数是协程的基本构建块,使用 suspend 关键字标记:

// 声明挂起函数
suspend fun fetchData(): String {
// 模拟网络请求
delay(1000) // 挂起 1 秒,不阻塞线程
return "Data"
}

// 只能从协程或其他挂起函数调用
suspend fun main() {
val data = fetchData() // 挂起点
println(data)
}

挂起函数的本质

挂起函数在编译时会被转换为状态机,每个挂起点对应一个状态。当协程挂起时,它会保存当前状态,释放线程;恢复时,从保存的状态继续执行。

挂起点与续体

suspend fun example() {
println("开始")
val a = computeA() // 挂起点 1
println("A: $a")
val b = computeB() // 挂起点 2
println("B: $b")
val result = a + b // 挂起点 3
println("结果: $result")
}

编译器将上述代码转换为类似状态机的结构:

状态 0: 初始状态 → 调用 computeA() → 挂起
状态 1: computeA 返回 → 调用 computeB() → 挂起
状态 2: computeB 返回 → 计算结果 → 完成

协程构建器

launch:启动协程

launch 用于启动一个不返回结果的协程:

import kotlinx.coroutines.*

fun main() = runBlocking {
// 启动协程,返回 Job
val job = launch {
repeat(5) { i ->
println("协程执行: $i")
delay(100)
}
}

println("协程已启动")
job.join() // 等待协程完成
println("协程完成")
}

// 输出:
// 协程已启动
// 协程执行: 0
// 协程执行: 1
// 协程执行: 2
// 协程执行: 3
// 协程执行: 4
// 协程完成

async:返回结果

async 启动协程并返回 Deferred,可以获取结果:

import kotlinx.coroutines.*

fun main() = runBlocking {
// 并发执行两个任务
val deferred1 = async { fetchUser(1) }
val deferred2 = async { fetchUser(2) }

// 等待并获取结果
val user1 = deferred1.await()
val user2 = deferred2.await()

println("User1: $user1, User2: $user2")
}

suspend fun fetchUser(id: Int): String {
delay(500) // 模拟网络请求
return "User$id"
}

并发执行多个任务

import kotlinx.coroutines.*

fun main() = runBlocking {
// 方式一:分别 await
val deferreds = List(10) { i ->
async { fetchData(i) }
}
deferreds.forEach { it.await() }

// 方式二:使用 awaitAll
val results = awaitAll(
async { fetchData(1) },
async { fetchData(2) },
async { fetchData(3) }
)
println(results) // [Data1, Data2, Data3]
}

suspend fun fetchData(id: Int): String {
delay(100)
return "Data$id"
}

runBlocking:阻塞当前线程

runBlocking 会阻塞当前线程直到协程完成:

import kotlinx.coroutines.*

fun main() {
println("开始")
runBlocking {
delay(1000)
println("协程内部")
}
println("结束")
}

// 输出:
// 开始
// (1秒后)
// 协程内部
// 结束
注意

runBlocking 主要用于测试和桥接阻塞与非阻塞代码,在生产代码中应避免使用。

调度器

调度器决定协程在哪个线程执行。

四种主要调度器

调度器用途特点
Dispatchers.DefaultCPU 密集型任务共享线程池,线程数等于 CPU 核心数
Dispatchers.IOIO 操作按需扩展的线程池,适合网络/文件操作
Dispatchers.MainUI 线程Android/JavaFX/Swing 主线程
Dispatchers.Unconfined不限制在调用者线程开始,挂起后在恢复线程继续

使用调度器

import kotlinx.coroutines.*

fun main() = runBlocking {
// 指定调度器
launch(Dispatchers.Default) {
println("Default: ${Thread.currentThread().name}")
}

launch(Dispatchers.IO) {
println("IO: ${Thread.currentThread().name}")
}

// 在 Android 中
// launch(Dispatchers.Main) { ... }
}

withContext 切换上下文

withContext 用于在协程中切换执行上下文:

suspend fun loadData(): User {
// 在 IO 调度器执行网络请求
return withContext(Dispatchers.IO) {
api.fetchUser() // 网络请求
}
}

suspend fun processData(data: Data): Result {
// 在 Default 调度器执行 CPU 密集计算
return withContext(Dispatchers.Default) {
heavyComputation(data)
}
}

// Android 中更新 UI
suspend fun loadAndShow() {
val data = withContext(Dispatchers.IO) { loadData() }
// 自动切回 Main 调度器
textView.text = data.name
}

协程作用域

结构化并发

Kotlin 协程遵循结构化并发原则:协程形成父子层级关系,父协程等待所有子协程完成。

CoroutineScope

import kotlinx.coroutines.*

class MyRepository {
// 定义作用域
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

fun fetchData() {
scope.launch {
// 协程在这个作用域中运行
val data = api.fetch()
processData(data)
}
}

// 清理资源
fun onDestroy() {
scope.cancel() // 取消作用域内所有协程
}
}

常用作用域

// 1. coroutineScope:等待所有子协程完成
suspend fun loadData() = coroutineScope {
val user = async { fetchUser() }
val posts = async { fetchPosts() }
Result(user.await(), posts.await())
}

// 2. supervisorScope:子协程失败不影响其他子协程
suspend fun loadAll() = supervisorScope {
launch {
// 如果这里失败,不会影响其他协程
loadUser()
}
launch { loadPosts() }
}

// 3. Android viewModelScope
class MyViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch {
// 自动跟随 ViewModel 生命周期取消
val data = repository.fetch()
_state.value = data
}
}
}

// 4. Android lifecycleScope
class MyFragment : Fragment() {
fun loadData() {
viewLifecycleOwner.lifecycleScope.launch {
// 自动跟随 Fragment 生命周期取消
val data = repository.fetch()
updateUI(data)
}
}
}

协程取消

基本取消

import kotlinx.coroutines.*

fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
}

delay(500)
job.cancel() // 取消协程
job.join() // 等待取消完成
// 或使用: job.cancelAndJoin()

println("协程已取消")
}

取消是协作式的

协程取消需要协程代码配合。挂起函数会自动检查取消状态:

// ❌ 无法取消:计算代码不检查取消状态
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
var nextPrintTime = System.currentTimeMillis()
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("执行: ${i++}")
nextPrintTime += 500
}
}
}
delay(1000)
job.cancelAndJoin()
println("完成") // 协程继续运行直到完成
}

使计算代码可取消

import kotlinx.coroutines.*

fun main() = runBlocking {
// 方式一:使用 isActive 检查
val job1 = launch(Dispatchers.Default) {
var i = 0
while (isActive) { // 检查协程是否活跃
println("执行: ${i++}")
}
}

// 方式二:使用 ensureActive()
val job2 = launch(Dispatchers.Default) {
var i = 0
while (i < 1000000) {
ensureActive() // 检查并抛出 CancellationException
i++
}
}

// 方式三:使用 yield()
val job3 = launch(Dispatchers.Default) {
var i = 0
while (i < 1000000) {
yield() // 挂起并检查取消,让出执行权
i++
}
}

delay(100)
job1.cancelAndJoin()
}

释放资源

使用 try-finally 确保资源释放:

import kotlinx.coroutines.*

fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
} finally {
// 取消时仍会执行
println("清理资源")
closeConnection()
}
}

delay(500)
job.cancelAndJoin()
println("完成")
}

在 finally 中使用挂起函数

import kotlinx.coroutines.*

fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
} finally {
// 使用 withContext(NonCancellable) 在已取消的协程中挂起
withContext(NonCancellable) {
delay(1000)
println("延迟清理完成")
}
}
}

delay(500)
job.cancelAndJoin()
}

超时处理

withTimeout

import kotlinx.coroutines.*

fun main() = runBlocking {
try {
withTimeout(1000) {
repeat(10) { i ->
println("执行: $i")
delay(300)
}
}
} catch (e: TimeoutCancellationException) {
println("超时: ${e.message}")
}
}

withTimeoutOrNull

import kotlinx.coroutines.*

fun main() = runBlocking {
val result = withTimeoutOrNull(1000) {
repeat(10) { i ->
println("执行: $i")
delay(300)
}
"完成"
}

println("结果: $result") // null(超时)
}

异常处理

try-catch

import kotlinx.coroutines.*

fun main() = runBlocking {
try {
launch {
throw RuntimeException("错误")
}.join()
} catch (e: Exception) {
println("捕获异常: ${e.message}")
}
}

CoroutineExceptionHandler

import kotlinx.coroutines.*

fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("捕获异常: $exception")
}

launch(handler) {
throw RuntimeException("错误")
}

delay(100)
}

SupervisorJob

SupervisorJob 让子协程的失败不影响其他子协程:

import kotlinx.coroutines.*

fun main() = runBlocking {
val supervisor = SupervisorJob()

with(CoroutineScope(coroutineContext + supervisor)) {
launch {
delay(100)
throw RuntimeException("协程 1 失败")
}

launch {
delay(200)
println("协程 2 完成") // 仍会执行
}
}
}

async 的异常处理

import kotlinx.coroutines.*

fun main() = runBlocking {
val deferred = async {
throw RuntimeException("错误")
}

// async 的异常在 await 时抛出
try {
deferred.await()
} catch (e: Exception) {
println("捕获: ${e.message}")
}
}

Channel(通道)

Channel 用于协程间通信,类似于 BlockingQueue。

基本使用

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
val channel = Channel<Int>()

// 发送端
launch {
for (i in 1..5) {
channel.send(i)
println("发送: $i")
delay(100)
}
channel.close()
}

// 接收端
launch {
for (i in channel) {
println("接收: $i")
}
}.join()
}

Channel 类型

类型容量行为
Rendezvous0发送和接收必须同时进行
BufferedN满时发送挂起
Unlimited无限发送永不挂起
Conflated1只保留最新值
// 创建不同类型的 Channel
val rendezvous = Channel<Int>() // 默认
val buffered = Channel<Int>(10) // 容量 10
val unlimited = Channel<Int>(Channel.UNLIMITED)
val conflated = Channel<Int>(Channel.CONFLATED)

Produce 和 Consume

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// 生产者模式
fun CoroutineScope.produceNumbers() = produce {
for (i in 1..10) {
send(i)
}
}

// 消费者模式
fun CoroutineScope.consumeNumbers() = consumeEach {
println("消费: $it")
}

fun main() = runBlocking {
val channel = produceNumbers()
channel.consumeNumbers()
}

Flow(流)

Flow 是 Kotlin 的响应式流,用于处理异步数据序列。

创建 Flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// 方式一:flow 构建器
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(100)
emit(i) // 发射值
}
}

// 方式二:flowOf
fun flowOfExample() = flowOf(1, 2, 3)

// 方式三:asFlow
fun asFlowExample() = (1..5).asFlow()

收集 Flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
simpleFlow()
.collect { value ->
println("收到: $value")
}
}

Flow 是冷的

Flow 只有在被收集时才会执行:

fun simpleFlow(): Flow<Int> = flow {
println("Flow 开始")
emit(1)
emit(2)
}

fun main() = runBlocking {
println("调用 simpleFlow")
val flow = simpleFlow() // 此时不执行

println("开始收集")
flow.collect { println(it) } // 此时执行

println("再次收集")
flow.collect { println(it) } // 再次执行
}

中间操作符

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// map:转换
flowOf(1, 2, 3)
.map { it * 2 }
.collect { println(it) } // 2, 4, 6

// filter:过滤
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.collect { println(it) } // 2, 4

// take:取前 N 个
flowOf(1, 2, 3, 4, 5)
.take(3)
.collect { println(it) } // 1, 2, 3

// transform:自定义转换
flowOf(1, 2, 3)
.transform {
emit("处理前: $it")
emit("处理后: ${it * 2}")
}
.collect { println(it) }
}

终端操作符

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flow = flowOf(1, 2, 3, 4, 5)

// collect:收集每个值
flow.collect { println(it) }

// toList:转换为列表
val list = flow.toList()
println(list) // [1, 2, 3, 4, 5]

// first:取第一个值
val first = flow.first()
println(first) // 1

// single:确保只有一个值
val single = flowOf(1).single()
println(single) // 1

// reduce:归约
val sum = flow.reduce { acc, value -> acc + value }
println(sum) // 15

// fold:带初始值的归约
val product = flow.fold(1) { acc, value -> acc * value }
println(product) // 120
}

上下文切换

使用 flowOn 切换 Flow 的执行上下文:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
flow {
for (i in 1..5) {
println("发射: ${Thread.currentThread().name}")
emit(i)
}
}
.flowOn(Dispatchers.Default) // 在 Default 调度器执行
.collect {
println("收集: ${Thread.currentThread().name}, 值: $it")
}
}

缓冲与合并

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// buffer:并发发射和收集
flow {
for (i in 1..5) {
delay(100) // 发射慢
emit(i)
}
}
.buffer() // 缓冲区
.collect {
delay(300) // 收集慢
println(it)
}

// conflate:只处理最新值
flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}
.conflate()
.collect {
delay(300)
println(it) // 可能跳过中间值
}

// collectLatest:取消旧值处理
flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}
.collectLatest {
delay(300)
println(it) // 只处理最新值
}
}

组合多个 Flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val nums = (1..3).asFlow()
val strs = flowOf("A", "B", "C")

// zip:配对组合
nums.zip(strs) { a, b -> "$a$b" }
.collect { println(it) } // 1A, 2B, 3C

// combine:任一变化时重新计算
val flow1 = (1..3).asFlow().onEach { delay(300) }
val flow2 = flowOf("A", "B", "C").onEach { delay(400) }

flow1.combine(flow2) { a, b -> "$a$b" }
.collect { println(it) }
// 1A, 2A, 2B, 3B, 3C
}

异常处理

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 方式一:try-catch
try {
flow {
emit(1)
throw RuntimeException("错误")
}.collect { println(it) }
} catch (e: Exception) {
println("捕获: ${e.message}")
}

// 方式二:catch 操作符
flow {
emit(1)
throw RuntimeException("错误")
}
.catch { e ->
println("捕获: ${e.message}")
emit(-1) // 可以发射备用值
}
.collect { println(it) }
}

完成回调

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 方式一:try-finally
try {
flowOf(1, 2, 3).collect { println(it) }
} finally {
println("完成")
}

// 方式二:onCompletion
flowOf(1, 2, 3)
.onCompletion { cause ->
if (cause == null) {
println("正常完成")
} else {
println("异常完成: ${cause.message}")
}
}
.collect { println(it) }
}

并发安全

Mutex

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

fun main() = runBlocking {
val mutex = Mutex()
var count = 0

repeat(1000) {
launch {
mutex.withLock {
count++
}
}
}

delay(1000)
println("Count: $count") // 1000
}

原子操作

import kotlinx.coroutines.*
import java.util.concurrent.atomic.*

fun main() = runBlocking {
val counter = AtomicInteger(0)

repeat(1000) {
launch {
counter.incrementAndGet()
}
}

delay(1000)
println("Counter: ${counter.get()}") // 1000
}

最佳实践

1. 使用结构化并发

// ✅ 好:使用结构化并发
suspend fun loadData() = coroutineScope {
val user = async { fetchUser() }
val posts = async { fetchPosts() }
Result(user.await(), posts.await())
}

// ❌ 差:使用 GlobalScope
fun loadData() {
GlobalScope.launch {
// 不会被父协程取消
}
}

2. 正确处理生命周期

// Android 示例
class MyViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch {
// 自动随 ViewModel 取消
}
}
}

3. 选择合适的调度器

// CPU 密集型
withContext(Dispatchers.Default) {
// 计算
}

// IO 操作
withContext(Dispatchers.IO) {
// 网络/文件操作
}

// UI 更新(Android)
withContext(Dispatchers.Main) {
// 更新 UI
}

4. 避免阻塞协程

// ❌ 差:阻塞协程
launch {
Thread.sleep(1000) // 阻塞线程
}

// ✅ 好:使用挂起函数
launch {
delay(1000) // 挂起,不阻塞
}

小结

本章我们学习了:

  1. 协程基础:概念、优势、与线程的对比
  2. 挂起函数:suspend 关键字、挂起点、状态机
  3. 协程构建器:launch、async、runBlocking
  4. 调度器:Default、IO、Main、Unconfined
  5. 结构化并发:作用域、父子关系、生命周期管理
  6. 协程取消:协作式取消、资源释放、超时处理
  7. 异常处理:try-catch、CoroutineExceptionHandler、SupervisorJob
  8. Channel:协程间通信
  9. Flow:响应式流、操作符、异常处理
  10. 并发安全:Mutex、原子操作

练习

  1. 使用 launch 启动多个协程并发执行,统计总执行时间
  2. 使用 async/await 实现并行获取多个网络请求结果
  3. 实现可取消的协程,支持超时和手动取消
  4. 创建一个 Flow,实现数据转换、过滤和异常处理
  5. 使用 Mutex 实现一个并发安全的计数器

参考资料