当前位置: 首页 > news >正文

mqtt消费堆积

mqtt发送主题成功,但消费主题只有一个

代码实例:

@ServiceActivator(inputChannel = "caGetConfig") @Override public void handleMessage(Message<?> message) throws MessagingException { try { String payload = (String) message.getPayload(); String topic = (String) message.getHeaders().get(MQTTRECEIVEDTOPIC); log.info("接收到MQTT消息,主题: {}, 消息内容: {}", topic, payload); try { if (StringUtils.isEmpty(payload)) { log.info("接收到空的MQTT消息,主题: {}", topic); CompletableFuture.runAsync(() -> generateVideoService.getNeMessage()); return; } if (StringUtils.isBlank(topic)) { log.info("接收到空的MQTT消息,主题: {}", topic); CompletableFuture.runAsync(() -> generateVideoService.getNeMessage()); } // 处理MQTT消息 List<String> onlineStatusNewLis = JSON.parseArray(payload, String.class); if (CollectionUtils.isNotEmpty(onlineStatusNewLis)){ generateVideoService.getNeMessageByUrl(onlineStatusNewLis) } } catch (Exception e) { log.error("分发MQTT消息失败,主题: {}, 消息: {}", topic, payload, e); } } catch (Exception e) { log.error("处理MQTT消息失败,消息: {}", message, e); } }

问题分析:

1. MQTT 消费者是单线程的

@ServiceActivator(inputChannel = "caGetConfig")基于 Spring Integration MQTT,其inputChannel默认是PublishSubscribeChannel(同步调用),即消息的接收和处理在同一线程中串行执行。只有当handleMessage()方法返回后,才能处理下一条消息。

2. 使用CompletableFuture.runAsync 异步调用

generateVideoService.getNeMessageByUrl(onlineStatusNewLis);方法执行完才能返回。如果onlineStatusNewLis列表较大,或者数据库查询较慢,这段时间内 MQTT 消费线程就会被阻塞。

如果该方法里面是一个while(!Thread.currentThread().isInterrupted())的无限循环,这个循环会在 MQTT 消费线程上执行,永远无法返回,后续所有消息都将被彻底阻塞。

添加异步编排执行,解决阻塞

// 处理MQTT消息 List<String> onlineStatusNewLis = JSON.parseArray(payload, String.class); if (CollectionUtils.isNotEmpty(onlineStatusNewLis)){ CompletableFuture.runAsync(() -> generateVideoService.getNeMessageByUrl(onlineStatusNewLis)); }
http://www.jsqmd.com/news/613881/

相关文章:

  • 劳力士官方售后服务中心网点实地考察报告(2026年4月最新实体店地址) - 亨得利官方服务中心
  • 2026年山东投流开户选购指南:3步教你省钱避坑精准投放 - 精选优质企业推荐榜
  • Pyplot在图表显示中文--配置文件法
  • 你的终端神器之Oh My Zsh吭
  • 2026沈阳凉亭评测:精选公司打造理想休闲空间,凉亭产品推荐技术实力与市场典范解析 - 品牌推荐师
  • Windows Server 配置与管理——第3章:文件系统管理
  • 智能防控,高效救援!2026北京森林消防装备展,邀您共探新趋势
  • 75.Acwing基础课第889题-简单-满足条件的01序列
  • 企业智能体,不是聊天机器人升级版
  • 2026年济南手机软件开发厂家推荐:系统软件开发/直播软件开发/行业软件开发/定制软件开发专业服务商 - 品牌推荐官
  • 2026雅思写作高效自学指南:痛点解析与优质网课精选 - 品牌2025
  • Burpsuite之暴力破解+验证码识别 | 添柴不加火敢
  • 三个角度分析AI自动写文+自动发布自媒体矩阵提效实测
  • 2026年有详细解析的雅思机考软件哪个好用?仿真模考,全解析工具推荐 - 品牌2026
  • 2026开发者App质量监控工具盘点与选型
  • 2026届最火的六大AI辅助写作工具实际效果
  • 西门子S7-1200与G150变频器的PROFIBUS通信控制实战
  • 74.Acwing基础课第888题-简单-求组合数Ⅳ
  • 2026年兰州设备搬迁指南:为何“甘肃蚂蚁搬家”成为政企首选? - 深度智识库
  • 用户画像构建的7个关键步骤(从数据收集到标签建模)
  • 湖北鑫巨达工贸有限公司:高要区多玛地弹簧 多玛电动地弹簧出售公司 - LYL仔仔
  • 2026年甘肃门禁系统公司优选 覆盖兰州及各地市工程适配需求 - 深度智识库
  • PostgreSQL 技术日报 (4月9日)|JSON 搜索架构解锁,PG 迁移时机有讲究
  • Llama-3.2V-11B-cot作品集:10个真实场景下图文推理输出效果高清对比展示
  • 2026年4月福建气动衬氟阀/衬氟管道/衬氟管件/衬氟弯头/衬氟补偿器厂家哪家好 - 2026年企业推荐榜
  • OpenClaw备份恢复方案:千问3.5-35B-A3B-FP8任务配置的迁移技巧
  • 探索NWaves:C#中的高效信号处理与音频分析实战
  • 002、Python开发环境搭建:从官网下载到安装完成
  • 2026年雅思阅读网课怎么选?高性价比线上课程与小班一对一深度指南 - 品牌2025
  • Vue + Iframe 实战:打造企业级流程配置中心揪