Kotlin 协程
协程是 Kotlin 强大的异步编程特性,让异步代码像同步代码一样简洁。
协程基础
什么是协程?
协程是一种"轻量级线程",允许在单个线程中挂起和恢复执行:
import kotlinx.coroutines.*
fun main() = runBlocking {
println("开始")
delay(1000) // 挂起 1 秒(非阻塞)
println("结束")
}
// 输出:
// 开始
// (1秒后)
// 结束
协程 vs 线程
| 特性 | 协程 | 线程 |
|---|---|---|
| 创建成本 | 极低 | 高 |
| 数量 | 可达百万 | 受限 |
| 阻塞 | 协程挂起 | 线程阻塞 |
| 管理 | 手动控制 | 操作系统 |
第一个协程
依赖配置
// build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.3")
}
基本结构
import kotlinx.coroutines.*
fun main() = runBlocking {
// 协程构建器:launch
launch {
delay(1000)
println("Hello from coroutine!")
}
println("Hello from main!")
delay(2000) // 等待协程完成
}
// 输出:
// Hello from main!
// (1秒后)
// Hello from coroutine!
协程构建器
launch
启动协程,不返回结果:
import kotlinx.coroutines.*
fun main() = runBlocking {
// 启动协程
val job = launch {
repeat(5) { i ->
println("协程执行: $i")
delay(100)
}
}
// 等待协程完成
job.join()
println("协程完成")
}
async
启动协程,返回 Deferred(类似 Future):
import kotlinx.coroutines.*
fun main() = runBlocking {
// async 返回结果
val deferred1 = async { fetchUser(1) }
val deferred2 = async { fetchUser(2) }
// 等待并获取结果
val user1 = deferred1.await()
val user2 = deferred2.await()
println("User1: $user1, User2: $user2")
}
suspend fun fetchUser(id: Int): String {
delay(500) // 模拟网络请求
return "User$id"
}
awaitAll
等待多个 async:
fun main() = runBlocking {
// 并发执行
val results = awaitAll(
async { fetchData(1) },
async { fetchData(2) },
async { fetchData(3) }
)
println(results) // [Data1, Data2, Data3]
}
协程上下文
CoroutineContext
协程运行在上下文中,包含调度器、协程名等信息:
import kotlinx.coroutines.*
fun main() = runBlocking {
// 指定调度器
launch(Dispatchers.Default) {
// CPU 密集型任务
println("运行在线程: ${Thread.currentThread().name}")
}
launch(Dispatchers.IO) {
// IO 操作
println("运行在线程: ${Thread.currentThread().name}")
}
launch(Dispatchers.Main) {
// UI 线程(Android)
}
}
常用 Dispatchers
| 调度器 | 用途 |
|---|---|
| Default | CPU 密集型任务 |
| IO | IO 操作、网络请求 |
| Main | UI 线程(Android) |
| Unconfined | 不限制线程 |
协程名和 ID
fun main() = runBlocking {
launch(CoroutineName("我的协程")) {
println("协程名: ${coroutineContext[CoroutineName]}")
}
}
协程作用域
CoroutineScope
管理协程生命周期:
import kotlinx.coroutines.*
class MyRepository {
// 定义作用域
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
fun fetchData() {
scope.launch {
// 协程在这个作用域中运行
}
}
// 清理
fun onDestroy() {
scope.cancel()
}
}
viewModelScope (Android)
class MyViewModel : ViewModel() {
// ViewModel 自带的作用域
fun loadData() {
viewModelScope.launch {
// 在 IO 调度器
}
}
}
挂起函数
suspend 关键字
import kotlinx.coroutines.*
// 挂起函数 - 可以在协程中挂起
suspend fun fetchUser(): String {
delay(1000) // 挂起(非阻塞)
return "User"
}
fun main() = runBlocking {
println("开始")
val user = fetchUser() // 等待结果
println("用户: $user")
}
withContext
切换协程上下文:
suspend fun fetchData(): String = withContext(Dispatchers.IO) {
// 在 IO 线程执行
"Data"
}
suspend fun processData(): String = withContext(Dispatchers.Default) {
// 在 CPU 线程执行
"Processed"
}
协程取消
取消协程
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("执行: $i")
delay(100)
}
}
delay(500)
println("取消协程")
job.cancel()
job.join()
println("协程已取消")
}
isActive 检查
fun main() = runBlocking {
val job = launch {
var i = 0
while (isActive) { // 检查协程是否活跃
println("执行: $i")
delay(100)
i++
}
}
delay(500)
job.cancelAndJoin()
println("协程已取消")
}
协程取消传播
suspend fun parentCoroutine() {
coroutineScope {
launch {
delay(1000)
println("子协程1完成")
}
launch {
delay(500)
println("子协程2完成")
}
}
// 父协程等待所有子协程完成
}
异常处理
try-catch
fun main() = runBlocking {
try {
launch {
throw RuntimeException("错误")
}.join()
} catch (e: Exception) {
println("捕获异常: ${e.message}")
}
}
CoroutineExceptionHandler
val handler = CoroutineExceptionHandler { _, exception ->
println("捕获异常: $exception")
}
fun main() = runBlocking {
launch(handler) {
throw RuntimeException("错误")
}
delay(100)
}
异常传播
fun main() = runBlocking {
// async 的异常不会被自动传播
val deferred = async {
throw RuntimeException("错误")
}
delay(100)
// 需要调用 await 才会抛出
try {
deferred.await()
} catch (e: Exception) {
println("捕获: ${e.message}")
}
}
通道 (Channel)
Channel 基本用法
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
// 发送
launch {
for (i in 1..5) {
channel.send(i)
delay(100)
}
channel.close()
}
// 接收
launch {
for (i in channel) {
println("收到: $i")
}
}.join()
}
Channel 类型
| 类型 | 行为 |
|---|---|
| Rendezvous | 0 容量,需要同时有发送和接收 |
| Buffered | 有缓冲区,满时发送挂起 |
| Unlimited | 无限缓冲 |
| Conflated | 只保留最新值 |
Flow
什么是 Flow?
Flow 是 Kotlin 的响应式流,类似于 RxJava:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 创建 Flow
val flow = flow {
for (i in 1..5) {
emit(i) // 发送值
delay(100)
}
}
// 收集
flow.collect { value ->
println("收到: $value")
}
}
Flow 操作符
fun main() = runBlocking {
// map
flowOf(1, 2, 3)
.map { it * 2 }
.collect { println(it) } // 2, 4, 6
// filter
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.collect { println(it) } // 2, 4
// take
flowOf(1, 2, 3, 4, 5)
.take(3)
.collect { println(it) } // 1, 2, 3
// onEach / onStart / onCompletion
flowOf(1, 2, 3)
.onStart { println("开始") }
.onEach { println("处理: $it") }
.onCompletion { println("完成") }
.collect()
}
冷流 vs 热流
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 冷流 - 只有收集时才执行
val coldFlow = flow {
println("发射")
emit(1)
}
println("创建完成")
coldFlow.collect { println("收集1: $it") }
coldFlow.collect { println("收集2: $it") }
// 输出: 创建完成, 发射, 收集1: 1, 发射, 收集2: 1
// 热流 - 创建时就开始执行
val hotFlow = MutableSharedFlow<Int>()
hotFlow.emit(1) // 需要在协程中
// ...
}
并发安全
Mutex
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
fun main() = runBlocking {
val mutex = Mutex()
var count = 0
repeat(1000) {
launch {
mutex.withLock {
count++
}
}
}
delay(1000)
println("Count: $count") // 1000
}
线程安全数据结构
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// AtomicInteger
val counter = AtomicInteger(0)
repeat(1000) {
launch {
counter.incrementAndGet()
}
}
delay(1000)
println("Counter: ${counter.get()}") // 1000
}
小结
本章我们学习了:
- 协程基础:概念、优势
- 协程构建器:launch, async
- 调度器:Dispatchers
- 挂起函数:suspend, withContext
- 协程取消:cancel, isActive
- 异常处理:try-catch, CoroutineExceptionHandler
- 通道:Channel
- Flow:响应式流
- 并发安全:Mutex, 线程安全类
练习
- 使用 launch 启动多个协程并发执行
- 使用 async/await 获取多个异步结果
- 实现协程取消功能
- 创建和使用 Flow
- 使用 Mutex 实现并发安全计数