当前位置: 首页 > news >正文

Java 并发容器详解

引言

并发容器是java.util.concurrent包的核心组件,专为多线程环境设计,替代了传统的Collections.synchronizedXxx()同步包装器。本文将深入讲解三类最常用的并发容器。


一、为什么需要并发容器

1.1 同步容器的局限

// 同步容器:所有方法加 synchronizedList<String>list=Collections.synchronizedList(newArrayList<>());// 问题 1:复合操作不原子// ❌ 先检查再操作,中间可能被其他线程修改if(!list.contains("key")){list.add("key");// 可能重复添加}// 问题 2:迭代时需手动加锁// ❌ ConcurrentModificationExceptionfor(Strings:list){// 其他线程修改 list → 异常}// ✅ 必须手动加锁synchronized(list){for(Strings:list){// 安全}}

1.2 并发容器的优势

优势说明
细粒度锁分段锁或 CAS,减少锁竞争
弱一致性迭代迭代时不抛 ConcurrentModificationException
原子复合操作putIfAbsent / computeIfAbsent 等
无锁读取ConcurrentHashMap 读操作不需要锁

二、ConcurrentHashMap

2.1 数据结构演进

Java 7:分段锁(Segment)

┌─────────────────────────────────────────────┐ │ Segment[0] │ Segment[1] │ ... │ Segment[15]│ │ ┌─────────┐ │ ┌─────────┐│ │ ┌─────────┐│ │ │ HashEntry│ │ │ HashEntry││ │ HashEntry││ │ │ 链表 │ │ │ 链表 ││ │ 链表 ││ │ └─────────┘ │ └─────────┘│ │ └─────────┘│ │ 锁1 │ 锁2 │ │ 锁16 │ └─────────────────────────────────────────────┘ 每个 Segment 一把锁,默认 16 个 Segment 并发度 = Segment 数量

Java 8+:CAS + synchronized(细粒度)

┌──────────────────────────────────────────────┐ │ Node[] table │ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │ │ N │ N │ N │ N │ N │ N │ N │ N │ │ │ └────┴────┴─┬──┴────┴────┴────┴────┴────┘ │ │ │ │ │ ┌────┴────┐ │ │ │ 链表/红黑树│ │ │ └─────────┘ │ │ │ │ 锁粒度:每个桶(数组位置)一把 synchronized 锁 │ │ 读操作:无锁(volatile + CAS) │ │ 写操作:锁单个桶 │ └──────────────────────────────────────────────┘

2.2 核心操作

ConcurrentHashMap<String,Integer>map=newConcurrentHashMap<>();// 基本操作(与 HashMap 类似)map.put("a",1);map.get("a");// 1map.remove("a");map.size();map.containsKey("a");// 原子复合操作map.putIfAbsent("b",2);// 不存在才放入map.remove("b",2);// 键值都匹配才删除map.replace("b",2,3);// 旧值匹配才替换map.computeIfAbsent("c",k->3);// 不存在则计算并放入map.computeIfPresent("c",(k,v)->v+1);// 存在则计算替换map.compute("d",(k,v)->v==null?1:v+1);// 无论如何计算map.merge("e",1,Integer::sum);// 存在则合并,不存在则放入

2.3 computeIfAbsent 的妙用

// 场景:缓存计算结果ConcurrentHashMap<String,ExpensiveResult>cache=newConcurrentHashMap<>();// ❌ 非原子操作if(!cache.containsKey(key)){cache.put(key,expensiveCompute(key));// 可能重复计算}// ✅ 原子操作ExpensiveResultresult=cache.computeIfAbsent(key,k->expensiveCompute(k));// ✅ 映射到子 Map(多级缓存)ConcurrentHashMap<String,Map<String,Integer>>multiMap=newConcurrentHashMap<>();multiMap.computeIfAbsent("group1",k->newConcurrentHashMap<>()).put("item1",100);

2.4 批量操作(Java 8+)

// forEachmap.forEach(1,(k,v)->System.out.println(k+"="+v));// 第一个参数是并行度// search:找到第一个非 null 的结果Stringfound=map.search(1,(k,v)->v>10?k:null);// reduceIntegersum=map.reduceValues(1,Integer::sum);

2.5 弱一致性

// ConcurrentHashMap 的迭代器是弱一致性的:// - 不会抛 ConcurrentModificationException// - 可能反映也可能不反映迭代开始后的修改// - 适用于"最终一致"可接受的场景ConcurrentHashMap<String,Integer>map=newConcurrentHashMap<>();map.put("a",1);map.put("b",2);// 迭代过程中其他线程可以修改for(Map.Entry<String,Integer>entry:map.entrySet()){// 不会抛异常,但可能看不到最新的修改}

2.6 常见陷阱

① size() 和 isEmpty() 是近似值

// ConcurrentHashMap 的 size() 是各桶的近似求和// 在并发修改时可能不准确intsize=map.size();// 近似值,不保证精确

② key/value 不能为 null

map.put(null,1);// ❌ NullPointerExceptionmap.put("key",null);// ❌ NullPointerException

③ computeIfAbsent 中不要修改自身 Map

// ❌ 死锁 / 无限循环(Java 8 已部分修复,但仍不推荐)map.computeIfAbsent("a",k->{map.put("b",2);// 不要在计算函数中修改同一个 mapreturn1;});

三、CopyOnWriteArrayList

3.1 核心原理

写操作:复制整个数组 → 修改副本 → 替换引用 读操作:直接读取,无锁无等待 ┌──────────────────────────────────────────┐ │ 读线程1 → array ──→ [A, B, C, D, E] │ │ 读线程2 → array ──→ [A, B, C, D, E] │ │ │ │ 写线程:add("F") │ │ 1. 复制 → [A, B, C, D, E, F] │ │ 2. 替换引用 → array 指向新数组 │ │ 3. 旧数组由 GC 回收 │ └──────────────────────────────────────────┘

3.2 基本操作

CopyOnWriteArrayList<String>list=newCopyOnWriteArrayList<>();list.add("a");// 复制+添加list.add(0,"b");// 复制+插入list.set(0,"c");// 复制+替换list.remove("a");// 复制+删除list.get(0);// 无锁读取list.size();// 无锁读取list.contains("a");// 无锁遍历

3.3 适用场景

读远多于写(如 99:1): ├── 配置列表 ├── 监听器列表(Listener) ├── 白名单/黑名单 └── 数据字典 不适合的场景: ├── 写操作频繁 → 每次复制开销巨大 ├── 列表很大 → 复制耗时 + 内存压力 └── 需要强一致性 → 读写可能不一致

3.4 CopyOnWriteArraySet

// 基于 CopyOnWriteArrayList 实现的 Set// add 时遍历检查重复,性能更差CopyOnWriteArraySet<String>set=newCopyOnWriteArraySet<>();set.add("a");set.add("b");set.contains("a");// O(n) 遍历

3.5 注意事项

注意说明
写操作开销大每次写都复制整个数组,O(n)
内存占用写期间同时存在两个数组
最终一致性迭代器使用创建时的快照,不反映后续修改
不支持 listIterator迭代器不支持 add/set/remove

四、BlockingQueue

4.1 核心接口

publicinterfaceBlockingQueue<E>extendsQueue<E>{// 抛异常booleanadd(Ee);// 队列满抛 IllegalStateExceptionEremove();// 队列空抛 NoSuchElementException// 返回布尔booleanoffer(Ee);// 队列满返回 falseEpoll();// 队列空返回 null// 阻塞等待voidput(Ee)throwsInterruptedException;// 队列满则阻塞等待Etake()throwsInterruptedException;// 队列空则阻塞等待// 超时等待booleanoffer(Ee,longtimeout,TimeUnitunit);Epoll(longtimeout,TimeUnitunit);}

4.2 四种操作行为

操作抛异常返回值阻塞超时
插入add()offer()put()offer(timeout)
移除remove()poll()take()poll(timeout)
检查element()peek()--

4.3 七种 BlockingQueue 实现

实现数据结构有界特点
ArrayBlockingQueue数组✅ 有界公平/非公平可选
LinkedBlockingQueue链表可选(默认无界)吞吐量通常高于 Array
PriorityBlockingQueue无界按优先级出队
SynchronousQueue-不存储元素,直接交付
DelayQueue无界元素到期才能出队
LinkedTransferQueue链表无界transfer 直接交付给消费者
LinkedBlockingDeque双向链表可选双端队列

4.4 ArrayBlockingQueue

// 有界阻塞队列,创建时必须指定容量BlockingQueue<Task>queue=newArrayBlockingQueue<>(1000);// 公平模式:等待最久的消费者/生产者优先BlockingQueue<Task>fairQueue=newArrayBlockingQueue<>(1000,true);// 生产者queue.put(newTask());// 队列满时阻塞queue.offer(newTask(),5,SECONDS);// 超时返回 false// 消费者Tasktask=queue.take();// 队列空时阻塞Tasktask=queue.poll(5,SECONDS);// 超时返回 null

4.5 生产者-消费者模式

publicclassProducerConsumer{privatefinalBlockingQueue<String>queue=newArrayBlockingQueue<>(100);publicvoidstart(){newThread(this::producer).start();newThread(this::consumer).start();}voidproducer(){try{while(true){Stringdata=fetchData();queue.put(data);// 队列满时自动阻塞}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}voidconsumer(){try{while(true){Stringdata=queue.take();// 队列空时自动阻塞processData(data);}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}}

4.6 线程池 + BlockingQueue

// 自定义线程池使用有界队列ThreadPoolExecutorexecutor=newThreadPoolExecutor(10,20,60,SECONDS,newArrayBlockingQueue<>(500),// 有界队列newNamedThreadFactory("worker"),newCallerRunsPolicy()// 队列满时由调用者线程执行);

4.7 DelayQueue —— 延迟队列

// 元素必须实现 Delayed 接口publicclassDelayedTaskimplementsDelayed{privatefinalStringname;privatefinallongexecuteTime;publicDelayedTask(Stringname,longdelayMs){this.name=name;this.executeTime=System.currentTimeMillis()+delayMs;}@OverridepubliclonggetDelay(TimeUnitunit){returnunit.convert(executeTime-System.currentTimeMillis(),MILLISECONDS);}@OverridepublicintcompareTo(Delayedo){returnLong.compare(this.executeTime,((DelayedTask)o).executeTime);}}// 使用DelayQueue<DelayedTask>delayQueue=newDelayQueue<>();delayQueue.put(newDelayedTask("task1",5000));// 5秒后执行delayQueue.put(newDelayedTask("task2",3000));// 3秒后执行DelayedTasktask=delayQueue.take();// 阻塞直到最早的任务到期

4.8 SynchronousQueue —— 零容量队列

// 不存储元素,生产者必须等待消费者接收// 适用于直接交付场景(CachedThreadPool 使用)BlockingQueue<String>queue=newSynchronousQueue<>();// 线程1:生产者queue.put("data");// 阻塞,直到有消费者来取// 线程2:消费者Stringdata=queue.take();// 阻塞,直到有生产者放入

五、其他并发容器

5.1 ConcurrentSkipListMap / ConcurrentSkipListSet

// 基于跳表的有序并发 Map/Set// 插入/删除/查找 O(log n)ConcurrentSkipListMap<String,Integer>sortedMap=newConcurrentSkipListMap<>();sortedMap.put("c",3);sortedMap.put("a",1);sortedMap.put("b",2);// 自然排序:a=1, b=2, c=3ConcurrentSkipListSet<String>sortedSet=newConcurrentSkipListSet<>();

5.2 ConcurrentLinkedQueue / ConcurrentLinkedDeque

// 无界非阻塞队列,基于 CAS 实现// 适合高并发场景,但不支持阻塞操作ConcurrentLinkedQueue<String>queue=newConcurrentLinkedQueue<>();queue.offer("a");queue.offer("b");Stringhead=queue.poll();// "a",非阻塞

六、选型指南

场景推荐原因
高并发键值存储ConcurrentHashMap分段锁/CAS,读无锁
缓存计算结果ConcurrentHashMap + computeIfAbsent原子复合操作
读多写少列表CopyOnWriteArrayList写时复制,读无锁
监听器管理CopyOnWriteArrayList注册/注销少,遍历多
生产者-消费者ArrayBlockingQueue有界 + 阻塞
任务调度DelayQueue延迟出队
有序并发 MapConcurrentSkipListMap跳表 + CAS
高并发无阻塞队列ConcurrentLinkedQueueCAS,无锁

总结

容器核心机制读性能写性能一致性
ConcurrentHashMapCAS + synchronized弱一致
CopyOnWriteArrayList写时复制快照一致
ArrayBlockingQueueReentrantLock-强一致
ConcurrentSkipListMapCAS + 跳表弱一致
http://www.jsqmd.com/news/1058496/

相关文章:

  • 数据库合规性策略建模与查询优化实战:从RLS到性能调优
  • NLP与计算语言学:从社交媒体文本分析到深度洞察的实战指南
  • Serpent攻击:macOS钥匙串权限漏洞与Apple Intelligence令牌窃取防御
  • 2026年水利水电职称评审机构哪家靠谱 水利申报老踩坑到底咋选机构 - 3158GEO
  • 2026年新发布:深度剖析环保塑胶跑道颗粒生产厂家的选择之道与行业标杆 - 品牌鉴赏官2026
  • RAG系统隐私保护:匿名化时机如何影响检索与生成效果
  • 2026菏泽漏水检测维修本地口碑防水商家榜单:厨卫/阳台/屋面/地下室渗漏水维修,持证施工+明码实价,防水补漏公司TOP5推荐 - 即刻修防水
  • FRP内网穿透安全实践:从TLS加密到流量隐匿的攻防对抗
  • STSF-Net:多模态遥感图像变化检测的创新框架
  • Java 函数式编程
  • 金融合规策略数据库设计:结构化存储与高性能查询优化实践
  • LLM驱动的文本相关性评估:从RAG到可持续性分析的工程实践
  • Linux命令-pvchange(修改物理卷属性)
  • 事件相机在视觉说话人识别中的应用:NeuroLip框架解析
  • 基于YOLOv8与RexNet-150的两阶段深度学习考试作弊检测框架实战
  • QGas工具:解决气体能源网络建模数据荒的拓扑感知数据生成方案
  • 极端天气下电力系统鲁棒调度优化实践
  • RAGognizer:基于幻觉感知微调提升大模型在RAG中的事实可靠性
  • 2026荆州漏水检测维修本地口碑防水商家榜单:厨卫/阳台/屋面/地下室渗漏水维修,持证施工+明码实价,防水补漏公司TOP5推荐 - 即刻修防水
  • AI动态简报之商业洞察篇(2026.06.21)
  • GoB插件:打破Blender与ZBrush之间的创作壁垒
  • Xournal++:如何用这款开源手写笔记软件彻底改变你的数字笔记体验?
  • 轻量级AI音乐生成模型TinyMU:2.29亿参数媲美大模型的架构与实战
  • SVGedit完全指南:5步掌握浏览器端矢量图形编辑
  • 基于RPA思想的Cassandra数据库自动化测试框架构建与实践
  • 高穹全域透视·智网自主抗毁|空基立体感知·全域精准管控
  • UniEditBench:基于知识蒸馏的统一多模态编辑评测基准
  • 基于SIVR的大语言模型幻觉检测:原理、实现与实战优化
  • 2023年图灵“贝利文件”近50万美元拍卖,揭秘其绝密“黛利拉”语音加密项目
  • LLM智能体架构设计:经验压缩谱实现记忆、技能与规则统一管理