并发编程
Rust 的并发模型基于"无畏并发"(Fearless Concurrency)理念。编译器在编译时检查并发问题,避免数据竞争和其他常见并发错误。
线程
创建线程
use std::thread;
use std::time::Duration;
fn main() {
// 创建线程
thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
// 主线程
for i in 1..5 {
println!("主线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
// 注意:主线程结束时会终止整个程序
// 子线程可能还没执行完
}
join 等待线程
use std::thread;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
}
});
// 等待子线程结束
handle.join().unwrap();
println!("子线程已结束");
}
move 闭包
use std::thread;
fn main() {
let v = vec![1, 2, 3, 4, 5];
// 使用 move 将所有权转移到线程
let handle = thread::spawn(move || {
println!("向量: {:?}", v);
});
// v 已被移动,不能在主线程使用
// println!("{:?}", v); // 错误!
handle.join().unwrap();
}
线程返回值
use std::thread;
fn main() {
let handle = thread::spawn(|| {
let result = 1 + 2;
result
});
let result = handle.join().unwrap();
println!("结果: {}", result);
}
消息传递
Channel 基本用法
use std::sync::mpsc; // multiple producer, single consumer
use std::thread;
fn main() {
// 创建通道
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("你好");
tx.send(val).unwrap();
});
// 接收消息(阻塞)
let received = rx.recv().unwrap();
println!("收到: {}", received);
}
发送多个消息
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("你好"),
String::from("世界"),
String::from("Rust"),
String::from("并发"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// 将 rx 当作迭代器
for received in rx {
println!("收到: {}", received);
}
}
多个生产者
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 克隆发送端
let tx1 = tx.clone();
let tx2 = tx.clone();
// 原始 tx 被移动,所以先克隆
drop(tx); // 释放原始 tx
thread::spawn(move || {
let vals = vec![
String::from("线程1: 你好"),
String::from("线程1: 世界"),
];
for val in vals {
tx1.send(val).unwrap();
}
});
thread::spawn(move || {
let vals = vec![
String::from("线程2: Rust"),
String::from("线程2: 并发"),
];
for val in vals {
tx2.send(val).unwrap();
}
});
for received in rx {
println!("{}", received);
}
}
try_recv 非阻塞接收
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx.send(String::from("延迟消息")).unwrap();
});
// 非阻塞检查
loop {
match rx.try_recv() {
Ok(msg) => {
println!("收到: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("暂无消息,继续工作...");
thread::sleep(Duration::from_millis(100));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("发送端已关闭");
break;
}
}
}
}
共享状态
Mutex 互斥锁
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
// 获取锁
let mut num = m.lock().unwrap();
*num = 6;
// 离开作用域自动释放锁
}
println!("m = {:?}", m);
}
多线程共享 Mutex
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc 用于多线程共享所有权
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", *counter.lock().unwrap());
}
Mutex 和结构体
use std::sync::{Arc, Mutex};
use std::thread;
#[derive(Debug)]
struct BankAccount {
balance: f64,
}
impl BankAccount {
fn new(initial: f64) -> Self {
BankAccount { balance: initial }
}
fn deposit(&mut self, amount: f64) {
self.balance += amount;
}
fn withdraw(&mut self, amount: f64) -> bool {
if self.balance >= amount {
self.balance -= amount;
true
} else {
false
}
}
}
fn main() {
let account = Arc::new(Mutex::new(BankAccount::new(100.0)));
let mut handles = vec![];
for i in 0..5 {
let account = Arc::clone(&account);
let handle = thread::spawn(move || {
let mut acc = account.lock().unwrap();
acc.deposit(10.0);
println!("线程 {} 存入后余额: {}", i, acc.balance);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终余额: {}", account.lock().unwrap().balance);
}
RwLock 读写锁
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// 多个读者
for i in 0..3 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let r = data.read().unwrap();
println!("读者 {}: {:?}", i, *r);
});
handles.push(handle);
}
// 一个写者
{
let data = Arc::clone(&data);
let mut w = data.write().unwrap();
w.push(4);
println!("写者添加: {:?}", *w);
}
for handle in handles {
handle.join().unwrap();
}
}
Send 和 Sync Trait
Send Trait
Send trait 表示类型可以在线程间安全转移所有权:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
// Vec<i32> 实现了 Send
thread::spawn(move || {
println!("{:?}", v);
});
// Rc<T> 没有实现 Send,不能在线程间传递
// use std::rc::Rc;
// let rc = Rc::new(5);
// thread::spawn(move || { // 编译错误!
// println!("{}", rc);
// });
}
Sync Trait
Sync trait 表示类型可以在线程间安全共享引用:
use std::sync::Arc;
use std::thread;
fn main() {
// Mutex<T> 实现了 Sync
let data = Arc::new(std::sync::Mutex::new(0));
// 可以安全地共享引用
let data_clone = Arc::clone(&data);
thread::spawn(move || {
let mut lock = data_clone.lock().unwrap();
*lock += 1;
});
}
原子类型
Atomic 类型
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicI32::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 原子增加
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", counter.load(Ordering::SeqCst));
}
Ordering 内存顺序
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
let ready = AtomicBool::new(false);
let data = Arc::new(vec![1, 2, 3]);
let data_clone = Arc::clone(&data);
let ready_ref = &ready; // AtomicBool 实现了 Sync
thread::scope(|s| {
s.spawn(|| {
// 修改数据
let mut d = (*data_clone).clone();
d.push(4);
println!("数据已准备好");
// 释放:确保之前的写操作可见
ready.store(true, Ordering::Release);
});
s.spawn(|| {
// 获取:确保看到之前的数据
while !ready_ref.load(Ordering::Acquire) {
std::hint::spin_loop();
}
println!("读取数据: {:?}", *data);
});
});
}
Scoped Threads
使用 scope
use std::thread;
fn main() {
let mut v = vec![1, 2, 3, 4, 5, 6, 7, 8];
// 使用 scope 允许引用外部数据
thread::scope(|s| {
s.spawn(|| {
// 可以引用 v,因为 scope 保证 v 在所有线程结束前有效
v[0] *= 2;
});
s.spawn(|| {
v[1] *= 2;
});
});
println!("结果: {:?}", v);
}
并行处理
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut results = vec![0; data.len()];
thread::scope(|s| {
// 将数据分成多个块并行处理
let chunk_size = data.len() / 4;
for i in 0..4 {
let start = i * chunk_size;
let end = if i == 3 { data.len() } else { (i + 1) * chunk_size };
s.spawn(move || {
for j in start..end {
results[j] = data[j] * data[j]; // 平方
}
});
}
});
println!("结果: {:?}", results);
}
异步编程基础
异步编程是 Rust 并发编程的重要组成部分。与基于操作系统线程的并发不同,异步编程允许在单个线程内处理大量并发任务,具有更低的内存开销和更快的上下文切换速度。
为什么需要异步编程?
在传统的线程模型中,每个线程都需要操作系统分配栈空间(通常为 1-2MB),当需要处理成千上万的并发连接时,内存消耗会非常巨大。而异步任务的内存开销通常只有几 KB,这使得异步编程特别适合以下场景:
- 高并发服务器:处理大量网络连接
- I/O 密集型应用:大量等待操作(如网络请求、文件读写)
- 资源受限环境:嵌入式设备、微控制器
异步 vs 多线程:
| 特性 | 多线程 | 异步 |
|---|---|---|
| 内存开销 | 较大(每线程 MB 级) | 较小(每任务 KB 级) |
| 上下文切换 | 操作系统管理,开销较大 | 运行时管理,开销极小 |
| 适用场景 | CPU 密集型任务 | I/O 密集型任务 |
| 编程复杂度 | 相对简单 | 需要理解 Future 和运行时 |
Future Trait
Future 是 Rust 异步编程的核心抽象。它表示一个可能还没有完成的异步计算。
从 Rust 2024 Edition 开始,Future 和 IntoFuture traits 已被添加到标准库的 prelude 中。这意味着:
- 不需要显式导入:在 Rust 2024 中,这些 traits 会自动可用,无需
use std::future::Future - 可能的歧义问题:如果你定义了与
Future或IntoFuture中同名的方法,可能会导致编译错误
如果你从较早版本迁移到 Rust 2024,可能会遇到 trait 方法调用歧义问题。解决方法是使用完全限定语法:
// 如果存在方法歧义,使用完全限定语法
<_ as MyTrait>::poll(&my_value);
// 或者显式指定 trait
MyTrait::poll(&my_value);
运行 cargo fix --edition 可以自动修复大多数此类问题。
// 标准库中 Future trait 的定义
// Rust 2024+ 不需要显式导入,自动在 prelude 中
use std::pin::Pin;
use std::task::{Context, Poll};
trait Future {
type Output; // Future 完成时产生的值类型
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
// Poll 枚举表示轮询结果
enum Poll<T> {
Ready(T), // 计算完成,包含结果
Pending, // 计算未完成,需要稍后再试
}
Future 的工作原理:
Future 采用"轮询"(polling)模型。当你调用 .await 时,运行时会不断地调用 poll 方法:
- 如果
poll返回Poll::Ready(value),说明计算完成,value就是被.await返回的值 - 如果
poll返回Poll::Pending,说明计算还未完成。运行时会注册一个唤醒器(Waker),当计算可能完成时,唤醒器会通知运行时再次轮询
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;
// 自定义 Future:延迟指定时间后返回
struct Delay {
duration: Duration,
start: Option<Instant>,
}
impl Delay {
fn new(duration: Duration) -> Self {
Delay {
duration,
start: None,
}
}
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 第一次 poll 时记录开始时间
if self.start.is_none() {
self.start = Some(Instant::now());
// 使用线程模拟异步等待
// 在实际应用中,应该使用运行时提供的定时器
let waker = cx.waker().clone();
let duration = self.duration;
thread::spawn(move || {
thread::sleep(duration);
waker.wake(); // 通知运行时可以再次 poll
});
return Poll::Pending;
}
// 检查是否已经过了足够的时间
if self.start.unwrap().elapsed() >= self.duration {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
// 注意:在实际应用中,不要在 Future 中使用 thread::sleep
// 这会阻塞整个线程,违背异步编程的初衷
// 应该使用运行时提供的异步定时器
async/await 语法
async 和 await 是 Rust 提供的异步编程语法糖,让异步代码看起来像同步代码一样清晰。
async fn 定义异步函数
// async fn 定义的函数返回一个 Future
async fn say_hello() {
println!("你好");
}
// 等价于:
fn say_hello() -> impl Future<Output = ()> {
async {
println!("你好");
}
}
// async 函数可以有返回值
async fn add(a: i32, b: i32) -> i32 {
a + b
}
// 返回类型:impl Future<Output = i32>
.await 等待 Future 完成
// .await 会暂停当前函数的执行,直到 Future 完成
async fn example() {
let result = add(1, 2).await; // 等待 add 完成
println!("结果: {}", result);
}
// 错误:不能在非 async 函数中使用 .await
// fn not_async() {
// say_hello().await; // 编译错误!
// }
async 块
除了 async fn,还可以使用 async 块创建匿名的 Future:
async fn example() {
// async 块创建一个 Future
let future = async {
println!("执行异步操作");
42
};
// 等待 Future 完成
let result = future.await;
println!("结果: {}", result);
}
异步运行时:Tokio
Rust 标准库只提供了 Future trait,但没有提供执行 Future 的运行时。需要使用第三方运行时,最流行的是 Tokio。
添加 Tokio 依赖
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
基本 Tokio 程序
use tokio::time::{sleep, Duration};
// 使用 #[tokio::main] 宏让 main 函数变成异步
#[tokio::main]
async fn main() {
println!("开始");
// 异步等待 1 秒(不会阻塞线程)
sleep(Duration::from_secs(1)).await;
println!("1 秒后");
// 调用异步函数
let result = compute().await;
println!("计算结果: {}", result);
}
async fn compute() -> i32 {
// 模拟耗时计算
sleep(Duration::from_millis(100)).await;
42
}
#[tokio::main] 宏展开后:
fn main() {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async {
// 原来的 async main 函数体
println!("开始");
sleep(Duration::from_secs(1)).await;
println!("1 秒后");
let result = compute().await;
println!("计算结果: {}", result);
})
}
创建异步任务:tokio::spawn
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("主任务开始");
// spawn 创建一个新的异步任务,立即返回 JoinHandle
let handle = tokio::spawn(async {
println!("子任务开始");
sleep(Duration::from_secs(2)).await;
println!("子任务结束");
"子任务结果"
});
// 主任务继续执行,不等待子任务
println!("主任务继续工作...");
sleep(Duration::from_secs(1)).await;
println!("主任务等待子任务...");
// 等待子任务完成并获取结果
let result = handle.await.unwrap();
println!("收到: {}", result);
}
异步 I/O 操作
use tokio::fs;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
// 异步读取文件
let content = fs::read_to_string("hello.txt").await?;
println!("文件内容: {}", content);
// 异步写入文件
fs::write("output.txt", "Hello, Async!").await?;
// 更精细的文件操作
let mut file = fs::File::create("data.txt").await?;
file.write_all(b"异步写入数据").await?;
Ok(())
}
异步网络编程
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
// 绑定 TCP 监听器
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器启动,监听 127.0.0.1:8080");
loop {
// 异步接受连接
let (mut socket, addr) = listener.accept().await?;
println!("新连接: {}", addr);
// 为每个连接创建一个任务
tokio::spawn(async move {
let mut buf = [0; 1024];
// 异步读取数据
match socket.read(&mut buf).await {
Ok(n) if n > 0 => {
// 回显数据
socket.write_all(&buf[..n]).await.unwrap();
}
_ => {}
}
});
}
}
并发执行多个 Future
join!:等待所有 Future 完成
use tokio::time::{sleep, Duration};
async fn task1() -> i32 {
sleep(Duration::from_millis(100)).await;
println!("任务 1 完成");
1
}
async fn task2() -> i32 {
sleep(Duration::from_millis(200)).await;
println!("任务 2 完成");
2
}
#[tokio::main]
async fn main() {
// join! 并发执行多个 Future,等待全部完成
let (r1, r2) = tokio::join!(task1(), task2());
println!("结果: {}, {}", r1, r2);
}
select!:等待任一 Future 完成
use tokio::time::{sleep, Duration, timeout};
async fn slow_task() -> i32 {
sleep(Duration::from_secs(5)).await;
42
}
#[tokio::main]
async fn main() {
// select! 等待第一个完成的 Future
tokio::select! {
result = slow_task() => {
println!("任务完成: {}", result);
}
_ = sleep(Duration::from_secs(1)) => {
println!("超时了!");
}
}
}
使用 timeout 设置超时
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let result = timeout(
Duration::from_secs(2),
async {
// 模拟耗时操作
sleep(Duration::from_secs(5)).await;
"完成"
}
).await;
match result {
Ok(value) => println!("成功: {}", value),
Err(_) => println!("操作超时"),
}
}
异步通道
Tokio 提供了异步版本的通道,适合在异步任务间通信:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 创建异步通道(容量为 32)
let (tx, mut rx) = mpsc::channel(32);
// 生产者任务
let producer = tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
println!("发送: {}", i);
}
});
// 消费者
while let Some(msg) = rx.recv().await {
println!("接收: {}", msg);
}
producer.await.unwrap();
}
异步互斥锁
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let mut num = counter.lock().await;
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("结果: {}", *counter.lock().await);
}
异步编程注意事项
避免在异步代码中使用阻塞操作
// 错误:在异步代码中使用阻塞操作
async fn bad_example() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞整个线程!
}
// 正确:使用异步版本
async fn good_example() {
tokio::time::sleep(Duration::from_secs(1)).await; // 不阻塞线程
}
使用 tokio::task::spawn_blocking 处理阻塞操作
#[tokio::main]
async fn main() {
// 对于无法避免的阻塞操作,使用 spawn_blocking
let result = tokio::task::spawn_blocking(|| {
// 这里可以安全地执行阻塞操作
std::thread::sleep(Duration::from_secs(1));
"阻塞操作完成"
}).await.unwrap();
println!("{}", result);
}
异步 trait
在 Rust 1.75 之前,直接在 trait 中使用 async fn 需要使用 async_trait 宏:
// Rust 1.75+ 原生支持 async fn in trait
trait AsyncProcessor {
async fn process(&self, data: &str) -> i32;
}
struct MyProcessor;
impl AsyncProcessor for MyProcessor {
async fn process(&self, data: &str) -> i32 {
// 异步处理
tokio::time::sleep(Duration::from_millis(100)).await;
data.len() as i32
}
}
// 对于旧版本 Rust,使用 async_trait 宏
// #[async_trait]
// trait AsyncProcessor {
// async fn process(&self, data: &str) -> i32;
// }
异步闭包(Rust 2024 新特性)
Rust 2024 Edition 引入了异步闭包(async closures),这是异步编程领域的重要增强。异步闭包允许你定义返回 Future 的闭包,并且能够正确地从环境中捕获值。
为什么需要异步闭包?
在 Rust 2024 之前,如果你想要一个返回 Future 的闭包,通常需要使用 || async {} 的形式,即一个普通闭包返回一个 async 块。但这种方式有一个关键限制:内部 async 块返回的 Future 无法从闭包的捕获中借用值。
// Rust 2024 之前的方式
let mut vec = vec![1, 2, 3];
let closure = || {
// 问题:这个 async 块无法从闭包捕获中借用 vec
async {
vec.push(4); // 编译错误!
}
};
异步闭包解决了这个问题,它允许 Future 正确地从闭包捕获中借用数据。
基本语法
// 异步闭包的基本语法
let async_closure = async |x: i32| {
// 异步操作
tokio::time::sleep(Duration::from_millis(100)).await;
x * 2
};
// 调用异步闭包(返回一个 Future)
let result = async_closure(5).await;
println!("结果: {}", result); // 结果: 10
捕获环境的异步闭包
#[tokio::main]
async fn main() {
let mut data = vec![1, 2, 3];
// 异步闭包可以可变地借用捕获的数据
let mut push_value = async |value: i32| {
// Future 可以借用 data
data.push(value);
println!("添加了: {}", value);
};
// 调用异步闭包
push_value(4).await;
push_value(5).await;
println!("最终数据: {:?}", data); // [1, 2, 3, 4, 5]
}
与普通闭包返回 async 块的区别
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let mut counter = 0;
// 方式一:普通闭包返回 async 块
// 问题:async 块不能从闭包捕获中借用 counter
let regular_closure = || {
// counter += 1; // 这会导致借用检查错误
async {
sleep(Duration::from_millis(10)).await;
42
}
};
// 方式二:异步闭包(Rust 2024+)
// 优势:可以从闭包捕获中借用数据
let async_closure = async || {
counter += 1; // 这在异步闭包中是允许的!
sleep(Duration::from_millis(10)).await;
counter
};
let result1 = async_closure.await;
let result2 = async_closure.await;
println!("结果: {}, {}", result1, result2); // 结果: 1, 2
}
AsyncFn Trait 系列
与 Fn、FnMut、FnOnce trait 对应,异步闭包有三个对应的 trait:
| Trait | 说明 | 对应关系 |
|---|---|---|
AsyncFn | 不可变借用捕获的值 | 对应 Fn |
AsyncFnMut | 可变借用捕获的值 | 对应 FnMut |
AsyncFnOnce | 消耗捕获的值 | 对应 FnOnce |
这三个 trait 已在 Rust 2024 的 prelude 中,无需显式导入:
// Rust 2024+: 使用 AsyncFn trait
async fn process_with_async_fn<F>(callback: F) -> i32
where
F: AsyncFn(i32) -> i32,
{
callback(10).await
}
#[tokio::main]
async fn main() {
let multiplier = 2;
let async_closure = async |x: i32| x * multiplier;
let result = process_with_async_fn(async_closure).await;
println!("结果: {}", result); // 结果: 20
}
高阶函数中的应用
异步闭包特别适合用于高阶函数场景,可以编写更简洁的异步代码:
use tokio::time::{sleep, Duration};
// 高阶函数:接受异步闭包并重复执行
async fn repeat_async<F>(times: usize, mut f: F) -> Vec<i32>
where
F: AsyncFnMut() -> i32,
{
let mut results = Vec::new();
for _ in 0..times {
results.push(f().await);
}
results
}
#[tokio::main]
async fn main() {
let mut counter = 0;
// 传入异步闭包
let results = repeat_async(5, async || {
counter += 1;
sleep(Duration::from_millis(10)).await;
counter * 2
}).await;
println!("结果: {:?}", results); // 结果: [2, 4, 6, 8, 10]
}
异步闭包与 move
与普通闭包类似,可以使用 move 关键字将所有权转移到异步闭包中:
#[tokio::main]
async fn main() {
let data = vec![1, 2, 3, 4, 5];
// 使用 move 将 data 的所有权转移到闭包中
let process = async move || {
let sum: i32 = data.iter().sum();
sleep(Duration::from_millis(10)).await;
sum
};
// data 已被移动,不能再使用
// println!("{:?}", data); // 编译错误
let result = process.await;
println!("总和: {}", result); // 总和: 15
}
异步闭包是 Rust 2024 Edition 引入的特性,需要使用 Rust 1.85+ 并在 Cargo.toml 中设置 edition = "2024"。
并发模式
生产者-消费者模式
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 生产者
let producer = thread::spawn(move || {
for i in 0..10 {
tx.send(i).unwrap();
println!("生产: {}", i);
thread::sleep(Duration::from_millis(50));
}
});
// 消费者
let consumer = thread::spawn(move || {
for msg in rx {
println!("消费: {}", msg);
thread::sleep(Duration::from_millis(100));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
线程池模式
use std::sync::{Arc, Mutex};
use std::thread;
struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
fn new(size: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> ThreadPool {
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let receiver = Arc::clone(&receiver);
let worker = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => job(),
Err(_) => break,
}
});
workers.push(worker);
}
ThreadPool { workers }
}
}
fn main() {
let (tx, rx) = std::sync::mpsc::channel();
let rx = Arc::new(Mutex::new(rx));
let _pool = ThreadPool::new(4, rx);
for i in 0..10 {
let tx = tx.clone();
tx.send(Box::new(move || {
println!("任务 {} 正在执行", i);
})).unwrap();
}
// 等待任务完成
thread::sleep(Duration::from_millis(100));
}
小结
本章我们学习了:
- 线程:创建、join、move 闭包
- 消息传递:channel、多生产者、try_recv
- 共享状态:Mutex、RwLock、Arc
- Send 和 Sync:线程安全 trait
- 原子类型:AtomicI32、Ordering
- Scoped Threads:安全引用外部数据
- 异步编程基础:
- Future trait 和轮询模型
- async/await 语法
- Tokio 运行时的使用
- 异步任务创建和管理
- 并发执行多个 Future(join!、select!)
- 异步 I/O 和网络编程
- 异步通道和互斥锁
- 并发模式:生产者-消费者、线程池
练习
- 创建 5 个线程,每个线程打印自己的线程 ID
- 使用 channel 实现一个简单的任务队列
- 使用 Mutex 和 Arc 实现一个线程安全的计数器
- 使用 scope 实现并行数据处理
- 使用 Tokio 编写一个异步 HTTP 客户端,并发请求多个 URL
- 实现一个异步的 Echo 服务器,使用 select! 处理连接超时
- 使用 tokio::sync::mpsc 实现一个异步的生产者-消费者系统