跳到主要内容

同步工具类

JUC 提供了一系列同步工具类,用于协调多个线程的执行顺序。这些工具类是并发编程中的重要组件,能够解决各种线程协调问题。

CountDownLatch

CountDownLatch(闭锁)是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。

工作原理

CountDownLatch 内部维护一个计数器,初始值为指定的数量。每次调用 countDown() 方法,计数器减 1。当计数器为 0 时,所有等待的线程被唤醒。

初始计数 = 3

线程A调用 await() ──→ 等待
线程B调用 await() ──→ 等待
线程C调用 await() ──→ 等待

线程1完成 ──→ countDown() ──→ 计数 = 2
线程2完成 ──→ countDown() ──→ 计数 = 1
线程3完成 ──→ countDown() ──→ 计数 = 0 ──→ 唤醒所有等待线程

基本使用

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);

for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
System.out.println("Worker " + workerId + " 开始工作");
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker " + workerId + " 完成工作");
latch.countDown();
}).start();
}

System.out.println("主线程等待所有 Worker 完成...");
latch.await();
System.out.println("所有 Worker 已完成,主线程继续执行");
}
}

典型应用场景

场景一:等待多个服务启动

import java.util.concurrent.CountDownLatch;

public class ServiceStartupDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

new Thread(new Service("数据库服务", 2000, latch)).start();
new Thread(new Service("缓存服务", 1000, latch)).start();
new Thread(new Service("消息队列服务", 1500, latch)).start();

System.out.println("等待所有服务启动...");
latch.await();
System.out.println("所有服务启动完成,系统就绪");
}
}

class Service implements Runnable {
private final String name;
private final int startupTime;
private final CountDownLatch latch;

Service(String name, int startupTime, CountDownLatch latch) {
this.name = name;
this.startupTime = startupTime;
this.latch = latch;
}

@Override
public void run() {
System.out.println(name + " 正在启动...");
try {
Thread.sleep(startupTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " 启动完成");
latch.countDown();
}
}

场景二:并发测试

import java.util.concurrent.CountDownLatch;

public class ConcurrentTestDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
CountDownLatch readyLatch = new CountDownLatch(threadCount);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
final int id = i;
new Thread(() -> {
readyLatch.countDown();
try {
startLatch.await();
System.out.println("线程 " + id + " 开始执行");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + id + " 执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
finishLatch.countDown();
}
}).start();
}

readyLatch.await();
System.out.println("所有线程就绪,开始并发测试");
startLatch.countDown();

finishLatch.await();
System.out.println("所有线程执行完成");
}
}

超时等待

boolean completed = latch.await(5, TimeUnit.SECONDS);
if (completed) {
System.out.println("所有任务在超时时间内完成");
} else {
System.out.println("等待超时,仍有任务未完成");
}

注意事项

  • CountDownLatch 的计数器只能减少,不能重置
  • countDown() 可以在任何线程调用
  • await() 可以被多个线程同时调用
  • 计数器为 0 后,后续的 await() 会立即返回

CyclicBarrier

CyclicBarrier(循环栅栏)让一组线程互相等待,直到所有线程都到达某个屏障点。

与 CountDownLatch 的区别

特性CountDownLatchCyclicBarrier
计数方向递减递增
可重用
等待者主线程等待工作线程工作线程互相等待
触发条件计数归零到达指定数量

基本使用

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
public static void main(String[] args) {
int partyCount = 3;
CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
System.out.println("=== 所有线程到达屏障,开始下一阶段 ===");
});

for (int i = 0; i < partyCount; i++) {
final int id = i;
new Thread(() -> {
try {
for (int phase = 1; phase <= 3; phase++) {
System.out.println("线程 " + id + " 阶段 " + phase + " 准备中...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + id + " 阶段 " + phase + " 就绪,等待其他线程");
barrier.await();
System.out.println("线程 " + id + " 阶段 " + phase + " 开始执行");
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}

典型应用场景

场景:多线程计算汇总

import java.util.concurrent.*;

public class ParallelComputeDemo {
public static void main(String[] args) {
int threadCount = 4;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程计算完成,开始汇总结果");
});

ExecutorService executor = Executors.newFixedThreadPool(threadCount);
int[] data = new int[1000];
for (int i = 0; i < data.length; i++) {
data[i] = i + 1;
}

int chunkSize = data.length / threadCount;
int[] partialSums = new int[threadCount];

for (int i = 0; i < threadCount; i++) {
final int threadId = i;
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? data.length : start + chunkSize;

executor.submit(() -> {
int sum = 0;
for (int j = start; j < end; j++) {
sum += data[j];
}
partialSums[threadId] = sum;
System.out.println("线程 " + threadId + " 计算部分和: " + sum);

try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
});
}

executor.shutdown();
}
}

超时和重置

CyclicBarrier barrier = new CyclicBarrier(3);

try {
barrier.await(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("等待超时");
}

barrier.reset();
System.out.println("屏障已重置");

Semaphore

Semaphore(信号量)用于控制同时访问某个资源的线程数量。

工作原理

Semaphore 维护一组许可,线程通过 acquire() 获取许可,通过 release() 释放许可。如果许可不足,线程会阻塞等待。

Semaphore(3) - 初始许可数为3

线程1 acquire() ──→ 获取许可,剩余2
线程2 acquire() ──→ 获取许可,剩余1
线程3 acquire() ──→ 获取许可,剩余0
线程4 acquire() ──→ 阻塞等待
线程1 release() ──→ 释放许可,剩余1 ──→ 线程4获取许可

基本使用

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
public static void main(String[] args) {
int maxConcurrent = 3;
Semaphore semaphore = new Semaphore(maxConcurrent);

for (int i = 0; i < 10; i++) {
final int id = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程 " + id + " 获取许可,开始执行");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程 " + id + " 执行完成,释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}

典型应用场景

场景一:数据库连接池

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class ConnectionPool {
private final Semaphore semaphore;
private final Connection[] connections;
private final boolean[] used;

public ConnectionPool(int size) {
this.semaphore = new Semaphore(size);
this.connections = new Connection[size];
this.used = new boolean[size];

for (int i = 0; i < size; i++) {
connections[i] = new Connection("Connection-" + i);
}
}

public Connection getConnection() throws InterruptedException {
semaphore.acquire();

synchronized (this) {
for (int i = 0; i < used.length; i++) {
if (!used[i]) {
used[i] = true;
return connections[i];
}
}
}
throw new IllegalStateException("无可用连接");
}

public void releaseConnection(Connection connection) {
synchronized (this) {
for (int i = 0; i < connections.length; i++) {
if (connections[i] == connection) {
used[i] = false;
semaphore.release();
return;
}
}
}
}

static class Connection {
private final String name;

Connection(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}
}

public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool(3);

for (int i = 0; i < 6; i++) {
final int id = i;
new Thread(() -> {
try {
Connection conn = pool.getConnection();
System.out.println("线程 " + id + " 获取连接: " + conn);
Thread.sleep(2000);
System.out.println("线程 " + id + " 释放连接: " + conn);
pool.releaseConnection(conn);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}

场景二:限流

import java.util.concurrent.Semaphore;

public class RateLimiter {
private final Semaphore semaphore;

public RateLimiter(int maxRequests) {
this.semaphore = new Semaphore(maxRequests);
}

public boolean tryAcquire() {
return semaphore.tryAcquire();
}

public void release() {
semaphore.release();
}

public static void main(String[] args) {
RateLimiter limiter = new RateLimiter(3);

for (int i = 0; i < 10; i++) {
if (limiter.tryAcquire()) {
System.out.println("请求 " + i + " 被接受");
} else {
System.out.println("请求 " + i + " 被拒绝");
}
}
}
}

公平与非公平

Semaphore fairSemaphore = new Semaphore(3, true);   // 公平
Semaphore unfairSemaphore = new Semaphore(3, false); // 非公平(默认)

公平信号量按照请求顺序分配许可,非公平信号量允许插队。

Exchanger

Exchanger 用于两个线程之间交换数据。当一个线程调用 exchange() 方法后,会等待另一个线程也调用 exchange(),然后两个线程交换数据。

基本使用

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
String data = "来自线程A的数据";
try {
System.out.println("线程A 准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("线程A 收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
String data = "来自线程B的数据";
try {
Thread.sleep(1000);
System.out.println("线程B 准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("线程B 收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

典型应用场景

场景:生产者-消费者数据交换

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class ExchangerProducerConsumer {
public static void main(String[] args) {
Exchanger<List<String>> exchanger = new Exchanger<>();

new Thread(() -> {
List<String> buffer = new ArrayList<>();
for (int i = 0; i < 3; i++) {
buffer.clear();
for (int j = 0; j < 5; j++) {
buffer.add("数据-" + i + "-" + j);
}
System.out.println("生产者生产: " + buffer);
try {
buffer = exchanger.exchange(buffer);
System.out.println("生产者收到空缓冲区");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

new Thread(() -> {
List<String> buffer = new ArrayList<>();
for (int i = 0; i < 3; i++) {
try {
buffer = exchanger.exchange(buffer);
System.out.println("消费者消费: " + buffer);
buffer.clear();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

超时交换

try {
String result = exchanger.exchange(data, 5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("交换超时");
}

Phaser

Phaser 是 Java 7 引入的同步工具,结合了 CountDownLatch 和 CyclicBarrier 的功能,更加灵活。

基本使用

import java.util.concurrent.Phaser;

public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);

for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
System.out.println("线程 " + id + " 阶段1开始");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + id + " 阶段1完成");

System.out.println("线程 " + id + " 阶段2开始");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + id + " 阶段2完成");

phaser.arriveAndDeregister();
}).start();
}
}
}

动态注册

import java.util.concurrent.Phaser;

public class DynamicPhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(1);

for (int i = 0; i < 5; i++) {
phaser.register();
final int id = i;
new Thread(() -> {
System.out.println("线程 " + id + " 到达");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + id + " 继续");
}).start();
}

phaser.arriveAndDeregister();
}
}

核心方法

方法说明
register()注册一个参与者
arrive()到达,不等待其他线程
arriveAndAwaitAdvance()到达并等待其他线程
arriveAndDeregister()到达并注销
getPhase()获取当前阶段号
awaitAdvance(int)等待指定阶段完成

同步工具选择指南

场景推荐工具
等待多个线程完成CountDownLatch
线程互相等待(可重用)CyclicBarrier
控制并发数量Semaphore
两个线程交换数据Exchanger
多阶段任务协调Phaser

小结

本章介绍了 JUC 提供的同步工具类:

  1. CountDownLatch:一次性栅栏,用于等待一组线程完成
  2. CyclicBarrier:可循环使用的栅栏,让线程互相等待
  3. Semaphore:信号量,控制并发访问数量
  4. Exchanger:两个线程之间交换数据
  5. Phaser:多阶段任务协调,更加灵活

这些同步工具类是并发编程中协调线程执行的重要手段,理解它们的原理和适用场景,能够更好地解决线程协调问题。