当前位置: 首页 > news >正文

《多业务隔离的高性能队列分发架构:基于ConcurrentLinkedQueue的独立队列设计与实践》


文章目录

  • 一、介绍
  • 二、代码
    • SnmpMessageHandler
    • SnmpTrapListen
    • DispatcherQueueMap
    • DelegateInterface
    • ServiceHost
    • SnmpTrapBusinessLogic
  • 三、注意点
    • `注意点1`:执行流程
    • `注意点2`:dispatcherQueueMap.getDictionaryRepeaterInfo().putIfAbsent(strKey, queueRepterInfo) != null这是啥
    • `注意点3`:@FunctionalInterface 详解
    • `注意点4`:为什么使用接口回调而不是直接调用?
    • `注意点5`:为什么使用 ConcurrentLinkedQueue?
    • `注意点6`:这是啥写法?
    • `注意点7`:listener初始化时机?

一、介绍

在构建高并发分布式系统时,不同业务数据的隔离处理是一个核心挑战。本文深入剖析了一种基于 ConcurrentHashMap + ConcurrentLinkedQueue 的轻量级队列分发架构,通过业务Key与独立队列的映射机制,实现了多业务数据的物理隔离与并行处理。该架构的核心创新在于:为每个业务类型(如中转台注册信息)分配独立的无锁队列,利用 putIfAbsent 原子操作确保Key-Queue映射的线程安全,再通过专门的消费线程轮询所有队列进行差异化处理。这种设计既避免了单一队列的头阻塞效应,又实现了业务间的故障隔离——当某个业务队列积压时,不会影响其他业务的正常处理。文章将详细阐述这种架构在SNMP Trap监控系统中的实际应用,包括如何通过队列门限控制内存使用、如何利用函数式接口实现业务逻辑解耦,以及在多生产者多消费者场景下的性能优化技巧。无论你是正在设计消息中间件,还是优化现有系统的处理能力,这种基于Key的队列隔离思想都能为你提供有价值的参考。

场景描述:

在一个大型物联网平台中,接入数十万计的不同类型设备(如智能电表、环境传感器、GPS定位器等),每类设备的数据格式、处理逻辑和优先级都不同。系统需要高并发接收设备上报数据,并确保各类设备的数据隔离处理,避免相互影响。

二、代码

SnmpMessageHandler

switch(oid){caseBroadcastMachineInfo:{// 1、装载队列RptBroadcastMachineInforbmi=(RptBroadcastMachineInfo)RptProtocolHelper.createRptEventPackageByOid(oid,result);// --获得协议对象并填充内容serialNo.set(rbmi.getSerialNumber());//创建/获取序列号所在的队列queueManager.getOrCreateQueue(rbmi.getSerialNumber());QueueRepterInforepterInfoBroadcast=newQueueRepterInfo(receiveDataInfo.getPeer(),rbmi,communityData,snmpVersion,serialNo.get(),secureAgentParame.get(),contextName.get());StringstrKey=oid+rbmi.getSerialNumber();ConcurrentLinkedQueue<QueueRepterInfo>queueRepterInfo=newConcurrentLinkedQueue<>();queueRepterInfo.offer(repterInfoBroadcast);ip_serialNo.keySet().removeIf(key->ip_serialNo.get(key).equals(rbmi.getSerialNumber()));ip_serialNo.put(ipSerialnoKey,rbmi.getSerialNumber());if(ip_serialNo.size()>10000){ip_serialNo.clear();}// 2、先向字典中加入,如果加入不成功说明包含该key,则进行后续判断处理 QueueReceiveDataInforif(dispatcherQueueMap.getDictionaryRepeaterInfo().putIfAbsent(strKey,queueRepterInfo)!=null){// 成功加入字典queueRepterInfo=dispatcherQueueMap.getDictionaryRepeaterInfo().get(strKey);if(queueRepterInfo!=null){// 判断队列数量是否大于门限值,大于删除一个,如果删除不成功则继续if(queueRepterInfo.size()>=QUEUE_MAX_NUM&&queueRepterInfo.poll()==null){break;}// 入队queueRepterInfo.offer(repterInfoBroadcast);}}break;}caseBroadcastGpsInfo:...

SnmpTrapListen

@ComponentpublicclassSnmpTrapListen{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(SnmpTrapListen.class);@AutowiredprivateDispatcherQueueMapdispatcherQueueMap;// 中转台注册 分发线程privateThreaddispatcherRepeaterInfoDataThread;// 中转台GPS 分发线程privateThreaddispatcherRepeaterGpsThread;// 存储事件监听器privateList<DelegateInterface.repterInfoBroadcastDistributedHandler>repterInfoBroadcastListeners=newArrayList<>();privateList<DelegateInterface.gpsInfoBroadcastDistributedHandler>gpsInfoBroadcastListeners=newArrayList<>();publicvoidstart(){// 中转台注册队列分发dispatcherRepeaterInfoDataThread=newThread(this::dispatcherRepterInfo,"SnmpTrapListen-dispatcherRepeaterInfoDataThread");dispatcherRepeaterInfoDataThread.setDaemon(true);dispatcherRepeaterInfoDataThread.start();// 中转台GPS分发dispatcherRepeaterGpsThread=newThread(this::dispatcherGpsInfo,"SnmpTrapListen-dispatcherRepeaterGpsThread");dispatcherRepeaterGpsThread.setDaemon(true);dispatcherRepeaterGpsThread.start();}/** * 分发中转台注册信息 */publicvoiddispatcherRepterInfo(){try{QueueRepterInforepterInfoBroadcast;booleanrun=true;while(run){try{Thread.sleep(10);for(varentry:dispatcherQueueMap.getDictionaryRepeaterInfo().entrySet()){ConcurrentLinkedQueue<QueueRepterInfo>queueRepterInfo=entry.getValue();// 尝试查看队列头部的元素repterInfoBroadcast=queueRepterInfo.peek();if(repterInfoBroadcast==null){Thread.sleep(10);continue;}// 尝试从队列中删除元素queueRepterInfo.poll();// poll() 方法返回并移除头部元素if(repterInfoBroadcastListeners!=null){for(DelegateInterface.repterInfoBroadcastDistributedHandler listener:repterInfoBroadcastListeners){listener.invoke(repterInfoBroadcast);}}Thread.sleep(10);}}catch(Exceptionex){logger.error("<XnmsSnmpTrap DispatcherData One Error>:"+ex.getMessage(),ex);}}}catch(Exceptionex){logger.error("<XnmsSnmpTrap DispatcherData Two Error>:"+ex.getMessage(),ex);}}/** * 分发Gps信息 */publicvoiddispatcherGpsInfo(){}}

DispatcherQueueMap

@ComponentpublicclassDispatcherQueueMap{// 中转台上报信息缓存队列,字典用于根据中转台和任务类型进行缓存privateConcurrentHashMap<String,ConcurrentLinkedQueue<QueueReceiveDataInfor>>dictionaryDataInfo=newConcurrentHashMap<>();// 中转台Gps信息缓存队列privateConcurrentHashMap<String,ConcurrentLinkedQueue<QueueGpsInfo>>dictionaryGpsInfo=newConcurrentHashMap<>();}

DelegateInterface

publicclassDelegateInterface{// 2. 中转台信息上报@FunctionalInterfacepublicinterfacerepterInfoBroadcastDistributedHandler{voidinvoke(QueueRepterInforepterInfoBroadcast);}// 3. Gps信息上报@FunctionalInterfacepublicinterfacegpsInfoBroadcastDistributedHandler{voidinvoke(QueueGpsInfogpsInfo);}}

ServiceHost

@Component@Order(Ordered.HIGHEST_PRECEDENCE)publicclassServiceHost{privatestaticfinalLogAdapterlogger=LogFactory.getLogger(ServiceHost.class);@AutowiredSnmpTrapBusinessLogicsnmpReceiveBusinessLogic;@PostConstructpublicvoidstart()throwsSocketException,UnknownHostException{// 0、启动UDP监听snmpReceiveBusinessLogic.SnmpListen();// 1、线程处理logger.info("<ServiceHost Start> :StartFinished.");}}

SnmpTrapBusinessLogic

@ComponentpublicclassSnmpTrapBusinessLogicimplementsApplicationRunner{publicvoidSnmpListen(){try{// xnmsSnmpTrap = new SnmpTrapListen(dsiClientCommon);xnmsSnmpTrap.setSnmpTrapListen(dsiClientCommon);DelegateInterface.repterInfoBroadcastDistributedHandler repterInfoBroadcastDistributedHandler=(QueueRepterInforepterInfoBroadcast)->{xnmsSnmpTrapRepterInfoBroadcastHandler(repterInfoBroadcast);};xnmsSnmpTrap.addRepterInfoBroadcastListener(repterInfoBroadcastDistributedHandler);DelegateInterface.gpsInfoBroadcastDistributedHandler gpsInfoBroadcastDistributedHandler=(QueueGpsInfogpsInfo)->{xnmsSnmpTrapGpsInfoBroadcastDistributedHandler(gpsInfo);};xnmsSnmpTrap.addGpsInfoBroadcastListener(gpsInfoBroadcastDistributedHandler);}/** * 中转台注册,已上线更新状态 * * @param queueRepterInfo queueRepterInfo */privatevoidxnmsSnmpTrapRepterInfoBroadcastHandler(QueueRepterInfoqueueRepterInfo){doBroadcastRepeaterInfo(queueRepterInfo.getIpEndPoint(),queueRepterInfo.getRptBroadcastMachineInfo(),queueRepterInfo.getRealCommunity(),queueRepterInfo.getSnmpVersion(),queueRepterInfo.getUsmUser(),queueRepterInfo.getContextName());}/** * 上报Gps信息 * * @param queueGpsInfo queueGpsInfo */privatevoidxnmsSnmpTrapGpsInfoBroadcastDistributedHandler(QueueGpsInfoqueueGpsInfo){...}}

三、注意点

注意点1:执行流程

答案:

注意点2:dispatcherQueueMap.getDictionaryRepeaterInfo().putIfAbsent(strKey, queueRepterInfo) != null这是啥

答案:

注意点3:@FunctionalInterface 详解

答案:
这是一个注解,用于标记函数式接口。让我详细解释:

  1. 什么是函数式接口?
    函数式接口就是只有一个抽象方法的接口。
// 这是一个函数式接口(只有一个抽象方法)@FunctionalInterfacepublicinterfacerepterInfoBroadcastDistributedHandler{voidinvoke(QueueRepterInforepterInfoBroadcast);// 唯一的抽象方法}// 这不是函数式接口(有2个抽象方法)publicinterfaceWrongInterface{voidmethod1();voidmethod2();// 第二个抽象方法 ❌}
  1. @FunctionalInterface的作用
    ① 编译时检查
@FunctionalInterfacepublicinterfaceMyHandler{voidhandle(Stringdata);// void process(String data); // 如果取消注释这行,编译会报错!}// 错误:Multiple non-overriding abstract methods found

② 文档说明
告诉其他开发者:“这个接口可以用Lambda表达式实现”

  1. 为什么要有函数式接口?
    主要是为了支持Lambda表达式(Java 8引入的新特性)
    没有Lambda之前(匿名内部类):
// 传统的写法,很冗长repterInfoBroadcastListeners.add(newDelegateInterface.repterInfoBroadcastDistributedHandler(){@Overridepublicvoidinvoke(QueueRepterInfodata){System.out.println("收到数据:"+data);}});

有了Lambda之后(简洁多了):

// Lambda表达式写法repterInfoBroadcastListeners.add(data->{System.out.println("收到数据:"+data);});// 更简短的写法(如果只有一行)repterInfoBroadcastListeners.add(data->System.out.println("收到数据:"+data));
  1. Lambda表达式如何匹配函数式接口
// 这个Lambda表达式data->System.out.println("收到数据:"+data)// 相当于实现了这个接口newrepterInfoBroadcastDistributedHandler(){@Overridepublicvoidinvoke(QueueRepterInfodata){System.out.println("收到数据:"+data);}}
  1. 为什么你的代码中要用这个注解?
  • 明确意图:告诉其他开发者这个接口可以用Lambda实现
  • 防止错误:如果有人想往接口里加第二个方法,编译会报错
  • 符合标准:Java官方的函数式接口都有这个注解

总结
@FunctionalInterface 就像是一个标签,告诉Java编译器:
✅ 这个接口是专门为Lambda表达式设计的
✅ 帮我检查是不是真的只有一个抽象方法
✅ 让其他开发者知道可以用Lambda来简化代码

注意点4:为什么使用接口回调而不是直接调用?

ublicclassDelegateInterface{@FunctionalInterfacepublicinterfacerepterInfoBroadcastDistributedHandler{voidinvoke(QueueRepterInforepterInfoBroadcast);}为啥要这么定义,而不是在dispatcherRepterInfo方法中直接调用doBroadcastRepeaterInfo(queueRepterInfo.getIpEndPoint(),queueRepterInfo.getRptBroadcastMachineInfo(),queueRepterInfo.getRealCommunity(),queueRepterInfo.getSnmpVersion(),queueRepterInfo.getUsmUser(),queueRepterInfo.getContextName());}

答案:这是一个非常经典的设计模式问题!让我详细解释这样设计的好处:


注意点5:为什么使用 ConcurrentLinkedQueue?

答案:这是一个非常好的问题!让我详细分析选择 ConcurrentLinkedQueue 的原因:







注意点6:这是啥写法?

DelegateInterface.repterInfoBroadcastDistributedHandler repterInfoBroadcastDistributedHandler=(QueueRepterInforepterInfoBroadcast)->{xnmsSnmpTrapRepterInfoBroadcastHandler(repterInfoBroadcast);};xnmsSnmpTrap.addRepterInfoBroadcastListener(repterInfoBroadcastDistributedHandler);

答案:



注意点7:listener初始化时机?

DelegateInterface.repterInfoBroadcastDistributedHandler repterInfoBroadcastDistributedHandler=(QueueRepterInforepterInfoBroadcast)->{xnmsSnmpTrapRepterInfoBroadcastHandler(repterInfoBroadcast);};xnmsSnmpTrap.addRepterInfoBroadcastListener(repterInfoBroadcastDistributedHandler);

答案:ServiceHost中的start方法会在初始化时候执行,然后就会执行SnmpListen()初始化方法,它会把具体所有具体实现都添加到addGpsInfoBroadcastListener中,这样当外部snmp接收消息执行如下代码时,for循环判断不为空则直接调用invoke方法。

if(repterInfoBroadcastListeners!=null){for(DelegateInterface.repterInfoBroadcastDistributedHandler listener:repterInfoBroadcastListeners){listener.invoke(repterInfoBroadcast);}}

这是观察者模式的典型应用!让我详细解释它的作用和必要性:


http://www.jsqmd.com/news/467618/

相关文章:

  • 数字重生诉讼案中的测试启示:当情感算法遭遇伦理边界
  • 【Video Agent】(CVPR 2025)VIDEOTREE: Adaptive Tree-based Video Representation for LLM Reasoning ...
  • 群晖通过acme.sh自动化部署Let’s Encrypt证书的实践指南
  • 5G前传接口实战:O-RAN中M/C/U/S平面协议栈配置避坑指南
  • 第一篇:开篇总论——AI主导的“算法战争”:美以伊战改写现代战争规则
  • 使用acme.sh实现SSL证书自动化管理:从申请到续期全攻略
  • rocketmq5--必要知识点
  • 方法回顾--空间转录组多模态交叉分析(MIA)
  • Nuclei实战:5分钟搞定企业级漏洞扫描(附最新模板下载指南)
  • allure测试报告——项目
  • 2026过氧化氢厂推荐:工业/食品/电子级合规过氧化氢厂家最新权威排行榜 - 深度智识库
  • 深度学习赋能双色球预测:特征工程与模型融合实战解析
  • Linux alternatives / update-alternatives 使用教程
  • 物美卡回收最新操作攻略 - 猎卡回收公众号
  • 2026年四川电缆与变压器回收服务商深度测评:谁才是工业产废企业的“专业护航者”? - 深度智识库
  • 避坑指南:特征转换中SVD/LSA的5个常见误区与优化方案
  • 不仅能听还能懂:网易有道发布首个同传Agent,重塑高频信息处理场景
  • 从电话通信到数字音频:PCM编码中的μ律15折线为何在北美更流行?
  • Qt5.15.2下QML地图插件源码修改实战:解决OSM在线地图加载失败问题
  • DNANet实战解析:如何用密集嵌套注意力网络提升红外小目标检测精度
  • 泛微E9移动端集成实战:如何为不同业务配置多个Emobile7工作台(附代码修改步骤)
  • 性能测试基础概念
  • 2026陕西仿古铝瓦厂家排名|源头好厂口碑推荐,选型不踩坑 - 朴素的承诺
  • 用Colab免费GPU训练专属SDXL模型:手把手教你跑通BLIP标注+Waifu Diffusion打标
  • Acunetix漏洞扫描实战:从零配置到生成专业安全报告(附常见错误排查)
  • 2025大唐杯仿真2——车联网中的V2V与PC5技术实战解析
  • ChatGLM3-6B模型服务化:FastAPI高性能接口开发
  • 玩Pokémon GO被Ban?2024年安卓Root检测绕过全攻略(附SafetyNet通关配置)
  • RCL0923光伏协议转换器与SCU融合终端:分布式光伏群调群控的智能化实践
  • CentOS7下FFmpeg安装全攻略:从在线到离线的完整解决方案(含Nux Dextop源配置)