跳到主要内容

并发编程

Rust 的并发模型基于"无畏并发"(Fearless Concurrency)理念。编译器在编译时检查并发问题,避免数据竞争和其他常见并发错误。

线程

创建线程

use std::thread;
use std::time::Duration;

fn main() {
// 创建线程
thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
});

// 主线程
for i in 1..5 {
println!("主线程: {}", i);
thread::sleep(Duration::from_millis(1));
}

// 注意:主线程结束时会终止整个程序
// 子线程可能还没执行完
}

join 等待线程

use std::thread;

fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
}
});

// 等待子线程结束
handle.join().unwrap();

println!("子线程已结束");
}

move 闭包

use std::thread;

fn main() {
let v = vec![1, 2, 3, 4, 5];

// 使用 move 将所有权转移到线程
let handle = thread::spawn(move || {
println!("向量: {:?}", v);
});

// v 已被移动,不能在主线程使用
// println!("{:?}", v); // 错误!

handle.join().unwrap();
}

线程返回值

use std::thread;

fn main() {
let handle = thread::spawn(|| {
let result = 1 + 2;
result
});

let result = handle.join().unwrap();
println!("结果: {}", result);
}

消息传递

Channel 基本用法

use std::sync::mpsc;  // multiple producer, single consumer
use std::thread;

fn main() {
// 创建通道
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("你好");
tx.send(val).unwrap();
});

// 接收消息(阻塞)
let received = rx.recv().unwrap();
println!("收到: {}", received);
}

发送多个消息

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("你好"),
String::from("世界"),
String::from("Rust"),
String::from("并发"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});

// 将 rx 当作迭代器
for received in rx {
println!("收到: {}", received);
}
}

多个生产者

use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

// 克隆发送端
let tx1 = tx.clone();
let tx2 = tx.clone();

// 原始 tx 被移动,所以先克隆
drop(tx); // 释放原始 tx

thread::spawn(move || {
let vals = vec![
String::from("线程1: 你好"),
String::from("线程1: 世界"),
];

for val in vals {
tx1.send(val).unwrap();
}
});

thread::spawn(move || {
let vals = vec![
String::from("线程2: Rust"),
String::from("线程2: 并发"),
];

for val in vals {
tx2.send(val).unwrap();
}
});

for received in rx {
println!("{}", received);
}
}

try_recv 非阻塞接收

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx.send(String::from("延迟消息")).unwrap();
});

// 非阻塞检查
loop {
match rx.try_recv() {
Ok(msg) => {
println!("收到: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("暂无消息,继续工作...");
thread::sleep(Duration::from_millis(100));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("发送端已关闭");
break;
}
}
}
}

共享状态

Mutex 互斥锁

use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);

{
// 获取锁
let mut num = m.lock().unwrap();
*num = 6;
// 离开作用域自动释放锁
}

println!("m = {:?}", m);
}

多线程共享 Mutex

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
// Arc 用于多线程共享所有权
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("结果: {}", *counter.lock().unwrap());
}

Mutex 和结构体

use std::sync::{Arc, Mutex};
use std::thread;

#[derive(Debug)]
struct BankAccount {
balance: f64,
}

impl BankAccount {
fn new(initial: f64) -> Self {
BankAccount { balance: initial }
}

fn deposit(&mut self, amount: f64) {
self.balance += amount;
}

fn withdraw(&mut self, amount: f64) -> bool {
if self.balance >= amount {
self.balance -= amount;
true
} else {
false
}
}
}

fn main() {
let account = Arc::new(Mutex::new(BankAccount::new(100.0)));
let mut handles = vec![];

for i in 0..5 {
let account = Arc::clone(&account);
let handle = thread::spawn(move || {
let mut acc = account.lock().unwrap();
acc.deposit(10.0);
println!("线程 {} 存入后余额: {}", i, acc.balance);
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("最终余额: {}", account.lock().unwrap().balance);
}

RwLock 读写锁

use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];

// 多个读者
for i in 0..3 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let r = data.read().unwrap();
println!("读者 {}: {:?}", i, *r);
});
handles.push(handle);
}

// 一个写者
{
let data = Arc::clone(&data);
let mut w = data.write().unwrap();
w.push(4);
println!("写者添加: {:?}", *w);
}

for handle in handles {
handle.join().unwrap();
}
}

Send 和 Sync Trait

Send Trait

Send trait 表示类型可以在线程间安全转移所有权:

use std::thread;

fn main() {
let v = vec![1, 2, 3];

// Vec<i32> 实现了 Send
thread::spawn(move || {
println!("{:?}", v);
});

// Rc<T> 没有实现 Send,不能在线程间传递
// use std::rc::Rc;
// let rc = Rc::new(5);
// thread::spawn(move || { // 编译错误!
// println!("{}", rc);
// });
}

Sync Trait

Sync trait 表示类型可以在线程间安全共享引用:

use std::sync::Arc;
use std::thread;

fn main() {
// Mutex<T> 实现了 Sync
let data = Arc::new(std::sync::Mutex::new(0));

// 可以安全地共享引用
let data_clone = Arc::clone(&data);
thread::spawn(move || {
let mut lock = data_clone.lock().unwrap();
*lock += 1;
});
}

原子类型

Atomic 类型

use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
let counter = Arc::new(AtomicI32::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 原子增加
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("结果: {}", counter.load(Ordering::SeqCst));
}

Ordering 内存顺序

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
let ready = AtomicBool::new(false);
let data = Arc::new(vec![1, 2, 3]);

let data_clone = Arc::clone(&data);
let ready_ref = &ready; // AtomicBool 实现了 Sync

thread::scope(|s| {
s.spawn(|| {
// 修改数据
let mut d = (*data_clone).clone();
d.push(4);
println!("数据已准备好");

// 释放:确保之前的写操作可见
ready.store(true, Ordering::Release);
});

s.spawn(|| {
// 获取:确保看到之前的数据
while !ready_ref.load(Ordering::Acquire) {
std::hint::spin_loop();
}
println!("读取数据: {:?}", *data);
});
});
}

Scoped Threads

使用 scope

use std::thread;

fn main() {
let mut v = vec![1, 2, 3, 4, 5, 6, 7, 8];

// 使用 scope 允许引用外部数据
thread::scope(|s| {
s.spawn(|| {
// 可以引用 v,因为 scope 保证 v 在所有线程结束前有效
v[0] *= 2;
});

s.spawn(|| {
v[1] *= 2;
});
});

println!("结果: {:?}", v);
}

并行处理

use std::thread;

fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut results = vec![0; data.len()];

thread::scope(|s| {
// 将数据分成多个块并行处理
let chunk_size = data.len() / 4;

for i in 0..4 {
let start = i * chunk_size;
let end = if i == 3 { data.len() } else { (i + 1) * chunk_size };

s.spawn(move || {
for j in start..end {
results[j] = data[j] * data[j]; // 平方
}
});
}
});

println!("结果: {:?}", results);
}

异步编程基础

async/await

// 需要异步运行时(如 tokio)
// 这是一个概念示例

async fn say_hello() {
println!("你好");
}

async fn say_world() {
println!("世界");
}

// async fn main() {
// say_hello().await;
// say_world().await;
// }

Future

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// 自定义 Future
struct Countdown {
count: u32,
}

impl Future for Countdown {
type Output = String;

fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready(String::from("倒计时结束!"))
} else {
self.count -= 1;
println!("倒计时: {}", self.count);
Poll::Pending
}
}
}

并发模式

生产者-消费者模式

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

// 生产者
let producer = thread::spawn(move || {
for i in 0..10 {
tx.send(i).unwrap();
println!("生产: {}", i);
thread::sleep(Duration::from_millis(50));
}
});

// 消费者
let consumer = thread::spawn(move || {
for msg in rx {
println!("消费: {}", msg);
thread::sleep(Duration::from_millis(100));
}
});

producer.join().unwrap();
consumer.join().unwrap();
}

线程池模式

use std::sync::{Arc, Mutex};
use std::thread;

struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
fn new(size: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> ThreadPool {
let mut workers = Vec::with_capacity(size);

for _ in 0..size {
let receiver = Arc::clone(&receiver);
let worker = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => job(),
Err(_) => break,
}
});
workers.push(worker);
}

ThreadPool { workers }
}
}

fn main() {
let (tx, rx) = std::sync::mpsc::channel();
let rx = Arc::new(Mutex::new(rx));

let _pool = ThreadPool::new(4, rx);

for i in 0..10 {
let tx = tx.clone();
tx.send(Box::new(move || {
println!("任务 {} 正在执行", i);
})).unwrap();
}

// 等待任务完成
thread::sleep(Duration::from_millis(100));
}

小结

本章我们学习了:

  1. 线程:创建、join、move 闭包
  2. 消息传递:channel、多生产者、try_recv
  3. 共享状态:Mutex、RwLock、Arc
  4. Send 和 Sync:线程安全 trait
  5. 原子类型:AtomicI32、Ordering
  6. Scoped Threads:安全引用外部数据
  7. 并发模式:生产者-消费者、线程池

练习

  1. 创建 5 个线程,每个线程打印自己的线程 ID
  2. 使用 channel 实现一个简单的任务队列
  3. 使用 Mutex 和 Arc 实现一个线程安全的计数器
  4. 使用 scope 实现并行数据处理