C++ 多线程编程
C++11 引入了标准线程库,使得多线程编程成为 C++ 标准的一部分。本章将详细介绍线程创建、同步机制和并发工具。
线程基础
创建线程
#include <thread>
#include <iostream>
void hello() {
std::cout << "Hello from thread!" << std::endl;
}
void greet(const std::string& name) {
std::cout << "Hello, " << name << "!" << std::endl;
}
int main() {
// 创建线程:传入函数
std::thread t1(hello);
// 创建线程:传入参数
std::thread t2(greet, "World");
// 等待线程结束
t1.join();
t2.join();
std::cout << "Main thread done!" << std::endl;
return 0;
}
线程函数
线程函数可以是普通函数、lambda 或成员函数:
#include <thread>
#include <iostream>
// 普通函数
void func1() { }
// Lambda
auto func2 = []() { };
// 仿函数(函数对象)
class Task {
public:
void operator()() { }
};
Task task;
// 成员函数
class Data {
public:
void process() { }
};
int main() {
std::thread t1(func1);
std::thread t2(func2);
std::thread t3(task);
Data d;
std::thread t4(&Data::process, &d); // 需要传入对象指针
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
线程管理
#include <thread>
#include <iostream>
int main() {
// 获取线程 ID
std::cout << "Main thread ID: "
<< std::this_thread::get_id() << std::endl;
std::thread t([]() {
std::cout << "New thread ID: "
<< std::this_thread::get_id() << std::endl;
});
// 检查线程是否可 joinable
if (t.joinable()) {
std::cout << "Thread is joinable" << std::endl;
}
// join:等待线程结束
t.join();
// detach:分离线程(后台运行)
// std::thread t2([](){ ... });
// t2.detach(); // 注意:detach 后无法 join
// 分离后,线程在后台运行,由运行时库管理
// 适用于不需要等待完成的长时间运行任务
return 0;
}
互斥锁
std::mutex
#include <mutex>
#include <iostream>
int main() {
std::mutex mtx;
// lock() 和 unlock()
mtx.lock();
// 临界区代码
mtx.unlock();
// 推荐:使用 lock_guard 自动管理
{
std::lock_guard<std::mutex> lock(mtx);
// 临界区代码
} // 自动 unlock
return 0;
}
std::lock_guard
#include <mutex>
#include <iostream>
class Counter {
private:
int count = 0;
mutable std::mutex mtx; // mutable 允许在 const 函数中修改
public:
void increment() {
std::lock_guard<std::mutex> lock(mtx);
count++;
}
int get() const {
std::lock_guard<std::mutex> lock(mtx);
return count;
}
};
int main() {
Counter c;
c.increment();
c.increment();
std::cout << c.get() << std::endl; // 2
return 0;
}
std::unique_lock
unique_lock 比 lock_guard 更灵活:
#include <mutex>
#include <thread>
int main() {
std::mutex mtx;
std::unique_lock<std::mutex> lock1(mtx, std::defer_lock);
// defer_lock:不获取锁
// 可以在需要时手动加锁
lock1.lock();
// 临界区
lock1.unlock();
// 可以多次加锁/解锁
lock1.lock();
lock1.unlock();
// 析构函数自动释放
// 如果锁仍被持有,会自动 unlock
return 0;
}
死锁避免
#include <mutex>
#include <thread>
#include <iostream>
// 死锁示例
void deadLockExample() {
std::mutex m1, m2;
// 线程1:先锁 m1,再锁 m2
std::thread t1([&]() {
std::lock_guard<std::mutex> lock1(m1);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lock2(m2);
std::cout << "Thread 1" << std::endl;
});
// 线程2:先锁 m2,再锁 m1(与线程1相反)
std::thread t2([&]() {
std::lock_guard<std::mutex> lock1(m2);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lock2(m1);
std::cout << "Thread 2" << std::endl;
});
t1.join();
t2.join();
}
// 解决方案1:使用 std::lock 一次性获取多个锁
void solveWithLock() {
std::mutex m1, m2;
std::thread t1([&]() {
std::lock(m1, m2); // 一次性锁住两个
// 注意:std::lock 不支持 lock_guard,需要用 scoped_lock
std::scoped_lock lock(m1, m2); // C++17
std::cout << "Thread 1" << std::endl;
});
std::thread t2([&]() {
std::lock(m1, m2);
std::scoped_lock lock(m1, m2);
std::cout << "Thread 2" << std::endl;
});
t1.join();
t2.join();
}
int main() {
solveWithLock();
return 0;
}
std::scoped_lock(C++17)
#include <mutex>
#include <thread>
int main() {
std::mutex m1, m2, m3;
// 同时锁住多个互斥量,避免死锁
std::scoped_lock lock(m1, m2, m3);
// 临界区代码
return 0;
}
条件变量
std::condition_variable
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <iostream>
std::queue<int> q;
std::mutex mtx;
std::condition_variable cv;
// 生产者
void producer() {
for (int i = 0; i < 5; i++) {
std::unique_lock<std::mutex> lock(mtx);
q.push(i);
std::cout << "Produced: " << i << std::endl;
cv.notify_one(); // 通知等待的消费者
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// 消费者
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 等待条件:队列不为空
cv.wait(lock, []() { return !q.empty(); });
int val = q.front();
q.pop();
std::cout << "Consumed: " << val << std::endl;
if (val == 4) break;
}
}
int main() {
std::thread prod(producer);
std::thread cons(consumer);
prod.join();
cons.join();
return 0;
}
wait 的变体
// 带超时的 wait
std::condition_variable cv;
bool ready = false;
void waitWithTimeout() {
std::unique_lock<std::mutex> lock(mtx);
// 等待最多 1 秒
if (cv.wait_for(lock, std::chrono::seconds(1), [](){ return ready; })) {
std::cout << "条件满足" << std::endl;
} else {
std::cout << "超时" << std::endl;
}
}
// 等待直到特定时间
cv.wait_until(lock, std::chrono::system_clock::now() + std::chrono::seconds(10));
原子操作
std::atomic
#include <atomic>
#include <thread>
#include <iostream>
// 原子整数
std::atomic<int> counter(0);
// 原子加载和存储
void increment() {
for (int i = 0; i < 1000; i++) {
counter.fetch_add(1); // 原子加法,返回旧值
// 或 counter++; // 简写形式
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final value: " << counter.load() << std::endl;
return 0;
}
原子操作类型
#include <atomic>
int main() {
// 常用的原子类型
std::atomic<int> ai;
std::atomic<long> al;
std::atomic<bool> ab;
// 常用操作
ai.store(10); // 存储
int val = ai.load(); // 加载
ai.fetch_add(5); // 原子加法
ai.fetch_sub(3); // 原子减法
ai.fetch_and(0xFF); // 原子与
ai.fetch_or(0xFF); // 原子或
// 交换
int old = ai.exchange(100); // 原子交换,返回旧值
// 比较交换 (CAS)
int expected = 100;
int desired = 200;
bool success = ai.compare_exchange_strong(expected, desired);
// 如果 ai == expected,则 ai = desired,返回 true
// 否则 expected = ai,返回 false
return 0;
}
内存顺序
#include <atomic>
std::atomic<int> a(0);
int b = 0;
void thread1() {
a.store(1, std::memory_order_relaxed);
b = 2; // 可能被重排序
}
void thread2() {
// 在较宽松的内存顺序下,可能看到不同的顺序
if (b.load(std::memory_order_relaxed) == 2) {
std::cout << a.load(std::memory_order_relaxed) << std::endl;
}
}
内存顺序选项:
| 顺序 | 说明 |
|---|---|
memory_order_relaxed | 最宽松,只保证原子性 |
memory_order_acquire | 读取操作,后续操作不能重排到前面 |
memory_order_release | 写入操作,之前操作不能重排到后面 |
memory_order_seq_cst | 顺序一致(默认,最安全) |
异步操作
std::future 和 std::promise
#include <future>
#include <thread>
#include <iostream>
int calculate(int x) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return x * x;
}
int main() {
// 创建 promise 和 future
std::promise<int> prom;
std::future<int> fut = prom.get_future();
// 在线程中设置值
std::thread t([](std::promise<int>& p) {
p.set_value(42);
}, std::ref(prom));
// 在主线程中获取值
std::cout << "Result: " << fut.get() << std::endl;
t.join();
return 0;
}
std::async
#include <future>
#include <iostream>
int heavyCompute(int n) {
// 模拟耗时计算
return n * n;
}
int main() {
// 启动异步任务
std::future<int> fut1 = std::async(std::launch::async, heavyCompute, 10);
std::future<int> fut2 = std::async(std::launch::async, heavyCompute, 20);
// 获取结果(会阻塞等待)
std::cout << fut1.get() << std::endl; // 100
std::cout << fut2.get() << std::endl; // 400
return 0;
}
std::packaged_task
#include <future>
#include <functional>
#include <thread>
#include <iostream>
int main() {
// 包装任务
std::packaged_task<int(int, int)> task(
[](int a, int b) { return a + b; }
);
std::future<int> result = task.get_future();
// 在线程中执行
std::thread t(std::move(task), 10, 20);
// 获取结果
std::cout << result.get() << std::endl; // 30
t.join();
return 0;
}
线程池
简单线程池实现
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
class ThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop = false;
public:
ThreadPool(size_t threads) {
for (size_t i = 0; i < threads; i++) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<typename F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop) throw std::runtime_error("Pool stopped");
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers) {
worker.join();
}
}
};
int main() {
ThreadPool pool(4);
for (int i = 0; i < 8; i++) {
pool.enqueue([i]() {
std::cout << "Task " << i << std::endl;
});
}
return 0;
}
线程局部存储
thread_local
#include <thread>
#include <iostream>
thread_local int threadLocalValue = 0; // 每个线程独立的变量
void worker(int id) {
threadLocalValue = id;
std::cout << "Thread " << id << ": value = " << threadLocalValue << std::endl;
threadLocalValue = id * 10;
std::cout << "Thread " << id << ": value = " << threadLocalValue << std::endl;
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
t1.join();
t2.join();
return 0;
}
并发编程最佳实践
1. 避免全局状态
// 不好:全局变量需要加锁
std::mutex globalMutex;
int globalValue = 0;
// 好:使用局部变量或传入参数
void process(std::mutex& mtx, int& localValue) {
std::lock_guard<std::mutex> lock(mtx);
localValue++;
}
2. 最小化锁的持有时间
// 不好:锁持有时间长
std::lock_guard<std::mutex> lock(mtx);
processData();
formatOutput();
sendToNetwork();
// 好:只锁必要部分
{
std::lock_guard<std::mutex> lock(mtx);
data = prepareData();
}
processData(data);
formatOutput();
sendToNetwork();
3. 使用 RAII 管理锁
// 使用 lock_guard 或 scoped_lock
{
std::lock_guard<std::mutex> lock(mtx);
// 临界区
}
4. 避免死锁
// 总是按相同顺序获取锁
// 使用 std::lock 同时获取多个锁
练习
- 实现一个生产者-消费者模型
- 实现一个简单的线程池
- 使用原子操作实现无锁队列
- 实现读者-写者问题
小结
本章学习了:
- 线程创建:std::thread、线程函数
- 互斥锁:mutex、lock_guard、unique_lock、scoped_lock
- 条件变量:std::condition_variable
- 原子操作:std::atomic、内存顺序
- 异步操作:future、promise、async
- 线程池:基本实现
- 线程局部存储:thread_local
下一步
接下来让我们继续学习 C++ 的其他高级主题,或者查看 C++ 知识速查表 快速回顾。