并发集合
传统的 Java 集合类(如 HashMap、ArrayList)不是线程安全的。虽然可以使用 Collections.synchronizedXXX 方法包装,但这种"全表锁"的方式并发性能很差。JUC 提供了一系列高性能的并发集合,本章将详细介绍它们。
为什么需要并发集合?
传统集合的线程安全问题
HashMap 的并发问题
public class HashMapUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
Map<Integer, Integer> map = new HashMap<>();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
map.put(i, i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 10000; i < 20000; i++) {
map.put(i, i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Map 大小: " + map.size());
}
}
在多线程环境下使用 HashMap 可能导致:
- 数据丢失
- 死循环(JDK 7 扩容时)
- 数据不一致
ArrayList 的并发问题
public class ArrayListUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
list.add(i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("List 大小: " + list.size());
}
}
ArrayList 在多线程环境下可能导致:
- 数组越界异常
- 数据丢失
- null 元素
同步包装器的问题
Collections 工具类提供了同步包装方法:
Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
这种方式的问题:
全表锁:所有操作都需要获取同一个锁,并发性能差。
迭代需要手动同步:
Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
synchronized (map) {
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
System.out.println(entry.getKey() + " = " + entry.getValue());
}
}
JUC 的并发集合通过更细粒度的锁机制和无锁算法,提供了更好的并发性能。
ConcurrentHashMap
ConcurrentHashMap 是线程安全的哈希表,是 HashMap 的并发版本。
JDK 7 实现:分段锁
JDK 7 中,ConcurrentHashMap 采用"分段锁"机制:
ConcurrentHashMap
├── Segment[0] (ReentrantLock) → HashEntry[]
├── Segment[1] (ReentrantLock) → HashEntry[]
├── Segment[2] (ReentrantLock) → HashEntry[]
└── ...
每个 Segment 是一个独立的哈希表,继承 ReentrantLock。不同 Segment 之间互不影响,可以并发访问。默认 16 个 Segment,理论上支持 16 个线程同时写入。
JDK 8 实现:CAS + synchronized
JDK 8 对 ConcurrentHashMap 进行了重大改进:
ConcurrentHashMap
└── Node[] table
├── Node (链表)
└── TreeNode (红黑树)
核心改进:
- 取消分段锁:锁粒度从 Segment 降低到桶级别
- CAS 操作:无竞争时使用 CAS 插入节点,无需加锁
- synchronized 锁桶头:有竞争时锁住桶的头节点
- 红黑树优化:链表长度超过 8 转为红黑树,查询效率从 O(n) 提升到 O(log n)
基本使用
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("key-" + i, i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Map 大小: " + map.size());
}
}
原子复合操作
ConcurrentHashMap 提供了多个原子复合操作方法:
putIfAbsent - 不存在才插入
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("a", 1);
Integer oldValue = map.putIfAbsent("a", 2);
System.out.println("旧值: " + oldValue); // 1
System.out.println("当前值: " + map.get("a")); // 1
Integer newValue = map.putIfAbsent("b", 2);
System.out.println("旧值: " + newValue); // null
System.out.println("当前值: " + map.get("b")); // 2
computeIfAbsent - 不存在则计算并插入
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.computeIfAbsent("key", k -> {
System.out.println("计算 value");
return k.length();
});
System.out.println(map.get("key")); // 3
map.computeIfAbsent("key", k -> {
System.out.println("不会执行");
return 100;
});
System.out.println(map.get("key")); // 3
computeIfPresent - 存在则更新
map.put("a", 1);
map.computeIfPresent("a", (k, v) -> v * 10);
System.out.println(map.get("a")); // 10
map.computeIfPresent("b", (k, v) -> v * 10);
System.out.println(map.get("b")); // null
compute - 计算
map.compute("a", (k, v) -> {
if (v == null) return 1;
return v + 1;
});
System.out.println(map.get("a")); // 1
map.compute("a", (k, v) -> v + 1);
System.out.println(map.get("a")); // 2
merge - 合并
map.put("a", 1);
map.merge("a", 2, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("a")); // 3
map.merge("b", 10, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("b")); // 10
replace - 替换
map.put("a", 1);
boolean success = map.replace("a", 1, 2);
System.out.println(success); // true
System.out.println(map.get("a")); // 2
success = map.replace("a", 1, 3);
System.out.println(success); // false
remove - 条件删除
map.put("a", 1);
boolean removed = map.remove("a", 1);
System.out.println(removed); // true
System.out.println(map.containsKey("a")); // false
实战示例:本地缓存
import java.util.concurrent.ConcurrentHashMap;
public class LocalCache {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
public Object get(String key, java.util.function.Supplier<Object> loader) {
return cache.computeIfAbsent(key, k -> loader.get());
}
public void put(String key, Object value) {
cache.put(key, value);
}
public void remove(String key) {
cache.remove(key);
}
public void clear() {
cache.clear();
}
public static void main(String[] args) {
LocalCache cache = new LocalCache();
Object value = cache.get("user:1", () -> {
System.out.println("从数据库加载用户信息");
return "User-1";
});
System.out.println(value);
value = cache.get("user:1", () -> {
System.out.println("不会执行");
return "User-1-New";
});
System.out.println(value);
}
}
CopyOnWriteArrayList
CopyOnWriteArrayList 是线程安全的 List 实现,采用"写时复制"策略。
工作原理
- 读操作:直接读取,无需加锁
- 写操作:复制一份新数组,在新数组上修改,然后替换原数组引用
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add(i);
System.out.println("添加: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread reader = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("读取: " + list);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
writer.start();
reader.start();
writer.join();
reader.join();
}
}
特点分析
优点:
- 读操作完全无锁,性能极高
- 迭代过程中不会抛出 ConcurrentModificationException
- 适合读多写少的场景
缺点:
- 写操作需要复制数组,内存开销大
- 数据一致性较弱:迭代时可能看到旧数据
适用场景
- 读操作远多于写操作
- 集合大小较小
- 对实时性要求不高
public class EventListenerDemo {
private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();
public void addListener(EventListener listener) {
listeners.add(listener);
}
public void removeListener(EventListener listener) {
listeners.remove(listener);
}
public void fireEvent(Event event) {
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}
interface EventListener {
void onEvent(Event event);
}
class Event {}
CopyOnWriteArraySet
CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的线程安全 Set,保证元素唯一性。
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("a");
set.add("b");
set.add("a");
System.out.println(set); // [a, b]
ConcurrentLinkedQueue
ConcurrentLinkedQueue 是基于链表的无界线程安全队列,采用 CAS 无锁算法实现。
基本使用
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.offer("Item-" + i);
System.out.println("生产: Item-" + i);
}
});
Thread consumer = new Thread(() -> {
String item;
while ((item = queue.poll()) != null || !Thread.currentThread().isInterrupted()) {
if (item != null) {
System.out.println("消费: " + item);
}
if (queue.isEmpty()) {
break;
}
}
});
producer.start();
producer.join();
consumer.start();
consumer.join();
}
}
核心方法
| 方法 | 说明 |
|---|---|
| offer(e) | 入队,成功返回 true |
| poll() | 出队,队列为空返回 null |
| peek() | 查看队首元素,不删除 |
| isEmpty() | 判断是否为空 |
| size() | 返回元素数量(遍历统计,不精确) |
特点
- 无界队列,不会阻塞
- 基于 CAS 实现,无锁
- 不支持 null 元素
- size() 方法需要遍历,时间复杂度 O(n)
BlockingQueue
BlockingQueue 是阻塞队列接口,扩展了 Queue,提供了阻塞的入队和出队操作。
核心方法
| 操作 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 入队 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
| 出队 | remove() | poll() | take() | poll(timeout, unit) |
| 查看 | element() | peek() | - | - |
ArrayBlockingQueue
基于数组的有界阻塞队列:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i + ", 队列大小: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String item = queue.take();
System.out.println("消费: " + item);
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
LinkedBlockingQueue
基于链表的可选有界阻塞队列:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);
不指定容量时默认为 Integer.MAX_VALUE,相当于无界。
PriorityBlockingQueue
支持优先级的无界阻塞队列:
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(
11,
Comparator.comparingInt(Task::getPriority).reversed()
);
queue.put(new Task("低优先级", 1));
queue.put(new Task("高优先级", 10));
queue.put(new Task("中优先级", 5));
while (!queue.isEmpty()) {
System.out.println(queue.take());
}
record Task(String name, int priority) {
@Override
public String toString() {
return name + " (优先级: " + priority + ")";
}
public int getPriority() {
return priority;
}
}
SynchronousQueue
不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作:
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
Thread producer = new Thread(() -> {
try {
System.out.println("生产者准备放入数据");
queue.put("Hello");
System.out.println("生产者放入成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("消费者准备获取数据");
String data = queue.take();
System.out.println("消费者获取: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
DelayQueue
支持延迟获取的阻塞队列,元素必须实现 Delayed 接口:
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("任务1", 3000));
queue.put(new DelayedTask("任务2", 1000));
queue.put(new DelayedTask("任务3", 5000));
System.out.println("开始时间: " + System.currentTimeMillis());
while (!queue.isEmpty()) {
DelayedTask task = queue.take();
System.out.println("执行: " + task.getName() +
", 时间: " + System.currentTimeMillis());
}
}
}
class DelayedTask implements Delayed {
private final String name;
private final long executeTime;
public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}
public String getName() {
return name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
}
生产者-消费者模式
BlockingQueue 是实现生产者-消费者模式的最佳选择:
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(() -> {
for (int i = 0; i < 20; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者1 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者2 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdownNow();
}
}
ConcurrentSkipListMap 和 ConcurrentSkipListSet
基于跳表实现的并发有序集合:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
public class SkipListDemo {
public static void main(String[] args) {
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
map.put(3, "C");
map.put(1, "A");
map.put(2, "B");
map.put(5, "E");
map.put(4, "D");
System.out.println("有序遍历:");
map.forEach((k, v) -> System.out.println(k + " = " + v));
System.out.println("\n第一个键: " + map.firstKey());
System.out.println("最后一个键: " + map.lastKey());
System.out.println("小于3的最大键: " + map.lowerKey(3));
System.out.println("大于等于3的最小键: " + map.ceilingKey(3));
ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(5);
set.add(1);
set.add(3);
set.add(2);
set.add(4);
System.out.println("\n有序 Set: " + set);
}
}
集合选择指南
| 场景 | 推荐集合 |
|---|---|
| 高并发 Map | ConcurrentHashMap |
| 读多写少 List | CopyOnWriteArrayList |
| 高并发队列 | ConcurrentLinkedQueue |
| 生产者-消费者 | ArrayBlockingQueue / LinkedBlockingQueue |
| 延迟任务 | DelayQueue |
| 优先级队列 | PriorityBlockingQueue |
| 有序并发 Map | ConcurrentSkipListMap |
| 有序并发 Set | ConcurrentSkipListSet |
小结
本章介绍了 JUC 提供的并发集合:
- ConcurrentHashMap:高性能并发哈希表,JDK 8 采用 CAS + synchronized 实现
- CopyOnWriteArrayList/Set:写时复制的线程安全集合,适合读多写少场景
- ConcurrentLinkedQueue:无锁并发队列,高性能
- BlockingQueue 系列:阻塞队列,支持生产者-消费者模式
- ConcurrentSkipListMap/Set:基于跳表的有序并发集合
选择合适的并发集合对于构建高性能并发应用至关重要。理解每种集合的特点和适用场景,才能做出正确的选择。