跳到主要内容

并发集合

传统的 Java 集合类(如 HashMap、ArrayList)不是线程安全的。虽然可以使用 Collections.synchronizedXXX 方法包装,但这种"全表锁"的方式并发性能很差。JUC 提供了一系列高性能的并发集合,本章将详细介绍它们。

为什么需要并发集合?

传统集合的线程安全问题

HashMap 的并发问题

public class HashMapUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
Map<Integer, Integer> map = new HashMap<>();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
map.put(i, i);
}
});

Thread t2 = new Thread(() -> {
for (int i = 10000; i < 20000; i++) {
map.put(i, i);
}
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("Map 大小: " + map.size());
}
}

在多线程环境下使用 HashMap 可能导致:

  • 数据丢失
  • 死循环(JDK 7 扩容时)
  • 数据不一致

ArrayList 的并发问题

public class ArrayListUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});

Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
list.add(i);
}
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("List 大小: " + list.size());
}
}

ArrayList 在多线程环境下可能导致:

  • 数组越界异常
  • 数据丢失
  • null 元素

同步包装器的问题

Collections 工具类提供了同步包装方法:

Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
List<Integer> list = Collections.synchronizedList(new ArrayList<>());

这种方式的问题:

全表锁:所有操作都需要获取同一个锁,并发性能差。

迭代需要手动同步

Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
synchronized (map) {
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
System.out.println(entry.getKey() + " = " + entry.getValue());
}
}

JUC 的并发集合通过更细粒度的锁机制和无锁算法,提供了更好的并发性能。

ConcurrentHashMap

ConcurrentHashMap 是线程安全的哈希表,是 HashMap 的并发版本。它是 JUC 中最重要的并发集合之一,理解其实现原理对于正确使用至关重要。

JDK 7 实现:分段锁

JDK 7 中,ConcurrentHashMap 采用"分段锁"机制:

ConcurrentHashMap
├── Segment[0] (ReentrantLock) → HashEntry[]
├── Segment[1] (ReentrantLock) → HashEntry[]
├── Segment[2] (ReentrantLock) → HashEntry[]
└── ...

每个 Segment 是一个独立的哈希表,继承 ReentrantLock。不同 Segment 之间互不影响,可以并发访问。默认 16 个 Segment,理论上支持 16 个线程同时写入。

分段锁的问题

  1. Segment 数量固定,无法动态扩容
  2. 并发度受限于 Segment 数量
  3. 操作需要两次哈希(先定位 Segment,再定位桶)
  4. 每个 Segment 占用额外的内存

JDK 8 实现:CAS + synchronized

JDK 8 对 ConcurrentHashMap 进行了重大改进,彻底重构了实现方式:

ConcurrentHashMap
└── Node[] table
├── Node (链表)
└── TreeNode (红黑树)

核心改进

  1. 取消分段锁:锁粒度从 Segment 降低到桶级别,理论上支持与桶数量相同的并发度
  2. CAS 操作:无竞争时使用 CAS 插入节点,无需加锁
  3. synchronized 锁桶头:有竞争时锁住桶的头节点,比 ReentrantLock 更轻量
  4. 红黑树优化:链表长度超过 8 转为红黑树,查询效率从 O(n) 提升到 O(log n)

put 操作流程

JDK 8 中 put 操作的流程如下:

详细步骤解析

步骤 1:计算哈希

static final int HASH_BITS = 0x7fffffff;  // 最高位为 0

// spread 方法确保哈希值分布均匀
int hash = spread(key.hashCode());

static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

步骤 2:定位桶位置

int index = (tab.length - 1) & hash;

步骤 3:空桶处理

如果目标桶为空,使用 CAS 直接插入新节点:

if (tab[i] == null) {
// 使用 CAS 原子插入,无需加锁
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
break; // 插入成功
}
// CAS 失败,说明有竞争,重新尝试
}

步骤 4:处理冲突

如果桶不为空,分三种情况处理:

synchronized (node) {  // 锁住桶的头节点
if (node instanceof TreeBin) {
// 红黑树节点,调用树的插入方法
TreeNode<K,V> p = (TreeNode<K,V>)node;
// ... 树的插入逻辑
} else {
// 链表节点,遍历插入
for (Node<K,V> e = node; ; e = e.next) {
if (e.hash == hash && e.key.equals(key)) {
// 找到相同 key,更新 value
e.value = value;
break;
}
if (e.next == null) {
// 到达链表末尾,插入新节点
e.next = new Node<K,V>(hash, key, value, null);
// 检查是否需要树化
if (binCount >= TREEIFY_THRESHOLD - 1)
treeifyBin(tab, i);
break;
}
}
}
}

为什么使用 synchronized 而不是 ReentrantLock?

这是一个常见的问题。JDK 8 选择 synchronized 有以下原因:

  1. JVM 优化:现代 JVM 对 synchronized 做了大量优化(偏向锁、轻量级锁、重量级锁),在无竞争或低竞争时性能接近 CAS

  2. 内存开销:每个 ReentrantLock 都是一个对象,需要额外的内存。ConcurrentHashMap 可能有很多桶,使用 synchronized 避免了大量 Lock 对象的开销

  3. 锁释放:ReentrantLock 需要显式 unlock,容易出错。synchronized 自动释放锁

  4. 锁粒度:synchronized 锁的是对象头,比 ReentrantLock 更轻量

扩容机制

ConcurrentHashMap 的扩容采用"多线程协同扩容"机制,这是 JDK 8 的一个重要创新。

传统扩容的问题

传统 HashMap 扩容时,需要遍历旧表所有元素并迁移到新表。如果表很大,单线程扩容会导致长时间停顿。

多线程协同扩容

ConcurrentHashMap 允许多个线程同时参与扩容:

  1. 扩容时,将旧表分成多个区间(stride)
  2. 每个线程负责一个或多个区间的迁移
  3. 线程完成自己的区间后,可以"窃取"其他区间
  4. 通过 ForwardingNode 标记已迁移的桶
// ForwardingNode 用于标记正在扩容的桶
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}

当线程访问到 ForwardingNode 时,会帮助进行扩容,而不是被动等待。

get 操作

get 操作完全无锁,通过 volatile 语义保证可见性:

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

关键点:

  • Node 的 valnext 字段是 volatile 的,保证可见性
  • 数组访问通过 tabAt 方法,使用 volatile 读语义
  • 整个过程不加锁,读操作不会被写操作阻塞

基本使用

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}
});

Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("key-" + i, i);
}
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("Map 大小: " + map.size());
}
}

原子复合操作

ConcurrentHashMap 提供了多个原子复合操作方法:

putIfAbsent - 不存在才插入

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

map.put("a", 1);
Integer oldValue = map.putIfAbsent("a", 2);
System.out.println("旧值: " + oldValue); // 1
System.out.println("当前值: " + map.get("a")); // 1

Integer newValue = map.putIfAbsent("b", 2);
System.out.println("旧值: " + newValue); // null
System.out.println("当前值: " + map.get("b")); // 2

computeIfAbsent - 不存在则计算并插入

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

map.computeIfAbsent("key", k -> {
System.out.println("计算 value");
return k.length();
});
System.out.println(map.get("key")); // 3

map.computeIfAbsent("key", k -> {
System.out.println("不会执行");
return 100;
});
System.out.println(map.get("key")); // 3

computeIfPresent - 存在则更新

map.put("a", 1);
map.computeIfPresent("a", (k, v) -> v * 10);
System.out.println(map.get("a")); // 10

map.computeIfPresent("b", (k, v) -> v * 10);
System.out.println(map.get("b")); // null

compute - 计算

map.compute("a", (k, v) -> {
if (v == null) return 1;
return v + 1;
});
System.out.println(map.get("a")); // 1

map.compute("a", (k, v) -> v + 1);
System.out.println(map.get("a")); // 2

merge - 合并

map.put("a", 1);
map.merge("a", 2, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("a")); // 3

map.merge("b", 10, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("b")); // 10

replace - 替换

map.put("a", 1);
boolean success = map.replace("a", 1, 2);
System.out.println(success); // true
System.out.println(map.get("a")); // 2

success = map.replace("a", 1, 3);
System.out.println(success); // false

remove - 条件删除

map.put("a", 1);
boolean removed = map.remove("a", 1);
System.out.println(removed); // true
System.out.println(map.containsKey("a")); // false

实战示例:本地缓存

import java.util.concurrent.ConcurrentHashMap;

public class LocalCache {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();

public Object get(String key, java.util.function.Supplier<Object> loader) {
return cache.computeIfAbsent(key, k -> loader.get());
}

public void put(String key, Object value) {
cache.put(key, value);
}

public void remove(String key) {
cache.remove(key);
}

public void clear() {
cache.clear();
}

public static void main(String[] args) {
LocalCache cache = new LocalCache();

Object value = cache.get("user:1", () -> {
System.out.println("从数据库加载用户信息");
return "User-1";
});
System.out.println(value);

value = cache.get("user:1", () -> {
System.out.println("不会执行");
return "User-1-New";
});
System.out.println(value);
}
}

CopyOnWriteArrayList

CopyOnWriteArrayList 是线程安全的 List 实现,采用"写时复制"策略。

工作原理

  • 读操作:直接读取,无需加锁
  • 写操作:复制一份新数组,在新数组上修改,然后替换原数组引用
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();

Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add(i);
System.out.println("添加: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread reader = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("读取: " + list);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

writer.start();
reader.start();

writer.join();
reader.join();
}
}

特点分析

优点

  • 读操作完全无锁,性能极高
  • 迭代过程中不会抛出 ConcurrentModificationException
  • 适合读多写少的场景

缺点

  • 写操作需要复制数组,内存开销大
  • 数据一致性较弱:迭代时可能看到旧数据

适用场景

  • 读操作远多于写操作
  • 集合大小较小
  • 对实时性要求不高
public class EventListenerDemo {
private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();

public void addListener(EventListener listener) {
listeners.add(listener);
}

public void removeListener(EventListener listener) {
listeners.remove(listener);
}

public void fireEvent(Event event) {
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}

interface EventListener {
void onEvent(Event event);
}

class Event {}

CopyOnWriteArraySet

CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的线程安全 Set,保证元素唯一性。

CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();

set.add("a");
set.add("b");
set.add("a");

System.out.println(set); // [a, b]

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是基于链表的无界线程安全队列,采用 CAS 无锁算法实现。

基本使用

import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.offer("Item-" + i);
System.out.println("生产: Item-" + i);
}
});

Thread consumer = new Thread(() -> {
String item;
while ((item = queue.poll()) != null || !Thread.currentThread().isInterrupted()) {
if (item != null) {
System.out.println("消费: " + item);
}
if (queue.isEmpty()) {
break;
}
}
});

producer.start();
producer.join();

consumer.start();
consumer.join();
}
}

核心方法

方法说明
offer(e)入队,成功返回 true
poll()出队,队列为空返回 null
peek()查看队首元素,不删除
isEmpty()判断是否为空
size()返回元素数量(遍历统计,不精确)

特点

  • 无界队列,不会阻塞
  • 基于 CAS 实现,无锁
  • 不支持 null 元素
  • size() 方法需要遍历,时间复杂度 O(n)

BlockingQueue

BlockingQueue 是阻塞队列接口,扩展了 Queue,提供了阻塞的入队和出队操作。

核心方法

操作抛异常返回特殊值阻塞超时
入队add(e)offer(e)put(e)offer(e, timeout, unit)
出队remove()poll()take()poll(timeout, unit)
查看element()peek()--

ArrayBlockingQueue

基于数组的有界阻塞队列:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i + ", 队列大小: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String item = queue.take();
System.out.println("消费: " + item);
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

producer.start();
consumer.start();
}
}

LinkedBlockingQueue

基于链表的可选有界阻塞队列:

BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);

不指定容量时默认为 Integer.MAX_VALUE,相当于无界。

PriorityBlockingQueue

支持优先级的无界阻塞队列:

PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(
11,
Comparator.comparingInt(Task::getPriority).reversed()
);

queue.put(new Task("低优先级", 1));
queue.put(new Task("高优先级", 10));
queue.put(new Task("中优先级", 5));

while (!queue.isEmpty()) {
System.out.println(queue.take());
}

record Task(String name, int priority) {
@Override
public String toString() {
return name + " (优先级: " + priority + ")";
}

public int getPriority() {
return priority;
}
}

SynchronousQueue

不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作:

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();

Thread producer = new Thread(() -> {
try {
System.out.println("生产者准备放入数据");
queue.put("Hello");
System.out.println("生产者放入成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("消费者准备获取数据");
String data = queue.take();
System.out.println("消费者获取: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

producer.start();
consumer.start();
}
}

DelayQueue

支持延迟获取的阻塞队列,元素必须实现 Delayed 接口:

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();

queue.put(new DelayedTask("任务1", 3000));
queue.put(new DelayedTask("任务2", 1000));
queue.put(new DelayedTask("任务3", 5000));

System.out.println("开始时间: " + System.currentTimeMillis());

while (!queue.isEmpty()) {
DelayedTask task = queue.take();
System.out.println("执行: " + task.getName() +
", 时间: " + System.currentTimeMillis());
}
}
}

class DelayedTask implements Delayed {
private final String name;
private final long executeTime;

public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}

public String getName() {
return name;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
}

生产者-消费者模式

BlockingQueue 是实现生产者-消费者模式的最佳选择:

import java.util.concurrent.*;

public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

ExecutorService executor = Executors.newFixedThreadPool(3);

executor.submit(() -> {
for (int i = 0; i < 20; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者1 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者2 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

executor.shutdownNow();
}
}

ConcurrentSkipListMap 和 ConcurrentSkipListSet

基于跳表实现的并发有序集合:

import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;

public class SkipListDemo {
public static void main(String[] args) {
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();

map.put(3, "C");
map.put(1, "A");
map.put(2, "B");
map.put(5, "E");
map.put(4, "D");

System.out.println("有序遍历:");
map.forEach((k, v) -> System.out.println(k + " = " + v));

System.out.println("\n第一个键: " + map.firstKey());
System.out.println("最后一个键: " + map.lastKey());
System.out.println("小于3的最大键: " + map.lowerKey(3));
System.out.println("大于等于3的最小键: " + map.ceilingKey(3));

ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(5);
set.add(1);
set.add(3);
set.add(2);
set.add(4);

System.out.println("\n有序 Set: " + set);
}
}

集合选择指南

场景推荐集合
高并发 MapConcurrentHashMap
读多写少 ListCopyOnWriteArrayList
高并发队列ConcurrentLinkedQueue
生产者-消费者ArrayBlockingQueue / LinkedBlockingQueue
延迟任务DelayQueue
优先级队列PriorityBlockingQueue
有序并发 MapConcurrentSkipListMap
有序并发 SetConcurrentSkipListSet

小结

本章介绍了 JUC 提供的并发集合:

  1. ConcurrentHashMap:高性能并发哈希表,JDK 8 采用 CAS + synchronized 实现
  2. CopyOnWriteArrayList/Set:写时复制的线程安全集合,适合读多写少场景
  3. ConcurrentLinkedQueue:无锁并发队列,高性能
  4. BlockingQueue 系列:阻塞队列,支持生产者-消费者模式
  5. ConcurrentSkipListMap/Set:基于跳表的有序并发集合

选择合适的并发集合对于构建高性能并发应用至关重要。理解每种集合的特点和适用场景,才能做出正确的选择。