跳到主要内容

并发编程

Rust 的并发模型基于"无畏并发"(Fearless Concurrency)理念。编译器在编译时检查并发问题,避免数据竞争和其他常见并发错误。

线程

创建线程

use std::thread;
use std::time::Duration;

fn main() {
// 创建线程
thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
});

// 主线程
for i in 1..5 {
println!("主线程: {}", i);
thread::sleep(Duration::from_millis(1));
}

// 注意:主线程结束时会终止整个程序
// 子线程可能还没执行完
}

join 等待线程

use std::thread;

fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
}
});

// 等待子线程结束
handle.join().unwrap();

println!("子线程已结束");
}

move 闭包

use std::thread;

fn main() {
let v = vec![1, 2, 3, 4, 5];

// 使用 move 将所有权转移到线程
let handle = thread::spawn(move || {
println!("向量: {:?}", v);
});

// v 已被移动,不能在主线程使用
// println!("{:?}", v); // 错误!

handle.join().unwrap();
}

线程返回值

use std::thread;

fn main() {
let handle = thread::spawn(|| {
let result = 1 + 2;
result
});

let result = handle.join().unwrap();
println!("结果: {}", result);
}

消息传递

Channel 基本用法

use std::sync::mpsc;  // multiple producer, single consumer
use std::thread;

fn main() {
// 创建通道
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("你好");
tx.send(val).unwrap();
});

// 接收消息(阻塞)
let received = rx.recv().unwrap();
println!("收到: {}", received);
}

发送多个消息

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("你好"),
String::from("世界"),
String::from("Rust"),
String::from("并发"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});

// 将 rx 当作迭代器
for received in rx {
println!("收到: {}", received);
}
}

多个生产者

use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

// 克隆发送端
let tx1 = tx.clone();
let tx2 = tx.clone();

// 原始 tx 被移动,所以先克隆
drop(tx); // 释放原始 tx

thread::spawn(move || {
let vals = vec![
String::from("线程1: 你好"),
String::from("线程1: 世界"),
];

for val in vals {
tx1.send(val).unwrap();
}
});

thread::spawn(move || {
let vals = vec![
String::from("线程2: Rust"),
String::from("线程2: 并发"),
];

for val in vals {
tx2.send(val).unwrap();
}
});

for received in rx {
println!("{}", received);
}
}

try_recv 非阻塞接收

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx.send(String::from("延迟消息")).unwrap();
});

// 非阻塞检查
loop {
match rx.try_recv() {
Ok(msg) => {
println!("收到: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("暂无消息,继续工作...");
thread::sleep(Duration::from_millis(100));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("发送端已关闭");
break;
}
}
}
}

共享状态

Mutex 互斥锁

use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);

{
// 获取锁
let mut num = m.lock().unwrap();
*num = 6;
// 离开作用域自动释放锁
}

println!("m = {:?}", m);
}

多线程共享 Mutex

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
// Arc 用于多线程共享所有权
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("结果: {}", *counter.lock().unwrap());
}

Mutex 和结构体

use std::sync::{Arc, Mutex};
use std::thread;

#[derive(Debug)]
struct BankAccount {
balance: f64,
}

impl BankAccount {
fn new(initial: f64) -> Self {
BankAccount { balance: initial }
}

fn deposit(&mut self, amount: f64) {
self.balance += amount;
}

fn withdraw(&mut self, amount: f64) -> bool {
if self.balance >= amount {
self.balance -= amount;
true
} else {
false
}
}
}

fn main() {
let account = Arc::new(Mutex::new(BankAccount::new(100.0)));
let mut handles = vec![];

for i in 0..5 {
let account = Arc::clone(&account);
let handle = thread::spawn(move || {
let mut acc = account.lock().unwrap();
acc.deposit(10.0);
println!("线程 {} 存入后余额: {}", i, acc.balance);
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("最终余额: {}", account.lock().unwrap().balance);
}

RwLock 读写锁

use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];

// 多个读者
for i in 0..3 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let r = data.read().unwrap();
println!("读者 {}: {:?}", i, *r);
});
handles.push(handle);
}

// 一个写者
{
let data = Arc::clone(&data);
let mut w = data.write().unwrap();
w.push(4);
println!("写者添加: {:?}", *w);
}

for handle in handles {
handle.join().unwrap();
}
}

Send 和 Sync Trait

Send Trait

Send trait 表示类型可以在线程间安全转移所有权:

use std::thread;

fn main() {
let v = vec![1, 2, 3];

// Vec<i32> 实现了 Send
thread::spawn(move || {
println!("{:?}", v);
});

// Rc<T> 没有实现 Send,不能在线程间传递
// use std::rc::Rc;
// let rc = Rc::new(5);
// thread::spawn(move || { // 编译错误!
// println!("{}", rc);
// });
}

Sync Trait

Sync trait 表示类型可以在线程间安全共享引用:

use std::sync::Arc;
use std::thread;

fn main() {
// Mutex<T> 实现了 Sync
let data = Arc::new(std::sync::Mutex::new(0));

// 可以安全地共享引用
let data_clone = Arc::clone(&data);
thread::spawn(move || {
let mut lock = data_clone.lock().unwrap();
*lock += 1;
});
}

原子类型

Atomic 类型

use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
let counter = Arc::new(AtomicI32::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 原子增加
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("结果: {}", counter.load(Ordering::SeqCst));
}

Ordering 内存顺序

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
let ready = AtomicBool::new(false);
let data = Arc::new(vec![1, 2, 3]);

let data_clone = Arc::clone(&data);
let ready_ref = &ready; // AtomicBool 实现了 Sync

thread::scope(|s| {
s.spawn(|| {
// 修改数据
let mut d = (*data_clone).clone();
d.push(4);
println!("数据已准备好");

// 释放:确保之前的写操作可见
ready.store(true, Ordering::Release);
});

s.spawn(|| {
// 获取:确保看到之前的数据
while !ready_ref.load(Ordering::Acquire) {
std::hint::spin_loop();
}
println!("读取数据: {:?}", *data);
});
});
}

Scoped Threads

使用 scope

use std::thread;

fn main() {
let mut v = vec![1, 2, 3, 4, 5, 6, 7, 8];

// 使用 scope 允许引用外部数据
thread::scope(|s| {
s.spawn(|| {
// 可以引用 v,因为 scope 保证 v 在所有线程结束前有效
v[0] *= 2;
});

s.spawn(|| {
v[1] *= 2;
});
});

println!("结果: {:?}", v);
}

并行处理

use std::thread;

fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut results = vec![0; data.len()];

thread::scope(|s| {
// 将数据分成多个块并行处理
let chunk_size = data.len() / 4;

for i in 0..4 {
let start = i * chunk_size;
let end = if i == 3 { data.len() } else { (i + 1) * chunk_size };

s.spawn(move || {
for j in start..end {
results[j] = data[j] * data[j]; // 平方
}
});
}
});

println!("结果: {:?}", results);
}

异步编程基础

异步编程是 Rust 并发编程的重要组成部分。与基于操作系统线程的并发不同,异步编程允许在单个线程内处理大量并发任务,具有更低的内存开销和更快的上下文切换速度。

为什么需要异步编程?

在传统的线程模型中,每个线程都需要操作系统分配栈空间(通常为 1-2MB),当需要处理成千上万的并发连接时,内存消耗会非常巨大。而异步任务的内存开销通常只有几 KB,这使得异步编程特别适合以下场景:

  • 高并发服务器:处理大量网络连接
  • I/O 密集型应用:大量等待操作(如网络请求、文件读写)
  • 资源受限环境:嵌入式设备、微控制器

异步 vs 多线程

特性多线程异步
内存开销较大(每线程 MB 级)较小(每任务 KB 级)
上下文切换操作系统管理,开销较大运行时管理,开销极小
适用场景CPU 密集型任务I/O 密集型任务
编程复杂度相对简单需要理解 Future 和运行时

Future Trait

Future 是 Rust 异步编程的核心抽象。它表示一个可能还没有完成的异步计算。

Rust 2024 Edition Prelude 变更

从 Rust 2024 Edition 开始,FutureIntoFuture traits 已被添加到标准库的 prelude 中。这意味着:

  1. 不需要显式导入:在 Rust 2024 中,这些 traits 会自动可用,无需 use std::future::Future
  2. 可能的歧义问题:如果你定义了与 FutureIntoFuture 中同名的方法,可能会导致编译错误

如果你从较早版本迁移到 Rust 2024,可能会遇到 trait 方法调用歧义问题。解决方法是使用完全限定语法:

// 如果存在方法歧义,使用完全限定语法
<_ as MyTrait>::poll(&my_value);

// 或者显式指定 trait
MyTrait::poll(&my_value);

运行 cargo fix --edition 可以自动修复大多数此类问题。

// 标准库中 Future trait 的定义
// Rust 2024+ 不需要显式导入,自动在 prelude 中
use std::pin::Pin;
use std::task::{Context, Poll};

trait Future {
type Output; // Future 完成时产生的值类型

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

// Poll 枚举表示轮询结果
enum Poll<T> {
Ready(T), // 计算完成,包含结果
Pending, // 计算未完成,需要稍后再试
}

Future 的工作原理

Future 采用"轮询"(polling)模型。当你调用 .await 时,运行时会不断地调用 poll 方法:

  1. 如果 poll 返回 Poll::Ready(value),说明计算完成,value 就是被 .await 返回的值
  2. 如果 poll 返回 Poll::Pending,说明计算还未完成。运行时会注册一个唤醒器(Waker),当计算可能完成时,唤醒器会通知运行时再次轮询
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

// 自定义 Future:延迟指定时间后返回
struct Delay {
duration: Duration,
start: Option<Instant>,
}

impl Delay {
fn new(duration: Duration) -> Self {
Delay {
duration,
start: None,
}
}
}

impl Future for Delay {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 第一次 poll 时记录开始时间
if self.start.is_none() {
self.start = Some(Instant::now());

// 使用线程模拟异步等待
// 在实际应用中,应该使用运行时提供的定时器
let waker = cx.waker().clone();
let duration = self.duration;
thread::spawn(move || {
thread::sleep(duration);
waker.wake(); // 通知运行时可以再次 poll
});

return Poll::Pending;
}

// 检查是否已经过了足够的时间
if self.start.unwrap().elapsed() >= self.duration {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

// 注意:在实际应用中,不要在 Future 中使用 thread::sleep
// 这会阻塞整个线程,违背异步编程的初衷
// 应该使用运行时提供的异步定时器

async/await 语法

asyncawait 是 Rust 提供的异步编程语法糖,让异步代码看起来像同步代码一样清晰。

async fn 定义异步函数

// async fn 定义的函数返回一个 Future
async fn say_hello() {
println!("你好");
}

// 等价于:
fn say_hello() -> impl Future<Output = ()> {
async {
println!("你好");
}
}

// async 函数可以有返回值
async fn add(a: i32, b: i32) -> i32 {
a + b
}
// 返回类型:impl Future<Output = i32>

.await 等待 Future 完成

// .await 会暂停当前函数的执行,直到 Future 完成
async fn example() {
let result = add(1, 2).await; // 等待 add 完成
println!("结果: {}", result);
}

// 错误:不能在非 async 函数中使用 .await
// fn not_async() {
// say_hello().await; // 编译错误!
// }

async 块

除了 async fn,还可以使用 async 块创建匿名的 Future:

async fn example() {
// async 块创建一个 Future
let future = async {
println!("执行异步操作");
42
};

// 等待 Future 完成
let result = future.await;
println!("结果: {}", result);
}

异步运行时:Tokio

Rust 标准库只提供了 Future trait,但没有提供执行 Future 的运行时。需要使用第三方运行时,最流行的是 Tokio。

添加 Tokio 依赖

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }

基本 Tokio 程序

use tokio::time::{sleep, Duration};

// 使用 #[tokio::main] 宏让 main 函数变成异步
#[tokio::main]
async fn main() {
println!("开始");

// 异步等待 1 秒(不会阻塞线程)
sleep(Duration::from_secs(1)).await;

println!("1 秒后");

// 调用异步函数
let result = compute().await;
println!("计算结果: {}", result);
}

async fn compute() -> i32 {
// 模拟耗时计算
sleep(Duration::from_millis(100)).await;
42
}

#[tokio::main] 宏展开后

fn main() {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async {
// 原来的 async main 函数体
println!("开始");
sleep(Duration::from_secs(1)).await;
println!("1 秒后");
let result = compute().await;
println!("计算结果: {}", result);
})
}

创建异步任务:tokio::spawn

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
println!("主任务开始");

// spawn 创建一个新的异步任务,立即返回 JoinHandle
let handle = tokio::spawn(async {
println!("子任务开始");
sleep(Duration::from_secs(2)).await;
println!("子任务结束");
"子任务结果"
});

// 主任务继续执行,不等待子任务
println!("主任务继续工作...");
sleep(Duration::from_secs(1)).await;
println!("主任务等待子任务...");

// 等待子任务完成并获取结果
let result = handle.await.unwrap();
println!("收到: {}", result);
}

异步 I/O 操作

use tokio::fs;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
// 异步读取文件
let content = fs::read_to_string("hello.txt").await?;
println!("文件内容: {}", content);

// 异步写入文件
fs::write("output.txt", "Hello, Async!").await?;

// 更精细的文件操作
let mut file = fs::File::create("data.txt").await?;
file.write_all(b"异步写入数据").await?;

Ok(())
}

异步网络编程

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
// 绑定 TCP 监听器
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器启动,监听 127.0.0.1:8080");

loop {
// 异步接受连接
let (mut socket, addr) = listener.accept().await?;
println!("新连接: {}", addr);

// 为每个连接创建一个任务
tokio::spawn(async move {
let mut buf = [0; 1024];

// 异步读取数据
match socket.read(&mut buf).await {
Ok(n) if n > 0 => {
// 回显数据
socket.write_all(&buf[..n]).await.unwrap();
}
_ => {}
}
});
}
}

并发执行多个 Future

join!:等待所有 Future 完成

use tokio::time::{sleep, Duration};

async fn task1() -> i32 {
sleep(Duration::from_millis(100)).await;
println!("任务 1 完成");
1
}

async fn task2() -> i32 {
sleep(Duration::from_millis(200)).await;
println!("任务 2 完成");
2
}

#[tokio::main]
async fn main() {
// join! 并发执行多个 Future,等待全部完成
let (r1, r2) = tokio::join!(task1(), task2());
println!("结果: {}, {}", r1, r2);
}

select!:等待任一 Future 完成

use tokio::time::{sleep, Duration, timeout};

async fn slow_task() -> i32 {
sleep(Duration::from_secs(5)).await;
42
}

#[tokio::main]
async fn main() {
// select! 等待第一个完成的 Future
tokio::select! {
result = slow_task() => {
println!("任务完成: {}", result);
}
_ = sleep(Duration::from_secs(1)) => {
println!("超时了!");
}
}
}

使用 timeout 设置超时

use tokio::time::{timeout, Duration};

#[tokio::main]
async fn main() {
let result = timeout(
Duration::from_secs(2),
async {
// 模拟耗时操作
sleep(Duration::from_secs(5)).await;
"完成"
}
).await;

match result {
Ok(value) => println!("成功: {}", value),
Err(_) => println!("操作超时"),
}
}

异步通道

Tokio 提供了异步版本的通道,适合在异步任务间通信:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
// 创建异步通道(容量为 32)
let (tx, mut rx) = mpsc::channel(32);

// 生产者任务
let producer = tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
println!("发送: {}", i);
}
});

// 消费者
while let Some(msg) = rx.recv().await {
println!("接收: {}", msg);
}

producer.await.unwrap();
}

异步互斥锁

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let mut num = counter.lock().await;
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.await.unwrap();
}

println!("结果: {}", *counter.lock().await);
}

异步编程注意事项

避免在异步代码中使用阻塞操作

// 错误:在异步代码中使用阻塞操作
async fn bad_example() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞整个线程!
}

// 正确:使用异步版本
async fn good_example() {
tokio::time::sleep(Duration::from_secs(1)).await; // 不阻塞线程
}

使用 tokio::task::spawn_blocking 处理阻塞操作

#[tokio::main]
async fn main() {
// 对于无法避免的阻塞操作,使用 spawn_blocking
let result = tokio::task::spawn_blocking(|| {
// 这里可以安全地执行阻塞操作
std::thread::sleep(Duration::from_secs(1));
"阻塞操作完成"
}).await.unwrap();

println!("{}", result);
}

异步 trait

在 Rust 1.75 之前,直接在 trait 中使用 async fn 需要使用 async_trait 宏:

// Rust 1.75+ 原生支持 async fn in trait
trait AsyncProcessor {
async fn process(&self, data: &str) -> i32;
}

struct MyProcessor;

impl AsyncProcessor for MyProcessor {
async fn process(&self, data: &str) -> i32 {
// 异步处理
tokio::time::sleep(Duration::from_millis(100)).await;
data.len() as i32
}
}

// 对于旧版本 Rust,使用 async_trait 宏
// #[async_trait]
// trait AsyncProcessor {
// async fn process(&self, data: &str) -> i32;
// }

异步闭包(Rust 2024 新特性)

Rust 2024 Edition 引入了异步闭包(async closures),这是异步编程领域的重要增强。异步闭包允许你定义返回 Future 的闭包,并且能够正确地从环境中捕获值。

为什么需要异步闭包?

在 Rust 2024 之前,如果你想要一个返回 Future 的闭包,通常需要使用 || async {} 的形式,即一个普通闭包返回一个 async 块。但这种方式有一个关键限制:内部 async 块返回的 Future 无法从闭包的捕获中借用值。

// Rust 2024 之前的方式
let mut vec = vec![1, 2, 3];
let closure = || {
// 问题:这个 async 块无法从闭包捕获中借用 vec
async {
vec.push(4); // 编译错误!
}
};

异步闭包解决了这个问题,它允许 Future 正确地从闭包捕获中借用数据。

基本语法

// 异步闭包的基本语法
let async_closure = async |x: i32| {
// 异步操作
tokio::time::sleep(Duration::from_millis(100)).await;
x * 2
};

// 调用异步闭包(返回一个 Future)
let result = async_closure(5).await;
println!("结果: {}", result); // 结果: 10

捕获环境的异步闭包

#[tokio::main]
async fn main() {
let mut data = vec![1, 2, 3];

// 异步闭包可以可变地借用捕获的数据
let mut push_value = async |value: i32| {
// Future 可以借用 data
data.push(value);
println!("添加了: {}", value);
};

// 调用异步闭包
push_value(4).await;
push_value(5).await;

println!("最终数据: {:?}", data); // [1, 2, 3, 4, 5]
}

与普通闭包返回 async 块的区别

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
let mut counter = 0;

// 方式一:普通闭包返回 async 块
// 问题:async 块不能从闭包捕获中借用 counter
let regular_closure = || {
// counter += 1; // 这会导致借用检查错误
async {
sleep(Duration::from_millis(10)).await;
42
}
};

// 方式二:异步闭包(Rust 2024+)
// 优势:可以从闭包捕获中借用数据
let async_closure = async || {
counter += 1; // 这在异步闭包中是允许的!
sleep(Duration::from_millis(10)).await;
counter
};

let result1 = async_closure.await;
let result2 = async_closure.await;

println!("结果: {}, {}", result1, result2); // 结果: 1, 2
}

AsyncFn Trait 系列

FnFnMutFnOnce trait 对应,异步闭包有三个对应的 trait:

Trait说明对应关系
AsyncFn不可变借用捕获的值对应 Fn
AsyncFnMut可变借用捕获的值对应 FnMut
AsyncFnOnce消耗捕获的值对应 FnOnce

这三个 trait 已在 Rust 2024 的 prelude 中,无需显式导入:

// Rust 2024+: 使用 AsyncFn trait
async fn process_with_async_fn<F>(callback: F) -> i32
where
F: AsyncFn(i32) -> i32,
{
callback(10).await
}

#[tokio::main]
async fn main() {
let multiplier = 2;
let async_closure = async |x: i32| x * multiplier;

let result = process_with_async_fn(async_closure).await;
println!("结果: {}", result); // 结果: 20
}

高阶函数中的应用

异步闭包特别适合用于高阶函数场景,可以编写更简洁的异步代码:

use tokio::time::{sleep, Duration};

// 高阶函数:接受异步闭包并重复执行
async fn repeat_async<F>(times: usize, mut f: F) -> Vec<i32>
where
F: AsyncFnMut() -> i32,
{
let mut results = Vec::new();
for _ in 0..times {
results.push(f().await);
}
results
}

#[tokio::main]
async fn main() {
let mut counter = 0;

// 传入异步闭包
let results = repeat_async(5, async || {
counter += 1;
sleep(Duration::from_millis(10)).await;
counter * 2
}).await;

println!("结果: {:?}", results); // 结果: [2, 4, 6, 8, 10]
}

异步闭包与 move

与普通闭包类似,可以使用 move 关键字将所有权转移到异步闭包中:

#[tokio::main]
async fn main() {
let data = vec![1, 2, 3, 4, 5];

// 使用 move 将 data 的所有权转移到闭包中
let process = async move || {
let sum: i32 = data.iter().sum();
sleep(Duration::from_millis(10)).await;
sum
};

// data 已被移动,不能再使用
// println!("{:?}", data); // 编译错误

let result = process.await;
println!("总和: {}", result); // 总和: 15
}
Rust 版本要求

异步闭包是 Rust 2024 Edition 引入的特性,需要使用 Rust 1.85+ 并在 Cargo.toml 中设置 edition = "2024"

并发模式

生产者-消费者模式

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

// 生产者
let producer = thread::spawn(move || {
for i in 0..10 {
tx.send(i).unwrap();
println!("生产: {}", i);
thread::sleep(Duration::from_millis(50));
}
});

// 消费者
let consumer = thread::spawn(move || {
for msg in rx {
println!("消费: {}", msg);
thread::sleep(Duration::from_millis(100));
}
});

producer.join().unwrap();
consumer.join().unwrap();
}

线程池模式

use std::sync::{Arc, Mutex};
use std::thread;

struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
fn new(size: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> ThreadPool {
let mut workers = Vec::with_capacity(size);

for _ in 0..size {
let receiver = Arc::clone(&receiver);
let worker = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => job(),
Err(_) => break,
}
});
workers.push(worker);
}

ThreadPool { workers }
}
}

fn main() {
let (tx, rx) = std::sync::mpsc::channel();
let rx = Arc::new(Mutex::new(rx));

let _pool = ThreadPool::new(4, rx);

for i in 0..10 {
let tx = tx.clone();
tx.send(Box::new(move || {
println!("任务 {} 正在执行", i);
})).unwrap();
}

// 等待任务完成
thread::sleep(Duration::from_millis(100));
}

小结

本章我们学习了:

  1. 线程:创建、join、move 闭包
  2. 消息传递:channel、多生产者、try_recv
  3. 共享状态:Mutex、RwLock、Arc
  4. Send 和 Sync:线程安全 trait
  5. 原子类型:AtomicI32、Ordering
  6. Scoped Threads:安全引用外部数据
  7. 异步编程基础
    • Future trait 和轮询模型
    • async/await 语法
    • Tokio 运行时的使用
    • 异步任务创建和管理
    • 并发执行多个 Future(join!、select!)
    • 异步 I/O 和网络编程
    • 异步通道和互斥锁
  8. 并发模式:生产者-消费者、线程池

练习

  1. 创建 5 个线程,每个线程打印自己的线程 ID
  2. 使用 channel 实现一个简单的任务队列
  3. 使用 Mutex 和 Arc 实现一个线程安全的计数器
  4. 使用 scope 实现并行数据处理
  5. 使用 Tokio 编写一个异步 HTTP 客户端,并发请求多个 URL
  6. 实现一个异步的 Echo 服务器,使用 select! 处理连接超时
  7. 使用 tokio::sync::mpsc 实现一个异步的生产者-消费者系统

参考资料