同步机制
在并行计算中,同步是确保正确性的关键。CUDA 提供了多种同步机制,包括线程同步、原子操作和流同步。
线程同步
__syncthreads()
__syncthreads() 是最常用的线程同步函数,用于同步同一个 Block 内的所有线程:
__global__ void kernel(float *data) {
__shared__ float sharedData[256];
int idx = threadIdx.x;
sharedData[idx] = data[idx];
__syncthreads();
float result = sharedData[255 - idx];
data[idx] = result;
}
重要规则:
- Block 内所有线程必须执行
__syncthreads() - 不能在条件分支中调用(除非所有线程都进入该分支)
- 只能同步同一个 Block 内的线程
错误示例
__global__ void badSync(float *data) {
int idx = threadIdx.x;
if (idx < 128) {
data[idx] *= 2;
__syncthreads();
}
}
上面的代码会导致死锁,因为只有部分线程执行了 __syncthreads()。
正确示例
__global__ void goodSync(float *data) {
int idx = threadIdx.x;
if (idx < 128) {
data[idx] *= 2;
}
__syncthreads();
}
内存屏障
CUDA 提供了多种内存屏障函数,确保内存访问的顺序性。
__threadfence_block()
确保对共享内存和全局内存的写入对 Block 内其他线程可见:
__global__ void kernel(float *data) {
int idx = threadIdx.x;
data[idx] = 1.0f;
__threadfence_block();
float val = data[(idx + 1) % blockDim.x];
}
__threadfence()
确保对全局内存的写入对所有线程可见:
__global__ void kernel(float *data, int *flag) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
data[idx] = 1.0f;
__threadfence();
if (idx == 0) {
*flag = 1;
}
}
__threadfence_system()
确保对全局内存的写入对 CPU 和其他 GPU 可见:
__global__ void kernel(float *data, int *flag) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
data[idx] = 1.0f;
__threadfence_system();
if (idx == 0) {
*flag = 1;
}
}
原子操作
原子操作是不可中断的操作,用于在多线程环境下安全地更新共享数据。
基本原子操作
| 函数 | 说明 |
|---|---|
atomicAdd | 原子加 |
atomicSub | 原子减 |
atomicExch | 原子交换 |
atomicMin | 原子最小值 |
atomicMax | 原子最大值 |
atomicInc | 原子递增 |
atomicDec | 原子递减 |
atomicCAS | 原子比较并交换 |
atomicAnd | 原子与 |
atomicOr | 原子或 |
atomicXor | 原子异或 |
atomicAdd
__global__ void atomicAddKernel(int *counter, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
atomicAdd(counter, 1);
}
}
int main() {
int *d_counter;
cudaMalloc(&d_counter, sizeof(int));
cudaMemset(d_counter, 0, sizeof(int));
atomicAddKernel<<<100, 256>>>(d_counter, 100 * 256);
int h_counter;
cudaMemcpy(&h_counter, d_counter, sizeof(int), cudaMemcpyDeviceToHost);
printf("Counter: %d\n", h_counter);
cudaFree(d_counter);
}
atomicCAS(比较并交换)
atomicCAS 是最基础的原子操作,其他原子操作可以用它实现:
__device__ int myAtomicAdd(int *address, int value) {
int old = *address;
int assumed;
do {
assumed = old;
old = atomicCAS(address, assumed, assumed + value);
} while (old != assumed);
return old;
}
原子操作示例:直方图
__global__ void histogramKernel(int *histogram, unsigned char *data, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
int bin = data[idx];
atomicAdd(&histogram[bin], 1);
}
}
原子操作示例:最大值
__global__ void maxKernel(float *data, float *maxVal, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
atomicMax(maxVal, data[idx]);
}
}
浮点数原子操作
CUDA 支持浮点数的原子加法:
__global__ void atomicAddFloat(float *data, float value) {
atomicAdd(data, value);
}
其他浮点原子操作需要使用 atomicCAS 实现:
__device__ float atomicMaxFloat(float *address, float value) {
int *address_as_int = (int*)address;
int old = *address_as_int;
int assumed;
do {
assumed = old;
old = atomicCAS(address_as_int, assumed,
__float_as_int(fmaxf(value, __int_as_float(assumed))));
} while (old != assumed);
return __int_as_float(old);
}
Warp 级同步
CUDA 9.0 引入了 Warp 级同步原语。
__syncwarp()
同步 Warp 内的线程:
__global__ void warpSyncKernel(float *data) {
int laneId = threadIdx.x % 32;
data[threadIdx.x] = laneId;
__syncwarp();
float val = data[(threadIdx.x + 16) % 32];
}
Warp 级原语
| 函数 | 说明 |
|---|---|
__shfl_sync | 线程间数据交换 |
__shfl_up_sync | 向上移位交换 |
__shfl_down_sync | 向下移位交换 |
__shfl_xor_sync | 异或交换 |
__all_sync | 全部为真 |
__any_sync | 任一为真 |
__ballot_sync | 位掩码投票 |
__shfl_sync
在 Warp 内交换数据:
__global__ void shflExample() {
int laneId = threadIdx.x % 32;
int value = laneId;
int valueFromLane0 = __shfl_sync(0xffffffff, value, 0);
printf("Lane %d: value from lane 0 = %d\n", laneId, valueFromLane0);
}
__shfl_up_sync 和 __shfl_down_sync
__global__ void scanKernel(float *data, int n) {
int laneId = threadIdx.x % 32;
int idx = blockIdx.x * blockDim.x + threadIdx.x;
float value = data[idx];
for (int offset = 1; offset < 32; offset *= 2) {
float temp = __shfl_up_sync(0xffffffff, value, offset);
if (laneId >= offset) {
value += temp;
}
}
data[idx] = value;
}
__ballot_sync
收集 Warp 内的条件结果:
__global__ void ballotExample(int *data, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
int condition = (data[idx] > 0);
unsigned int mask = __ballot_sync(0xffffffff, condition);
if (threadIdx.x % 32 == 0) {
printf("Warp mask: 0x%08x\n", mask);
}
}
}
协作组
CUDA 9.0 引入了协作组(Cooperative Groups),提供更灵活的线程协作方式。
基本用法
#include <cooperative_groups.h>
namespace cg = cooperative_groups;
__global__ void kernel(float *data) {
cg::thread_block block = cg::this_thread_block();
int idx = block.thread_rank();
data[idx] = idx;
cg::sync(block);
float val = data[(idx + 1) % block.size()];
}
线程组类型
| 类型 | 说明 |
|---|---|
thread_block | 一个 Block 内的所有线程 |
thread_block_tile<N> | 固定大小的线程组 |
coalesced_group | 合并访问的线程组 |
grid_group | 整个 Grid 的线程 |
thread_block_tile
__global__ void tileExample(float *data) {
cg::thread_block block = cg::this_thread_block();
cg::thread_block_tile<32> tile = cg::tiled_partition<32>(block);
int idx = tile.thread_rank();
float value = data[block.thread_rank()];
value = tile.shfl(value, 0);
data[block.thread_rank()] = value;
}
Grid 同步
使用协作组实现 Grid 级同步:
#include <cooperative_groups.h>
namespace cg = cooperative_groups;
__global__ void gridSyncKernel(float *data, int n) {
cg::grid_group grid = cg::this_grid();
int idx = grid.thread_rank();
if (idx < n) {
data[idx] = idx;
}
cg::sync(grid);
if (idx < n) {
data[idx] = data[(idx + 1) % n];
}
}
int main() {
int n = 10000;
int blockSize = 256;
int gridSize = (n + blockSize - 1) / blockSize;
void *args[] = {&d_data, &n};
cudaLaunchCooperativeKernel((void*)gridSyncKernel, gridSize, blockSize, args);
cudaDeviceSynchronize();
}
流同步
cudaDeviceSynchronize()
等待设备上所有任务完成:
kernel<<<grid, block>>>(args);
cudaDeviceSynchronize();
cudaStreamSynchronize()
等待特定流中的任务完成:
cudaStream_t stream;
cudaStreamCreate(&stream);
kernel<<<grid, block, 0, stream>>>(args);
cudaStreamSynchronize(stream);
cudaStreamDestroy(stream);
cudaEventSynchronize()
使用事件进行同步:
cudaEvent_t event;
cudaEventCreate(&event);
kernel<<<grid, block>>>(args);
cudaEventRecord(event);
cudaEventSynchronize(event);
cudaEventDestroy(event);
小结
本章介绍了 CUDA 的同步机制:
- 线程同步:
__syncthreads()同步 Block 内线程 - 内存屏障:
__threadfence_block()、__threadfence()、__threadfence_system() - 原子操作:
atomicAdd、atomicCAS等 - Warp 级同步:
__syncwarp()、__shfl_sync()等 - 协作组:灵活的线程组同步
- 流同步:
cudaDeviceSynchronize()、cudaStreamSynchronize()
正确使用同步机制是编写正确 CUDA 程序的关键。下一章将介绍 流与事件,学习如何实现异步执行和任务并发。