跳到主要内容

JUC 知识速查表

本文档提供 JUC 常用类和方法的快速参考。

锁机制

ReentrantLock

ReentrantLock lock = new ReentrantLock();        // 非公平锁
ReentrantLock lock = new ReentrantLock(true); // 公平锁

lock.lock(); // 获取锁
lock.lockInterruptibly(); // 可中断获取锁
lock.tryLock(); // 尝试获取锁(立即返回)
lock.tryLock(3, TimeUnit.SECONDS); // 超时获取锁
lock.unlock(); // 释放锁

lock.isLocked(); // 是否被锁定
lock.isHeldByCurrentThread(); // 是否由当前线程持有
lock.getQueueLength(); // 等待队列长度

ReentrantReadWriteLock

ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

readLock.lock(); // 获取读锁(共享)
readLock.unlock();

writeLock.lock(); // 获取写锁(排他)
writeLock.unlock();

StampedLock

StampedLock sl = new StampedLock();

long stamp = sl.writeLock(); // 写锁
sl.unlockWrite(stamp);

stamp = sl.readLock(); // 悲观读锁
sl.unlockRead(stamp);

stamp = sl.tryOptimisticRead(); // 乐观读锁
if (!sl.validate(stamp)) {
stamp = sl.readLock(); // 升级为悲观读锁
sl.unlockRead(stamp);
}

Condition

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

condition.await(); // 等待
condition.await(3, TimeUnit.SECONDS); // 超时等待
condition.signal(); // 唤醒一个
condition.signalAll(); // 唤醒所有

线程池

ThreadPoolExecutor 参数

ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
workQueue, // 任务队列
threadFactory, // 线程工厂
handler // 拒绝策略
);

任务队列

队列特点
ArrayBlockingQueue有界队列
LinkedBlockingQueue可选有界
SynchronousQueue直接传递
PriorityBlockingQueue优先级队列

拒绝策略

策略行为
AbortPolicy抛异常(默认)
CallerRunsPolicy调用者执行
DiscardPolicy丢弃
DiscardOldestPolicy丢弃最老

常用方法

executor.execute(Runnable);           // 执行任务
executor.submit(Runnable); // 提交任务
executor.submit(Callable); // 提交有返回值任务
executor.shutdown(); // 平滑关闭
executor.shutdownNow(); // 立即关闭
executor.awaitTermination(60, SECONDS); // 等待终止
executor.isShutdown(); // 是否已关闭
executor.isTerminated(); // 是否已终止
executor.getActiveCount(); // 活动线程数
executor.getQueue().size(); // 队列大小

并发集合

ConcurrentHashMap

ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();

map.put(K, V); // 存入
map.putIfAbsent(K, V); // 不存在才存入
map.get(K); // 获取
map.remove(K); // 删除
map.remove(K, V); // 条件删除
map.replace(K, oldV, newV); // 条件替换
map.computeIfAbsent(K, fn); // 不存在则计算
map.computeIfPresent(K, fn); // 存在则更新
map.merge(K, V, fn); // 合并

CopyOnWriteArrayList

CopyOnWriteArrayList<E> list = new CopyOnWriteArrayList<>();

list.add(E); // 添加
list.get(int); // 获取
list.remove(int); // 删除
list.iterator(); // 迭代(弱一致性)

BlockingQueue

BlockingQueue<E> queue = new ArrayBlockingQueue<>(10);

queue.put(E); // 阻塞入队
queue.offer(E); // 非阻塞入队
queue.offer(E, 3, SECONDS); // 超时入队
queue.take(); // 阻塞出队
queue.poll(); // 非阻塞出队
queue.poll(3, SECONDS); // 超时出队
queue.peek(); // 查看队首
queue.size(); // 大小

阻塞队列实现

实现类特点
ArrayBlockingQueue有界数组队列
LinkedBlockingQueue可选有界链表队列
PriorityBlockingQueue无界优先级队列
SynchronousQueue无缓冲直接传递
DelayQueue延迟队列
LinkedTransferQueue传递队列

原子类

基本类型

AtomicInteger ai = new AtomicInteger(0);
ai.get(); // 获取
ai.set(10); // 设置
ai.getAndSet(10); // 设置并返回旧值
ai.compareAndSet(0, 10); // CAS
ai.getAndIncrement(); // 自增返回旧值
ai.incrementAndGet(); // 自增返回新值
ai.getAndDecrement(); // 自减返回旧值
ai.decrementAndGet(); // 自减返回新值
ai.getAndAdd(5); // 加返回旧值
ai.addAndGet(5); // 加返回新值
ai.updateAndGet(x -> x * 2); // 函数式更新

引用类型

AtomicReference<V> ref = new AtomicReference<>();
ref.get();
ref.set(V);
ref.compareAndSet(V, V);
ref.updateAndGet(fn);

AtomicStampedReference<V> stampedRef = new AtomicStampedReference<>();
stampedRef.getReference(); // 获取引用
stampedRef.getStamp(); // 获取版本
stampedRef.compareAndSet(V, V, stamp, newStamp);

累加器

LongAdder adder = new LongAdder();
adder.increment(); // 自增
adder.add(10); // 加
adder.sum(); // 获取总和
adder.reset(); // 重置

LongAccumulator acc = new LongAccumulator(Long::sum, 0);
acc.accumulate(10); // 累加
acc.get(); // 获取结果

同步工具

CountDownLatch

CountDownLatch latch = new CountDownLatch(3);
latch.countDown(); // 计数减1
latch.await(); // 等待计数归零
latch.await(5, SECONDS); // 超时等待
latch.getCount(); // 获取当前计数

CyclicBarrier

CyclicBarrier barrier = new CyclicBarrier(3);
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 屏障动作
});

barrier.await(); // 等待其他线程
barrier.await(5, SECONDS); // 超时等待
barrier.reset(); // 重置
barrier.getParties(); // 获取参与数
barrier.getNumberWaiting(); // 获取等待数

Semaphore

Semaphore sem = new Semaphore(3);
Semaphore sem = new Semaphore(3, true); // 公平

sem.acquire(); // 获取许可
sem.acquire(2); // 获取多个许可
sem.tryAcquire(); // 尝试获取
sem.tryAcquire(5, SECONDS); // 超时获取
sem.release(); // 释放许可
sem.availablePermits(); // 可用许可数

Exchanger

Exchanger<V> exchanger = new Exchanger<>();
V received = exchanger.exchange(V); // 交换
V received = exchanger.exchange(V, 5, SECONDS); // 超时交换

CompletableFuture

创建

CompletableFuture.supplyAsync(() -> value);      // 有返回值
CompletableFuture.runAsync(() -> {}); // 无返回值
CompletableFuture.completedFuture(value); // 已完成

链式调用

future.thenApply(v -> fn(v));          // 转换
future.thenAccept(v -> {}); // 消费
future.thenRun(() -> {}); // 执行
future.thenCompose(v -> anotherFuture); // 扁平化
future.thenCombine(f2, (a, b) -> fn); // 组合两个

多任务

CompletableFuture.allOf(f1, f2, f3);   // 等待所有
CompletableFuture.anyOf(f1, f2, f3); // 任一完成

异常处理

future.exceptionally(e -> defaultValue);
future.handle((v, e) -> v != null ? v : default);
future.whenComplete((v, e) -> {});

超时

future.orTimeout(5, SECONDS);
future.completeOnTimeout(defaultValue, 5, SECONDS);

线程安全策略

不可变对象

final class ImmutableValue {
private final int value;

ImmutableValue(int value) {
this.value = value;
}

int getValue() {
return value;
}
}

ThreadLocal

ThreadLocal<T> threadLocal = ThreadLocal.withInitial(() -> defaultValue);
threadLocal.set(value);
threadLocal.get();
threadLocal.remove(); // 必须清理

volatile

volatile boolean running = true;  // 保证可见性
volatile int counter; // 不保证原子性

synchronized

synchronized (lock) {
// 临界区
}

public synchronized void method() {
// 同步方法
}

线程池配置参考

CPU 密集型

int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cores + 1, cores + 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);

IO 密集型

int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cores * 2, cores * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);

混合型

int cores = Runtime.getRuntime().availableProcessors();
double ratio = (double) waitTime / computeTime;
int poolSize = (int) (cores * (1 + ratio));