跳到主要内容

JUC 知识速查表

本文档提供 JUC 常用类和方法的快速参考,方便开发时查阅。

锁机制

ReentrantLock

ReentrantLock lock = new ReentrantLock();        // 非公平锁(默认)
ReentrantLock lock = new ReentrantLock(true); // 公平锁

lock.lock(); // 获取锁(阻塞)
lock.lockInterruptibly(); // 可中断获取锁
lock.tryLock(); // 尝试获取锁(立即返回布尔值)
lock.tryLock(3, TimeUnit.SECONDS); // 超时获取锁
lock.unlock(); // 释放锁(必须在finally中调用)

// 状态查询
lock.isLocked(); // 是否被锁定
lock.isHeldByCurrentThread(); // 是否由当前线程持有
lock.getQueueLength(); // 等待队列长度
lock.hasQueuedThreads(); // 是否有等待线程
lock.getHoldCount(); // 当前线程持有锁的次数

ReentrantReadWriteLock

ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

// 读锁(共享锁)
readLock.lock();
try {
// 读取操作
} finally {
readLock.unlock();
}

// 写锁(排他锁)
writeLock.lock();
try {
// 写入操作
} finally {
writeLock.unlock();
}

// 锁降级:写锁 → 读锁
writeLock.lock();
try {
// 更新数据
readLock.lock(); // 获取读锁
} finally {
writeLock.unlock(); // 释放写锁,此时仍持有读锁
}
try {
// 读取数据
} finally {
readLock.unlock();
}

StampedLock

StampedLock sl = new StampedLock();

// 写锁
long stamp = sl.writeLock();
try {
// 写入操作
} finally {
sl.unlockWrite(stamp);
}

// 悲观读锁
stamp = sl.readLock();
try {
// 读取操作
} finally {
sl.unlockRead(stamp);
}

// 乐观读锁(无锁读取,性能最高)
stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
// 验证失败,升级为悲观读锁
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}

// 锁转换
stamp = sl.readLock();
try {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
// 执行写操作
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
// 执行写操作
}
} finally {
sl.unlock(stamp);
}

Condition

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

// 等待
lock.lock();
try {
while (条件不满足) {
condition.await(); // 无限等待
condition.await(3, TimeUnit.SECONDS); // 超时等待
condition.awaitNanos(1_000_000_000); // 纳秒超时
condition.awaitUntil(deadline); // 等待到指定时间
}
// 执行操作
} finally {
lock.unlock();
}

// 唤醒
lock.lock();
try {
condition.signal(); // 唤醒一个等待线程
condition.signalAll(); // 唤醒所有等待线程
} finally {
lock.unlock();
}

线程池

ThreadPoolExecutor 参数配置

ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心线程数(常驻线程)
maximumPoolSize, // 最大线程数
keepAliveTime, // 非核心线程空闲存活时间
TimeUnit.SECONDS, // 时间单位
workQueue, // 任务队列
threadFactory, // 线程工厂(可自定义线程名称、优先级等)
handler // 拒绝策略
);

任务队列选择

队列类型容量特点适用场景
ArrayBlockingQueue有界数组实现,公平/非公平可选资源受限,防止任务堆积
LinkedBlockingQueue可选有界链表实现,默认无界任务量可控的场景
SynchronousQueue无容量直接传递,不存储高吞吐,任务处理快
PriorityBlockingQueue无界支持优先级排序任务有优先级需求
DelayQueue无界延迟获取元素定时任务、延迟处理
LinkedTransferQueue无界支持transfer操作生产者消费者模式

拒绝策略

策略行为适用场景
AbortPolicy(默认)抛出 RejectedExecutionException需要快速发现问题的场景
CallerRunsPolicy由调用者线程执行任务降级处理,减缓提交速度
DiscardPolicy直接丢弃,不抛异常允许任务丢失的场景
DiscardOldestPolicy丢弃队列中最老的任务优先处理新任务
自定义实现RejectedExecutionHandler记录日志、持久化、告警等

常用方法

// 提交任务
executor.execute(Runnable); // 无返回值
Future<T> future = executor.submit(Callable); // 有返回值
Future<?> future = executor.submit(Runnable); // 返回null

// 批量执行
List<Future<T>> futures = executor.invokeAll(callables); // 执行所有
T result = executor.invokeAny(callables); // 任一成功即返回

// 关闭线程池
executor.shutdown(); // 平滑关闭,等待任务完成
List<Runnable> unfinished = executor.shutdownNow(); // 立即关闭,返回未执行任务
boolean terminated = executor.awaitTermination(60, TimeUnit.SECONDS); // 等待终止

// 状态查询
executor.isShutdown(); // 是否已调用shutdown
executor.isTerminated(); // 是否已完全终止
executor.getActiveCount(); // 活动线程数
executor.getPoolSize(); // 当前线程数
executor.getQueue().size(); // 队列大小
executor.getCompletedTaskCount(); // 已完成任务数

线程池大小配置参考

int cores = Runtime.getRuntime().availableProcessors();

// CPU密集型任务
int cpuIntensiveSize = cores + 1;

// IO密集型任务
int ioIntensiveSize = cores * 2;

// 混合型任务(waitTime为等待时间,computeTime为计算时间)
int mixedSize = (int) (cores * (1 + waitTime / computeTime));

并发集合

ConcurrentHashMap

ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();
ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(16); // 初始容量
ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(16, 0.75f, 4); // 容量、负载因子、并发级别

// 基本操作
map.put(K, V); // 存入
map.putAll(Map); // 批量存入
map.putIfAbsent(K, V); // 不存在才存入,返回旧值或null
map.get(K); // 获取
map.getOrDefault(K, defaultValue); // 获取或默认值
map.remove(K); // 删除
map.remove(K, V); // 条件删除(值匹配才删除)
map.replace(K, V); // 替换
map.replace(K, oldV, newV); // 条件替换
map.clear(); // 清空
map.size(); // 大小
map.isEmpty(); // 是否为空
map.containsKey(K); // 是否包含键
map.containsValue(V); // 是否包含值

// 原子复合操作
map.computeIfAbsent(K, k -> createValue(k)); // 不存在则计算并插入
map.computeIfPresent(K, (k, v) -> newValue); // 存在则更新
map.compute(K, (k, v) -> newValue); // 计算(存在或不存在都执行)
map.merge(K, V, (oldV, newV) -> mergedValue); // 合并

// 批量操作
map.forEach((k, v) -> process(k, v));
map.forEach((k, v) -> process(k, v), parallelismThreshold);
map.replaceAll((k, v) -> newValue);
map.search(parallelismThreshold, (k, v) -> searchResult);
map.reduce(parallelismThreshold, (k, v) -> value, (v1, v2) -> combined);

CopyOnWriteArrayList / CopyOnWriteArraySet

CopyOnWriteArrayList<E> list = new CopyOnWriteArrayList<>();
CopyOnWriteArraySet<E> set = new CopyOnWriteArraySet<>();

// List操作
list.add(E); // 添加(复制数组)
list.addIfAbsent(E); // 不存在才添加
list.addAllAbsent(Collection); // 批量添加不存在的元素
list.get(int); // 获取
list.set(int, E); // 设置
list.remove(int); // 删除索引
list.remove(E); // 删除元素
list.indexOf(E); // 查找索引
list.contains(E); // 是否包含
list.iterator(); // 迭代器(弱一致性,不抛ConcurrentModificationException)

// Set操作
set.add(E);
set.remove(E);
set.contains(E);

BlockingQueue 接口方法

操作抛异常返回特殊值阻塞超时
入队add(e)offer(e)put(e)offer(e, timeout, unit)
出队remove()poll()take()poll(timeout, unit)
查看element()peek()--

常用 BlockingQueue 实现

// ArrayBlockingQueue - 有界数组队列
BlockingQueue<E> queue = new ArrayBlockingQueue<>(100);
BlockingQueue<E> queue = new ArrayBlockingQueue<>(100, true); // 公平模式

// LinkedBlockingQueue - 可选有界链表队列
BlockingQueue<E> queue = new LinkedBlockingQueue<>(); // 无界(Integer.MAX_VALUE)
BlockingQueue<E> queue = new LinkedBlockingQueue<>(100); // 有界

// PriorityBlockingQueue - 优先级队列
BlockingQueue<E> queue = new PriorityBlockingQueue<>(11, comparator);

// SynchronousQueue - 直接传递队列
BlockingQueue<E> queue = new SynchronousQueue<>(); // 非公平
BlockingQueue<E> queue = new SynchronousQueue<>(true); // 公平

// DelayQueue - 延迟队列
class DelayedElement implements Delayed {
private final long expireTime;

public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}

public int compareTo(Delayed o) {
return Long.compare(this.expireTime, ((DelayedElement) o).expireTime);
}
}
BlockingQueue<DelayedElement> queue = new DelayQueue<>();

ConcurrentLinkedQueue

ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<>();

queue.offer(E); // 入队(永不阻塞)
queue.poll(); // 出队(队列为空返回null)
queue.peek(); // 查看队首
queue.isEmpty(); // 是否为空
queue.size(); // 大小(需遍历,O(n)复杂度)

ConcurrentSkipListMap / ConcurrentSkipListSet

// 有序并发Map
ConcurrentSkipListMap<K, V> map = new ConcurrentSkipListMap<>();
ConcurrentSkipListMap<K, V> map = new ConcurrentSkipListMap<>(comparator);

map.firstKey(); // 最小键
map.lastKey(); // 最大键
map.lowerKey(K); // 小于K的最大键
map.higherKey(K); // 大于K的最小键
map.floorKey(K); // 小于等于K的最大键
map.ceilingKey(K); // 大于等于K的最小键
map.descendingMap(); // 降序视图
map.subMap(fromK, toK); // 范围视图

// 有序并发Set
ConcurrentSkipListSet<E> set = new ConcurrentSkipListSet<>();

原子类

基本类型原子类

AtomicInteger ai = new AtomicInteger(0);
AtomicLong al = new AtomicLong(0L);
AtomicBoolean ab = new AtomicBoolean(false);

// 获取和设置
ai.get(); // 获取值
ai.set(10); // 设置值
ai.getAndSet(10); // 设置并返回旧值
ai.lazySet(10); // 延迟设置(最终一致性)

// CAS操作
ai.compareAndSet(0, 10); // CAS,成功返回true
ai.weakCompareAndSet(0, 10); // 弱CAS(可能虚假失败)

// 自增自减
ai.getAndIncrement(); // 自增返回旧值
ai.incrementAndGet(); // 自增返回新值
ai.getAndDecrement(); // 自减返回旧值
ai.decrementAndGet(); // 自减返回新值

// 加减运算
ai.getAndAdd(5); // 加返回旧值
ai.addAndGet(5); // 加返回新值

// 函数式更新
ai.updateAndGet(x -> x * 2); // 更新返回新值
ai.getAndUpdate(x -> x * 2); // 更新返回旧值
ai.accumulateAndGet(5, (x, y) -> x + y); // 累加返回新值
ai.getAndAccumulate(5, (x, y) -> x + y); // 累加返回旧值

引用类型原子类

// AtomicReference
AtomicReference<V> ref = new AtomicReference<>(initialValue);
ref.get();
ref.set(V);
ref.getAndSet(V);
ref.compareAndSet(expected, new);
ref.updateAndGet(v -> newValue);
ref.getAndUpdate(v -> newValue);

// AtomicStampedReference(解决ABA问题,带版本号)
AtomicStampedReference<V> ref = new AtomicStampedReference<>(initialValue, 0);
int[] stampHolder = new int[1];
V value = ref.get(stampHolder); // 获取值和版本号
int stamp = stampHolder[0];
ref.compareAndSet(expectedRef, newRef, expectedStamp, newStamp);
ref.attemptStamp(expectedRef, newStamp);

// AtomicMarkableReference(带布尔标记)
AtomicMarkableReference<V> ref = new AtomicMarkableReference<>(initialValue, false);
boolean[] markHolder = new boolean[1];
V value = ref.get(markHolder);
ref.compareAndSet(expectedRef, newRef, expectedMark, newMark);

数组类型原子类

AtomicIntegerArray array = new AtomicIntegerArray(10);
AtomicIntegerArray array = new AtomicIntegerArray(new int[]{1, 2, 3});
AtomicLongArray longArray = new AtomicLongArray(10);
AtomicReferenceArray<V> refArray = new AtomicReferenceArray<>(10);

// 基本操作
array.get(i);
array.set(i, value);
array.getAndSet(i, newValue);
array.compareAndSet(i, expected, newValue);

// 原子更新
array.getAndIncrement(i);
array.incrementAndGet(i);
array.getAndAdd(i, delta);
array.addAndGet(i, delta);
array.getAndUpdate(i, x -> x * 2);

字段更新器

// 要求:字段必须是volatile且可访问

class Person {
volatile String name;
volatile int age;
}

AtomicReferenceFieldUpdater<Person, String> nameUpdater =
AtomicReferenceFieldUpdater.newUpdater(Person.class, String.class, "name");
AtomicIntegerFieldUpdater<Person> ageUpdater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");

Person p = new Person();
nameUpdater.set(p, "张三");
ageUpdater.incrementAndGet(p);

累加器(高并发场景推荐)

// LongAdder - 高性能计数器
LongAdder adder = new LongAdder();
adder.increment(); // 自增
adder.decrement(); // 自减
adder.add(10); // 加值
adder.sum(); // 获取总和
adder.sumThenReset(); // 获取总和并重置
adder.reset(); // 重置

// DoubleAdder
DoubleAdder doubleAdder = new DoubleAdder();
doubleAdder.add(1.5);
doubleAdder.sum();

// LongAccumulator - 自定义累加函数
LongAccumulator maxAcc = new LongAccumulator(Long::max, Long.MIN_VALUE);
LongAccumulator sumAcc = new LongAccumulator(Long::sum, 0L);
maxAcc.accumulate(value); // 累加
maxAcc.get(); // 获取结果

// DoubleAccumulator
DoubleAccumulator acc = new DoubleAccumulator((x, y) -> x * y, 1.0);
acc.accumulate(2.0);

VarHandle(JDK 9+)

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

// 创建 VarHandle
MethodHandles.Lookup lookup = MethodHandles.lookup();
VarHandle handle = lookup.findVarHandle(MyClass.class, "fieldName", int.class);

// 访问私有字段
VarHandle privateHandle = MethodHandles.privateLookupIn(MyClass.class, lookup)
.findVarHandle(MyClass.class, "privateField", int.class);

// 数组 VarHandle
VarHandle arrayHandle = MethodHandles.arrayElementVarHandle(int[].class);

// 读取操作
handle.get(obj); // 普通读
handle.getVolatile(obj); // volatile 读
handle.getAcquire(obj); // acquire 读
handle.getOpaque(obj); // opaque 读

// 写入操作
handle.set(obj, value); // 普通写
handle.setVolatile(obj, value); // volatile 写
handle.setRelease(obj, value); // release 写
handle.setOpaque(obj, value); // opaque 写

// 原子操作
handle.compareAndSet(obj, expected, newValue); // CAS
handle.compareAndExchange(obj, expected, newValue); // CAS 返回旧值
handle.getAndSet(obj, newValue); // 原子设置
handle.getAndAdd(obj, delta); // 原子加
handle.getAndIncrement(obj); // 原子自增
handle.getAndBitwiseOr(obj, mask); // 原子位或

// 内存排序模式
// Plain: 无内存屏障,仅保证位原子性
// Opaque: 保证原子性和单变量顺序一致性
// Acquire: 获取语义,保证后续读不会被重排到此之前
// Release: 释放语义,保证之前写不会被重排到此之后
// Volatile: 完全的 volatile 语义

同步工具类

CountDownLatch

CountDownLatch latch = new CountDownLatch(3);

// 等待线程
latch.await(); // 阻塞等待计数归零
latch.await(5, TimeUnit.SECONDS); // 超时等待
latch.await(5, TimeUnit.SECONDS); // 返回boolean表示是否完成

// 计数线程
latch.countDown(); // 计数减1
latch.getCount(); // 获取当前计数

// 注意:计数器只能递减,不能重置

CyclicBarrier

CyclicBarrier barrier = new CyclicBarrier(3);  // 3个线程
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 屏障动作:所有线程到达后执行
});

// 等待
int arrivalIndex = barrier.await(); // 返回到达索引(0到parties-1)
barrier.await(5, TimeUnit.SECONDS); // 超时等待

// 状态和重置
barrier.reset(); // 重置屏障
barrier.getParties(); // 获取参与线程数
barrier.getNumberWaiting(); // 获取当前等待数
barrier.isBroken(); // 是否被破坏

Semaphore

Semaphore sem = new Semaphore(3);             // 非公平(默认)
Semaphore sem = new Semaphore(3, true); // 公平模式

// 获取许可
sem.acquire(); // 获取1个许可(阻塞)
sem.acquire(2); // 获取多个许可
sem.acquireUninterruptibly(); // 不可中断获取
sem.tryAcquire(); // 尝试获取(立即返回)
sem.tryAcquire(5, TimeUnit.SECONDS); // 超时获取

// 释放许可
sem.release(); // 释放1个许可
sem.release(2); // 释放多个许可

// 状态查询
sem.availablePermits(); // 可用许可数
sem.hasQueuedThreads(); // 是否有等待线程
sem.getQueueLength(); // 等待队列长度
sem.drainPermits(); // 获取并返回所有可用许可

Exchanger

Exchanger<V> exchanger = new Exchanger<>();

// 交换数据
V received = exchanger.exchange(V); // 阻塞交换
V received = exchanger.exchange(V, 5, TimeUnit.SECONDS); // 超时交换

Phaser

Phaser phaser = new Phaser(3);          // 初始注册3个参与者
Phaser phaser = new Phaser(); // 初始无参与者

// 注册
phaser.register(); // 注册1个参与者
phaser.bulkRegister(5); // 批量注册

// 到达和等待
int phase = phaser.arrive(); // 到达,不等待
phaser.arriveAndAwaitAdvance(); // 到达并等待
phaser.arriveAndDeregister(); // 到达并注销

// 等待
phaser.awaitAdvance(phase); // 等待指定阶段完成
phaser.awaitAdvanceInterruptibly(phase); // 可中断等待

// 状态
phaser.getPhase(); // 当前阶段号
phaser.getRegisteredParties(); // 注册参与者数
phaser.getArrivedParties(); // 已到达参与者数
phaser.getUnarrivedParties(); // 未到达参与者数

// 强制结束
phaser.forceTermination(); // 强制终止
phaser.isTerminated(); // 是否已终止

CompletableFuture

创建

// 有返回值的异步任务
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "result");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "result", executor);

// 无返回值的异步任务
CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> doSomething());
CompletableFuture<Void> f4 = CompletableFuture.runAsync(() -> doSomething(), executor);

// 已完成的Future
CompletableFuture<String> f5 = CompletableFuture.completedFuture("result");

// 手动完成
CompletableFuture<String> f6 = new CompletableFuture<>();
f6.complete("result"); // 正常完成
f6.completeExceptionally(new RuntimeException()); // 异常完成

链式调用

// thenApply - 转换结果
future.thenApply(s -> s.toUpperCase());
future.thenApplyAsync(s -> s.toUpperCase());
future.thenApplyAsync(s -> s.toUpperCase(), executor);

// thenAccept - 消费结果(无返回值)
future.thenAccept(s -> System.out.println(s));

// thenRun - 执行动作(不接收结果)
future.thenRun(() -> System.out.println("done"));

// thenCompose - 扁平化组合(连接两个依赖的异步任务)
future.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " more"));

// thenCombine - 组合两个独立任务
future1.thenCombine(future2, (r1, r2) -> r1 + r2);

// thenAcceptBoth - 两个任务都完成后消费
future1.thenAcceptBoth(future2, (r1, r2) -> System.out.println(r1 + r2));

// runAfterBoth - 两个任务都完成后执行
future1.runAfterBoth(future2, () -> System.out.println("both done"));

任一任务完成

// applyToEither - 任一完成则转换
future1.applyToEither(future2, s -> s.toUpperCase());

// acceptToEither - 任一完成则消费
future1.acceptToEither(future2, s -> System.out.println(s));

// runAfterEither - 任一完成则执行
future1.runAfterEither(future2, () -> System.out.println("one done"));

多任务组合

// allOf - 等待所有任务完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
// 获取各任务结果
String r1 = f1.join();
String r2 = f2.join();

// anyOf - 任一任务完成
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object result = any.join();

异常处理

// exceptionally - 异常恢复
future.exceptionally(ex -> "default value");

// exceptionallyAsync - 异步异常恢复(JDK 12+)
future.exceptionallyAsync(ex -> "default value");
future.exceptionallyAsync(ex -> "default value", executor);

// exceptionallyCompose - 异常时组合新Future(JDK 12+)
future.exceptionallyCompose(ex -> backupFuture);
future.exceptionallyComposeAsync(ex -> backupFuture);
future.exceptionallyComposeAsync(ex -> backupFuture, executor);

// handle - 统一处理成功和失败
future.handle((result, ex) -> {
if (ex != null) {
return "default value";
}
return result;
});

// handleAsync - 异步统一处理
future.handleAsync((result, ex) -> result != null ? result : "default");
future.handleAsync((result, ex) -> ..., executor);

// whenComplete - 完成时回调(不改变结果)
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("异常: " + ex.getMessage());
} else {
System.out.println("结果: " + result);
}
});

// whenCompleteAsync - 异步完成时回调
future.whenCompleteAsync((result, ex) -> { ... });

超时处理

// orTimeout - 超时抛出TimeoutException
future.orTimeout(5, TimeUnit.SECONDS);

// completeOnTimeout - 超时返回默认值
future.completeOnTimeout("default", 5, TimeUnit.SECONDS);

高级方法(JDK 9+)

// copy - 返回不可修改的副本
CompletableFuture<T> copy = future.copy();

// minimalCompletionStage - 返回最小化CompletionStage(仅支持CompletionStage方法)
CompletionStage<T> stage = future.minimalCompletionStage();

// completeAsync - 异步完成
future.completeAsync(() -> computeValue());
future.completeAsync(() -> computeValue(), executor);

// delayedExecutor - 延迟执行器(用于延迟启动任务)
Executor delayed = CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS);
CompletableFuture<String> delayedFuture = CompletableFuture.supplyAsync(() -> "delayed", delayed);

// newIncompleteFuture - 子类可覆盖,返回特定类型的新Future(用于子类化)
// defaultExecutor - 子类可覆盖,更改默认执行器

获取结果

T result = future.get();                           // 阻塞获取(抛检查异常)
T result = future.get(5, TimeUnit.SECONDS); // 超时获取
T result = future.join(); // 阻塞获取(抛非检查异常)
T result = future.getNow(defaultValue); // 立即获取或返回默认值
boolean done = future.isDone(); // 是否完成
boolean cancelled = future.cancel(mayInterrupt); // 取消任务

虚拟线程(Java 21+)

虚拟线程是 Java 21 的正式特性(JEP 444),无需启用预览标志。

创建虚拟线程

// 方式1:Thread API 直接创建
Thread vThread = Thread.ofVirtual().start(() -> {
System.out.println("Hello from virtual thread");
});

// 方式2:带名称前缀
Thread vThread = Thread.ofVirtual()
.name("worker-", 0)
.start(() -> task());

// 方式3:先创建后启动
Thread.Builder builder = Thread.ofVirtual().name("vt-");
Thread vThread = builder.unstarted(() -> task());
vThread.start();

// 方式4:Executors(推荐)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> task1());
executor.submit(() -> task2());
} // 自动关闭

// 判断是否为虚拟线程
boolean isVirtual = Thread.currentThread().isVirtual();

最佳实践

// ✅ 推荐:每个任务一个虚拟线程,不池化
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (String url : urls) {
executor.submit(() -> fetchUrl(url));
}
}

// ✅ 推荐:使用同步阻塞风格
public String fetchData(String url) throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.build();
return client.send(request, HttpResponse.BodyHandlers.ofString()).body();
}

// ✅ 推荐:使用 Semaphore 限制并发
Semaphore semaphore = new Semaphore(10); // 限制并发数为10
semaphore.acquire();
try {
callExternalApi();
} finally {
semaphore.release();
}

// ❌ 不推荐:池化虚拟线程
ExecutorService pool = Executors.newFixedThreadPool(100); // 不要这样做!

// ❌ 不推荐:在 ThreadLocal 中缓存昂贵对象
// 虚拟线程数量巨大,会导致内存问题

Pinning 问题处理

// ⚠️ 可能导致 pinning(在 synchronized 块内阻塞)
synchronized (lock) {
Thread.sleep(1000); // 虚拟线程被钉住,载体线程阻塞
}

// ✅ 改用 ReentrantLock
private final ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
Thread.sleep(1000); // 虚拟线程可以卸载,载体线程继续工作
} finally {
lock.unlock();
}

// 检测 pinning
// 启动时添加参数:-Djdk.tracePinnedThreads=full

结构化并发(预览特性)

结构化并发是预览特性,API 在不同 Java 版本有重大变化。需要启用预览标志:

javac --release 25 --enable-preview Main.java
java --enable-preview Main

Java 25+ API

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructuredTaskScope.Joiner;

// 默认策略:所有成功或任一失败
try (var scope = StructuredTaskScope.open()) {
Subtask<String> task1 = scope.fork(() -> fetchUser());
Subtask<Integer> task2 = scope.fork(() -> fetchOrder());
scope.join(); // 等待所有子任务完成
return new Response(task1.get(), task2.get());
}

// Joiner 策略
Joiner.<T>anySuccessfulResultOrThrow() // 任一成功即返回
Joiner.<T>allSuccessfulOrThrow() // 全部成功才返回(返回 Stream<Subtask>)
Joiner.awaitAll() // 等待所有完成(无论成功失败)
Joiner.awaitAllSuccessfulOrThrow() // 等待所有成功
Joiner.allUntil(predicate) // 自定义谓词

// 带配置
try (var scope = StructuredTaskScope.open(
Joiner.<String>allSuccessfulOrThrow(),
cf -> cf.withTimeout(Duration.ofSeconds(5))
.withThreadFactory(Thread.ofVirtual().factory()))) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
}

// 子任务状态
switch (subtask.state()) {
case SUCCESS -> T result = subtask.get();
case FAILED -> Throwable ex = subtask.exception();
case UNAVAILABLE -> { /* 尚未完成或已取消 */ }
}

Java 21-24 API

// Java 21-24 使用子类策略
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;

// ShutdownOnFailure:任一失败即取消其他
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join();
scope.throwIfFailed();
return new Response(user.resultNow(), order.resultNow());
}

// ShutdownOnSuccess:任一成功即返回
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
sources.forEach(source -> scope.fork(() -> fetch(source)));
scope.join();
return scope.result(); // 返回第一个成功的结果
}

线程安全策略速查

不可变对象

// 所有字段都是 final
public final class ImmutablePoint {
private final int x;
private final int y;

public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}

public int getX() { return x; }
public int getY() { return y; }
}

ThreadLocal 使用

// 创建
ThreadLocal<SimpleDateFormat> formatter =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));

// 使用
String formatted = formatter.get().format(date);

// 清理(必须!防止内存泄漏)
formatter.remove();

volatile 关键字

// 保证可见性,不保证原子性
volatile boolean running = true;
volatile int counter; // ⚠️ count++ 不是原子操作

// 适用场景:状态标志、单例双重检查

synchronized 关键字

// 同步代码块
synchronized (lock) {
// 临界区
}

// 同步实例方法(锁 this)
public synchronized void method() { }

// 同步静态方法(锁 Class 对象)
public static synchronized void staticMethod() { }

常见问题速查

死锁避免

// 1. 按顺序获取锁
// 2. 使用 tryLock 超时
// 3. 使用 Lock 的 lockInterruptibly
// 4. 使用无锁数据结构(原子类、并发集合)

if (lock1.tryLock(1, TimeUnit.SECONDS)) {
try {
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
try {
// 执行操作
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}

线程安全单例

// 双重检查锁定
public class Singleton {
private static volatile Singleton instance;

public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}

// 枚举单例(推荐)
public enum Singleton {
INSTANCE;

public void doSomething() { }
}

// 静态内部类
public class Singleton {
private Singleton() { }

private static class Holder {
static final Singleton INSTANCE = new Singleton();
}

public static Singleton getInstance() {
return Holder.INSTANCE;
}
}

生产者消费者模式

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

// 生产者
void produce(String item) throws InterruptedException {
queue.put(item);
}

// 消费者
void consume() throws InterruptedException {
String item = queue.take();
process(item);
}