多线程编程
多线程编程允许程序同时执行多个任务,是现代软件开发的重要技能。C11 标准引入了原生的线程支持库,包括 <threads.h> 和 <stdatomic.h>,使 C 语言具备了跨平台的多线程编程能力。
并发编程基础
进程与线程
进程是操作系统分配资源的基本单位,每个进程拥有独立的内存空间。线程是 CPU 调度的基本单位,同一进程内的线程共享内存空间。
理解进程和线程的区别对于并发编程至关重要:
| 特性 | 进程 | 线程 |
|---|---|---|
| 内存空间 | 独立 | 共享 |
| 创建开销 | 大 | 小 |
| 通信方式 | IPC(管道、套接字等) | 共享内存 |
| 切换开销 | 大 | 小 |
| 稳定性 | 一个进程崩溃不影响其他进程 | 一个线程崩溃可能影响整个进程 |
线程共享内存的特性使得线程间通信变得简单,但同时也带来了数据竞争等问题。
并发与并行
并发是指多个任务在时间段内交替执行,看似同时进行。并行是指多个任务真正同时执行,需要多核处理器支持。
单核处理器上只能实现并发,多核处理器上可以实现真正的并行。无论并发还是并行,多线程编程面临的问题是一样的。
数据竞争
当多个线程同时访问同一内存位置,且至少有一个线程在写入时,就会发生数据竞争。数据竞争会导致未定义行为,程序结果不可预测。
// 错误示例:数据竞争
int counter = 0;
void increment(void) {
for (int i = 0; i < 100000; i++) {
counter++; // 非原子操作,存在数据竞争
}
}
counter++ 看起来是一条语句,实际上包含三个操作:读取 counter 的值、加一、写回 counter。多个线程同时执行时,这些操作可能交错,导致计数错误。
同步机制
为避免数据竞争,需要使用同步机制:
- 互斥量(Mutex):保证同一时刻只有一个线程访问共享资源
- 原子操作(Atomic):不可分割的操作,不会被中断
- 条件变量(Condition Variable):线程间的等待/通知机制
C11 线程库 threads.h
C11 引入了 <threads.h> 头文件,提供了跨平台的线程支持。如果你的编译器定义了 __STDC_NO_THREADS__ 宏,则表示不支持此功能。
线程类型与常量
基本类型:
| 类型 | 说明 |
|---|---|
thrd_t | 线程标识符 |
thrd_start_t | 线程函数类型,定义为 int (*)(void*) |
返回值常量:
| 常量 | 说明 |
|---|---|
thrd_success | 操作成功 |
thrd_error | 操作失败 |
thrd_nomem | 内存不足 |
thrd_busy | 资源暂时不可用 |
thrd_timedout | 操作超时 |
创建线程
thrd_create 函数创建一个新线程:
#include <threads.h>
#include <stdio.h>
// 线程函数
int thread_function(void* arg) {
int id = *(int*)arg;
printf("线程 %d 正在运行\n", id);
return 0; // 线程返回值
}
int main(void) {
thrd_t thread;
int id = 1;
// 创建线程
int result = thrd_create(&thread, thread_function, &id);
if (result != thrd_success) {
printf("线程创建失败\n");
return 1;
}
printf("主线程继续执行\n");
// 等待线程结束
int thread_result;
thrd_join(thread, &thread_result);
printf("线程结束,返回值: %d\n", thread_result);
return 0;
}
线程函数必须返回 int 类型,参数为 void*。这种设计允许向线程传递任意类型的数据。
线程管理
等待线程结束:
int result;
thrd_join(thread, &result); // 阻塞直到线程结束
thrd_join 会阻塞当前线程,直到目标线程执行完毕。第二个参数用于接收线程的返回值。
分离线程:
thrd_detach(thread); // 分离线程
分离后的线程在结束时自动释放资源,不需要 thrd_join。分离后不能再 join 该线程。
获取当前线程标识:
thrd_t current = thrd_current();
比较两个线程标识:
if (thrd_equal(t1, t2)) {
printf("是同一个线程\n");
}
线程退出:
int thread_func(void* arg) {
// 执行一些操作
thrd_exit(0); // 退出线程,返回值为 0
// 之后的代码不会执行
return 0;
}
线程休眠与让步
休眠:
#include <threads.h>
#include <time.h>
// 休眠 2 秒
struct timespec duration = {
.tv_sec = 2,
.tv_nsec = 0
};
thrd_sleep(&duration, NULL);
// 休眠 500 毫秒
struct timespec half_second = {
.tv_sec = 0,
.tv_nsec = 500000000
};
thrd_sleep(&half_second, NULL);
thrd_sleep 的第二个参数用于接收剩余时间(如果被信号中断)。
让出 CPU:
thrd_yield(); // 让出时间片给其他线程
thrd_yield 提示调度器当前线程愿意放弃 CPU,让其他线程有机会执行。
多线程示例
#include <threads.h>
#include <stdio.h>
#define THREAD_COUNT 5
int thread_task(void* arg) {
int id = *(int*)arg;
printf("线程 %d 开始工作\n", id);
// 模拟工作
for (int i = 0; i < 3; i++) {
printf("线程 %d: 步骤 %d\n", id, i);
thrd_sleep(&(struct timespec){.tv_sec = 1}, NULL);
}
printf("线程 %d 完成\n", id);
return id;
}
int main(void) {
thrd_t threads[THREAD_COUNT];
int ids[THREAD_COUNT];
// 创建多个线程
for (int i = 0; i < THREAD_COUNT; i++) {
ids[i] = i + 1;
thrd_create(&threads[i], thread_task, &ids[i]);
}
// 等待所有线程结束
for (int i = 0; i < THREAD_COUNT; i++) {
int result;
thrd_join(threads[i], &result);
printf("线程 %d 已结束\n", result);
}
printf("所有线程完成\n");
return 0;
}
互斥量
互斥量(Mutex)用于保护共享资源,确保同一时刻只有一个线程访问。
互斥量类型
| 常量 | 说明 |
|---|---|
mtx_plain | 普通互斥量 |
mtx_recursive | 递归互斥量(同一线程可多次加锁) |
mtx_timed | 支持超时的互斥量 |
这些类型可以通过按位或组合,例如 mtx_plain | mtx_recursive。
创建与销毁
mtx_t mutex;
// 创建普通互斥量
if (mtx_init(&mutex, mtx_plain) != thrd_success) {
printf("互斥量创建失败\n");
return -1;
}
// 使用互斥量...
mtx_destroy(&mutex); // 销毁互斥量
基本使用
#include <threads.h>
#include <stdio.h>
int shared_counter = 0;
mtx_t counter_mutex;
int increment(void* arg) {
for (int i = 0; i < 100000; i++) {
mtx_lock(&counter_mutex); // 加锁
shared_counter++; // 安全地修改共享数据
mtx_unlock(&counter_mutex); // 解锁
}
return 0;
}
int main(void) {
mtx_init(&counter_mutex, mtx_plain);
thrd_t threads[10];
for (int i = 0; i < 10; i++) {
thrd_create(&threads[i], increment, NULL);
}
for (int i = 0; i < 10; i++) {
thrd_join(threads[i], NULL);
}
printf("最终计数: %d\n", shared_counter); // 正确输出 1000000
mtx_destroy(&counter_mutex);
return 0;
}
如果不使用互斥量,由于数据竞争,最终结果可能小于 1000000。
尝试加锁
if (mtx_trylock(&mutex) == thrd_success) {
// 成功获取锁
// 访问共享资源
mtx_unlock(&mutex);
} else {
// 锁被其他线程持有
printf("锁不可用,稍后重试\n");
}
mtx_trylock 不会阻塞,如果锁不可用则立即返回 thrd_busy。
超时加锁
#include <threads.h>
#include <time.h>
mtx_t mutex;
void try_lock_with_timeout(void) {
struct timespec timeout = {
.tv_sec = time(NULL) + 5, // 5 秒后超时
.tv_nsec = 0
};
int result = mtx_timedlock(&mutex, &timeout);
if (result == thrd_success) {
// 成功获取锁
mtx_unlock(&mutex);
} else if (result == thrd_timedout) {
printf("获取锁超时\n");
}
}
超时加锁需要创建时指定 mtx_timed 类型:
mtx_init(&mutex, mtx_timed);
递归互斥量
普通互斥量不允许同一线程多次加锁,否则会死锁。递归互斥量允许同一线程多次加锁:
mtx_t recursive_mutex;
mtx_init(&recursive_mutex, mtx_recursive);
void recursive_function(int depth) {
mtx_lock(&recursive_mutex); // 第一次加锁
if (depth > 0) {
mtx_lock(&recursive_mutex); // 第二次加锁(同一线程)
printf("深度: %d\n", depth);
recursive_function(depth - 1);
mtx_unlock(&recursive_mutex); // 第二次解锁
}
mtx_unlock(&recursive_mutex); // 第一次解锁
}
每次加锁都需要对应的解锁。递归互斥量通常用于递归函数中保护共享资源。
条件变量
条件变量用于线程间的等待/通知机制,允许线程等待某个条件成立。
创建与销毁
cnd_t condition;
cnd_init(&condition);
// 使用条件变量...
cnd_destroy(&condition);
等待与通知
#include <threads.h>
#include <stdio.h>
mtx_t mutex;
cnd_t condition;
int ready = 0; // 共享状态
int producer(void* arg) {
mtx_lock(&mutex);
ready = 1; // 设置条件为真
printf("生产者: 数据已准备\n");
cnd_signal(&condition); // 通知一个等待的线程
mtx_unlock(&mutex);
return 0;
}
int consumer(void* arg) {
mtx_lock(&mutex);
while (!ready) { // 使用 while 而非 if
printf("消费者: 等待数据...\n");
cnd_wait(&condition, &mutex); // 等待通知
}
printf("消费者: 收到数据,开始处理\n");
mtx_unlock(&mutex);
return 0;
}
int main(void) {
mtx_init(&mutex, mtx_plain);
cnd_init(&condition);
thrd_t consumer_thread, producer_thread;
// 先启动消费者
thrd_create(&consumer_thread, consumer, NULL);
// 等待一下确保消费者先运行
thrd_sleep(&(struct timespec){.tv_sec = 1}, NULL);
// 启动生产者
thrd_create(&producer_thread, producer, NULL);
thrd_join(consumer_thread, NULL);
thrd_join(producer_thread, NULL);
mtx_destroy(&mutex);
cnd_destroy(&condition);
return 0;
}
重要说明:
- 条件变量必须与互斥量配合使用
cnd_wait会自动释放互斥量并阻塞,被唤醒时重新获取互斥量- 必须使用
while循环检查条件,而不是if,因为可能存在虚假唤醒
广播通知
cnd_broadcast 唤醒所有等待的线程:
cnd_broadcast(&condition); // 唤醒所有等待线程
这在多个线程需要同时被唤醒时很有用,例如广播事件。
超时等待
struct timespec timeout = {
.tv_sec = time(NULL) + 10, // 10 秒后超时
.tv_nsec = 0
};
mtx_lock(&mutex);
while (!ready) {
int result = cnd_timedwait(&condition, &mutex, &timeout);
if (result == thrd_timedout) {
printf("等待超时\n");
break;
}
}
mtx_unlock(&mutex);
生产者-消费者模式
这是一个经典的并发模式,展示了条件变量的典型用法:
#include <threads.h>
#include <stdio.h>
#include <stdbool.h>
#define BUFFER_SIZE 10
typedef struct {
int buffer[BUFFER_SIZE];
int count;
int head;
int tail;
bool done;
} Queue;
Queue queue = {0};
mtx_t queue_mutex;
cnd_t not_empty; // 缓冲区不为空
cnd_t not_full; // 缓冲区未满
void queue_init(void) {
queue.count = 0;
queue.head = 0;
queue.tail = 0;
queue.done = false;
mtx_init(&queue_mutex, mtx_plain);
cnd_init(¬_empty);
cnd_init(¬_full);
}
void queue_destroy(void) {
mtx_destroy(&queue_mutex);
cnd_destroy(¬_empty);
cnd_destroy(¬_full);
}
bool queue_push(int value) {
mtx_lock(&queue_mutex);
// 等待缓冲区有空间
while (queue.count >= BUFFER_SIZE) {
cnd_wait(¬_full, &queue_mutex);
}
queue.buffer[queue.tail] = value;
queue.tail = (queue.tail + 1) % BUFFER_SIZE;
queue.count++;
cnd_signal(¬_empty); // 通知消费者
mtx_unlock(&queue_mutex);
return true;
}
bool queue_pop(int* value) {
mtx_lock(&queue_mutex);
// 等待缓冲区有数据
while (queue.count == 0 && !queue.done) {
cnd_wait(¬_empty, &queue_mutex);
}
if (queue.count == 0) {
mtx_unlock(&queue_mutex);
return false; // 队列为空且生产已完成
}
*value = queue.buffer[queue.head];
queue.head = (queue.head + 1) % BUFFER_SIZE;
queue.count--;
cnd_signal(¬_full); // 通知生产者
mtx_unlock(&queue_mutex);
return true;
}
int producer_thread(void* arg) {
for (int i = 0; i < 20; i++) {
queue_push(i);
printf("生产: %d\n", i);
}
// 标记生产完成
mtx_lock(&queue_mutex);
queue.done = true;
cnd_broadcast(¬_empty); // 唤醒所有消费者
mtx_unlock(&queue_mutex);
return 0;
}
int consumer_thread(void* arg) {
int value;
while (queue_pop(&value)) {
printf("消费: %d\n", value);
}
return 0;
}
int main(void) {
queue_init();
thrd_t producer, consumer1, consumer2;
thrd_create(&producer, producer_thread, NULL);
thrd_create(&consumer1, consumer_thread, NULL);
thrd_create(&consumer2, consumer_thread, NULL);
thrd_join(producer, NULL);
thrd_join(consumer1, NULL);
thrd_join(consumer2, NULL);
queue_destroy();
return 0;
}
线程特定存储
线程特定存储(Thread-Specific Storage,TSS)允许每个线程拥有独立的数据副本。
#include <threads.h>
#include <stdio.h>
#include <stdlib.h>
tss_t tss_key;
// TSS 析构函数
void tss_destructor(void* data) {
printf("清理线程数据: %s\n", (char*)data);
free(data);
}
int thread_func(void* arg) {
int id = *(int*)arg;
// 为当前线程分配特定数据
char* data = malloc(50);
snprintf(data, 50, "线程 %d 的私有数据", id);
tss_set(tss_key, data);
// 获取当前线程的数据
char* my_data = tss_get(tss_key);
printf("获取数据: %s\n", my_data);
return 0;
}
int main(void) {
// 创建 TSS,设置析构函数
tss_create(&tss_key, tss_destructor);
thrd_t threads[3];
int ids[3] = {1, 2, 3};
for (int i = 0; i < 3; i++) {
thrd_create(&threads[i], thread_func, &ids[i]);
}
for (int i = 0; i < 3; i++) {
thrd_join(threads[i], NULL);
}
tss_delete(tss_key);
return 0;
}
每个线程可以存储自己的数据,互不干扰。线程结束时,析构函数会被调用以清理数据。
一次性初始化
call_once 确保某个函数只执行一次:
#include <threads.h>
#include <stdio.h>
once_flag init_flag = ONCE_FLAG_INIT;
void initialize_once(void) {
printf("初始化只执行一次\n");
}
int thread_func(void* arg) {
call_once(&init_flag, initialize_once);
printf("线程 %d 运行\n", *(int*)arg);
return 0;
}
int main(void) {
thrd_t threads[5];
int ids[5] = {1, 2, 3, 4, 5};
for (int i = 0; i < 5; i++) {
thrd_create(&threads[i], thread_func, &ids[i]);
}
for (int i = 0; i < 5; i++) {
thrd_join(threads[i], NULL);
}
return 0;
}
这在实现延迟初始化或单例模式时非常有用。
原子操作库 stdatomic.h
<stdatomic.h> 提供了原子类型和原子操作,是编写无锁并发程序的基础设施。如果编译器定义了 __STDC_NO_ATOMICS__ 宏,则表示不支持此功能。
原子类型
使用 _Atomic 关键字声明原子类型:
#include <stdatomic.h>
_Atomic int atomic_counter; // 原子整数
_Atomic int* atomic_ptr; // 指向原子整数的指针
int* _Atomic atomic_ptr_to_int; // 原子指针(指向普通整数)
标准库提供了便捷的类型别名:
| 类型别名 | 说明 |
|---|---|
atomic_bool | 原子布尔 |
atomic_char | 原子字符 |
atomic_int | 原子整数 |
atomic_uint | 原子无符号整数 |
atomic_long | 原子长整数 |
atomic_llong | 原子 long long |
atomic_size_t | 原子 size_t |
atomic_intptr_t | 原子 intptr_t |
atomic_int counter = 0;
atomic_bool flag = false;
atomic_size_t size = 0;
原子标志 atomic_flag
atomic_flag 是最基础的原子类型,保证是无锁的,可用于实现自旋锁:
#include <stdatomic.h>
atomic_flag spinlock = ATOMIC_FLAG_INIT;
void lock(atomic_flag* flag) {
while (atomic_flag_test_and_set(flag)) {
// 自旋等待,直到获取锁
thrd_yield(); // 可选:让出 CPU
}
}
void unlock(atomic_flag* flag) {
atomic_flag_clear(flag);
}
// 使用示例
void critical_section(void) {
lock(&spinlock);
// 临界区代码
printf("在临界区内\n");
unlock(&spinlock);
}
自旋锁是一种忙等待锁,适用于锁持有时间很短的场景。
原子操作函数
基本读写:
atomic_int value = 0;
// 原子存储
atomic_store(&value, 42);
// 原子加载
int loaded = atomic_load(&value);
printf("加载值: %d\n", loaded); // 42
// 原子交换(返回旧值)
int old = atomic_exchange(&value, 100);
printf("旧值: %d, 新值: %d\n", old, atomic_load(&value));
比较交换(CAS):
比较交换是许多无锁算法的核心操作:
atomic_int value = 10;
// 比较交换
int expected = 10;
int desired = 20;
bool success = atomic_compare_exchange_strong(&value, &expected, desired);
// 如果 value == expected,则 value = desired,返回 true
// 否则 expected = value,返回 false
if (success) {
printf("交换成功,新值: %d\n", value);
}
弱版本 CAS:
bool success = atomic_compare_exchange_weak(&value, &expected, desired);
弱版本可能产生虚假失败,但在循环中使用时性能更好:
void atomic_increment(atomic_int* counter) {
int old_value = atomic_load(counter);
while (!atomic_compare_exchange_weak(counter, &old_value, old_value + 1)) {
// 循环直到成功,old_value 会被更新
}
}
算术运算:
atomic_int value = 10;
// 原子加法(返回旧值)
int old = atomic_fetch_add(&value, 5); // value = 15, old = 10
// 原子减法
old = atomic_fetch_sub(&value, 3); // value = 12, old = 15
// 原子或运算
atomic_fetch_or(&value, 0x01);
// 原子与运算
atomic_fetch_and(&value, 0xFE);
// 原子异或运算
atomic_fetch_xor(&value, 0xFF);
无锁计数器示例
#include <stdatomic.h>
#include <threads.h>
#include <stdio.h>
atomic_int counter = 0;
int increment(void* arg) {
for (int i = 0; i < 100000; i++) {
atomic_fetch_add(&counter, 1); // 原子递增
}
return 0;
}
int main(void) {
thrd_t threads[10];
for (int i = 0; i < 10; i++) {
thrd_create(&threads[i], increment, NULL);
}
for (int i = 0; i < 10; i++) {
thrd_join(threads[i], NULL);
}
printf("最终计数: %d\n", atomic_load(&counter)); // 正确输出 1000000
return 0;
}
使用原子操作比互斥量更高效,适合简单的计数场景。
内存序
内存序定义了原子操作对内存访问顺序的约束。理解内存序对于编写正确的高效并发程序至关重要。
为什么需要内存序
现代处理器和编译器会对指令进行重排序以优化性能。单线程程序中,这种重排序不影响结果。但在多线程程序中,重排序可能导致意外行为:
// 线程 1
x = 1;
ready = 1;
// 线程 2
if (ready) {
// x 一定是 1 吗?不一定!
// 编译器或处理器可能重排序,导致 ready = 1 先于 x = 1 可见
}
内存序通过建立" happens-before "关系,确保操作按预期顺序执行。
内存序类型
C11 定义了五种内存序:
typedef enum {
memory_order_relaxed, // 宽松顺序
memory_order_consume, // 消费顺序(通常不推荐使用)
memory_order_acquire, // 获取顺序
memory_order_release, // 释放顺序
memory_order_acq_rel, // 获取-释放顺序
memory_order_seq_cst // 顺序一致性(默认)
} memory_order;
memory_order_relaxed
宽松顺序只保证原子性,不保证顺序:
atomic_int x = 0;
atomic_int y = 0;
// 线程 1
atomic_store_explicit(&x, 1, memory_order_relaxed);
atomic_store_explicit(&y, 1, memory_order_relaxed);
// 线程 2
int r1 = atomic_load_explicit(&y, memory_order_relaxed);
int r2 = atomic_load_explicit(&x, memory_order_relaxed);
// 可能 r1 = 1 而 r2 = 0(y 先于 x 被看到)
适用场景:只需要原子性,不关心顺序,如简单计数器。
memory_order_acquire 和 memory_order_release
获取-释放顺序建立同步关系:
acquire:之后的读写操作不能重排到此操作之前release:之前的读写操作不能重排到此操作之后
atomic_int ready = 0;
int data = 0;
// 线程 1(生产者)
void producer(void) {
data = 42; // 准备数据
atomic_store_explicit(&ready, 1, memory_order_release); // 发布
}
// 线程 2(消费者)
void consumer(void) {
while (atomic_load_explicit(&ready, memory_order_acquire) == 0) {
// 等待数据就绪
}
// 此时 data 一定是 42
printf("data = %d\n", data);
}
release 确保之前的写操作(data = 42)在 ready = 1 之前完成。acquire 确保之后的读操作(读取 data)在 ready == 1 之后执行。
memory_order_seq_cst
顺序一致性是最强的内存序,也是默认选项:
atomic_store(&x, 1); // 等价于 atomic_store_explicit(&x, 1, memory_order_seq_cst)
顺序一致性保证:
- 所有线程看到相同的操作顺序
- 所有操作按程序顺序执行
这是最安全的选择,但性能可能不如更宽松的内存序。
选择指南
| 内存序 | 性能 | 安全性 | 适用场景 |
|---|---|---|---|
seq_cst | 最低 | 最高 | 默认选择,简单安全 |
acq_rel | 中等 | 高 | 生产者-消费者模式 |
acquire/release | 较高 | 高 | 单向同步 |
relaxed | 最高 | 最低 | 简单计数器 |
建议:先用 seq_cst,确认正确后再优化。
内存序示例
#include <stdatomic.h>
#include <threads.h>
#include <stdio.h>
atomic_int flag = 0;
int data = 0;
int writer(void* arg) {
data = 42;
atomic_store_explicit(&flag, 1, memory_order_release);
return 0;
}
int reader(void* arg) {
while (atomic_load_explicit(&flag, memory_order_acquire) == 0) {
thrd_yield();
}
printf("data = %d\n", data); // 保证输出 42
return 0;
}
int main(void) {
thrd_t t1, t2;
thrd_create(&t1, writer, NULL);
thrd_create(&t2, reader, NULL);
thrd_join(t1, NULL);
thrd_join(t2, NULL);
return 0;
}
线程池实现
线程池是一种常用的并发模式,预先创建一组线程,重复利用它们执行任务:
#include <threads.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
typedef void (*TaskFunc)(void* arg);
typedef struct Task {
TaskFunc func;
void* arg;
struct Task* next;
} Task;
typedef struct {
Task* head;
Task* tail;
int count;
} TaskQueue;
typedef struct {
thrd_t* threads;
int thread_count;
TaskQueue queue;
mtx_t queue_mutex;
cnd_t queue_cond;
bool shutdown;
} ThreadPool;
void queue_init(TaskQueue* q) {
q->head = q->tail = NULL;
q->count = 0;
}
void queue_push(TaskQueue* q, Task* task) {
task->next = NULL;
if (q->tail == NULL) {
q->head = q->tail = task;
} else {
q->tail->next = task;
q->tail = task;
}
q->count++;
}
Task* queue_pop(TaskQueue* q) {
if (q->head == NULL) return NULL;
Task* task = q->head;
q->head = q->head->next;
if (q->head == NULL) q->tail = NULL;
q->count--;
return task;
}
int worker_thread(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (true) {
mtx_lock(&pool->queue_mutex);
while (pool->queue.count == 0 && !pool->shutdown) {
cnd_wait(&pool->queue_cond, &pool->queue_mutex);
}
if (pool->shutdown && pool->queue.count == 0) {
mtx_unlock(&pool->queue_mutex);
break;
}
Task* task = queue_pop(&pool->queue);
mtx_unlock(&pool->queue_mutex);
if (task != NULL) {
task->func(task->arg);
free(task);
}
}
return 0;
}
void pool_init(ThreadPool* pool, int thread_count) {
pool->thread_count = thread_count;
pool->threads = malloc(thread_count * sizeof(thrd_t));
pool->shutdown = false;
queue_init(&pool->queue);
mtx_init(&pool->queue_mutex, mtx_plain);
cnd_init(&pool->queue_cond);
for (int i = 0; i < thread_count; i++) {
thrd_create(&pool->threads[i], worker_thread, pool);
}
}
void pool_submit(ThreadPool* pool, TaskFunc func, void* arg) {
Task* task = malloc(sizeof(Task));
task->func = func;
task->arg = arg;
mtx_lock(&pool->queue_mutex);
queue_push(&pool->queue, task);
cnd_signal(&pool->queue_cond);
mtx_unlock(&pool->queue_mutex);
}
void pool_shutdown(ThreadPool* pool) {
mtx_lock(&pool->queue_mutex);
pool->shutdown = true;
cnd_broadcast(&pool->queue_cond);
mtx_unlock(&pool->queue_mutex);
for (int i = 0; i < pool->thread_count; i++) {
thrd_join(pool->threads[i], NULL);
}
mtx_destroy(&pool->queue_mutex);
cnd_destroy(&pool->queue_cond);
free(pool->threads);
}
// 示例任务
void print_task(void* arg) {
int id = *(int*)arg;
printf("任务 %d 在线程 %lu 中执行\n", id, thrd_current());
}
int main(void) {
ThreadPool pool;
pool_init(&pool, 4);
int task_ids[10];
for (int i = 0; i < 10; i++) {
task_ids[i] = i + 1;
pool_submit(&pool, print_task, &task_ids[i]);
}
thrd_sleep(&(struct timespec){.tv_sec = 2}, NULL);
pool_shutdown(&pool);
return 0;
}
多线程最佳实践
1. 最小化共享数据
共享数据越少,需要的同步越少,程序越不容易出错。
// 不推荐:频繁共享
for (int i = 0; i < 1000000; i++) {
mtx_lock(&mutex);
shared_counter++;
mtx_unlock(&mutex);
}
// 推荐:减少共享频率
int local_counter = 0;
for (int i = 0; i < 1000000; i++) {
local_counter++;
}
mtx_lock(&mutex);
shared_counter += local_counter;
mtx_unlock(&mutex);
2. 避免嵌套锁
嵌套锁容易导致死锁:
// 危险:可能死锁
void transfer(Account* from, Account* to, int amount) {
mtx_lock(&from->mutex);
mtx_lock(&to->mutex); // 如果另一个线程反向操作,会死锁
from->balance -= amount;
to->balance += amount;
mtx_unlock(&to->mutex);
mtx_unlock(&from->mutex);
}
// 安全:统一加锁顺序
void transfer_safe(Account* from, Account* to, int amount) {
Account* first = from < to ? from : to; // 按地址排序
Account* second = from < to ? to : from;
mtx_lock(&first->mutex);
mtx_lock(&second->mutex);
from->balance -= amount;
to->balance += amount;
mtx_unlock(&second->mutex);
mtx_unlock(&first->mutex);
}
3. 使用 RAII 模式管理锁
C 语言没有析构函数,但可以使用属性模拟:
// GCC/Clang 扩展
void cleanup_mutex(mtx_t** mutex) {
mtx_unlock(*mutex);
}
#define LOCK_MUTEX(m) \
mtx_t* __attribute__((cleanup(cleanup_mutex))) __lock __attribute__((unused)) = m; \
mtx_lock(m)
void safe_function(mtx_t* mutex) {
LOCK_MUTEX(mutex);
// 函数结束时自动解锁
}
4. 正确处理条件变量
总是使用 while 循环检查条件:
mtx_lock(&mutex);
while (!condition) { // 使用 while,不是 if
cnd_wait(&cond, &mutex);
}
// 处理...
mtx_unlock(&mutex);
5. 避免过度同步
过度使用同步会降低并发性能。评估是否真正需要同步:
// 不必要的同步
mtx_lock(&mutex);
int value = *read_only_pointer; // 只读,不需要锁
mtx_unlock(&mutex);
// 更好的做法
int value = *read_only_pointer; // 如果确定不会修改
6. 使用原子操作简化代码
对于简单操作,原子操作比互斥量更简洁:
// 使用互斥量
mtx_lock(&mutex);
counter++;
mtx_unlock(&mutex);
// 使用原子操作
atomic_fetch_add(&counter, 1);
调试多线程程序
常见问题
- 数据竞争:使用 ThreadSanitizer 检测
- 死锁:检查锁的获取顺序
- 活锁:线程持续响应但无法推进
- 资源泄漏:确保线程 join 或 detach
ThreadSanitizer
GCC 和 Clang 支持 ThreadSanitizer:
gcc -fsanitize=thread -g program.c -o program
./program
ThreadSanitizer 会检测数据竞争和死锁问题。
Helgrind(Valgrind 工具)
gcc -g program.c -o program -pthread
valgrind --tool=helgrind ./program
Helgrind 检测 POSIX 线程的错误使用。
打印调试
添加线程标识和顺序信息:
void debug_log(const char* msg) {
mtx_lock(&log_mutex);
printf("[Thread %lu] %s\n", thrd_current(), msg);
mtx_unlock(&log_mutex);
}
小结
本章介绍了 C 语言的多线程编程:
- 并发基础:进程与线程、数据竞争、同步机制
- threads.h 线程库:线程创建、管理、互斥量、条件变量
- stdatomic.h 原子操作库:原子类型、原子操作、内存序
- 高级模式:线程池、生产者-消费者
- 最佳实践:最小化共享、避免嵌套锁、正确使用条件变量
- 调试技巧:ThreadSanitizer、Helgrind
多线程编程是复杂但强大的技术。理解同步机制和内存序对于编写正确的并发程序至关重要。从简单的同步开始,逐步学习更复杂的模式。
下一章将学习 知识速查,快速查阅常用语法和函数。