Kotlin 协程
协程是 Kotlin 强大的异步编程特性,让异步代码像同步代码一样简洁。理解协程的工作原理对于编写高效的 Kotlin 异步代码至关重要。
协程基础概念
什么是协程?
协程是一种轻量级的线程,它可以在不阻塞线程的情况下挂起和恢复执行。与操作系统管理的线程不同,协程由 Kotlin 运行时管理,创建和切换的开销极小。
协程的核心特点:
- 轻量级:可以轻松创建数十万个协程,而不会耗尽系统资源
- 挂起而非阻塞:协程挂起时释放线程,让其他协程可以使用
- 结构化并发:协程形成父子层级关系,生命周期自动管理
协程 vs 线程
理解协程和线程的区别是掌握协程的关键:
| 特性 | 协程 | 线程 |
|---|---|---|
| 创建成本 | 极低(几 KB) | 高(几 MB 栈空间) |
| 数量限制 | 可达百万级 | 受限于系统资源 |
| 阻塞行为 | 挂起时释放线程 | 阻塞整个线程 |
| 上下文切换 | 用户态,成本低 | 内核态,成本高 |
| 管理方式 | 协程运行时 | 操作系统 |
实际对比示例:
// 协程版本:5 万个协程,每个等待 5 秒
fun main() = runBlocking {
repeat(50_000) {
launch {
delay(5000)
print(".")
}
}
}
// 内存占用:约 500 MB
// 线程版本:5 万个线程
fun main() {
repeat(50_000) {
Thread {
Thread.sleep(5000)
print(".")
}.start()
}
}
// 内存占用:可能超过 100 GB,甚至会 OOM
挂起函数
suspend 关键字
挂起函数是协程的基本构建块,使用 suspend 关键字标记:
// 声明挂起函数
suspend fun fetchData(): String {
// 模拟网络请求
delay(1000) // 挂起 1 秒,不阻塞线程
return "Data"
}
// 只能从协程或其他挂起函数调用
suspend fun main() {
val data = fetchData() // 挂起点
println(data)
}
挂起函数的本质:
挂起函数在编译时会被转换为状态机,每个挂起点对应一个状态。当协程挂起时,它会保存当前状态,释放线程;恢复时,从保存的状态继续执行。
挂起点与续体
suspend fun example() {
println("开始")
val a = computeA() // 挂起点 1
println("A: $a")
val b = computeB() // 挂起点 2
println("B: $b")
val result = a + b // 挂起点 3
println("结果: $result")
}
编译器将上述代码转换为类似状态机的结构:
状态 0: 初始状态 → 调用 computeA() → 挂起
状态 1: computeA 返回 → 调用 computeB() → 挂起
状态 2: computeB 返回 → 计算结果 → 完成
协程构建器
launch:启动协程
launch 用于启动一个不返回结果的协程:
import kotlinx.coroutines.*
fun main() = runBlocking {
// 启动协程,返回 Job
val job = launch {
repeat(5) { i ->
println("协程执行: $i")
delay(100)
}
}
println("协程已启动")
job.join() // 等待协程完成
println("协程完成")
}
// 输出:
// 协程已启动
// 协程执行: 0
// 协程执行: 1
// 协程执行: 2
// 协程执行: 3
// 协程执行: 4
// 协程完成
async:返回结果
async 启动协程并返回 Deferred,可以获取结果:
import kotlinx.coroutines.*
fun main() = runBlocking {
// 并发执行两个任务
val deferred1 = async { fetchUser(1) }
val deferred2 = async { fetchUser(2) }
// 等待并获取结果
val user1 = deferred1.await()
val user2 = deferred2.await()
println("User1: $user1, User2: $user2")
}
suspend fun fetchUser(id: Int): String {
delay(500) // 模拟网络请求
return "User$id"
}
并发执行多个任务
import kotlinx.coroutines.*
fun main() = runBlocking {
// 方式一:分别 await
val deferreds = List(10) { i ->
async { fetchData(i) }
}
deferreds.forEach { it.await() }
// 方式二:使用 awaitAll
val results = awaitAll(
async { fetchData(1) },
async { fetchData(2) },
async { fetchData(3) }
)
println(results) // [Data1, Data2, Data3]
}
suspend fun fetchData(id: Int): String {
delay(100)
return "Data$id"
}
runBlocking:阻塞当前线程
runBlocking 会阻塞当前线程直到协程完成:
import kotlinx.coroutines.*
fun main() {
println("开始")
runBlocking {
delay(1000)
println("协程内部")
}
println("结束")
}
// 输出:
// 开始
// (1秒后)
// 协程内部
// 结束
注意
runBlocking 主要用于测试和桥接阻塞与非阻塞代码,在生产代码中应避免使用。
调度器
调度器决定协程在哪个线程执行。
四种主要调度器
| 调度器 | 用途 | 特点 |
|---|---|---|
| Dispatchers.Default | CPU 密集型任务 | 共享线程池,线程数等于 CPU 核心数 |
| Dispatchers.IO | IO 操作 | 按需扩展的线程池,适合网络/文件操作 |
| Dispatchers.Main | UI 线程 | Android/JavaFX/Swing 主线程 |
| Dispatchers.Unconfined | 不限制 | 在调用者线程开始,挂起后在恢复线程继续 |
使用调度器
import kotlinx.coroutines.*
fun main() = runBlocking {
// 指定调度器
launch(Dispatchers.Default) {
println("Default: ${Thread.currentThread().name}")
}
launch(Dispatchers.IO) {
println("IO: ${Thread.currentThread().name}")
}
// 在 Android 中
// launch(Dispatchers.Main) { ... }
}
withContext 切换上下文
withContext 用于在协程中切换执行上下文:
suspend fun loadData(): User {
// 在 IO 调度器执行网络请求
return withContext(Dispatchers.IO) {
api.fetchUser() // 网络请求
}
}
suspend fun processData(data: Data): Result {
// 在 Default 调度器执行 CPU 密集计算
return withContext(Dispatchers.Default) {
heavyComputation(data)
}
}
// Android 中更新 UI
suspend fun loadAndShow() {
val data = withContext(Dispatchers.IO) { loadData() }
// 自动切回 Main 调度器
textView.text = data.name
}
协程作用域
结构化并发
Kotlin 协程遵循结构化并发原则:协程形成父子层级关系,父协程等待所有子协程完成。
CoroutineScope
import kotlinx.coroutines.*
class MyRepository {
// 定义作用域
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
fun fetchData() {
scope.launch {
// 协程在这个作用域中运行
val data = api.fetch()
processData(data)
}
}
// 清理资源
fun onDestroy() {
scope.cancel() // 取消作用域内所有协程
}
}
常用作用域
// 1. coroutineScope:等待所有子协程完成
suspend fun loadData() = coroutineScope {
val user = async { fetchUser() }
val posts = async { fetchPosts() }
Result(user.await(), posts.await())
}
// 2. supervisorScope:子协程失败不影响其他子协程
suspend fun loadAll() = supervisorScope {
launch {
// 如果这里失败,不会影响其他协程
loadUser()
}
launch { loadPosts() }
}
// 3. Android viewModelScope
class MyViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch {
// 自动跟随 ViewModel 生命周期取消
val data = repository.fetch()
_state.value = data
}
}
}
// 4. Android lifecycleScope
class MyFragment : Fragment() {
fun loadData() {
viewLifecycleOwner.lifecycleScope.launch {
// 自动跟随 Fragment 生命周期取消
val data = repository.fetch()
updateUI(data)
}
}
}
协程取消
基本取消
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
}
delay(500)
job.cancel() // 取消协程
job.join() // 等待取消完成
// 或使用: job.cancelAndJoin()
println("协程已取消")
}
取消是协作式的
协程取消需要协程代码配合。挂起函数会自动检查取消状态:
// ❌ 无法取消:计算代码不检查取消状态
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
var nextPrintTime = System.currentTimeMillis()
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("执行: ${i++}")
nextPrintTime += 500
}
}
}
delay(1000)
job.cancelAndJoin()
println("完成") // 协程继续运行直到完成
}
使计算代码可取消
import kotlinx.coroutines.*
fun main() = runBlocking {
// 方式一:使用 isActive 检查
val job1 = launch(Dispatchers.Default) {
var i = 0
while (isActive) { // 检查协程是否活跃
println("执行: ${i++}")
}
}
// 方式二:使用 ensureActive()
val job2 = launch(Dispatchers.Default) {
var i = 0
while (i < 1000000) {
ensureActive() // 检查并抛出 CancellationException
i++
}
}
// 方式三:使用 yield()
val job3 = launch(Dispatchers.Default) {
var i = 0
while (i < 1000000) {
yield() // 挂起并检查取消,让出执行权
i++
}
}
delay(100)
job1.cancelAndJoin()
}
释放资源
使用 try-finally 确保资源释放:
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
} finally {
// 取消时仍会执行
println("清理资源")
closeConnection()
}
}
delay(500)
job.cancelAndJoin()
println("完成")
}
在 finally 中使用挂起函数
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
} finally {
// 使用 withContext(NonCancellable) 在已取消的协程中挂起
withContext(NonCancellable) {
delay(1000)
println("延迟清理完成")
}
}
}
delay(500)
job.cancelAndJoin()
}
超时处理
withTimeout
import kotlinx.coroutines.*
fun main() = runBlocking {
try {
withTimeout(1000) {
repeat(10) { i ->
println("执行: $i")
delay(300)
}
}
} catch (e: TimeoutCancellationException) {
println("超时: ${e.message}")
}
}
withTimeoutOrNull
import kotlinx.coroutines.*
fun main() = runBlocking {
val result = withTimeoutOrNull(1000) {
repeat(10) { i ->
println("执行: $i")
delay(300)
}
"完成"
}
println("结果: $result") // null(超时)
}
异常处理
try-catch
import kotlinx.coroutines.*
fun main() = runBlocking {
try {
launch {
throw RuntimeException("错误")
}.join()
} catch (e: Exception) {
println("捕获异常: ${e.message}")
}
}
CoroutineExceptionHandler
import kotlinx.coroutines.*
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("捕获异常: $exception")
}
launch(handler) {
throw RuntimeException("错误")
}
delay(100)
}
SupervisorJob
SupervisorJob 让子协程的失败不影响其他子协程:
import kotlinx.coroutines.*
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
launch {
delay(100)
throw RuntimeException("协程 1 失败")
}
launch {
delay(200)
println("协程 2 完成") // 仍会执行
}
}
}
async 的异常处理
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred = async {
throw RuntimeException("错误")
}
// async 的异常在 await 时抛出
try {
deferred.await()
} catch (e: Exception) {
println("捕获: ${e.message}")
}
}
Channel(通道)
Channel 用于协程间通信,类似于 BlockingQueue。
基本使用
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
// 发送端
launch {
for (i in 1..5) {
channel.send(i)
println("发送: $i")
delay(100)
}
channel.close()
}
// 接收端
launch {
for (i in channel) {
println("接收: $i")
}
}.join()
}
Channel 类型
| 类型 | 容量 | 行为 |
|---|---|---|
| Rendezvous | 0 | 发送和接收必须同时进行 |
| Buffered | N | 满时发送挂起 |
| Unlimited | 无限 | 发送永不挂起 |
| Conflated | 1 | 只保留最新值 |
// 创建不同类型的 Channel
val rendezvous = Channel<Int>() // 默认
val buffered = Channel<Int>(10) // 容量 10
val unlimited = Channel<Int>(Channel.UNLIMITED)
val conflated = Channel<Int>(Channel.CONFLATED)
Produce 和 Consume
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// 生产者模式
fun CoroutineScope.produceNumbers() = produce {
for (i in 1..10) {
send(i)
}
}
// 消费者模式
fun CoroutineScope.consumeNumbers() = consumeEach {
println("消费: $it")
}
fun main() = runBlocking {
val channel = produceNumbers()
channel.consumeNumbers()
}
Flow(流)
Flow 是 Kotlin 的响应式流,用于处理异步数据序列。
创建 Flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// 方式一:flow 构建器
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(100)
emit(i) // 发射值
}
}
// 方式二:flowOf
fun flowOfExample() = flowOf(1, 2, 3)
// 方式三:asFlow
fun asFlowExample() = (1..5).asFlow()
收集 Flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
simpleFlow()
.collect { value ->
println("收到: $value")
}
}
Flow 是冷的
Flow 只有在被收集时才会执行:
fun simpleFlow(): Flow<Int> = flow {
println("Flow 开始")
emit(1)
emit(2)
}
fun main() = runBlocking {
println("调用 simpleFlow")
val flow = simpleFlow() // 此时不执行
println("开始收集")
flow.collect { println(it) } // 此时执行
println("再次收集")
flow.collect { println(it) } // 再次执行
}
中间操作符
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// map:转换
flowOf(1, 2, 3)
.map { it * 2 }
.collect { println(it) } // 2, 4, 6
// filter:过滤
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.collect { println(it) } // 2, 4
// take:取前 N 个
flowOf(1, 2, 3, 4, 5)
.take(3)
.collect { println(it) } // 1, 2, 3
// transform:自定义转换
flowOf(1, 2, 3)
.transform {
emit("处理前: $it")
emit("处理后: ${it * 2}")
}
.collect { println(it) }
}
终端操作符
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flowOf(1, 2, 3, 4, 5)
// collect:收集每个值
flow.collect { println(it) }
// toList:转换为列表
val list = flow.toList()
println(list) // [1, 2, 3, 4, 5]
// first:取第一个值
val first = flow.first()
println(first) // 1
// single:确保只有一个值
val single = flowOf(1).single()
println(single) // 1
// reduce:归约
val sum = flow.reduce { acc, value -> acc + value }
println(sum) // 15
// fold:带初始值的归约
val product = flow.fold(1) { acc, value -> acc * value }
println(product) // 120
}
上下文切换
使用 flowOn 切换 Flow 的执行上下文:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flow {
for (i in 1..5) {
println("发射: ${Thread.currentThread().name}")
emit(i)
}
}
.flowOn(Dispatchers.Default) // 在 Default 调度器执行
.collect {
println("收集: ${Thread.currentThread().name}, 值: $it")
}
}
缓冲与合并
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// buffer:并发发射和收集
flow {
for (i in 1..5) {
delay(100) // 发射慢
emit(i)
}
}
.buffer() // 缓冲区
.collect {
delay(300) // 收集慢
println(it)
}
// conflate:只处理最新值
flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}
.conflate()
.collect {
delay(300)
println(it) // 可能跳过中间值
}
// collectLatest:取消旧值处理
flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}
.collectLatest {
delay(300)
println(it) // 只处理最新值
}
}
组合多个 Flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val nums = (1..3).asFlow()
val strs = flowOf("A", "B", "C")
// zip:配对组合
nums.zip(strs) { a, b -> "$a$b" }
.collect { println(it) } // 1A, 2B, 3C
// combine:任一变化时重新计算
val flow1 = (1..3).asFlow().onEach { delay(300) }
val flow2 = flowOf("A", "B", "C").onEach { delay(400) }
flow1.combine(flow2) { a, b -> "$a$b" }
.collect { println(it) }
// 1A, 2A, 2B, 3B, 3C
}
异常处理
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 方式一:try-catch
try {
flow {
emit(1)
throw RuntimeException("错误")
}.collect { println(it) }
} catch (e: Exception) {
println("捕获: ${e.message}")
}
// 方式二:catch 操作符
flow {
emit(1)
throw RuntimeException("错误")
}
.catch { e ->
println("捕获: ${e.message}")
emit(-1) // 可以发射备用值
}
.collect { println(it) }
}
完成回调
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 方式一:try-finally
try {
flowOf(1, 2, 3).collect { println(it) }
} finally {
println("完成")
}
// 方式二:onCompletion
flowOf(1, 2, 3)
.onCompletion { cause ->
if (cause == null) {
println("正常完成")
} else {
println("异常完成: ${cause.message}")
}
}
.collect { println(it) }
}
并发安全
Mutex
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
fun main() = runBlocking {
val mutex = Mutex()
var count = 0
repeat(1000) {
launch {
mutex.withLock {
count++
}
}
}
delay(1000)
println("Count: $count") // 1000
}
原子操作
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
fun main() = runBlocking {
val counter = AtomicInteger(0)
repeat(1000) {
launch {
counter.incrementAndGet()
}
}
delay(1000)
println("Counter: ${counter.get()}") // 1000
}
最佳实践
1. 使用结构化并发
// ✅ 好:使用结构化并发
suspend fun loadData() = coroutineScope {
val user = async { fetchUser() }
val posts = async { fetchPosts() }
Result(user.await(), posts.await())
}
// ❌ 差:使用 GlobalScope
fun loadData() {
GlobalScope.launch {
// 不会被父协程取消
}
}
2. 正确处理生命周期
// Android 示例
class MyViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch {
// 自动随 ViewModel 取消
}
}
}
3. 选择合适的调度器
// CPU 密集型
withContext(Dispatchers.Default) {
// 计算
}
// IO 操作
withContext(Dispatchers.IO) {
// 网络/文件操作
}
// UI 更新(Android)
withContext(Dispatchers.Main) {
// 更新 UI
}
4. 避免阻塞协程
// ❌ 差:阻塞协程
launch {
Thread.sleep(1000) // 阻塞线程
}
// ✅ 好:使用挂起函数
launch {
delay(1000) // 挂起,不阻塞
}
小结
本章我们学习了:
- 协程基础:概念、优势、与线程的对比
- 挂起函数:suspend 关键字、挂起点、状态机
- 协程构建器:launch、async、runBlocking
- 调度器:Default、IO、Main、Unconfined
- 结构化并发:作用域、父子关系、生命周期管理
- 协程取消:协作式取消、资源释放、超时处理
- 异常处理:try-catch、CoroutineExceptionHandler、SupervisorJob
- Channel:协程间通信
- Flow:响应式流、操作符、异常处理
- 并发安全:Mutex、原子操作
练习
- 使用
launch启动多个协程并发执行,统计总执行时间 - 使用
async/await实现并行获取多个网络请求结果 - 实现可取消的协程,支持超时和手动取消
- 创建一个 Flow,实现数据转换、过滤和异常处理
- 使用 Mutex 实现一个并发安全的计数器