同步工具类
JUC 提供了一系列同步工具类,用于协调多个线程的执行顺序。这些工具类是并发编程中的重要组件,能够解决各种线程协调问题。
CountDownLatch
CountDownLatch(闭锁)是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。
工作原理
CountDownLatch 内部维护一个计数器,初始值为指定的数量。每次调用 countDown() 方法,计数器减 1。当计数器为 0 时,所有等待的线程被唤醒。
初始计数 = 3
线程A调用 await() ──→ 等待
线程B调用 await() ──→ 等待
线程C调用 await() ──→ 等待
线程1完成 ──→ countDown() ──→ 计数 = 2
线程2完成 ──→ countDown() ──→ 计数 = 1
线程3完成 ──→ countDown() ──→ 计数 = 0 ──→ 唤醒所有等待线程
基本使用
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
new Thread(() -> {
System.out.println("Worker " + workerId + " 开始工作");
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker " + workerId + " 完成工作");
latch.countDown();
}).start();
}
System.out.println("主线程等待所有 Worker 完成...");
latch.await();
System.out.println("所有 Worker 已完成,主线程继续执行");
}
}
典型应用场景
场景一:等待多个服务启动
import java.util.concurrent.CountDownLatch;
public class ServiceStartupDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Thread(new Service("数据库服务", 2000, latch)).start();
new Thread(new Service("缓存服务", 1000, latch)).start();
new Thread(new Service("消息队列服务", 1500, latch)).start();
System.out.println("等待所有服务启动...");
latch.await();
System.out.println("所有服务启动完成,系统就绪");
}
}
class Service implements Runnable {
private final String name;
private final int startupTime;
private final CountDownLatch latch;
Service(String name, int startupTime, CountDownLatch latch) {
this.name = name;
this.startupTime = startupTime;
this.latch = latch;
}
@Override
public void run() {
System.out.println(name + " 正在启动...");
try {
Thread.sleep(startupTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " 启动完成");
latch.countDown();
}
}
场景二:并发测试
import java.util.concurrent.CountDownLatch;
public class ConcurrentTestDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
CountDownLatch readyLatch = new CountDownLatch(threadCount);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int id = i;
new Thread(() -> {
readyLatch.countDown();
try {
startLatch.await();
System.out.println("线程 " + id + " 开始执行");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + id + " 执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
finishLatch.countDown();
}
}).start();
}
readyLatch.await();
System.out.println("所有线程就绪,开始并发测试");
startLatch.countDown();
finishLatch.await();
System.out.println("所有线程执行完成");
}
}
超时等待
boolean completed = latch.await(5, TimeUnit.SECONDS);
if (completed) {
System.out.println("所有任务在超时时间内完成");
} else {
System.out.println("等待超时,仍有任务未完成");
}
注意事项
- CountDownLatch 的计数器只能减少,不能重置
- countDown() 可以在任何线程调用
- await() 可以被多个线程同时调用
- 计数器为 0 后,后续的 await() 会立即返回
CyclicBarrier
CyclicBarrier(循环栅栏)让一组线程互相等待,直到所有线程都到达某个屏障点。
与 CountDownLatch 的区别
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 计数方向 | 递减 | 递增 |
| 可重用 | 否 | 是 |
| 等待者 | 主线程等待工作线程 | 工作线程互相等待 |
| 触发条件 | 计数归零 | 到达指定数量 |
基本使用
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int partyCount = 3;
CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
System.out.println("=== 所有线程到达屏障,开始下一阶段 ===");
});
for (int i = 0; i < partyCount; i++) {
final int id = i;
new Thread(() -> {
try {
for (int phase = 1; phase <= 3; phase++) {
System.out.println("线程 " + id + " 阶段 " + phase + " 准备中...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + id + " 阶段 " + phase + " 就绪,等待其他线程");
barrier.await();
System.out.println("线程 " + id + " 阶段 " + phase + " 开始执行");
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
典型应用场景
场景:多线程计算汇总
import java.util.concurrent.*;
public class ParallelComputeDemo {
public static void main(String[] args) {
int threadCount = 4;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程计算完成,开始汇总结果");
});
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
int[] data = new int[1000];
for (int i = 0; i < data.length; i++) {
data[i] = i + 1;
}
int chunkSize = data.length / threadCount;
int[] partialSums = new int[threadCount];
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? data.length : start + chunkSize;
executor.submit(() -> {
int sum = 0;
for (int j = start; j < end; j++) {
sum += data[j];
}
partialSums[threadId] = sum;
System.out.println("线程 " + threadId + " 计算部分和: " + sum);
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
超时和重置
CyclicBarrier barrier = new CyclicBarrier(3);
try {
barrier.await(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("等待超时");
}
barrier.reset();
System.out.println("屏障已重置");
Semaphore
Semaphore(信号量)用于控制同时访问某个资源的线程数量。
工作原理
Semaphore 维护一组许可,线程通过 acquire() 获取许可,通过 release() 释放许可。如果许可不足,线程会阻塞等待。
Semaphore(3) - 初始许可数为3
线程1 acquire() ──→ 获取许可,剩余2
线程2 acquire() ──→ 获取许可,剩余1
线程3 acquire() ──→ 获取许可,剩余0
线程4 acquire() ──→ 阻塞等待
线程1 release() ──→ 释放许可,剩余1 ──→ 线程4获取许可
基本使用
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
int maxConcurrent = 3;
Semaphore semaphore = new Semaphore(maxConcurrent);
for (int i = 0; i < 10; i++) {
final int id = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程 " + id + " 获取许可,开始执行");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程 " + id + " 执行完成,释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
典型应用场景
场景一:数据库连接池
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class ConnectionPool {
private final Semaphore semaphore;
private final Connection[] connections;
private final boolean[] used;
public ConnectionPool(int size) {
this.semaphore = new Semaphore(size);
this.connections = new Connection[size];
this.used = new boolean[size];
for (int i = 0; i < size; i++) {
connections[i] = new Connection("Connection-" + i);
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
synchronized (this) {
for (int i = 0; i < used.length; i++) {
if (!used[i]) {
used[i] = true;
return connections[i];
}
}
}
throw new IllegalStateException("无可用连接");
}
public void releaseConnection(Connection connection) {
synchronized (this) {
for (int i = 0; i < connections.length; i++) {
if (connections[i] == connection) {
used[i] = false;
semaphore.release();
return;
}
}
}
}
static class Connection {
private final String name;
Connection(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool(3);
for (int i = 0; i < 6; i++) {
final int id = i;
new Thread(() -> {
try {
Connection conn = pool.getConnection();
System.out.println("线程 " + id + " 获取连接: " + conn);
Thread.sleep(2000);
System.out.println("线程 " + id + " 释放连接: " + conn);
pool.releaseConnection(conn);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
场景二:限流
import java.util.concurrent.Semaphore;
public class RateLimiter {
private final Semaphore semaphore;
public RateLimiter(int maxRequests) {
this.semaphore = new Semaphore(maxRequests);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void release() {
semaphore.release();
}
public static void main(String[] args) {
RateLimiter limiter = new RateLimiter(3);
for (int i = 0; i < 10; i++) {
if (limiter.tryAcquire()) {
System.out.println("请求 " + i + " 被接受");
} else {
System.out.println("请求 " + i + " 被拒绝");
}
}
}
}
公平与非公平
Semaphore fairSemaphore = new Semaphore(3, true); // 公平
Semaphore unfairSemaphore = new Semaphore(3, false); // 非公平(默认)
公平信号量按照请求顺序分配许可,非公平信号量允许插队。
Exchanger
Exchanger 用于两个线程之间交换数据。当一个线程调用 exchange() 方法后,会等待另一个线程也调用 exchange(),然后两个线程交换数据。
基本使用
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
String data = "来自线程A的数据";
try {
System.out.println("线程A 准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("线程A 收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
String data = "来自线程B的数据";
try {
Thread.sleep(1000);
System.out.println("线程B 准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("线程B 收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
典型应用场景
场景:生产者-消费者数据交换
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class ExchangerProducerConsumer {
public static void main(String[] args) {
Exchanger<List<String>> exchanger = new Exchanger<>();
new Thread(() -> {
List<String> buffer = new ArrayList<>();
for (int i = 0; i < 3; i++) {
buffer.clear();
for (int j = 0; j < 5; j++) {
buffer.add("数据-" + i + "-" + j);
}
System.out.println("生产者生产: " + buffer);
try {
buffer = exchanger.exchange(buffer);
System.out.println("生产者收到空缓冲区");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
List<String> buffer = new ArrayList<>();
for (int i = 0; i < 3; i++) {
try {
buffer = exchanger.exchange(buffer);
System.out.println("消费者消费: " + buffer);
buffer.clear();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
超时交换
try {
String result = exchanger.exchange(data, 5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("交换超时");
}
Phaser
Phaser 是 Java 7 引入的同步工具,结合了 CountDownLatch 和 CyclicBarrier 的功能,更加灵活。
基本使用
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
System.out.println("线程 " + id + " 阶段1开始");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + id + " 阶段1完成");
System.out.println("线程 " + id + " 阶段2开始");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + id + " 阶段2完成");
phaser.arriveAndDeregister();
}).start();
}
}
}
动态注册
import java.util.concurrent.Phaser;
public class DynamicPhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(1);
for (int i = 0; i < 5; i++) {
phaser.register();
final int id = i;
new Thread(() -> {
System.out.println("线程 " + id + " 到达");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + id + " 继续");
}).start();
}
phaser.arriveAndDeregister();
}
}
核心方法
| 方法 | 说明 |
|---|---|
| register() | 注册一个参与者 |
| arrive() | 到达,不等待其他线程 |
| arriveAndAwaitAdvance() | 到达并等待其他线程 |
| arriveAndDeregister() | 到达并注销 |
| getPhase() | 获取当前阶段号 |
| awaitAdvance(int) | 等待指定阶段完成 |
同步工具选择指南
| 场景 | 推荐工具 |
|---|---|
| 等待多个线程完成 | CountDownLatch |
| 线程互相等待(可重用) | CyclicBarrier |
| 控制并发数量 | Semaphore |
| 两个线程交换数据 | Exchanger |
| 多阶段任务协调 | Phaser |
小结
本章介绍了 JUC 提供的同步工具类:
- CountDownLatch:一次性栅栏,用于等待一组线程完成
- CyclicBarrier:可循环使用的栅栏,让线程互相等待
- Semaphore:信号量,控制并发访问数量
- Exchanger:两个线程之间交换数据
- Phaser:多阶段任务协调,更加灵活
这些同步工具类是并发编程中协调线程执行的重要手段,理解它们的原理和适用场景,能够更好地解决线程协调问题。