跳到主要内容

Kotlin 协程

协程是 Kotlin 强大的异步编程特性,让异步代码像同步代码一样简洁。

协程基础

什么是协程?

协程是一种"轻量级线程",允许在单个线程中挂起和恢复执行:

import kotlinx.coroutines.*

fun main() = runBlocking {
println("开始")
delay(1000) // 挂起 1 秒(非阻塞)
println("结束")
}

// 输出:
// 开始
// (1秒后)
// 结束

协程 vs 线程

特性协程线程
创建成本极低
数量可达百万受限
阻塞协程挂起线程阻塞
管理手动控制操作系统

第一个协程

依赖配置

// build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.3")
}

基本结构

import kotlinx.coroutines.*

fun main() = runBlocking {
// 协程构建器:launch
launch {
delay(1000)
println("Hello from coroutine!")
}

println("Hello from main!")
delay(2000) // 等待协程完成
}

// 输出:
// Hello from main!
// (1秒后)
// Hello from coroutine!

协程构建器

launch

启动协程,不返回结果:

import kotlinx.coroutines.*

fun main() = runBlocking {
// 启动协程
val job = launch {
repeat(5) { i ->
println("协程执行: $i")
delay(100)
}
}

// 等待协程完成
job.join()
println("协程完成")
}

async

启动协程,返回 Deferred(类似 Future):

import kotlinx.coroutines.*

fun main() = runBlocking {
// async 返回结果
val deferred1 = async { fetchUser(1) }
val deferred2 = async { fetchUser(2) }

// 等待并获取结果
val user1 = deferred1.await()
val user2 = deferred2.await()

println("User1: $user1, User2: $user2")
}

suspend fun fetchUser(id: Int): String {
delay(500) // 模拟网络请求
return "User$id"
}

awaitAll

等待多个 async:

fun main() = runBlocking {
// 并发执行
val results = awaitAll(
async { fetchData(1) },
async { fetchData(2) },
async { fetchData(3) }
)

println(results) // [Data1, Data2, Data3]
}

协程上下文

CoroutineContext

协程运行在上下文中,包含调度器、协程名等信息:

import kotlinx.coroutines.*

fun main() = runBlocking {
// 指定调度器
launch(Dispatchers.Default) {
// CPU 密集型任务
println("运行在线程: ${Thread.currentThread().name}")
}

launch(Dispatchers.IO) {
// IO 操作
println("运行在线程: ${Thread.currentThread().name}")
}

launch(Dispatchers.Main) {
// UI 线程(Android)
}
}

常用 Dispatchers

调度器用途
DefaultCPU 密集型任务
IOIO 操作、网络请求
MainUI 线程(Android)
Unconfined不限制线程

协程名和 ID

fun main() = runBlocking {
launch(CoroutineName("我的协程")) {
println("协程名: ${coroutineContext[CoroutineName]}")
}
}

协程作用域

CoroutineScope

管理协程生命周期:

import kotlinx.coroutines.*

class MyRepository {
// 定义作用域
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

fun fetchData() {
scope.launch {
// 协程在这个作用域中运行
}
}

// 清理
fun onDestroy() {
scope.cancel()
}
}

viewModelScope (Android)

class MyViewModel : ViewModel() {
// ViewModel 自带的作用域
fun loadData() {
viewModelScope.launch {
// 在 IO 调度器
}
}
}

挂起函数

suspend 关键字

import kotlinx.coroutines.*

// 挂起函数 - 可以在协程中挂起
suspend fun fetchUser(): String {
delay(1000) // 挂起(非阻塞)
return "User"
}

fun main() = runBlocking {
println("开始")
val user = fetchUser() // 等待结果
println("用户: $user")
}

withContext

切换协程上下文:

suspend fun fetchData(): String = withContext(Dispatchers.IO) {
// 在 IO 线程执行
"Data"
}

suspend fun processData(): String = withContext(Dispatchers.Default) {
// 在 CPU 线程执行
"Processed"
}

协程取消

取消协程

import kotlinx.coroutines.*

fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
}

delay(500)
println("取消协程")
job.cancel()
job.join()
println("协程已取消")
}

isActive 检查

fun main() = runBlocking {
val job = launch {
var i = 0
while (isActive) { // 检查协程是否活跃
println("执行: $i")
delay(100)
i++
}
}

delay(500)
job.cancelAndJoin()
println("协程已取消")
}

协程取消传播

suspend fun parentCoroutine() {
coroutineScope {
launch {
delay(1000)
println("子协程1完成")
}
launch {
delay(500)
println("子协程2完成")
}
}
// 父协程等待所有子协程完成
}

异常处理

try-catch

fun main() = runBlocking {
try {
launch {
throw RuntimeException("错误")
}.join()
} catch (e: Exception) {
println("捕获异常: ${e.message}")
}
}

CoroutineExceptionHandler

val handler = CoroutineExceptionHandler { _, exception ->
println("捕获异常: $exception")
}

fun main() = runBlocking {
launch(handler) {
throw RuntimeException("错误")
}
delay(100)
}

异常传播

fun main() = runBlocking {
// async 的异常不会被自动传播
val deferred = async {
throw RuntimeException("错误")
}

delay(100)

// 需要调用 await 才会抛出
try {
deferred.await()
} catch (e: Exception) {
println("捕获: ${e.message}")
}
}

通道 (Channel)

Channel 基本用法

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
val channel = Channel<Int>()

// 发送
launch {
for (i in 1..5) {
channel.send(i)
delay(100)
}
channel.close()
}

// 接收
launch {
for (i in channel) {
println("收到: $i")
}
}.join()
}

Channel 类型

类型行为
Rendezvous0 容量,需要同时有发送和接收
Buffered有缓冲区,满时发送挂起
Unlimited无限缓冲
Conflated只保留最新值

Flow

什么是 Flow?

Flow 是 Kotlin 的响应式流,类似于 RxJava:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 创建 Flow
val flow = flow {
for (i in 1..5) {
emit(i) // 发送值
delay(100)
}
}

// 收集
flow.collect { value ->
println("收到: $value")
}
}

Flow 操作符

fun main() = runBlocking {
// map
flowOf(1, 2, 3)
.map { it * 2 }
.collect { println(it) } // 2, 4, 6

// filter
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.collect { println(it) } // 2, 4

// take
flowOf(1, 2, 3, 4, 5)
.take(3)
.collect { println(it) } // 1, 2, 3

// onEach / onStart / onCompletion
flowOf(1, 2, 3)
.onStart { println("开始") }
.onEach { println("处理: $it") }
.onCompletion { println("完成") }
.collect()
}

冷流 vs 热流

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 冷流 - 只有收集时才执行
val coldFlow = flow {
println("发射")
emit(1)
}
println("创建完成")
coldFlow.collect { println("收集1: $it") }
coldFlow.collect { println("收集2: $it") }
// 输出: 创建完成, 发射, 收集1: 1, 发射, 收集2: 1

// 热流 - 创建时就开始执行
val hotFlow = MutableSharedFlow<Int>()
hotFlow.emit(1) // 需要在协程中
// ...
}

并发安全

Mutex

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

fun main() = runBlocking {
val mutex = Mutex()
var count = 0

repeat(1000) {
launch {
mutex.withLock {
count++
}
}
}

delay(1000)
println("Count: $count") // 1000
}

线程安全数据结构

import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// AtomicInteger
val counter = AtomicInteger(0)

repeat(1000) {
launch {
counter.incrementAndGet()
}
}

delay(1000)
println("Counter: ${counter.get()}") // 1000
}

小结

本章我们学习了:

  1. 协程基础:概念、优势
  2. 协程构建器:launch, async
  3. 调度器:Dispatchers
  4. 挂起函数:suspend, withContext
  5. 协程取消:cancel, isActive
  6. 异常处理:try-catch, CoroutineExceptionHandler
  7. 通道:Channel
  8. Flow:响应式流
  9. 并发安全:Mutex, 线程安全类

练习

  1. 使用 launch 启动多个协程并发执行
  2. 使用 async/await 获取多个异步结果
  3. 实现协程取消功能
  4. 创建和使用 Flow
  5. 使用 Mutex 实现并发安全计数