JUC 知识速查表
本文档提供 JUC 常用类和方法的快速参考。
锁机制
ReentrantLock
ReentrantLock lock = new ReentrantLock(); // 非公平锁
ReentrantLock lock = new ReentrantLock(true); // 公平锁
lock.lock(); // 获取锁
lock.lockInterruptibly(); // 可中断获取锁
lock.tryLock(); // 尝试获取锁(立即返回)
lock.tryLock(3, TimeUnit.SECONDS); // 超时获取锁
lock.unlock(); // 释放锁
lock.isLocked(); // 是否被锁定
lock.isHeldByCurrentThread(); // 是否由当前线程持有
lock.getQueueLength(); // 等待队列长度
ReentrantReadWriteLock
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
readLock.lock(); // 获取读锁(共享)
readLock.unlock();
writeLock.lock(); // 获取写锁(排他)
writeLock.unlock();
StampedLock
StampedLock sl = new StampedLock();
long stamp = sl.writeLock(); // 写锁
sl.unlockWrite(stamp);
stamp = sl.readLock(); // 悲观读锁
sl.unlockRead(stamp);
stamp = sl.tryOptimisticRead(); // 乐观读锁
if (!sl.validate(stamp)) {
stamp = sl.readLock(); // 升级为悲观读锁
sl.unlockRead(stamp);
}
Condition
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition.await(); // 等待
condition.await(3, TimeUnit.SECONDS); // 超时等待
condition.signal(); // 唤醒一个
condition.signalAll(); // 唤醒所有
线程池
ThreadPoolExecutor 参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
workQueue, // 任务队列
threadFactory, // 线程工厂
handler // 拒绝策略
);
任务队列
| 队列 | 特点 |
|---|---|
| ArrayBlockingQueue | 有界队列 |
| LinkedBlockingQueue | 可选有界 |
| SynchronousQueue | 直接传递 |
| PriorityBlockingQueue | 优先级队列 |
拒绝策略
| 策略 | 行为 |
|---|---|
| AbortPolicy | 抛异常(默认) |
| CallerRunsPolicy | 调用者执行 |
| DiscardPolicy | 丢弃 |
| DiscardOldestPolicy | 丢弃最老 |
常用方法
executor.execute(Runnable); // 执行任务
executor.submit(Runnable); // 提交任务
executor.submit(Callable); // 提交有返回值任务
executor.shutdown(); // 平滑关闭
executor.shutdownNow(); // 立即关闭
executor.awaitTermination(60, SECONDS); // 等待终止
executor.isShutdown(); // 是否已关闭
executor.isTerminated(); // 是否已终止
executor.getActiveCount(); // 活动线程数
executor.getQueue().size(); // 队列大小
并发集合
ConcurrentHashMap
ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();
map.put(K, V); // 存入
map.putIfAbsent(K, V); // 不存在才存入
map.get(K); // 获取
map.remove(K); // 删除
map.remove(K, V); // 条件删除
map.replace(K, oldV, newV); // 条件替换
map.computeIfAbsent(K, fn); // 不存在则计算
map.computeIfPresent(K, fn); // 存在则更新
map.merge(K, V, fn); // 合并
CopyOnWriteArrayList
CopyOnWriteArrayList<E> list = new CopyOnWriteArrayList<>();
list.add(E); // 添加
list.get(int); // 获取
list.remove(int); // 删除
list.iterator(); // 迭代(弱一致性)
BlockingQueue
BlockingQueue<E> queue = new ArrayBlockingQueue<>(10);
queue.put(E); // 阻塞入队
queue.offer(E); // 非阻塞入队
queue.offer(E, 3, SECONDS); // 超时入队
queue.take(); // 阻塞出队
queue.poll(); // 非阻塞出队
queue.poll(3, SECONDS); // 超时出队
queue.peek(); // 查看队首
queue.size(); // 大小
阻塞队列实现
| 实现类 | 特点 |
|---|---|
| ArrayBlockingQueue | 有界数组队列 |
| LinkedBlockingQueue | 可选有界链表队列 |
| PriorityBlockingQueue | 无界优先级队列 |
| SynchronousQueue | 无缓冲直接传递 |
| DelayQueue | 延迟队列 |
| LinkedTransferQueue | 传递队列 |
原子类
基本类型
AtomicInteger ai = new AtomicInteger(0);
ai.get(); // 获取
ai.set(10); // 设置
ai.getAndSet(10); // 设置并返回旧值
ai.compareAndSet(0, 10); // CAS
ai.getAndIncrement(); // 自增返回旧值
ai.incrementAndGet(); // 自增返回新值
ai.getAndDecrement(); // 自减返回旧值
ai.decrementAndGet(); // 自减返回新值
ai.getAndAdd(5); // 加返回旧值
ai.addAndGet(5); // 加返回新值
ai.updateAndGet(x -> x * 2); // 函数式更新
引用类型
AtomicReference<V> ref = new AtomicReference<>();
ref.get();
ref.set(V);
ref.compareAndSet(V, V);
ref.updateAndGet(fn);
AtomicStampedReference<V> stampedRef = new AtomicStampedReference<>();
stampedRef.getReference(); // 获取引用
stampedRef.getStamp(); // 获取版本
stampedRef.compareAndSet(V, V, stamp, newStamp);
累加器
LongAdder adder = new LongAdder();
adder.increment(); // 自增
adder.add(10); // 加
adder.sum(); // 获取总和
adder.reset(); // 重置
LongAccumulator acc = new LongAccumulator(Long::sum, 0);
acc.accumulate(10); // 累加
acc.get(); // 获取结果
同步工具
CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
latch.countDown(); // 计数减1
latch.await(); // 等待计数归零
latch.await(5, SECONDS); // 超时等待
latch.getCount(); // 获取当前计数
CyclicBarrier
CyclicBarrier barrier = new CyclicBarrier(3);
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 屏障动作
});
barrier.await(); // 等待其他线程
barrier.await(5, SECONDS); // 超时等待
barrier.reset(); // 重置
barrier.getParties(); // 获取参与数
barrier.getNumberWaiting(); // 获取等待数
Semaphore
Semaphore sem = new Semaphore(3);
Semaphore sem = new Semaphore(3, true); // 公平
sem.acquire(); // 获取许可
sem.acquire(2); // 获取多个许可
sem.tryAcquire(); // 尝试获取
sem.tryAcquire(5, SECONDS); // 超时获取
sem.release(); // 释放许可
sem.availablePermits(); // 可用许可数
Exchanger
Exchanger<V> exchanger = new Exchanger<>();
V received = exchanger.exchange(V); // 交换
V received = exchanger.exchange(V, 5, SECONDS); // 超时交换
CompletableFuture
创建
CompletableFuture.supplyAsync(() -> value); // 有返回值
CompletableFuture.runAsync(() -> {}); // 无返回值
CompletableFuture.completedFuture(value); // 已完成
链式调用
future.thenApply(v -> fn(v)); // 转换
future.thenAccept(v -> {}); // 消费
future.thenRun(() -> {}); // 执行
future.thenCompose(v -> anotherFuture); // 扁平化
future.thenCombine(f2, (a, b) -> fn); // 组合两个
多任务
CompletableFuture.allOf(f1, f2, f3); // 等待所有
CompletableFuture.anyOf(f1, f2, f3); // 任一完成
异常处理
future.exceptionally(e -> defaultValue);
future.handle((v, e) -> v != null ? v : default);
future.whenComplete((v, e) -> {});
超时
future.orTimeout(5, SECONDS);
future.completeOnTimeout(defaultValue, 5, SECONDS);
线程安全策略
不可变对象
final class ImmutableValue {
private final int value;
ImmutableValue(int value) {
this.value = value;
}
int getValue() {
return value;
}
}
ThreadLocal
ThreadLocal<T> threadLocal = ThreadLocal.withInitial(() -> defaultValue);
threadLocal.set(value);
threadLocal.get();
threadLocal.remove(); // 必须清理
volatile
volatile boolean running = true; // 保证可见性
volatile int counter; // 不保证原子性
synchronized
synchronized (lock) {
// 临界区
}
public synchronized void method() {
// 同步方法
}
线程池配置参考
CPU 密集型
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cores + 1, cores + 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);
IO 密集型
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cores * 2, cores * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
混合型
int cores = Runtime.getRuntime().availableProcessors();
double ratio = (double) waitTime / computeTime;
int poolSize = (int) (cores * (1 + ratio));