JUC 知识速查表
本文档提供 JUC 常用类和方法的快速参考,方便开发时查阅。
锁机制
ReentrantLock
ReentrantLock lock = new ReentrantLock(); // 非公平锁(默认)
ReentrantLock lock = new ReentrantLock(true); // 公平锁
lock.lock(); // 获取锁(阻塞)
lock.lockInterruptibly(); // 可中断获取锁
lock.tryLock(); // 尝试获取锁(立即返回布尔值)
lock.tryLock(3, TimeUnit.SECONDS); // 超时获取锁
lock.unlock(); // 释放锁(必须在finally中调用)
// 状态查询
lock.isLocked(); // 是否被锁定
lock.isHeldByCurrentThread(); // 是否由当前线程持有
lock.getQueueLength(); // 等待队列长度
lock.hasQueuedThreads(); // 是否有等待线程
lock.getHoldCount(); // 当前线程持有锁的次数
ReentrantReadWriteLock
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
// 读锁(共享锁)
readLock.lock();
try {
// 读取操作
} finally {
readLock.unlock();
}
// 写锁(排他锁)
writeLock.lock();
try {
// 写入操作
} finally {
writeLock.unlock();
}
// 锁降级:写锁 → 读锁
writeLock.lock();
try {
// 更新数据
readLock.lock(); // 获取读锁
} finally {
writeLock.unlock(); // 释放写锁,此时仍持有读锁
}
try {
// 读取数据
} finally {
readLock.unlock();
}
StampedLock
StampedLock sl = new StampedLock();
// 写锁
long stamp = sl.writeLock();
try {
// 写入操作
} finally {
sl.unlockWrite(stamp);
}
// 悲观读锁
stamp = sl.readLock();
try {
// 读取操作
} finally {
sl.unlockRead(stamp);
}
// 乐观读锁(无锁读取,性能最高)
stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
// 验证失败,升级为悲观读锁
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
// 锁转换
stamp = sl.readLock();
try {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
// 执行写操作
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
// 执行写操作
}
} finally {
sl.unlock(stamp);
}
Condition
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// 等待
lock.lock();
try {
while (条件不满足) {
condition.await(); // 无限等待
condition.await(3, TimeUnit.SECONDS); // 超时等待
condition.awaitNanos(1_000_000_000); // 纳秒超时
condition.awaitUntil(deadline); // 等待到指定时间
}
// 执行操作
} finally {
lock.unlock();
}
// 唤醒
lock.lock();
try {
condition.signal(); // 唤醒一个等待线程
condition.signalAll(); // 唤醒所有等待线程
} finally {
lock.unlock();
}
线程池
ThreadPoolExecutor 参数配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心线程数(常驻线程)
maximumPoolSize, // 最大线程数
keepAliveTime, // 非核心线程空闲存活时间
TimeUnit.SECONDS, // 时间单位
workQueue, // 任务队列
threadFactory, // 线程工厂(可自定义线程名称、优先级等)
handler // 拒绝策略
);
任务队列选择
| 队列类型 | 容量 | 特点 | 适用场景 |
|---|---|---|---|
| ArrayBlockingQueue | 有界 | 数组实现,公平/非公平可选 | 资源受限,防止任务堆积 |
| LinkedBlockingQueue | 可选有界 | 链表实现,默认无界 | 任务量可控的场景 |
| SynchronousQueue | 无容量 | 直接传递,不存储 | 高吞吐,任务处理快 |
| PriorityBlockingQueue | 无界 | 支持优先级排序 | 任务有优先级需求 |
| DelayQueue | 无界 | 延迟获取元素 | 定时任务、延迟处理 |
| LinkedTransferQueue | 无界 | 支持transfer操作 | 生产者消费者模式 |
拒绝策略
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy(默认) | 抛出 RejectedExecutionException | 需要快速发现问题的场景 |
| CallerRunsPolicy | 由调用者线程执行任务 | 降级处理,减缓提交速度 |
| DiscardPolicy | 直接丢弃,不抛异常 | 允许任务丢失的场景 |
| DiscardOldestPolicy | 丢弃队列中最老的任务 | 优先处理新任务 |
| 自定义 | 实现RejectedExecutionHandler | 记录日志、持久化、告警等 |
常用方法
// 提交任务
executor.execute(Runnable); // 无返回值
Future<T> future = executor.submit(Callable); // 有返回值
Future<?> future = executor.submit(Runnable); // 返回null
// 批量执行
List<Future<T>> futures = executor.invokeAll(callables); // 执行所有
T result = executor.invokeAny(callables); // 任一成功即返回
// 关闭线程池
executor.shutdown(); // 平滑关闭,等待任务完成
List<Runnable> unfinished = executor.shutdownNow(); // 立即关闭,返回未执行任务
boolean terminated = executor.awaitTermination(60, TimeUnit.SECONDS); // 等待终止
// 状态查询
executor.isShutdown(); // 是否已调用shutdown
executor.isTerminated(); // 是否已完全终止
executor.getActiveCount(); // 活动线程数
executor.getPoolSize(); // 当前线程数
executor.getQueue().size(); // 队列大小
executor.getCompletedTaskCount(); // 已完成任务数
线程池大小配置参考
int cores = Runtime.getRuntime().availableProcessors();
// CPU密集型任务
int cpuIntensiveSize = cores + 1;
// IO密集型任务
int ioIntensiveSize = cores * 2;
// 混合型任务(waitTime为等待时间,computeTime为计算时间)
int mixedSize = (int) (cores * (1 + waitTime / computeTime));
并发集合
ConcurrentHashMap
ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();
ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(16); // 初始容量
ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(16, 0.75f, 4); // 容量、负载因子、并发级别
// 基本操作
map.put(K, V); // 存入
map.putAll(Map); // 批量存入
map.putIfAbsent(K, V); // 不存在才存入,返回旧值或null
map.get(K); // 获取
map.getOrDefault(K, defaultValue); // 获取或默认值
map.remove(K); // 删除
map.remove(K, V); // 条件删除(值匹配才删除)
map.replace(K, V); // 替换
map.replace(K, oldV, newV); // 条件替换
map.clear(); // 清空
map.size(); // 大小
map.isEmpty(); // 是否为空
map.containsKey(K); // 是否包含键
map.containsValue(V); // 是否包含值
// 原子复合操作
map.computeIfAbsent(K, k -> createValue(k)); // 不存在则计算并插入
map.computeIfPresent(K, (k, v) -> newValue); // 存在则更新
map.compute(K, (k, v) -> newValue); // 计算(存在或不存在都执行)
map.merge(K, V, (oldV, newV) -> mergedValue); // 合并
// 批量操作
map.forEach((k, v) -> process(k, v));
map.forEach((k, v) -> process(k, v), parallelismThreshold);
map.replaceAll((k, v) -> newValue);
map.search(parallelismThreshold, (k, v) -> searchResult);
map.reduce(parallelismThreshold, (k, v) -> value, (v1, v2) -> combined);
CopyOnWriteArrayList / CopyOnWriteArraySet
CopyOnWriteArrayList<E> list = new CopyOnWriteArrayList<>();
CopyOnWriteArraySet<E> set = new CopyOnWriteArraySet<>();
// List操作
list.add(E); // 添加(复制数组)
list.addIfAbsent(E); // 不存在才添加
list.addAllAbsent(Collection); // 批量添加不存在的元素
list.get(int); // 获取
list.set(int, E); // 设置
list.remove(int); // 删除索引
list.remove(E); // 删除元素
list.indexOf(E); // 查找索引
list.contains(E); // 是否包含
list.iterator(); // 迭代器(弱一致性,不抛ConcurrentModificationException)
// Set操作
set.add(E);
set.remove(E);
set.contains(E);
BlockingQueue 接口方法
| 操作 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 入队 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
| 出队 | remove() | poll() | take() | poll(timeout, unit) |
| 查看 | element() | peek() | - | - |
常用 BlockingQueue 实现
// ArrayBlockingQueue - 有界数组队列
BlockingQueue<E> queue = new ArrayBlockingQueue<>(100);
BlockingQueue<E> queue = new ArrayBlockingQueue<>(100, true); // 公平模式
// LinkedBlockingQueue - 可选有界链表队列
BlockingQueue<E> queue = new LinkedBlockingQueue<>(); // 无界(Integer.MAX_VALUE)
BlockingQueue<E> queue = new LinkedBlockingQueue<>(100); // 有界
// PriorityBlockingQueue - 优先级队列
BlockingQueue<E> queue = new PriorityBlockingQueue<>(11, comparator);
// SynchronousQueue - 直接传递队列
BlockingQueue<E> queue = new SynchronousQueue<>(); // 非公平
BlockingQueue<E> queue = new SynchronousQueue<>(true); // 公平
// DelayQueue - 延迟队列
class DelayedElement implements Delayed {
private final long expireTime;
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed o) {
return Long.compare(this.expireTime, ((DelayedElement) o).expireTime);
}
}
BlockingQueue<DelayedElement> queue = new DelayQueue<>();
ConcurrentLinkedQueue
ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<>();
queue.offer(E); // 入队(永不阻塞)
queue.poll(); // 出队(队列为空返回null)
queue.peek(); // 查看队首
queue.isEmpty(); // 是否为空
queue.size(); // 大小(需遍历,O(n)复杂度)
ConcurrentSkipListMap / ConcurrentSkipListSet
// 有序并发Map
ConcurrentSkipListMap<K, V> map = new ConcurrentSkipListMap<>();
ConcurrentSkipListMap<K, V> map = new ConcurrentSkipListMap<>(comparator);
map.firstKey(); // 最小键
map.lastKey(); // 最大键
map.lowerKey(K); // 小于K的最大键
map.higherKey(K); // 大于K的最小键
map.floorKey(K); // 小于等于K的最大键
map.ceilingKey(K); // 大于等于K的最小键
map.descendingMap(); // 降序视图
map.subMap(fromK, toK); // 范围视图
// 有序并发Set
ConcurrentSkipListSet<E> set = new ConcurrentSkipListSet<>();
原子类
基本类型原子类
AtomicInteger ai = new AtomicInteger(0);
AtomicLong al = new AtomicLong(0L);
AtomicBoolean ab = new AtomicBoolean(false);
// 获取和设置
ai.get(); // 获取值
ai.set(10); // 设置值
ai.getAndSet(10); // 设置并返回旧值
ai.lazySet(10); // 延迟设置(最终一致性)
// CAS操作
ai.compareAndSet(0, 10); // CAS,成功返回true
ai.weakCompareAndSet(0, 10); // 弱CAS(可能虚假失败)
// 自增自减
ai.getAndIncrement(); // 自增返回旧值
ai.incrementAndGet(); // 自增返回新值
ai.getAndDecrement(); // 自减返回旧值
ai.decrementAndGet(); // 自减返回新值
// 加减运算
ai.getAndAdd(5); // 加返回旧值
ai.addAndGet(5); // 加返回新值
// 函数式更新
ai.updateAndGet(x -> x * 2); // 更新返回新值
ai.getAndUpdate(x -> x * 2); // 更新返回旧值
ai.accumulateAndGet(5, (x, y) -> x + y); // 累加返回新值
ai.getAndAccumulate(5, (x, y) -> x + y); // 累加返回旧值
引用类型原子类
// AtomicReference
AtomicReference<V> ref = new AtomicReference<>(initialValue);
ref.get();
ref.set(V);
ref.getAndSet(V);
ref.compareAndSet(expected, new);
ref.updateAndGet(v -> newValue);
ref.getAndUpdate(v -> newValue);
// AtomicStampedReference(解决ABA问题,带版本号)
AtomicStampedReference<V> ref = new AtomicStampedReference<>(initialValue, 0);
int[] stampHolder = new int[1];
V value = ref.get(stampHolder); // 获取值和版本号
int stamp = stampHolder[0];
ref.compareAndSet(expectedRef, newRef, expectedStamp, newStamp);
ref.attemptStamp(expectedRef, newStamp);
// AtomicMarkableReference(带布尔标记)
AtomicMarkableReference<V> ref = new AtomicMarkableReference<>(initialValue, false);
boolean[] markHolder = new boolean[1];
V value = ref.get(markHolder);
ref.compareAndSet(expectedRef, newRef, expectedMark, newMark);
数组类型原子类
AtomicIntegerArray array = new AtomicIntegerArray(10);
AtomicIntegerArray array = new AtomicIntegerArray(new int[]{1, 2, 3});
AtomicLongArray longArray = new AtomicLongArray(10);
AtomicReferenceArray<V> refArray = new AtomicReferenceArray<>(10);
// 基本操作
array.get(i);
array.set(i, value);
array.getAndSet(i, newValue);
array.compareAndSet(i, expected, newValue);
// 原子更新
array.getAndIncrement(i);
array.incrementAndGet(i);
array.getAndAdd(i, delta);
array.addAndGet(i, delta);
array.getAndUpdate(i, x -> x * 2);
字段更新器
// 要求:字段必须是volatile且可访问
class Person {
volatile String name;
volatile int age;
}
AtomicReferenceFieldUpdater<Person, String> nameUpdater =
AtomicReferenceFieldUpdater.newUpdater(Person.class, String.class, "name");
AtomicIntegerFieldUpdater<Person> ageUpdater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
Person p = new Person();
nameUpdater.set(p, "张三");
ageUpdater.incrementAndGet(p);
累加器(高并发场景推荐)
// LongAdder - 高性能计数器
LongAdder adder = new LongAdder();
adder.increment(); // 自增
adder.decrement(); // 自减
adder.add(10); // 加值
adder.sum(); // 获取总和
adder.sumThenReset(); // 获取总和并重置
adder.reset(); // 重置
// DoubleAdder
DoubleAdder doubleAdder = new DoubleAdder();
doubleAdder.add(1.5);
doubleAdder.sum();
// LongAccumulator - 自定义累加函数
LongAccumulator maxAcc = new LongAccumulator(Long::max, Long.MIN_VALUE);
LongAccumulator sumAcc = new LongAccumulator(Long::sum, 0L);
maxAcc.accumulate(value); // 累加
maxAcc.get(); // 获取结果
// DoubleAccumulator
DoubleAccumulator acc = new DoubleAccumulator((x, y) -> x * y, 1.0);
acc.accumulate(2.0);
VarHandle(JDK 9+)
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
// 创建 VarHandle
MethodHandles.Lookup lookup = MethodHandles.lookup();
VarHandle handle = lookup.findVarHandle(MyClass.class, "fieldName", int.class);
// 访问私有字段
VarHandle privateHandle = MethodHandles.privateLookupIn(MyClass.class, lookup)
.findVarHandle(MyClass.class, "privateField", int.class);
// 数组 VarHandle
VarHandle arrayHandle = MethodHandles.arrayElementVarHandle(int[].class);
// 读取操作
handle.get(obj); // 普通读
handle.getVolatile(obj); // volatile 读
handle.getAcquire(obj); // acquire 读
handle.getOpaque(obj); // opaque 读
// 写入操作
handle.set(obj, value); // 普通写
handle.setVolatile(obj, value); // volatile 写
handle.setRelease(obj, value); // release 写
handle.setOpaque(obj, value); // opaque 写
// 原子操作
handle.compareAndSet(obj, expected, newValue); // CAS
handle.compareAndExchange(obj, expected, newValue); // CAS 返回旧值
handle.getAndSet(obj, newValue); // 原子设置
handle.getAndAdd(obj, delta); // 原子加
handle.getAndIncrement(obj); // 原子自增
handle.getAndBitwiseOr(obj, mask); // 原子位或
// 内存排序模式
// Plain: 无内存屏障,仅保证位原子性
// Opaque: 保证原子性和单变量顺序一致性
// Acquire: 获取语义,保证后续读不会被重排到此之前
// Release: 释放语义,保证之前写不会被重排到此之后
// Volatile: 完全的 volatile 语义
同步工具类
CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
// 等待线程
latch.await(); // 阻塞等待计数归零
latch.await(5, TimeUnit.SECONDS); // 超时等待
latch.await(5, TimeUnit.SECONDS); // 返回boolean表示是否完成
// 计数线程
latch.countDown(); // 计数减1
latch.getCount(); // 获取当前计数
// 注意:计数器只能递减,不能重置
CyclicBarrier
CyclicBarrier barrier = new CyclicBarrier(3); // 3个线程
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 屏障动作:所有线程到达后执行
});
// 等待
int arrivalIndex = barrier.await(); // 返回到达索引(0到parties-1)
barrier.await(5, TimeUnit.SECONDS); // 超时等待
// 状态和重置
barrier.reset(); // 重置屏障
barrier.getParties(); // 获取参与线程数
barrier.getNumberWaiting(); // 获取当前等待数
barrier.isBroken(); // 是否被破坏
Semaphore
Semaphore sem = new Semaphore(3); // 非公平(默认)
Semaphore sem = new Semaphore(3, true); // 公平模式
// 获取许可
sem.acquire(); // 获取1个许可(阻塞)
sem.acquire(2); // 获取多个许可
sem.acquireUninterruptibly(); // 不可中断获取
sem.tryAcquire(); // 尝试获取(立即返回)
sem.tryAcquire(5, TimeUnit.SECONDS); // 超时获取
// 释放许可
sem.release(); // 释放1个许可
sem.release(2); // 释放多个许可
// 状态查询
sem.availablePermits(); // 可用许可数
sem.hasQueuedThreads(); // 是否有等待线程
sem.getQueueLength(); // 等待队列长度
sem.drainPermits(); // 获取并返回所有可用许可
Exchanger
Exchanger<V> exchanger = new Exchanger<>();
// 交换数据
V received = exchanger.exchange(V); // 阻塞交换
V received = exchanger.exchange(V, 5, TimeUnit.SECONDS); // 超时交换
Phaser
Phaser phaser = new Phaser(3); // 初始注册3个参与者
Phaser phaser = new Phaser(); // 初始无参与者
// 注册
phaser.register(); // 注册1个参与者
phaser.bulkRegister(5); // 批量注册
// 到达和等待
int phase = phaser.arrive(); // 到达,不等待
phaser.arriveAndAwaitAdvance(); // 到达并等待
phaser.arriveAndDeregister(); // 到达并注销
// 等待
phaser.awaitAdvance(phase); // 等待指定阶段完成
phaser.awaitAdvanceInterruptibly(phase); // 可中断等待
// 状态
phaser.getPhase(); // 当前阶段号
phaser.getRegisteredParties(); // 注册参与者数
phaser.getArrivedParties(); // 已到达参与者数
phaser.getUnarrivedParties(); // 未到达参与者数
// 强制结束
phaser.forceTermination(); // 强制终止
phaser.isTerminated(); // 是否已终止
CompletableFuture
创建
// 有返回值的异步任务
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "result");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "result", executor);
// 无返回值的异步任务
CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> doSomething());
CompletableFuture<Void> f4 = CompletableFuture.runAsync(() -> doSomething(), executor);
// 已完成的Future
CompletableFuture<String> f5 = CompletableFuture.completedFuture("result");
// 手动完成
CompletableFuture<String> f6 = new CompletableFuture<>();
f6.complete("result"); // 正常完成
f6.completeExceptionally(new RuntimeException()); // 异常完成
链式调用
// thenApply - 转换结果
future.thenApply(s -> s.toUpperCase());
future.thenApplyAsync(s -> s.toUpperCase());
future.thenApplyAsync(s -> s.toUpperCase(), executor);
// thenAccept - 消费结果(无返回值)
future.thenAccept(s -> System.out.println(s));
// thenRun - 执行动作(不接收结果)
future.thenRun(() -> System.out.println("done"));
// thenCompose - 扁平化组合(连接两个依赖的异步任务)
future.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " more"));
// thenCombine - 组合两个独立任务
future1.thenCombine(future2, (r1, r2) -> r1 + r2);
// thenAcceptBoth - 两个任务都完成后消费
future1.thenAcceptBoth(future2, (r1, r2) -> System.out.println(r1 + r2));
// runAfterBoth - 两个任务都完成后执行
future1.runAfterBoth(future2, () -> System.out.println("both done"));
任一任务完成
// applyToEither - 任一完成则转换
future1.applyToEither(future2, s -> s.toUpperCase());
// acceptToEither - 任一完成则消费
future1.acceptToEither(future2, s -> System.out.println(s));
// runAfterEither - 任一完成则执行
future1.runAfterEither(future2, () -> System.out.println("one done"));
多任务组合
// allOf - 等待所有任务完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
// 获取各任务结果
String r1 = f1.join();
String r2 = f2.join();
// anyOf - 任一任务完成
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object result = any.join();
异常处理
// exceptionally - 异常恢复
future.exceptionally(ex -> "default value");
// exceptionallyAsync - 异步异常恢复(JDK 12+)
future.exceptionallyAsync(ex -> "default value");
future.exceptionallyAsync(ex -> "default value", executor);
// exceptionallyCompose - 异常时组合新Future(JDK 12+)
future.exceptionallyCompose(ex -> backupFuture);
future.exceptionallyComposeAsync(ex -> backupFuture);
future.exceptionallyComposeAsync(ex -> backupFuture, executor);
// handle - 统一处理成功和失败
future.handle((result, ex) -> {
if (ex != null) {
return "default value";
}
return result;
});
// handleAsync - 异步统一处理
future.handleAsync((result, ex) -> result != null ? result : "default");
future.handleAsync((result, ex) -> ..., executor);
// whenComplete - 完成时回调(不改变结果)
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("异常: " + ex.getMessage());
} else {
System.out.println("结果: " + result);
}
});
// whenCompleteAsync - 异步完成时回调
future.whenCompleteAsync((result, ex) -> { ... });
超时处理
// orTimeout - 超时抛出TimeoutException
future.orTimeout(5, TimeUnit.SECONDS);
// completeOnTimeout - 超时返回默认值
future.completeOnTimeout("default", 5, TimeUnit.SECONDS);
高级方法(JDK 9+)
// copy - 返回不可修改的副本
CompletableFuture<T> copy = future.copy();
// minimalCompletionStage - 返回最小化CompletionStage(仅支持CompletionStage方法)
CompletionStage<T> stage = future.minimalCompletionStage();
// completeAsync - 异步完成
future.completeAsync(() -> computeValue());
future.completeAsync(() -> computeValue(), executor);
// delayedExecutor - 延迟执行器(用于延迟启动任务)
Executor delayed = CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS);
CompletableFuture<String> delayedFuture = CompletableFuture.supplyAsync(() -> "delayed", delayed);
// newIncompleteFuture - 子类可覆盖,返回特定类型的新Future(用于子类化)
// defaultExecutor - 子类可覆盖,更改默认执行器
获取结果
T result = future.get(); // 阻塞获取(抛检查异常)
T result = future.get(5, TimeUnit.SECONDS); // 超时获取
T result = future.join(); // 阻塞获取(抛非检查异常)
T result = future.getNow(defaultValue); // 立即获取或返回默认值
boolean done = future.isDone(); // 是否完成
boolean cancelled = future.cancel(mayInterrupt); // 取消任务
虚拟线程(Java 21+)
虚拟线程是 Java 21 的正式特性(JEP 444),无需启用预览标志。
创建虚拟线程
// 方式1:Thread API 直接创建
Thread vThread = Thread.ofVirtual().start(() -> {
System.out.println("Hello from virtual thread");
});
// 方式2:带名称前缀
Thread vThread = Thread.ofVirtual()
.name("worker-", 0)
.start(() -> task());
// 方式3:先创建后启动
Thread.Builder builder = Thread.ofVirtual().name("vt-");
Thread vThread = builder.unstarted(() -> task());
vThread.start();
// 方式4:Executors(推荐)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> task1());
executor.submit(() -> task2());
} // 自动关闭
// 判断是否为虚拟线程
boolean isVirtual = Thread.currentThread().isVirtual();
最佳实践
// ✅ 推荐:每个任务一个虚拟线程,不池化
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (String url : urls) {
executor.submit(() -> fetchUrl(url));
}
}
// ✅ 推荐:使用同步阻塞风格
public String fetchData(String url) throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.build();
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
}
// ✅ 推荐:使用 Semaphore 限制并发
Semaphore semaphore = new Semaphore(10); // 限制并发数为10
semaphore.acquire();
try {
callExternalApi();
} finally {
semaphore.release();
}
// ❌ 不推荐:池化虚拟线程
ExecutorService pool = Executors.newFixedThreadPool(100); // 不要这样做!
// ❌ 不推荐:在 ThreadLocal 中缓存昂贵对象
// 虚拟线程数量巨大,会导致内存问题
Pinning 问题处理
// ⚠️ 可能导致 pinning(在 synchronized 块内阻塞)
synchronized (lock) {
Thread.sleep(1000); // 虚拟线程被钉住,载体线程阻塞
}
// ✅ 改用 ReentrantLock
private final ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
Thread.sleep(1000); // 虚拟线程可以卸载,载体线程继续工作
} finally {
lock.unlock();
}
// 检测 pinning
// 启动时添加参数:-Djdk.tracePinnedThreads=full
结构化并发(预览特性)
结构化并发是预览特性,API 在不同 Java 版本有重大变化。需要启用预览标志:
javac --release 25 --enable-preview Main.java
java --enable-preview Main
Java 25+ API
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructuredTaskScope.Joiner;
// 默认策略:所有成功或任一失败
try (var scope = StructuredTaskScope.open()) {
Subtask<String> task1 = scope.fork(() -> fetchUser());
Subtask<Integer> task2 = scope.fork(() -> fetchOrder());
scope.join(); // 等待所有子任务完成
return new Response(task1.get(), task2.get());
}
// Joiner 策略
Joiner.<T>anySuccessfulResultOrThrow() // 任一成功即返回
Joiner.<T>allSuccessfulOrThrow() // 全部成功才返回(返回 Stream<Subtask>)
Joiner.awaitAll() // 等待所有完成(无论成功失败)
Joiner.awaitAllSuccessfulOrThrow() // 等待所有成功
Joiner.allUntil(predicate) // 自定义谓词
// 带配置
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow(),
cf -> cf.withTimeout(Duration.ofSeconds(5))
.withThreadFactory(Thread.ofVirtual().factory()))) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
}
// 子任务状态
switch (subtask.state()) {
case SUCCESS -> T result = subtask.get();
case FAILED -> Throwable ex = subtask.exception();
case UNAVAILABLE -> { /* 尚未完成或已取消 */ }
}
Java 21-24 API
// Java 21-24 使用子类策略
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
// ShutdownOnFailure:任一失败即取消其他
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join();
scope.throwIfFailed();
return new Response(user.resultNow(), order.resultNow());
}
// ShutdownOnSuccess:任一成功即返回
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
sources.forEach(source -> scope.fork(() -> fetch(source)));
scope.join();
return scope.result(); // 返回第一个成功的结果
}
线程安全策略速查
不可变对象
// 所有字段都是 final
public final class ImmutablePoint {
private final int x;
private final int y;
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() { return x; }
public int getY() { return y; }
}
ThreadLocal 使用
// 创建
ThreadLocal<SimpleDateFormat> formatter =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
// 使用
String formatted = formatter.get().format(date);
// 清理(必须!防止内存泄漏)
formatter.remove();
volatile 关键字
// 保证可见性,不保证原子性
volatile boolean running = true;
volatile int counter; // ⚠️ count++ 不是原子操作
// 适用场景:状态标志、单例双重检查
synchronized 关键字
// 同步代码块
synchronized (lock) {
// 临界区
}
// 同步实例方法(锁 this)
public synchronized void method() { }
// 同步静态方法(锁 Class 对象)
public static synchronized void staticMethod() { }
常见问题速查
死锁避免
// 1. 按顺序获取锁
// 2. 使用 tryLock 超时
// 3. 使用 Lock 的 lockInterruptibly
// 4. 使用无锁数据结构(原子类、并发集合)
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
try {
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
try {
// 执行操作
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
线程安全单例
// 双重检查锁定
public class Singleton {
private static volatile Singleton instance;
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
// 枚举单例(推荐)
public enum Singleton {
INSTANCE;
public void doSomething() { }
}
// 静态内部类
public class Singleton {
private Singleton() { }
private static class Holder {
static final Singleton INSTANCE = new Singleton();
}
public static Singleton getInstance() {
return Holder.INSTANCE;
}
}
生产者消费者模式
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
// 生产者
void produce(String item) throws InterruptedException {
queue.put(item);
}
// 消费者
void consume() throws InterruptedException {
String item = queue.take();
process(item);
}