跳到主要内容

C++ 多线程编程

C++11 引入了标准线程库,使得多线程编程成为 C++ 标准的一部分。本章将详细介绍线程创建、同步机制和并发工具。

线程基础

创建线程

#include <thread>
#include <iostream>

void hello() {
std::cout << "Hello from thread!" << std::endl;
}

void greet(const std::string& name) {
std::cout << "Hello, " << name << "!" << std::endl;
}

int main() {
// 创建线程:传入函数
std::thread t1(hello);

// 创建线程:传入参数
std::thread t2(greet, "World");

// 等待线程结束
t1.join();
t2.join();

std::cout << "Main thread done!" << std::endl;

return 0;
}

线程函数

线程函数可以是普通函数、lambda 或成员函数:

#include <thread>
#include <iostream>

// 普通函数
void func1() { }

// Lambda
auto func2 = []() { };

// 仿函数(函数对象)
class Task {
public:
void operator()() { }
};
Task task;

// 成员函数
class Data {
public:
void process() { }
};

int main() {
std::thread t1(func1);
std::thread t2(func2);
std::thread t3(task);

Data d;
std::thread t4(&Data::process, &d); // 需要传入对象指针

t1.join();
t2.join();
t3.join();
t4.join();

return 0;
}

线程管理

#include <thread>
#include <iostream>

int main() {
// 获取线程 ID
std::cout << "Main thread ID: "
<< std::this_thread::get_id() << std::endl;

std::thread t([]() {
std::cout << "New thread ID: "
<< std::this_thread::get_id() << std::endl;
});

// 检查线程是否可 joinable
if (t.joinable()) {
std::cout << "Thread is joinable" << std::endl;
}

// join:等待线程结束
t.join();

// detach:分离线程(后台运行)
// std::thread t2([](){ ... });
// t2.detach(); // 注意:detach 后无法 join

// 分离后,线程在后台运行,由运行时库管理
// 适用于不需要等待完成的长时间运行任务

return 0;
}

互斥锁

std::mutex

#include <mutex>
#include <iostream>

int main() {
std::mutex mtx;

// lock() 和 unlock()
mtx.lock();
// 临界区代码
mtx.unlock();

// 推荐:使用 lock_guard 自动管理
{
std::lock_guard<std::mutex> lock(mtx);
// 临界区代码
} // 自动 unlock

return 0;
}

std::lock_guard

#include <mutex>
#include <iostream>

class Counter {
private:
int count = 0;
mutable std::mutex mtx; // mutable 允许在 const 函数中修改

public:
void increment() {
std::lock_guard<std::mutex> lock(mtx);
count++;
}

int get() const {
std::lock_guard<std::mutex> lock(mtx);
return count;
}
};

int main() {
Counter c;
c.increment();
c.increment();
std::cout << c.get() << std::endl; // 2

return 0;
}

std::unique_lock

unique_locklock_guard 更灵活:

#include <mutex>
#include <thread>

int main() {
std::mutex mtx;

std::unique_lock<std::mutex> lock1(mtx, std::defer_lock);
// defer_lock:不获取锁

// 可以在需要时手动加锁
lock1.lock();
// 临界区
lock1.unlock();

// 可以多次加锁/解锁
lock1.lock();
lock1.unlock();

// 析构函数自动释放
// 如果锁仍被持有,会自动 unlock

return 0;
}

死锁避免

#include <mutex>
#include <thread>
#include <iostream>

// 死锁示例
void deadLockExample() {
std::mutex m1, m2;

// 线程1:先锁 m1,再锁 m2
std::thread t1([&]() {
std::lock_guard<std::mutex> lock1(m1);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lock2(m2);
std::cout << "Thread 1" << std::endl;
});

// 线程2:先锁 m2,再锁 m1(与线程1相反)
std::thread t2([&]() {
std::lock_guard<std::mutex> lock1(m2);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lock2(m1);
std::cout << "Thread 2" << std::endl;
});

t1.join();
t2.join();
}

// 解决方案1:使用 std::lock 一次性获取多个锁
void solveWithLock() {
std::mutex m1, m2;

std::thread t1([&]() {
std::lock(m1, m2); // 一次性锁住两个
// 注意:std::lock 不支持 lock_guard,需要用 scoped_lock
std::scoped_lock lock(m1, m2); // C++17
std::cout << "Thread 1" << std::endl;
});

std::thread t2([&]() {
std::lock(m1, m2);
std::scoped_lock lock(m1, m2);
std::cout << "Thread 2" << std::endl;
});

t1.join();
t2.join();
}

int main() {
solveWithLock();
return 0;
}

std::scoped_lock(C++17)

#include <mutex>
#include <thread>

int main() {
std::mutex m1, m2, m3;

// 同时锁住多个互斥量,避免死锁
std::scoped_lock lock(m1, m2, m3);

// 临界区代码

return 0;
}

条件变量

std::condition_variable

#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <iostream>

std::queue<int> q;
std::mutex mtx;
std::condition_variable cv;

// 生产者
void producer() {
for (int i = 0; i < 5; i++) {
std::unique_lock<std::mutex> lock(mtx);
q.push(i);
std::cout << "Produced: " << i << std::endl;
cv.notify_one(); // 通知等待的消费者
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

// 消费者
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);

// 等待条件:队列不为空
cv.wait(lock, []() { return !q.empty(); });

int val = q.front();
q.pop();
std::cout << "Consumed: " << val << std::endl;

if (val == 4) break;
}
}

int main() {
std::thread prod(producer);
std::thread cons(consumer);

prod.join();
cons.join();

return 0;
}

wait 的变体

// 带超时的 wait
std::condition_variable cv;
bool ready = false;

void waitWithTimeout() {
std::unique_lock<std::mutex> lock(mtx);

// 等待最多 1 秒
if (cv.wait_for(lock, std::chrono::seconds(1), [](){ return ready; })) {
std::cout << "条件满足" << std::endl;
} else {
std::cout << "超时" << std::endl;
}
}

// 等待直到特定时间
cv.wait_until(lock, std::chrono::system_clock::now() + std::chrono::seconds(10));

原子操作

std::atomic

#include <atomic>
#include <thread>
#include <iostream>

// 原子整数
std::atomic<int> counter(0);

// 原子加载和存储
void increment() {
for (int i = 0; i < 1000; i++) {
counter.fetch_add(1); // 原子加法,返回旧值
// 或 counter++; // 简写形式
}
}

int main() {
std::thread t1(increment);
std::thread t2(increment);

t1.join();
t2.join();

std::cout << "Final value: " << counter.load() << std::endl;

return 0;
}

原子操作类型

#include <atomic>

int main() {
// 常用的原子类型
std::atomic<int> ai;
std::atomic<long> al;
std::atomic<bool> ab;

// 常用操作
ai.store(10); // 存储
int val = ai.load(); // 加载

ai.fetch_add(5); // 原子加法
ai.fetch_sub(3); // 原子减法
ai.fetch_and(0xFF); // 原子与
ai.fetch_or(0xFF); // 原子或

// 交换
int old = ai.exchange(100); // 原子交换,返回旧值

// 比较交换 (CAS)
int expected = 100;
int desired = 200;
bool success = ai.compare_exchange_strong(expected, desired);
// 如果 ai == expected,则 ai = desired,返回 true
// 否则 expected = ai,返回 false

return 0;
}

内存顺序

#include <atomic>

std::atomic<int> a(0);
int b = 0;

void thread1() {
a.store(1, std::memory_order_relaxed);
b = 2; // 可能被重排序
}

void thread2() {
// 在较宽松的内存顺序下,可能看到不同的顺序
if (b.load(std::memory_order_relaxed) == 2) {
std::cout << a.load(std::memory_order_relaxed) << std::endl;
}
}

内存顺序选项:

顺序说明
memory_order_relaxed最宽松,只保证原子性
memory_order_acquire读取操作,后续操作不能重排到前面
memory_order_release写入操作,之前操作不能重排到后面
memory_order_seq_cst顺序一致(默认,最安全)

异步操作

std::future 和 std::promise

#include <future>
#include <thread>
#include <iostream>

int calculate(int x) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return x * x;
}

int main() {
// 创建 promise 和 future
std::promise<int> prom;
std::future<int> fut = prom.get_future();

// 在线程中设置值
std::thread t([](std::promise<int>& p) {
p.set_value(42);
}, std::ref(prom));

// 在主线程中获取值
std::cout << "Result: " << fut.get() << std::endl;

t.join();

return 0;
}

std::async

#include <future>
#include <iostream>

int heavyCompute(int n) {
// 模拟耗时计算
return n * n;
}

int main() {
// 启动异步任务
std::future<int> fut1 = std::async(std::launch::async, heavyCompute, 10);
std::future<int> fut2 = std::async(std::launch::async, heavyCompute, 20);

// 获取结果(会阻塞等待)
std::cout << fut1.get() << std::endl; // 100
std::cout << fut2.get() << std::endl; // 400

return 0;
}

std::packaged_task

#include <future>
#include <functional>
#include <thread>
#include <iostream>

int main() {
// 包装任务
std::packaged_task<int(int, int)> task(
[](int a, int b) { return a + b; }
);

std::future<int> result = task.get_future();

// 在线程中执行
std::thread t(std::move(task), 10, 20);

// 获取结果
std::cout << result.get() << std::endl; // 30

t.join();

return 0;
}

线程池

简单线程池实现

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>

class ThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop = false;

public:
ThreadPool(size_t threads) {
for (size_t i = 0; i < threads; i++) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}

template<typename F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop) throw std::runtime_error("Pool stopped");
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}

~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers) {
worker.join();
}
}
};

int main() {
ThreadPool pool(4);

for (int i = 0; i < 8; i++) {
pool.enqueue([i]() {
std::cout << "Task " << i << std::endl;
});
}

return 0;
}

线程局部存储

thread_local

#include <thread>
#include <iostream>

thread_local int threadLocalValue = 0; // 每个线程独立的变量

void worker(int id) {
threadLocalValue = id;
std::cout << "Thread " << id << ": value = " << threadLocalValue << std::endl;
threadLocalValue = id * 10;
std::cout << "Thread " << id << ": value = " << threadLocalValue << std::endl;
}

int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);

t1.join();
t2.join();

return 0;
}

并发编程最佳实践

1. 避免全局状态

// 不好:全局变量需要加锁
std::mutex globalMutex;
int globalValue = 0;

// 好:使用局部变量或传入参数
void process(std::mutex& mtx, int& localValue) {
std::lock_guard<std::mutex> lock(mtx);
localValue++;
}

2. 最小化锁的持有时间

// 不好:锁持有时间长
std::lock_guard<std::mutex> lock(mtx);
processData();
formatOutput();
sendToNetwork();

// 好:只锁必要部分
{
std::lock_guard<std::mutex> lock(mtx);
data = prepareData();
}
processData(data);
formatOutput();
sendToNetwork();

3. 使用 RAII 管理锁

// 使用 lock_guard 或 scoped_lock
{
std::lock_guard<std::mutex> lock(mtx);
// 临界区
}

4. 避免死锁

// 总是按相同顺序获取锁
// 使用 std::lock 同时获取多个锁

练习

  1. 实现一个生产者-消费者模型
  2. 实现一个简单的线程池
  3. 使用原子操作实现无锁队列
  4. 实现读者-写者问题

小结

本章学习了:

  1. 线程创建:std::thread、线程函数
  2. 互斥锁:mutex、lock_guard、unique_lock、scoped_lock
  3. 条件变量:std::condition_variable
  4. 原子操作:std::atomic、内存顺序
  5. 异步操作:future、promise、async
  6. 线程池:基本实现
  7. 线程局部存储:thread_local

下一步

接下来让我们继续学习 C++ 的其他高级主题,或者查看 C++ 知识速查表 快速回顾。