跳到主要内容

并发集合

传统的 Java 集合类(如 HashMap、ArrayList)不是线程安全的。虽然可以使用 Collections.synchronizedXXX 方法包装,但这种"全表锁"的方式并发性能很差。JUC 提供了一系列高性能的并发集合,本章将详细介绍它们。

为什么需要并发集合?

传统集合的线程安全问题

HashMap 的并发问题

public class HashMapUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
Map<Integer, Integer> map = new HashMap<>();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
map.put(i, i);
}
});

Thread t2 = new Thread(() -> {
for (int i = 10000; i < 20000; i++) {
map.put(i, i);
}
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("Map 大小: " + map.size());
}
}

在多线程环境下使用 HashMap 可能导致:

  • 数据丢失
  • 死循环(JDK 7 扩容时)
  • 数据不一致

ArrayList 的并发问题

public class ArrayListUnsafeDemo {
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});

Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
list.add(i);
}
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("List 大小: " + list.size());
}
}

ArrayList 在多线程环境下可能导致:

  • 数组越界异常
  • 数据丢失
  • null 元素

同步包装器的问题

Collections 工具类提供了同步包装方法:

Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
List<Integer> list = Collections.synchronizedList(new ArrayList<>());

这种方式的问题:

全表锁:所有操作都需要获取同一个锁,并发性能差。

迭代需要手动同步

Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
synchronized (map) {
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
System.out.println(entry.getKey() + " = " + entry.getValue());
}
}

JUC 的并发集合通过更细粒度的锁机制和无锁算法,提供了更好的并发性能。

ConcurrentHashMap

ConcurrentHashMap 是线程安全的哈希表,是 HashMap 的并发版本。

JDK 7 实现:分段锁

JDK 7 中,ConcurrentHashMap 采用"分段锁"机制:

ConcurrentHashMap
├── Segment[0] (ReentrantLock) → HashEntry[]
├── Segment[1] (ReentrantLock) → HashEntry[]
├── Segment[2] (ReentrantLock) → HashEntry[]
└── ...

每个 Segment 是一个独立的哈希表,继承 ReentrantLock。不同 Segment 之间互不影响,可以并发访问。默认 16 个 Segment,理论上支持 16 个线程同时写入。

JDK 8 实现:CAS + synchronized

JDK 8 对 ConcurrentHashMap 进行了重大改进:

ConcurrentHashMap
└── Node[] table
├── Node (链表)
└── TreeNode (红黑树)

核心改进

  1. 取消分段锁:锁粒度从 Segment 降低到桶级别
  2. CAS 操作:无竞争时使用 CAS 插入节点,无需加锁
  3. synchronized 锁桶头:有竞争时锁住桶的头节点
  4. 红黑树优化:链表长度超过 8 转为红黑树,查询效率从 O(n) 提升到 O(log n)

基本使用

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("key-" + i, i);
}
});

Thread t2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("key-" + i, i);
}
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("Map 大小: " + map.size());
}
}

原子复合操作

ConcurrentHashMap 提供了多个原子复合操作方法:

putIfAbsent - 不存在才插入

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

map.put("a", 1);
Integer oldValue = map.putIfAbsent("a", 2);
System.out.println("旧值: " + oldValue); // 1
System.out.println("当前值: " + map.get("a")); // 1

Integer newValue = map.putIfAbsent("b", 2);
System.out.println("旧值: " + newValue); // null
System.out.println("当前值: " + map.get("b")); // 2

computeIfAbsent - 不存在则计算并插入

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

map.computeIfAbsent("key", k -> {
System.out.println("计算 value");
return k.length();
});
System.out.println(map.get("key")); // 3

map.computeIfAbsent("key", k -> {
System.out.println("不会执行");
return 100;
});
System.out.println(map.get("key")); // 3

computeIfPresent - 存在则更新

map.put("a", 1);
map.computeIfPresent("a", (k, v) -> v * 10);
System.out.println(map.get("a")); // 10

map.computeIfPresent("b", (k, v) -> v * 10);
System.out.println(map.get("b")); // null

compute - 计算

map.compute("a", (k, v) -> {
if (v == null) return 1;
return v + 1;
});
System.out.println(map.get("a")); // 1

map.compute("a", (k, v) -> v + 1);
System.out.println(map.get("a")); // 2

merge - 合并

map.put("a", 1);
map.merge("a", 2, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("a")); // 3

map.merge("b", 10, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.get("b")); // 10

replace - 替换

map.put("a", 1);
boolean success = map.replace("a", 1, 2);
System.out.println(success); // true
System.out.println(map.get("a")); // 2

success = map.replace("a", 1, 3);
System.out.println(success); // false

remove - 条件删除

map.put("a", 1);
boolean removed = map.remove("a", 1);
System.out.println(removed); // true
System.out.println(map.containsKey("a")); // false

实战示例:本地缓存

import java.util.concurrent.ConcurrentHashMap;

public class LocalCache {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();

public Object get(String key, java.util.function.Supplier<Object> loader) {
return cache.computeIfAbsent(key, k -> loader.get());
}

public void put(String key, Object value) {
cache.put(key, value);
}

public void remove(String key) {
cache.remove(key);
}

public void clear() {
cache.clear();
}

public static void main(String[] args) {
LocalCache cache = new LocalCache();

Object value = cache.get("user:1", () -> {
System.out.println("从数据库加载用户信息");
return "User-1";
});
System.out.println(value);

value = cache.get("user:1", () -> {
System.out.println("不会执行");
return "User-1-New";
});
System.out.println(value);
}
}

CopyOnWriteArrayList

CopyOnWriteArrayList 是线程安全的 List 实现,采用"写时复制"策略。

工作原理

  • 读操作:直接读取,无需加锁
  • 写操作:复制一份新数组,在新数组上修改,然后替换原数组引用
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();

Thread writer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add(i);
System.out.println("添加: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread reader = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("读取: " + list);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

writer.start();
reader.start();

writer.join();
reader.join();
}
}

特点分析

优点

  • 读操作完全无锁,性能极高
  • 迭代过程中不会抛出 ConcurrentModificationException
  • 适合读多写少的场景

缺点

  • 写操作需要复制数组,内存开销大
  • 数据一致性较弱:迭代时可能看到旧数据

适用场景

  • 读操作远多于写操作
  • 集合大小较小
  • 对实时性要求不高
public class EventListenerDemo {
private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();

public void addListener(EventListener listener) {
listeners.add(listener);
}

public void removeListener(EventListener listener) {
listeners.remove(listener);
}

public void fireEvent(Event event) {
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}

interface EventListener {
void onEvent(Event event);
}

class Event {}

CopyOnWriteArraySet

CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的线程安全 Set,保证元素唯一性。

CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();

set.add("a");
set.add("b");
set.add("a");

System.out.println(set); // [a, b]

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是基于链表的无界线程安全队列,采用 CAS 无锁算法实现。

基本使用

import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.offer("Item-" + i);
System.out.println("生产: Item-" + i);
}
});

Thread consumer = new Thread(() -> {
String item;
while ((item = queue.poll()) != null || !Thread.currentThread().isInterrupted()) {
if (item != null) {
System.out.println("消费: " + item);
}
if (queue.isEmpty()) {
break;
}
}
});

producer.start();
producer.join();

consumer.start();
consumer.join();
}
}

核心方法

方法说明
offer(e)入队,成功返回 true
poll()出队,队列为空返回 null
peek()查看队首元素,不删除
isEmpty()判断是否为空
size()返回元素数量(遍历统计,不精确)

特点

  • 无界队列,不会阻塞
  • 基于 CAS 实现,无锁
  • 不支持 null 元素
  • size() 方法需要遍历,时间复杂度 O(n)

BlockingQueue

BlockingQueue 是阻塞队列接口,扩展了 Queue,提供了阻塞的入队和出队操作。

核心方法

操作抛异常返回特殊值阻塞超时
入队add(e)offer(e)put(e)offer(e, timeout, unit)
出队remove()poll()take()poll(timeout, unit)
查看element()peek()--

ArrayBlockingQueue

基于数组的有界阻塞队列:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i + ", 队列大小: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String item = queue.take();
System.out.println("消费: " + item);
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

producer.start();
consumer.start();
}
}

LinkedBlockingQueue

基于链表的可选有界阻塞队列:

BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);

不指定容量时默认为 Integer.MAX_VALUE,相当于无界。

PriorityBlockingQueue

支持优先级的无界阻塞队列:

PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(
11,
Comparator.comparingInt(Task::getPriority).reversed()
);

queue.put(new Task("低优先级", 1));
queue.put(new Task("高优先级", 10));
queue.put(new Task("中优先级", 5));

while (!queue.isEmpty()) {
System.out.println(queue.take());
}

record Task(String name, int priority) {
@Override
public String toString() {
return name + " (优先级: " + priority + ")";
}

public int getPriority() {
return priority;
}
}

SynchronousQueue

不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作:

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();

Thread producer = new Thread(() -> {
try {
System.out.println("生产者准备放入数据");
queue.put("Hello");
System.out.println("生产者放入成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("消费者准备获取数据");
String data = queue.take();
System.out.println("消费者获取: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

producer.start();
consumer.start();
}
}

DelayQueue

支持延迟获取的阻塞队列,元素必须实现 Delayed 接口:

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();

queue.put(new DelayedTask("任务1", 3000));
queue.put(new DelayedTask("任务2", 1000));
queue.put(new DelayedTask("任务3", 5000));

System.out.println("开始时间: " + System.currentTimeMillis());

while (!queue.isEmpty()) {
DelayedTask task = queue.take();
System.out.println("执行: " + task.getName() +
", 时间: " + System.currentTimeMillis());
}
}
}

class DelayedTask implements Delayed {
private final String name;
private final long executeTime;

public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}

public String getName() {
return name;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
}

生产者-消费者模式

BlockingQueue 是实现生产者-消费者模式的最佳选择:

import java.util.concurrent.*;

public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

ExecutorService executor = Executors.newFixedThreadPool(3);

executor.submit(() -> {
for (int i = 0; i < 20; i++) {
try {
queue.put("Item-" + i);
System.out.println("生产: Item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者1 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String item = queue.take();
System.out.println("消费者2 消费: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

executor.shutdownNow();
}
}

ConcurrentSkipListMap 和 ConcurrentSkipListSet

基于跳表实现的并发有序集合:

import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;

public class SkipListDemo {
public static void main(String[] args) {
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();

map.put(3, "C");
map.put(1, "A");
map.put(2, "B");
map.put(5, "E");
map.put(4, "D");

System.out.println("有序遍历:");
map.forEach((k, v) -> System.out.println(k + " = " + v));

System.out.println("\n第一个键: " + map.firstKey());
System.out.println("最后一个键: " + map.lastKey());
System.out.println("小于3的最大键: " + map.lowerKey(3));
System.out.println("大于等于3的最小键: " + map.ceilingKey(3));

ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(5);
set.add(1);
set.add(3);
set.add(2);
set.add(4);

System.out.println("\n有序 Set: " + set);
}
}

集合选择指南

场景推荐集合
高并发 MapConcurrentHashMap
读多写少 ListCopyOnWriteArrayList
高并发队列ConcurrentLinkedQueue
生产者-消费者ArrayBlockingQueue / LinkedBlockingQueue
延迟任务DelayQueue
优先级队列PriorityBlockingQueue
有序并发 MapConcurrentSkipListMap
有序并发 SetConcurrentSkipListSet

小结

本章介绍了 JUC 提供的并发集合:

  1. ConcurrentHashMap:高性能并发哈希表,JDK 8 采用 CAS + synchronized 实现
  2. CopyOnWriteArrayList/Set:写时复制的线程安全集合,适合读多写少场景
  3. ConcurrentLinkedQueue:无锁并发队列,高性能
  4. BlockingQueue 系列:阻塞队列,支持生产者-消费者模式
  5. ConcurrentSkipListMap/Set:基于跳表的有序并发集合

选择合适的并发集合对于构建高性能并发应用至关重要。理解每种集合的特点和适用场景,才能做出正确的选择。