当前位置: 首页 > news >正文

springboot集成mqtt的订阅端

环境:JDK21

springboot:3.3.5

EMQX:5.8.5

MQTT协议:5.0

引入的mqtt jar包:

<dependency> <groupId>com.hivemq</groupId> <artifactId>hivemq-mqtt-client</artifactId> <version>1.3.0</version> </dependency>

yml中mqtt的配置:

$share/my-group/test/topic:共享订阅,也就是负载均衡,$share是固定写法,my-group共享组,共享组内的消费者只能有一个消费,
mqtt: host: 192.168.1.2 port: 1883 client-id: springboot-listener-001 username: admin password: xxyy1212@ qos: 1 topics: - device/up - device/data - $share/my-group/test/topic

读取yml的配置文件

import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.List; @Component @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String host; private int port; private String clientId; private String username; private String password; private int qos; private List<String> topics; // ✅ 列表正确读取 // 自动生成 getter setter public String getHost() {return host;} public void setHost(String host) {this.host = host;} public int getPort() {return port;} public void setPort(int port) {this.port = port;} public String getClientId() {return clientId;} public void setClientId(String clientId) {this.clientId = clientId;} public String getUsername() {return username;} public void setUsername(String username) {this.username = username;} public String getPassword() {return password;} public void setPassword(String password) {this.password = password;} public int getQos() {return qos;} public void setQos(int qos) {this.qos = qos;} public List<String> getTopics() {return topics;} public void setTopics(List<String> topics) {this.topics = topics;} }

接收消息:

1、自动检测链接状态,

2、连接中断后自动连接

package com.crazy.shopping.listener; import com.crazy.shopping.config.MqttProperties; import com.hivemq.client.mqtt.MqttClientState; import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.*; @Component public class Mqtt5MessageListener { @Autowired private MqttProperties mqttProperties; private MqttQos mqttQos; private Mqtt5AsyncClient client; private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final ExecutorService messageExecutor = new ThreadPoolExecutor( 3, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); @PostConstruct public void start() { mqttQos = MqttQos.fromCode(mqttProperties.getQos()); // 每3秒检查重连 + 强制绑定监听 scheduler.scheduleAtFixedRate(this::checkAndConnect, 0, 3, TimeUnit.SECONDS); } private void checkAndConnect() { if (client != null && client.getState() == MqttClientState.CONNECTED) { return; } System.out.println("⏳ 重连中..."); // 🔥 每次重连都新建客户端(彻底解决监听丢失) client = Mqtt5Client.builder() .serverHost(mqttProperties.getHost()) .serverPort(mqttProperties.getPort()) .identifier(mqttProperties.getClientId()) .buildAsync(); // ========================================== // 🔥 关键:连接之前 先注册监听!!! // ========================================== registerMessageListener(); // 连接 client.connectWith() .cleanStart(false) .sessionExpiryInterval(86400L) .keepAlive(60) .simpleAuth() .username(mqttProperties.getUsername()) .password(mqttProperties.getPassword().getBytes(StandardCharsets.UTF_8)) .applySimpleAuth() .send() .whenComplete((ack, t) -> { if (t == null) { System.out.println("✅ 连接成功!"); subscribeAllTopics(); } }); } // ========================================== // 🔥 每次重连都重新注册监听(绝杀) // ========================================== private void registerMessageListener() { client.toAsync().publishes(MqttGlobalPublishFilter.ALL, this::handleMessage); } // 消息处理:可以进行幂等性校验结合redis--TODO private void handleMessage(Mqtt5Publish publish) { messageExecutor.submit(() -> { try { String topic = publish.getTopic().toString(); String msg = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8); System.out.println("========================================"); System.out.println("📥 收到消息 >>> " + topic); System.out.println("内容 >>> " + msg); System.out.println("========================================"); } catch (Exception e) { e.printStackTrace(); } }); } // 订阅 private void subscribeAllTopics() { for (String topic : mqttProperties.getTopics()) { client.subscribeWith() .topicFilter(topic) .qos(mqttQos) .send() .whenComplete((sub, t) -> { if (t == null) { System.out.println("✅ 已订阅:" + topic); } }); } } @PreDestroy public void close() { try { scheduler.shutdownNow(); messageExecutor.shutdown(); if (client != null && client.getState().isConnected()) { client.disconnect(); } } catch (Exception ignored) {} } }

效果:

http://www.jsqmd.com/news/603595/

相关文章:

  • 实战演练企业网络规划:基于快马平台构建三级网络技术综合项目
  • 告别宏和模板元编程地狱:用C++27静态反射10行代码替代200行SFINAE,重构遗留系统的真实迁移路径曝光
  • 输入法词库跨平台迁移的技术实现与最佳实践
  • 大模型在环境科研中的应用:数据预测与分析
  • Android Studio Gradlew JDK配置
  • 【2026最新】AIGC率从60%降至5%只需零成本?10款免费工具实测红黑榜,一键解锁知网自救通关
  • MPLS标签转发的秘密:从数据包抓取到LSP表解析(含Router-ID设置技巧)
  • ThinkPad风扇总是噪音不断?这款开源工具让你的笔记本安静如图书馆
  • 为什么Meta内部已强制切换PyTorch 3.0静态图?架构图揭示3个被忽略的通信隐藏开销,第2个导致23%训练延迟飙升!
  • 2026年4月,国内评价高的电线电缆回收厂家大盘点,中餐馆回收/电线电缆回收/酒店回收,电线电缆回收厂家哪家好 - 品牌推荐师
  • 一篇搞定2026年简历模板服务商选购,避坑+选品全说清 - 极欧测评
  • 40+ Best Open Source Android Apps
  • Qwen-Image-2512-SDNQ镜像免配置优势:无需CUDA手动配置,自动适配A10/A100
  • Speechless:微博内容永久保存的终极解决方案
  • W5500io-M模组MQTT协议接入OneNet平台实战:从零构建微信小程序物联网控制
  • CS大三生的编程修行之路
  • 别再手动发消息了!用Python脚本+Coze API,5分钟搞定一个自动问答机器人
  • 在Windows上安装安卓应用?这个5MB小工具让你告别模拟器
  • nodejs pdf包
  • TYPE3-CAAV5如何革新CATIA中的文本与投影设计流程
  • 【全网最详细】FileZilla下载:FileZilla中文版FTP客户端安装使用图解教程 - xiema
  • Java 中 String 为何被设计为不可变?
  • 基于安路FPGA与米联客FDMA IP的DDR视频缓存系统设计与源码解析
  • 从Burp到Yakit:我的抓包工具箱升级记,聊聊实战中对付APP反抓包的几个野路子
  • 2026热门主治医师机构实测报告,在职医生看完再选 - 医考机构品牌测评专家
  • AI辅助写的一段存在就更新不存在就插入
  • 思源宋体CN:零成本打造专业中文排版的7个实用技巧
  • 3个颠覆性技巧让VR-Reversal打破3D视频观看壁垒
  • OpenClaw备份方案:千问3.5-9B配置与技能的版本管理
  • GLM-4-9B-Chat-1M与YOLOv8联合应用:图文关联分析系统