并发编程
Rust 的并发模型基于"无畏并发"(Fearless Concurrency)理念。编译器在编译时检查并发问题,避免数据竞争和其他常见并发错误。
线程
创建线程
use std::thread;
use std::time::Duration;
fn main() {
// 创建线程
thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
// 主线程
for i in 1..5 {
println!("主线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
// 注意:主线程结束时会终止整个程序
// 子线程可能还没执行完
}
join 等待线程
use std::thread;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
}
});
// 等待子线程结束
handle.join().unwrap();
println!("子线程已结束");
}
move 闭包
use std::thread;
fn main() {
let v = vec![1, 2, 3, 4, 5];
// 使用 move 将所有权转移到线程
let handle = thread::spawn(move || {
println!("向量: {:?}", v);
});
// v 已被移动,不能在主线程使用
// println!("{:?}", v); // 错误!
handle.join().unwrap();
}
线程返回值
use std::thread;
fn main() {
let handle = thread::spawn(|| {
let result = 1 + 2;
result
});
let result = handle.join().unwrap();
println!("结果: {}", result);
}
消息传递
Channel 基本用法
use std::sync::mpsc; // multiple producer, single consumer
use std::thread;
fn main() {
// 创建通道
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("你好");
tx.send(val).unwrap();
});
// 接收消息(阻塞)
let received = rx.recv().unwrap();
println!("收到: {}", received);
}
发送多个消息
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("你好"),
String::from("世界"),
String::from("Rust"),
String::from("并发"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// 将 rx 当作迭代器
for received in rx {
println!("收到: {}", received);
}
}
多个生产者
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 克隆发送端
let tx1 = tx.clone();
let tx2 = tx.clone();
// 原始 tx 被移动,所以先克隆
drop(tx); // 释放原始 tx
thread::spawn(move || {
let vals = vec![
String::from("线程1: 你好"),
String::from("线程1: 世界"),
];
for val in vals {
tx1.send(val).unwrap();
}
});
thread::spawn(move || {
let vals = vec![
String::from("线程2: Rust"),
String::from("线程2: 并发"),
];
for val in vals {
tx2.send(val).unwrap();
}
});
for received in rx {
println!("{}", received);
}
}
try_recv 非阻塞接收
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx.send(String::from("延迟消息")).unwrap();
});
// 非阻塞检查
loop {
match rx.try_recv() {
Ok(msg) => {
println!("收到: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("暂无消息,继续工作...");
thread::sleep(Duration::from_millis(100));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("发送端已关闭");
break;
}
}
}
}
共享状态
Mutex 互斥锁
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
// 获取锁
let mut num = m.lock().unwrap();
*num = 6;
// 离开作用域自动释放锁
}
println!("m = {:?}", m);
}
多线程共享 Mutex
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc 用于多线程共享所有权
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", *counter.lock().unwrap());
}
Mutex 和结构体
use std::sync::{Arc, Mutex};
use std::thread;
#[derive(Debug)]
struct BankAccount {
balance: f64,
}
impl BankAccount {
fn new(initial: f64) -> Self {
BankAccount { balance: initial }
}
fn deposit(&mut self, amount: f64) {
self.balance += amount;
}
fn withdraw(&mut self, amount: f64) -> bool {
if self.balance >= amount {
self.balance -= amount;
true
} else {
false
}
}
}
fn main() {
let account = Arc::new(Mutex::new(BankAccount::new(100.0)));
let mut handles = vec![];
for i in 0..5 {
let account = Arc::clone(&account);
let handle = thread::spawn(move || {
let mut acc = account.lock().unwrap();
acc.deposit(10.0);
println!("线程 {} 存入后余额: {}", i, acc.balance);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终余额: {}", account.lock().unwrap().balance);
}
RwLock 读写锁
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// 多个读者
for i in 0..3 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let r = data.read().unwrap();
println!("读者 {}: {:?}", i, *r);
});
handles.push(handle);
}
// 一个写者
{
let data = Arc::clone(&data);
let mut w = data.write().unwrap();
w.push(4);
println!("写者添加: {:?}", *w);
}
for handle in handles {
handle.join().unwrap();
}
}
Send 和 Sync Trait
Send Trait
Send trait 表示类型可以在线程间安全转移所有权:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
// Vec<i32> 实现了 Send
thread::spawn(move || {
println!("{:?}", v);
});
// Rc<T> 没有实现 Send,不能在线程间传递
// use std::rc::Rc;
// let rc = Rc::new(5);
// thread::spawn(move || { // 编译错误!
// println!("{}", rc);
// });
}
Sync Trait
Sync trait 表示类型可以在线程间安全共享引用:
use std::sync::Arc;
use std::thread;
fn main() {
// Mutex<T> 实现了 Sync
let data = Arc::new(std::sync::Mutex::new(0));
// 可以安全地共享引用
let data_clone = Arc::clone(&data);
thread::spawn(move || {
let mut lock = data_clone.lock().unwrap();
*lock += 1;
});
}
原子类型
Atomic 类型
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicI32::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 原子增加
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", counter.load(Ordering::SeqCst));
}
Ordering 内存顺序
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
let ready = AtomicBool::new(false);
let data = Arc::new(vec![1, 2, 3]);
let data_clone = Arc::clone(&data);
let ready_ref = &ready; // AtomicBool 实现了 Sync
thread::scope(|s| {
s.spawn(|| {
// 修改数据
let mut d = (*data_clone).clone();
d.push(4);
println!("数据已准备好");
// 释放:确保之前的写操作可见
ready.store(true, Ordering::Release);
});
s.spawn(|| {
// 获取:确保看到之前的数据
while !ready_ref.load(Ordering::Acquire) {
std::hint::spin_loop();
}
println!("读取数据: {:?}", *data);
});
});
}
Scoped Threads
使用 scope
use std::thread;
fn main() {
let mut v = vec![1, 2, 3, 4, 5, 6, 7, 8];
// 使用 scope 允许引用外部数据
thread::scope(|s| {
s.spawn(|| {
// 可以引用 v,因为 scope 保证 v 在所有线程结束前有效
v[0] *= 2;
});
s.spawn(|| {
v[1] *= 2;
});
});
println!("结果: {:?}", v);
}
并行处理
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut results = vec![0; data.len()];
thread::scope(|s| {
// 将数据分成多个块并行处理
let chunk_size = data.len() / 4;
for i in 0..4 {
let start = i * chunk_size;
let end = if i == 3 { data.len() } else { (i + 1) * chunk_size };
s.spawn(move || {
for j in start..end {
results[j] = data[j] * data[j]; // 平方
}
});
}
});
println!("结果: {:?}", results);
}
异步编程基础
async/await
// 需要异步运行时(如 tokio)
// 这是一个概念示例
async fn say_hello() {
println!("你好");
}
async fn say_world() {
println!("世界");
}
// async fn main() {
// say_hello().await;
// say_world().await;
// }
Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// 自定义 Future
struct Countdown {
count: u32,
}
impl Future for Countdown {
type Output = String;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready(String::from("倒计时结束!"))
} else {
self.count -= 1;
println!("倒计时: {}", self.count);
Poll::Pending
}
}
}
并发模式
生产者-消费者模式
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 生产者
let producer = thread::spawn(move || {
for i in 0..10 {
tx.send(i).unwrap();
println!("生产: {}", i);
thread::sleep(Duration::from_millis(50));
}
});
// 消费者
let consumer = thread::spawn(move || {
for msg in rx {
println!("消费: {}", msg);
thread::sleep(Duration::from_millis(100));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
线程池模式
use std::sync::{Arc, Mutex};
use std::thread;
struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
fn new(size: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> ThreadPool {
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let receiver = Arc::clone(&receiver);
let worker = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => job(),
Err(_) => break,
}
});
workers.push(worker);
}
ThreadPool { workers }
}
}
fn main() {
let (tx, rx) = std::sync::mpsc::channel();
let rx = Arc::new(Mutex::new(rx));
let _pool = ThreadPool::new(4, rx);
for i in 0..10 {
let tx = tx.clone();
tx.send(Box::new(move || {
println!("任务 {} 正在执行", i);
})).unwrap();
}
// 等待任务完成
thread::sleep(Duration::from_millis(100));
}
小结
本章我们学习了:
- 线程:创建、join、move 闭包
- 消息传递:channel、多生产者、try_recv
- 共享状态:Mutex、RwLock、Arc
- Send 和 Sync:线程安全 trait
- 原子类型:AtomicI32、Ordering
- Scoped Threads:安全引用外部数据
- 并发模式:生产者-消费者、线程池
练习
- 创建 5 个线程,每个线程打印自己的线程 ID
- 使用 channel 实现一个简单的任务队列
- 使用 Mutex 和 Arc 实现一个线程安全的计数器
- 使用 scope 实现并行数据处理