Java 并发容器详解
引言
并发容器是java.util.concurrent包的核心组件,专为多线程环境设计,替代了传统的Collections.synchronizedXxx()同步包装器。本文将深入讲解三类最常用的并发容器。
一、为什么需要并发容器
1.1 同步容器的局限
// 同步容器:所有方法加 synchronizedList<String>list=Collections.synchronizedList(newArrayList<>());// 问题 1:复合操作不原子// ❌ 先检查再操作,中间可能被其他线程修改if(!list.contains("key")){list.add("key");// 可能重复添加}// 问题 2:迭代时需手动加锁// ❌ ConcurrentModificationExceptionfor(Strings:list){// 其他线程修改 list → 异常}// ✅ 必须手动加锁synchronized(list){for(Strings:list){// 安全}}1.2 并发容器的优势
| 优势 | 说明 |
|---|---|
| 细粒度锁 | 分段锁或 CAS,减少锁竞争 |
| 弱一致性迭代 | 迭代时不抛 ConcurrentModificationException |
| 原子复合操作 | putIfAbsent / computeIfAbsent 等 |
| 无锁读取 | ConcurrentHashMap 读操作不需要锁 |
二、ConcurrentHashMap
2.1 数据结构演进
Java 7:分段锁(Segment)
┌─────────────────────────────────────────────┐ │ Segment[0] │ Segment[1] │ ... │ Segment[15]│ │ ┌─────────┐ │ ┌─────────┐│ │ ┌─────────┐│ │ │ HashEntry│ │ │ HashEntry││ │ HashEntry││ │ │ 链表 │ │ │ 链表 ││ │ 链表 ││ │ └─────────┘ │ └─────────┘│ │ └─────────┘│ │ 锁1 │ 锁2 │ │ 锁16 │ └─────────────────────────────────────────────┘ 每个 Segment 一把锁,默认 16 个 Segment 并发度 = Segment 数量Java 8+:CAS + synchronized(细粒度)
┌──────────────────────────────────────────────┐ │ Node[] table │ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │ │ N │ N │ N │ N │ N │ N │ N │ N │ │ │ └────┴────┴─┬──┴────┴────┴────┴────┴────┘ │ │ │ │ │ ┌────┴────┐ │ │ │ 链表/红黑树│ │ │ └─────────┘ │ │ │ │ 锁粒度:每个桶(数组位置)一把 synchronized 锁 │ │ 读操作:无锁(volatile + CAS) │ │ 写操作:锁单个桶 │ └──────────────────────────────────────────────┘2.2 核心操作
ConcurrentHashMap<String,Integer>map=newConcurrentHashMap<>();// 基本操作(与 HashMap 类似)map.put("a",1);map.get("a");// 1map.remove("a");map.size();map.containsKey("a");// 原子复合操作map.putIfAbsent("b",2);// 不存在才放入map.remove("b",2);// 键值都匹配才删除map.replace("b",2,3);// 旧值匹配才替换map.computeIfAbsent("c",k->3);// 不存在则计算并放入map.computeIfPresent("c",(k,v)->v+1);// 存在则计算替换map.compute("d",(k,v)->v==null?1:v+1);// 无论如何计算map.merge("e",1,Integer::sum);// 存在则合并,不存在则放入2.3 computeIfAbsent 的妙用
// 场景:缓存计算结果ConcurrentHashMap<String,ExpensiveResult>cache=newConcurrentHashMap<>();// ❌ 非原子操作if(!cache.containsKey(key)){cache.put(key,expensiveCompute(key));// 可能重复计算}// ✅ 原子操作ExpensiveResultresult=cache.computeIfAbsent(key,k->expensiveCompute(k));// ✅ 映射到子 Map(多级缓存)ConcurrentHashMap<String,Map<String,Integer>>multiMap=newConcurrentHashMap<>();multiMap.computeIfAbsent("group1",k->newConcurrentHashMap<>()).put("item1",100);2.4 批量操作(Java 8+)
// forEachmap.forEach(1,(k,v)->System.out.println(k+"="+v));// 第一个参数是并行度// search:找到第一个非 null 的结果Stringfound=map.search(1,(k,v)->v>10?k:null);// reduceIntegersum=map.reduceValues(1,Integer::sum);2.5 弱一致性
// ConcurrentHashMap 的迭代器是弱一致性的:// - 不会抛 ConcurrentModificationException// - 可能反映也可能不反映迭代开始后的修改// - 适用于"最终一致"可接受的场景ConcurrentHashMap<String,Integer>map=newConcurrentHashMap<>();map.put("a",1);map.put("b",2);// 迭代过程中其他线程可以修改for(Map.Entry<String,Integer>entry:map.entrySet()){// 不会抛异常,但可能看不到最新的修改}2.6 常见陷阱
① size() 和 isEmpty() 是近似值
// ConcurrentHashMap 的 size() 是各桶的近似求和// 在并发修改时可能不准确intsize=map.size();// 近似值,不保证精确② key/value 不能为 null
map.put(null,1);// ❌ NullPointerExceptionmap.put("key",null);// ❌ NullPointerException③ computeIfAbsent 中不要修改自身 Map
// ❌ 死锁 / 无限循环(Java 8 已部分修复,但仍不推荐)map.computeIfAbsent("a",k->{map.put("b",2);// 不要在计算函数中修改同一个 mapreturn1;});三、CopyOnWriteArrayList
3.1 核心原理
写操作:复制整个数组 → 修改副本 → 替换引用 读操作:直接读取,无锁无等待 ┌──────────────────────────────────────────┐ │ 读线程1 → array ──→ [A, B, C, D, E] │ │ 读线程2 → array ──→ [A, B, C, D, E] │ │ │ │ 写线程:add("F") │ │ 1. 复制 → [A, B, C, D, E, F] │ │ 2. 替换引用 → array 指向新数组 │ │ 3. 旧数组由 GC 回收 │ └──────────────────────────────────────────┘3.2 基本操作
CopyOnWriteArrayList<String>list=newCopyOnWriteArrayList<>();list.add("a");// 复制+添加list.add(0,"b");// 复制+插入list.set(0,"c");// 复制+替换list.remove("a");// 复制+删除list.get(0);// 无锁读取list.size();// 无锁读取list.contains("a");// 无锁遍历3.3 适用场景
读远多于写(如 99:1): ├── 配置列表 ├── 监听器列表(Listener) ├── 白名单/黑名单 └── 数据字典 不适合的场景: ├── 写操作频繁 → 每次复制开销巨大 ├── 列表很大 → 复制耗时 + 内存压力 └── 需要强一致性 → 读写可能不一致3.4 CopyOnWriteArraySet
// 基于 CopyOnWriteArrayList 实现的 Set// add 时遍历检查重复,性能更差CopyOnWriteArraySet<String>set=newCopyOnWriteArraySet<>();set.add("a");set.add("b");set.contains("a");// O(n) 遍历3.5 注意事项
| 注意 | 说明 |
|---|---|
| 写操作开销大 | 每次写都复制整个数组,O(n) |
| 内存占用 | 写期间同时存在两个数组 |
| 最终一致性 | 迭代器使用创建时的快照,不反映后续修改 |
| 不支持 listIterator | 迭代器不支持 add/set/remove |
四、BlockingQueue
4.1 核心接口
publicinterfaceBlockingQueue<E>extendsQueue<E>{// 抛异常booleanadd(Ee);// 队列满抛 IllegalStateExceptionEremove();// 队列空抛 NoSuchElementException// 返回布尔booleanoffer(Ee);// 队列满返回 falseEpoll();// 队列空返回 null// 阻塞等待voidput(Ee)throwsInterruptedException;// 队列满则阻塞等待Etake()throwsInterruptedException;// 队列空则阻塞等待// 超时等待booleanoffer(Ee,longtimeout,TimeUnitunit);Epoll(longtimeout,TimeUnitunit);}4.2 四种操作行为
| 操作 | 抛异常 | 返回值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add() | offer() | put() | offer(timeout) |
| 移除 | remove() | poll() | take() | poll(timeout) |
| 检查 | element() | peek() | - | - |
4.3 七种 BlockingQueue 实现
| 实现 | 数据结构 | 有界 | 特点 |
|---|---|---|---|
ArrayBlockingQueue | 数组 | ✅ 有界 | 公平/非公平可选 |
LinkedBlockingQueue | 链表 | 可选(默认无界) | 吞吐量通常高于 Array |
PriorityBlockingQueue | 堆 | 无界 | 按优先级出队 |
SynchronousQueue | 无 | - | 不存储元素,直接交付 |
DelayQueue | 堆 | 无界 | 元素到期才能出队 |
LinkedTransferQueue | 链表 | 无界 | transfer 直接交付给消费者 |
LinkedBlockingDeque | 双向链表 | 可选 | 双端队列 |
4.4 ArrayBlockingQueue
// 有界阻塞队列,创建时必须指定容量BlockingQueue<Task>queue=newArrayBlockingQueue<>(1000);// 公平模式:等待最久的消费者/生产者优先BlockingQueue<Task>fairQueue=newArrayBlockingQueue<>(1000,true);// 生产者queue.put(newTask());// 队列满时阻塞queue.offer(newTask(),5,SECONDS);// 超时返回 false// 消费者Tasktask=queue.take();// 队列空时阻塞Tasktask=queue.poll(5,SECONDS);// 超时返回 null4.5 生产者-消费者模式
publicclassProducerConsumer{privatefinalBlockingQueue<String>queue=newArrayBlockingQueue<>(100);publicvoidstart(){newThread(this::producer).start();newThread(this::consumer).start();}voidproducer(){try{while(true){Stringdata=fetchData();queue.put(data);// 队列满时自动阻塞}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}voidconsumer(){try{while(true){Stringdata=queue.take();// 队列空时自动阻塞processData(data);}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}}4.6 线程池 + BlockingQueue
// 自定义线程池使用有界队列ThreadPoolExecutorexecutor=newThreadPoolExecutor(10,20,60,SECONDS,newArrayBlockingQueue<>(500),// 有界队列newNamedThreadFactory("worker"),newCallerRunsPolicy()// 队列满时由调用者线程执行);4.7 DelayQueue —— 延迟队列
// 元素必须实现 Delayed 接口publicclassDelayedTaskimplementsDelayed{privatefinalStringname;privatefinallongexecuteTime;publicDelayedTask(Stringname,longdelayMs){this.name=name;this.executeTime=System.currentTimeMillis()+delayMs;}@OverridepubliclonggetDelay(TimeUnitunit){returnunit.convert(executeTime-System.currentTimeMillis(),MILLISECONDS);}@OverridepublicintcompareTo(Delayedo){returnLong.compare(this.executeTime,((DelayedTask)o).executeTime);}}// 使用DelayQueue<DelayedTask>delayQueue=newDelayQueue<>();delayQueue.put(newDelayedTask("task1",5000));// 5秒后执行delayQueue.put(newDelayedTask("task2",3000));// 3秒后执行DelayedTasktask=delayQueue.take();// 阻塞直到最早的任务到期4.8 SynchronousQueue —— 零容量队列
// 不存储元素,生产者必须等待消费者接收// 适用于直接交付场景(CachedThreadPool 使用)BlockingQueue<String>queue=newSynchronousQueue<>();// 线程1:生产者queue.put("data");// 阻塞,直到有消费者来取// 线程2:消费者Stringdata=queue.take();// 阻塞,直到有生产者放入五、其他并发容器
5.1 ConcurrentSkipListMap / ConcurrentSkipListSet
// 基于跳表的有序并发 Map/Set// 插入/删除/查找 O(log n)ConcurrentSkipListMap<String,Integer>sortedMap=newConcurrentSkipListMap<>();sortedMap.put("c",3);sortedMap.put("a",1);sortedMap.put("b",2);// 自然排序:a=1, b=2, c=3ConcurrentSkipListSet<String>sortedSet=newConcurrentSkipListSet<>();5.2 ConcurrentLinkedQueue / ConcurrentLinkedDeque
// 无界非阻塞队列,基于 CAS 实现// 适合高并发场景,但不支持阻塞操作ConcurrentLinkedQueue<String>queue=newConcurrentLinkedQueue<>();queue.offer("a");queue.offer("b");Stringhead=queue.poll();// "a",非阻塞六、选型指南
| 场景 | 推荐 | 原因 |
|---|---|---|
| 高并发键值存储 | ConcurrentHashMap | 分段锁/CAS,读无锁 |
| 缓存计算结果 | ConcurrentHashMap + computeIfAbsent | 原子复合操作 |
| 读多写少列表 | CopyOnWriteArrayList | 写时复制,读无锁 |
| 监听器管理 | CopyOnWriteArrayList | 注册/注销少,遍历多 |
| 生产者-消费者 | ArrayBlockingQueue | 有界 + 阻塞 |
| 任务调度 | DelayQueue | 延迟出队 |
| 有序并发 Map | ConcurrentSkipListMap | 跳表 + CAS |
| 高并发无阻塞队列 | ConcurrentLinkedQueue | CAS,无锁 |
总结
| 容器 | 核心机制 | 读性能 | 写性能 | 一致性 |
|---|---|---|---|---|
| ConcurrentHashMap | CAS + synchronized | 优 | 良 | 弱一致 |
| CopyOnWriteArrayList | 写时复制 | 优 | 差 | 快照一致 |
| ArrayBlockingQueue | ReentrantLock | - | 良 | 强一致 |
| ConcurrentSkipListMap | CAS + 跳表 | 良 | 良 | 弱一致 |
