跳到主要内容

同步机制

在并行计算中,同步是确保正确性的关键。CUDA 提供了多种同步机制,包括线程同步、原子操作和流同步。

线程同步

__syncthreads()

__syncthreads() 是最常用的线程同步函数,用于同步同一个 Block 内的所有线程:

__global__ void kernel(float *data) {
__shared__ float sharedData[256];

int idx = threadIdx.x;
sharedData[idx] = data[idx];

__syncthreads();

float result = sharedData[255 - idx];
data[idx] = result;
}

重要规则

  1. Block 内所有线程必须执行 __syncthreads()
  2. 不能在条件分支中调用(除非所有线程都进入该分支)
  3. 只能同步同一个 Block 内的线程

错误示例

__global__ void badSync(float *data) {
int idx = threadIdx.x;

if (idx < 128) {
data[idx] *= 2;
__syncthreads();
}
}

上面的代码会导致死锁,因为只有部分线程执行了 __syncthreads()

正确示例

__global__ void goodSync(float *data) {
int idx = threadIdx.x;

if (idx < 128) {
data[idx] *= 2;
}

__syncthreads();
}

内存屏障

CUDA 提供了多种内存屏障函数,确保内存访问的顺序性。

__threadfence_block()

确保对共享内存和全局内存的写入对 Block 内其他线程可见:

__global__ void kernel(float *data) {
int idx = threadIdx.x;

data[idx] = 1.0f;
__threadfence_block();

float val = data[(idx + 1) % blockDim.x];
}

__threadfence()

确保对全局内存的写入对所有线程可见:

__global__ void kernel(float *data, int *flag) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;

data[idx] = 1.0f;
__threadfence();

if (idx == 0) {
*flag = 1;
}
}

__threadfence_system()

确保对全局内存的写入对 CPU 和其他 GPU 可见:

__global__ void kernel(float *data, int *flag) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;

data[idx] = 1.0f;
__threadfence_system();

if (idx == 0) {
*flag = 1;
}
}

原子操作

原子操作是不可中断的操作,用于在多线程环境下安全地更新共享数据。

基本原子操作

函数说明
atomicAdd原子加
atomicSub原子减
atomicExch原子交换
atomicMin原子最小值
atomicMax原子最大值
atomicInc原子递增
atomicDec原子递减
atomicCAS原子比较并交换
atomicAnd原子与
atomicOr原子或
atomicXor原子异或

atomicAdd

__global__ void atomicAddKernel(int *counter, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
atomicAdd(counter, 1);
}
}

int main() {
int *d_counter;
cudaMalloc(&d_counter, sizeof(int));
cudaMemset(d_counter, 0, sizeof(int));

atomicAddKernel<<<100, 256>>>(d_counter, 100 * 256);

int h_counter;
cudaMemcpy(&h_counter, d_counter, sizeof(int), cudaMemcpyDeviceToHost);
printf("Counter: %d\n", h_counter);

cudaFree(d_counter);
}

atomicCAS(比较并交换)

atomicCAS 是最基础的原子操作,其他原子操作可以用它实现:

__device__ int myAtomicAdd(int *address, int value) {
int old = *address;
int assumed;

do {
assumed = old;
old = atomicCAS(address, assumed, assumed + value);
} while (old != assumed);

return old;
}

原子操作示例:直方图

__global__ void histogramKernel(int *histogram, unsigned char *data, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;

if (idx < n) {
int bin = data[idx];
atomicAdd(&histogram[bin], 1);
}
}

原子操作示例:最大值

__global__ void maxKernel(float *data, float *maxVal, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;

if (idx < n) {
atomicMax(maxVal, data[idx]);
}
}

浮点数原子操作

CUDA 支持浮点数的原子加法:

__global__ void atomicAddFloat(float *data, float value) {
atomicAdd(data, value);
}

其他浮点原子操作需要使用 atomicCAS 实现:

__device__ float atomicMaxFloat(float *address, float value) {
int *address_as_int = (int*)address;
int old = *address_as_int;
int assumed;

do {
assumed = old;
old = atomicCAS(address_as_int, assumed,
__float_as_int(fmaxf(value, __int_as_float(assumed))));
} while (old != assumed);

return __int_as_float(old);
}

Warp 级同步

CUDA 9.0 引入了 Warp 级同步原语。

__syncwarp()

同步 Warp 内的线程:

__global__ void warpSyncKernel(float *data) {
int laneId = threadIdx.x % 32;

data[threadIdx.x] = laneId;
__syncwarp();

float val = data[(threadIdx.x + 16) % 32];
}

Warp 级原语

函数说明
__shfl_sync线程间数据交换
__shfl_up_sync向上移位交换
__shfl_down_sync向下移位交换
__shfl_xor_sync异或交换
__all_sync全部为真
__any_sync任一为真
__ballot_sync位掩码投票

__shfl_sync

在 Warp 内交换数据:

__global__ void shflExample() {
int laneId = threadIdx.x % 32;
int value = laneId;

int valueFromLane0 = __shfl_sync(0xffffffff, value, 0);

printf("Lane %d: value from lane 0 = %d\n", laneId, valueFromLane0);
}

__shfl_up_sync 和 __shfl_down_sync

__global__ void scanKernel(float *data, int n) {
int laneId = threadIdx.x % 32;
int idx = blockIdx.x * blockDim.x + threadIdx.x;

float value = data[idx];

for (int offset = 1; offset < 32; offset *= 2) {
float temp = __shfl_up_sync(0xffffffff, value, offset);
if (laneId >= offset) {
value += temp;
}
}

data[idx] = value;
}

__ballot_sync

收集 Warp 内的条件结果:

__global__ void ballotExample(int *data, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;

if (idx < n) {
int condition = (data[idx] > 0);
unsigned int mask = __ballot_sync(0xffffffff, condition);

if (threadIdx.x % 32 == 0) {
printf("Warp mask: 0x%08x\n", mask);
}
}
}

协作组

CUDA 9.0 引入了协作组(Cooperative Groups),提供更灵活的线程协作方式。

基本用法

#include <cooperative_groups.h>

namespace cg = cooperative_groups;

__global__ void kernel(float *data) {
cg::thread_block block = cg::this_thread_block();

int idx = block.thread_rank();
data[idx] = idx;

cg::sync(block);

float val = data[(idx + 1) % block.size()];
}

线程组类型

类型说明
thread_block一个 Block 内的所有线程
thread_block_tile<N>固定大小的线程组
coalesced_group合并访问的线程组
grid_group整个 Grid 的线程

thread_block_tile

__global__ void tileExample(float *data) {
cg::thread_block block = cg::this_thread_block();
cg::thread_block_tile<32> tile = cg::tiled_partition<32>(block);

int idx = tile.thread_rank();
float value = data[block.thread_rank()];

value = tile.shfl(value, 0);

data[block.thread_rank()] = value;
}

Grid 同步

使用协作组实现 Grid 级同步:

#include <cooperative_groups.h>

namespace cg = cooperative_groups;

__global__ void gridSyncKernel(float *data, int n) {
cg::grid_group grid = cg::this_grid();

int idx = grid.thread_rank();
if (idx < n) {
data[idx] = idx;
}

cg::sync(grid);

if (idx < n) {
data[idx] = data[(idx + 1) % n];
}
}

int main() {
int n = 10000;
int blockSize = 256;
int gridSize = (n + blockSize - 1) / blockSize;

void *args[] = {&d_data, &n};
cudaLaunchCooperativeKernel((void*)gridSyncKernel, gridSize, blockSize, args);

cudaDeviceSynchronize();
}

流同步

cudaDeviceSynchronize()

等待设备上所有任务完成:

kernel<<<grid, block>>>(args);
cudaDeviceSynchronize();

cudaStreamSynchronize()

等待特定流中的任务完成:

cudaStream_t stream;
cudaStreamCreate(&stream);

kernel<<<grid, block, 0, stream>>>(args);
cudaStreamSynchronize(stream);

cudaStreamDestroy(stream);

cudaEventSynchronize()

使用事件进行同步:

cudaEvent_t event;
cudaEventCreate(&event);

kernel<<<grid, block>>>(args);
cudaEventRecord(event);

cudaEventSynchronize(event);

cudaEventDestroy(event);

小结

本章介绍了 CUDA 的同步机制:

  1. 线程同步__syncthreads() 同步 Block 内线程
  2. 内存屏障__threadfence_block()__threadfence()__threadfence_system()
  3. 原子操作atomicAddatomicCAS
  4. Warp 级同步__syncwarp()__shfl_sync()
  5. 协作组:灵活的线程组同步
  6. 流同步cudaDeviceSynchronize()cudaStreamSynchronize()

正确使用同步机制是编写正确 CUDA 程序的关键。下一章将介绍 流与事件,学习如何实现异步执行和任务并发。