并发集合
传统的 Java 集合类(如 HashMap、ArrayList)不是线程安全的。虽然可以使用 Collections.synchronizedXXX 方法包装,但这种"全表锁"的方式并发性能很差。JUC 提供了一系列高性能的并发集合,本章将详细介绍它们。
为什么需要并发集合?
传统集合的线程安全问题
HashMap 的并发问题
public class HashMapUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
Map<Integer, Integer> map = new HashMap<>();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
map.put(i, i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 10000; i < 20000; i++) {
map.put(i, i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Map 大小: " + map.size());
}
}
在多线程环境下使用 HashMap 可能导致:
- 数据丢失
- 死循环(JDK 7 扩容时)
- 数据不一致
ArrayList 的并发问题
public class ArrayListUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
list.add(i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("List 大小: " + list.size());
}
}
ArrayList 在多线程环境下可能导致:
- 数组越界异常
- 数据丢失
- null 元素
同步包装器的问题
Collections 工具类提供了同步包装方法:
Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
这种方式的问题:
全表锁:所有操作都需要获取同一个锁,并发性能差。
迭代需要手动同步:
Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
synchronized (map) {
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
System.out.println(entry.getKey() + " = " + entry.getValue());
}
}
JUC 的并发集合通过更细粒度的锁机制和无锁算法,提供了更好的并发性能。
ConcurrentHashMap
ConcurrentHashMap 是线程安全的哈希表,是 HashMap 的并发版本。它是 JUC 中最重要的并发集合之一,理解其实现原理对于正确使用至关重要。
JDK 7 实现:分段锁
JDK 7 中,ConcurrentHashMap 采用"分段锁"机制:
ConcurrentHashMap
├── Segment[0] (ReentrantLock) → HashEntry[]
├── Segment[1] (ReentrantLock) → HashEntry[]
├── Segment[2] (ReentrantLock) → HashEntry[]
└── ...
每个 Segment 是一个独立的哈希表,继承 ReentrantLock。不同 Segment 之间互不影响,可以并发访问。默认 16 个 Segment,理论上支持 16 个线程同时写入。
分段锁的问题:
- Segment 数量固定,无法动态扩容
- 并发度受限于 Segment 数量
- 操作需要两次哈希(先定位 Segment,再定位桶)
- 每个 Segment 占用额外的内存
JDK 8 实现:CAS + synchronized
JDK 8 对 ConcurrentHashMap 进行了重大改进,彻底重构了实现方式:
ConcurrentHashMap
└── Node[] table
├── Node (链表)
└── TreeNode (红黑树)
核心改进:
- 取消分段锁:锁粒度从 Segment 降低到桶级别,理论上支持与桶数量相同的并发度
- CAS 操作:无竞争时使用 CAS 插入节点,无需加锁
- synchronized 锁桶头:有竞争时锁住桶的头节点,比 ReentrantLock 更轻量
- 红黑树优化:链表长度超过 8 转为红黑树,查询效率从 O(n) 提升到 O(log n)
put 操作流程
JDK 8 中 put 操作的流程如下:
详细步骤解析:
步骤 1:计算哈希
static final int HASH_BITS = 0x7fffffff; // 最高位为 0
// spread 方法确保哈希值分布均匀
int hash = spread(key.hashCode());
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
步骤 2:定位桶位置
int index = (tab.length - 1) & hash;
步骤 3:空桶处理
如果目标桶为空,使用 CAS 直接插入新节点:
if (tab[i] == null) {
// 使用 CAS 原子插入,无需加锁
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
break; // 插入成功
}
// CAS 失败,说明有竞争,重新尝试
}
步骤 4:处理冲突
如果桶不为空,分三种情况处理:
synchronized (node) { // 锁住桶的头节点
if (node instanceof TreeBin) {
// 红黑树节点,调用树的插入方法
TreeNode<K,V> p = (TreeNode<K,V>)node;
// ... 树的插入逻辑
} else {
// 链表节点,遍历插入
for (Node<K,V> e = node; ; e = e.next) {
if (e.hash == hash && e.key.equals(key)) {
// 找到相同 key,更新 value
e.value = value;
break;
}
if (e.next == null) {
// 到达链表末尾,插入新节点
e.next = new Node<K,V>(hash, key, value, null);
// 检查是否需要树化
if (binCount >= TREEIFY_THRESHOLD - 1)
treeifyBin(tab, i);
break;
}
}
}
}
为什么使用 synchronized 而不是 ReentrantLock?
这是一个常见的问题。JDK 8 选择 synchronized 有以下原因:
-
JVM 优化:现代 JVM 对 synchronized 做了大量优化(偏向锁、轻量级锁、重量级锁),在无竞争或低竞争时性能接近 CAS
-
内存开销:每个 ReentrantLock 都是一个对象,需要额外的内存。ConcurrentHashMap 可能有很多桶,使用 synchronized 避免了大量 Lock 对象的开销
-
锁释放:ReentrantLock 需要显式 unlock,容易出错。synchronized 自动释放锁
-
锁粒度:synchronized 锁的是对象头,比 ReentrantLock 更轻量
扩容机制
ConcurrentHashMap 的扩容采用"多线程协同扩容"机制,这是 JDK 8 的一个重要创新。
传统扩容的问题:
传统 HashMap 扩容时,需要遍历旧表所有元素并迁移到新表。如果表很大,单线程扩容会导致长时间停顿。
多线程协同扩容:
ConcurrentHashMap 允许多个线程同时参与扩容:
- 扩容时,将旧表分成多个区间(stride)
- 每个线程负责一个或多个区间的迁移
- 线程完成自己的区间后,可以"窃取"其他区间
- 通过 ForwardingNode 标记已迁移的桶
// ForwardingNode 用于标记正在扩容的桶
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
当线程访问到 ForwardingNode 时,会帮助进行扩容,而不是被动等待。
get 操作
get 操作完全无锁,通过 volatile 语义保证可见性:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
关键点:
- Node 的
val和next字段是 volatile 的,保证可见性 - 数组访问通过
tabAt方法,使用 volatile 读语义 - 整个过程不加锁,读操作不会被写操作阻塞
基本使用
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("key-" + i, i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Map 大小: " + map.size());
}
}
原子复合操作
ConcurrentHashMap 提供了多个原子复合操作方法:
putIfAbsent - 不存在才插入
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("a", 1);
Integer oldValue = map.putIfAbsent("a", 2);
System.out.println("旧值: " + oldValue); // 1
System.out.println("当前值: " + map.get("a")); // 1
Integer newValue = map.putIfAbsent("b", 2);
System.out.println("旧值: " + newValue); // null
System.out.println("当前值: " + map.get("b")); // 2
computeIfAbsent - 不存在则计算并插入
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.computeIfAbsent("key", k -> {
System.out.println("计算 value");
return k.length();
});
System.out.println(map.get("key")); // 3
map.computeIfAbsent("key", k -> {
System.out.println("不会执行");
return 100;
});
System.out.println(map.get("key")); // 3
computeIfPresent - 存在则更新
map.put("a", 1);
map.computeIfPresent("a", (k, v) -> v * 10);
System.out.println(map.get("a")); // 10
map.computeIfPresent("b", (k, v) -> v * 10);
System.out.println(map.get("b")); // null
compute - 计算
map.compute("a", (k, v) -> {
if (v == null) return 1;
return v + 1;
});
System.out.println(map.get("a")); // 1
map.compute("a", (k, v) -> v + 1);
System.out.println(map.get("a")); // 2
merge - 合并
map.put("a", 1);
map.merge("a", 2, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("a")); // 3
map.merge("b", 10, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("b")); // 10
replace - 替换
map.put("a", 1);
boolean success = map.replace("a", 1, 2);
System.out.println(success); // true
System.out.println(map.get("a")); // 2
success = map.replace("a", 1, 3);
System.out.println(success); // false
remove - 条件删除
map.put("a", 1);
boolean removed = map.remove("a", 1);
System.out.println(removed); // true
System.out.println(map.containsKey("a")); // false
实战示例:本地缓存
import java.util.concurrent.ConcurrentHashMap;
public class LocalCache {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
public Object get(String key, java.util.function.Supplier<Object> loader) {
return cache.computeIfAbsent(key, k -> loader.get());
}
public void put(String key, Object value) {
cache.put(key, value);
}
public void remove(String key) {
cache.remove(key);
}
public void clear() {
cache.clear();
}
public static void main(String[] args) {
LocalCache cache = new LocalCache();
Object value = cache.get("user:1", () -> {
System.out.println("从数据库加载用户信息");
return "User-1";
});
System.out.println(value);
value = cache.get("user:1", () -> {
System.out.println("不会执行");
return "User-1-New";
});
System.out.println(value);
}
}
CopyOnWriteArrayList
CopyOnWriteArrayList 是线程安全的 List 实现,采用"写时复制"策略。
工作原理
- 读操作:直接读取,无需加锁
- 写操作:复制一份新数组,在新数组上修改,然后替换原数组引用
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add(i);
System.out.println("添加: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread reader = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("读取: " + list);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
writer.start();
reader.start();
writer.join();
reader.join();
}
}
特点分析
优点:
- 读操作完全无锁,性能极高
- 迭代过程中不会抛出 ConcurrentModificationException
- 适合读多写少的场景
缺点:
- 写操作需要复制数组,内存开销大
- 数据一致性较弱:迭代时可能看到旧数据
适用场景
- 读操作远多于写操作
- 集合大小较小
- 对实时性要求不高
public class EventListenerDemo {
private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();
public void addListener(EventListener listener) {
listeners.add(listener);
}
public void removeListener(EventListener listener) {
listeners.remove(listener);
}
public void fireEvent(Event event) {
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}
interface EventListener {
void onEvent(Event event);
}
class Event {}
CopyOnWriteArraySet
CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的线程安全 Set,保证元素唯一性。
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("a");
set.add("b");
set.add("a");
System.out.println(set); // [a, b]
ConcurrentLinkedQueue
ConcurrentLinkedQueue 是基于链表的无界线程安全队列,采用 CAS 无锁算法实现。
基本使用
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.offer("Item-" + i);
System.out.println("生产: Item-" + i);
}
});
Thread consumer = new Thread(() -> {
String item;
while ((item = queue.poll()) != null || !Thread.currentThread().isInterrupted()) {
if (item != null) {
System.out.println("消费: " + item);
}
if (queue.isEmpty()) {
break;
}
}
});
producer.start();
producer.join();
consumer.start();
consumer.join();
}
}
核心方法
| 方法 | 说明 |
|---|---|
| offer(e) | 入队,成功返回 true |
| poll() | 出队,队列为空返回 null |
| peek() | 查看队首元素,不删除 |
| isEmpty() | 判断是否为空 |
| size() | 返回元素数量(遍历统计,不精确) |
特点
- 无界队列,不会阻塞
- 基于 CAS 实现,无锁
- 不支持 null 元素
- size() 方法需要遍历,时间复杂度 O(n)
BlockingQueue
BlockingQueue 是阻塞队列接口,扩展了 Queue,提供了阻塞的入队和出队操作。
核心方法
| 操作 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 入队 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
| 出队 | remove() | poll() | take() | poll(timeout, unit) |
| 查看 | element() | peek() | - | - |
ArrayBlockingQueue
基于数组的有界阻塞队列:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i + ", 队列大小: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String item = queue.take();
System.out.println("消费: " + item);
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
LinkedBlockingQueue
基于链表的可选有界阻塞队列:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);
不指定容量时默认为 Integer.MAX_VALUE,相当于无界。
PriorityBlockingQueue
支持优先级的无界阻塞队列:
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(
11,
Comparator.comparingInt(Task::getPriority).reversed()
);
queue.put(new Task("低优先级", 1));
queue.put(new Task("高优先级", 10));
queue.put(new Task("中优先级", 5));
while (!queue.isEmpty()) {
System.out.println(queue.take());
}
record Task(String name, int priority) {
@Override
public String toString() {
return name + " (优先级: " + priority + ")";
}
public int getPriority() {
return priority;
}
}
SynchronousQueue
不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作:
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
Thread producer = new Thread(() -> {
try {
System.out.println("生产者准备放入数据");
queue.put("Hello");
System.out.println("生产者放入成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("消费者准备获取数据");
String data = queue.take();
System.out.println("消费者获取: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
DelayQueue
支持延迟获取的阻塞队列,元素必须实现 Delayed 接口:
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("任务1", 3000));
queue.put(new DelayedTask("任务2", 1000));
queue.put(new DelayedTask("任务3", 5000));
System.out.println("开始时间: " + System.currentTimeMillis());
while (!queue.isEmpty()) {
DelayedTask task = queue.take();
System.out.println("执行: " + task.getName() +
", 时间: " + System.currentTimeMillis());
}
}
}
class DelayedTask implements Delayed {
private final String name;
private final long executeTime;
public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}
public String getName() {
return name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
}
生产者-消费者模式
BlockingQueue 是实现生产者-消费者模式的最佳选择:
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(() -> {
for (int i = 0; i < 20; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者1 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者2 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdownNow();
}
}
ConcurrentSkipListMap 和 ConcurrentSkipListSet
基于跳表实现的并发有序集合:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
public class SkipListDemo {
public static void main(String[] args) {
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
map.put(3, "C");
map.put(1, "A");
map.put(2, "B");
map.put(5, "E");
map.put(4, "D");
System.out.println("有序遍历:");
map.forEach((k, v) -> System.out.println(k + " = " + v));
System.out.println("\n第一个键: " + map.firstKey());
System.out.println("最后一个键: " + map.lastKey());
System.out.println("小于3的最大键: " + map.lowerKey(3));
System.out.println("大于等于3的最小键: " + map.ceilingKey(3));
ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(5);
set.add(1);
set.add(3);
set.add(2);
set.add(4);
System.out.println("\n有序 Set: " + set);
}
}
集合选择指南
| 场景 | 推荐集合 |
|---|---|
| 高并发 Map | ConcurrentHashMap |
| 读多写少 List | CopyOnWriteArrayList |
| 高并发队列 | ConcurrentLinkedQueue |
| 生产者-消费者 | ArrayBlockingQueue / LinkedBlockingQueue |
| 延迟任务 | DelayQueue |
| 优先级队列 | PriorityBlockingQueue |
| 有序并发 Map | ConcurrentSkipListMap |
| 有序并发 Set | ConcurrentSkipListSet |
小结
本章介绍了 JUC 提供的并发集合:
- ConcurrentHashMap:高性能并发哈希表,JDK 8 采用 CAS + synchronized 实现
- CopyOnWriteArrayList/Set:写时复制的线程安全集合,适合读多写少场景
- ConcurrentLinkedQueue:无锁并发队列,高性能
- BlockingQueue 系列:阻塞队列,支持生产者-消费者模式
- ConcurrentSkipListMap/Set:基于跳表的有序并发集合
选择合适的并发集合对于构建高性能并发应用至关重要。理解每种集合的特点和适用场景,才能做出正确的选择。