当前位置: 首页 > news >正文

Spring Boot 集成 MQTT 完成消息发布与订阅

一、MQTT 是什么?

MQTT(Message Queuing Telemetry Transport)是一种 轻量级发布/订阅消息传输协议,广泛用于 物联网(IoT)设备通信。它基于 TCP/IP 协议,具有以下特点:

  • 轻量级,网络带宽占用低

  • 支持自动重连与消息重发

  • 支持 QoS(服务质量等级)保证消息可靠性

  • 典型应用场景:智能家居、工业监控、电力系统、设备遥测等

  项目Demo源码
GitHub: https://github.com/lpsqq1944900433/mqtt-demo/

Gitee: https://gitee.com/QQ1944900433/mqtt-demo

在本文中,我们将基于 Spring Boot + Eclipse Paho 客户端,实现一个最小可运行的 MQTT 消息发布与订阅系统。


二、项目结构概览

项目结构如下:

mqtt-demo├─ com.lps.mqttdemo│   ├─ config│   │   └─ MqttConfig.java           # MQTT连接配置类│   ├─ publisher│   │   └─ Publisher.java            # MQTT消息发布者│   ├─ subscriber│   │   └─ Subscriber.java           # MQTT消息订阅者│   └─ MqttDemoApplication.java      # 启动类└─ resources└─ application.yml               # 配置文件

依赖引入:

        org.eclipse.pahoorg.eclipse.paho.client.mqttv31.2.5

三、配置文件(application.yml)

# MQTT 配置
mqtt:# MQTT Broker 地址broker-url: tcp://192.168.88.178:1883# 客户端 IDclient-id: mqtt-demo-client# 订阅的主题topic: lps-test-topic# 默认 QoS 等级(0: 最多一次, 1: 至少一次, 2: 仅一次)default-qos: 1# 连接超时时间(秒)connection-timeout: 30# 保持连接间隔(秒)keep-alive-interval: 60# 是否自动重连automatic-reconnect: true# 是否清除会话clean-session: true
# Spring Boot 应用配置
spring:application:name: mqtt-demo
# 服务器端口
server:port: 8080

建议:

  • 如果你的 MQTT Broker 是 EMQXMosquitto,可以用 tcp://127.0.0.1:1883 进行测试。

  • 在生产环境中建议启用 SSL(ssl://broker地址:8883)。


四、MqttConfig 配置类详解

package com.lps.mqttdemo.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** MQTT 配置类* 用于配置 MQTT 客户端连接参数*/
@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.connection-timeout}")private int connectionTimeout;@Value("${mqtt.keep-alive-interval}")private int keepAliveInterval;@Value("${mqtt.automatic-reconnect}")private boolean automaticReconnect;@Value("${mqtt.clean-session}")private boolean cleanSession;/*** 创建 MQTT 连接选项*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setConnectionTimeout(connectionTimeout);options.setKeepAliveInterval(keepAliveInterval);options.setAutomaticReconnect(automaticReconnect);options.setCleanSession(cleanSession);return options;}/*** 创建 MQTT 客户端(用于发布消息)*/@Beanpublic MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-publisher");client.connect(mqttConnectOptions);return client;}/*** 创建订阅端 MQTT 客户端*/@Bean(name = "subscriberMqttClient")public MqttClient subscriberMqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-subscriber");client.connect(mqttConnectOptions);return client;}
}

说明:

  • 使用 @Bean 管理两个独立的 MQTT 客户端:发布端和订阅端;

  • MqttConnectOptions 用于配置连接参数;

  • 支持自动重连、保活机制,保证长连接稳定。


五、Subscriber(订阅者)

package com.lps.mqttdemo.subscriber;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/*** MQTT 消息订阅者* 在应用启动时自动订阅指定主题并接收消息*/
@Component
public class Subscriber {@Autowired@Qualifier("subscriberMqttClient")private MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;@Value("${mqtt.default-qos}")private int qos;/*** 应用启动后自动订阅主题*/@PostConstructpublic void subscribe() {try {// 设置回调处理器mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("MQTT 连接丢失: " + cause.getMessage());// 自动重连由配置处理}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String payload = new String(message.getPayload());System.out.println("========================================");System.out.println("收到 MQTT 消息:");System.out.println("主题: " + topic);System.out.println("QoS: " + message.getQos());System.out.println("消息内容: " + payload);System.out.println("消息ID: " + message.getId());System.out.println("是否保留消息: " + message.isRetained());System.out.println("========================================");}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 发布完成回调(订阅者不需要实现)}});// 订阅主题mqttClient.subscribe(topic, qos);System.out.println("成功订阅主题: " + topic + ",QoS: " + qos);} catch (MqttException e) {System.err.println("订阅主题失败: " + e.getMessage());e.printStackTrace();}}
}

逻辑说明:

  • 使用 @PostConstruct 实现应用启动后自动订阅;

  • 实现 MqttCallback 接口监听消息;

  • 当消息到达时打印详细信息;

  • 可在此扩展业务逻辑(如存入数据库、转发到消息队列)。


六、Publisher(发布者)

package com.lps.mqttdemo.publisher;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.nio.charset.StandardCharsets;
/*** MQTT 消息发布者* 提供 REST 接口用于发布 MQTT 消息*/
@RestController
@RequestMapping("/publish")
public class Publisher {@Autowiredprivate MqttClient mqttClient;@Value("${mqtt.topic}")private String defaultTopic;@Value("${mqtt.default-qos}")private int defaultQos;/*** 发布消息到默认主题** @param message 要发布的消息内容* @return 发布结果*/@PostMappingpublic ResponseEntity publishMessage(@RequestParam String message) {return publishMessage(defaultTopic, message, defaultQos);}/*** 发布消息到指定主题** @param topic   主题名称* @param message 消息内容* @param qos     QoS 等级(0, 1, 2)* @return 发布结果*/@PostMapping("/{topic}")public ResponseEntity publishToTopic(@PathVariable String topic,@RequestParam String message,@RequestParam(required = false, defaultValue = "1") int qos) {return publishMessage(topic, message, qos);}/*** 发布消息的核心方法** @param topic   主题* @param message 消息内容* @param qos     QoS 等级* @return 发布结果*/private ResponseEntity publishMessage(String topic, String message, int qos) {try {// 确保客户端已连接if (!mqttClient.isConnected()) {mqttClient.reconnect();}// 创建 MQTT 消息MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));mqttMessage.setQos(qos);mqttMessage.setRetained(false);// 发布消息mqttClient.publish(topic, mqttMessage);return ResponseEntity.ok(String.format("消息已成功发布到主题 [%s],QoS: %d,内容: %s", topic, qos, message));} catch (MqttException e) {return ResponseEntity.status(500).body("发布消息失败: " + e.getMessage());}}
}

接口测试:

1️⃣ 发布默认主题消息

curl -X POST "http://localhost:8080/publish?message=hello_mqtt"

2️⃣ 发布自定义主题消息

curl -X POST "http://localhost:8080/publish/myTopic?message=test_msg&qos=1"

控制台输出示例:

成功订阅主题: lps-test-topic,QoS: 1 ========== 收到MQTT消息 ========== 主题: lps-test-topic QoS: 1 内容: hello_mqtt ================================


七、常见问题与优化建议

1. 客户端断开重连问题

如果 Broker 重启或网络异常,isConnected() 检查 + reconnect() 是基本方案。
更优的方案是:

  • 使用 Paho 的 MqttAsyncClient

  • 或者使用 Spring Integration MQTT 框架管理连接状态


2. 多主题订阅

mqttClient.subscribe(new String[]{"topic1", "topic2"}, new int[]{1, 1});

3. QoS 说明

等级含义场景
0最多一次(不保证送达)传感器实时数据
1至少一次(可能重复)一般消息
2仅一次(最可靠)控制指令、金融场景

4. 提升健壮性建议

  • 建议封装 MqttService 类统一管理发布/订阅逻辑;

  • 增加心跳检测与状态日志;

  • 使用 CompletableFuture 或线程池异步发布,提升吞吐;

  • 如果系统使用 RocketMQ/Kafka,可将 MQTT 作为接入层网关。


八、总结

本文完整演示了如何在 Spring Boot 中使用 Eclipse Paho 实现 MQTT 消息的发布与订阅功能。
整个流程简单明了,适合初学者快速上手,同时也能作为物联网项目的基础通信模块。

http://www.jsqmd.com/news/55703/

相关文章:

  • 2025年防尘雾森设备技术与智能雾森设备方案十大服务商排行榜
  • 智能电网用户端设备品牌TOP5权威推荐:江苏斯菲尔研发能力强
  • 优势演员-评论家(Advantage Actor-Critic,A2C)算法详解与完成
  • 2025年十大闭式冷却塔定制服务提供商排行榜,精选闭式冷却塔
  • 2025年哈尔滨中央空调销售公司排名:中央空调销售品牌推荐
  • HTML列表学习笔记
  • 2025年十大不锈钢压花板厂商排行榜,靠谱不锈钢压花板制造厂
  • 哈尔滨口碑好的家居设备专业公司TOP5权威推荐:甄选实力企业
  • 深入解析:2025-10-30日供应链安全日报:最新漏洞预警与投毒预警情报汇总
  • 光刻胶分类与特性:化学增幅i线光刻胶 CAR、金属氧化物光刻胶及EUV光刻胶前沿进展(续) - 详解
  • 2025年十大上海起重设备品牌排行榜,凯力起重设备质量可靠吗
  • 2025年度哈尔滨一站式家居设备定制服务排行榜,专业测评精选
  • NOIP2025 总结
  • 专业的的楼梯钢格板供应商推荐排行榜单?楼梯钢格板供应商 楼梯钢格板销售厂家 楼梯钢格板制造厂 楼梯钢格板生产商 楼梯钢格板厂商 楼梯钢格板企业 楼梯钢格板供货商
  • 正规的电厂钢格栅供应商推荐排行榜单?电厂钢格栅供应商 电厂钢格栅销售厂家 电厂钢格栅制造厂 电厂钢格栅生产商 电厂钢格栅厂商 电厂钢格栅企业 电厂钢格栅供货商
  • 可靠的工业铝型材供应厂家推荐排行榜?工业铝型材供应厂家 工业铝型材工厂 工业铝型材厂家 工业铝型材生产厂家 工业铝型材源头厂家 工业铝型材供应商
  • 2025年山东帽顶膜结构停车棚安装厂家推荐:学校/拉杆式膜结
  • 2025年哈尔滨家居设备服务公司排名:盛通MALL旗舰店的用
  • 专业的遮阳蓬品牌哪家靠谱?遮阳蓬品牌 遮阳蓬公司 遮阳蓬产品 遮阳蓬供应厂家 遮阳蓬工厂 遮阳蓬厂家 遮阳蓬生产厂家 遮阳蓬源头厂家
  • 《考研408数据结构》第七章(6.1~6.3图的概念、存储方式、深/广度遍历)复习笔记 - 教程
  • 国内AI独角兽公司推荐:探索智能科技前沿力量
  • 国内AI公司估值排行:行业技术实力与发展潜力观察
  • 正规的推拉雨棚品牌哪家靠谱?推拉雨棚品牌 推拉雨棚公司 推拉雨棚产品 推拉雨棚供应厂家 推拉雨棚工厂 推拉雨棚厂家 推拉雨棚生产厂家 推拉雨棚源头厂家
  • 靠谱的AI公司有哪些?行业深耕者实力盘点
  • 完整教程:人机交互的软件工程方法实验报告(黑龙江大学)
  • 国内AI公司推荐:智能获客与行业赋能的实力之选
  • 2025年口碑好的前置过滤器品牌哪家好?大白瓶前置过滤器 大胖瓶前置过滤器 小白瓶前置过滤器 大蓝瓶前置过滤器 双芯前置过滤器品牌推荐
  • 睡眠益生菌哪家好?五款热门产品深度解析
  • 上海AI创业公司排行榜:2025年创新技术与应用场景分析
  • 中国AI科技公司融资榜推荐:行业领军企业实力解析