当前位置: 首页 > news >正文

SpringBoot+Vue2.x+MQTT+TDengine3.x搭建物联网设备管理平台

import Vue from 'vue'
import App from './App.vue'
import router from './router'
import store from './store'
import ElementUI from 'element-ui'
import 'element-ui/lib/theme-chalk/index.css'
import './assets/styles/global.css'
import * as echarts from 'echarts'
import dayjs from 'dayjs'
import websocket from './utils/websocket'// 注册 Element UI
Vue.use(ElementUI, { size: 'small' })// 全局挂载 echarts
Vue.prototype.$echarts = echarts// 全局挂载 dayjs
Vue.prototype.$dayjs = dayjs// 全局挂载 websocket
Vue.prototype.$websocket = websocketVue.config.productionTip = falsenew Vue({router,store,render: h => h(App)
}).$mount('#app')// 应用启动时初始化 WebSocket 连接(可选)
// 在需要实时数据的页面中显式连接
websocket.connect().catch(err => {console.warn('WebSocket 连接失败,将在需要时重试:', err)
})

1. 前端使用vue2.x 集合websocket 做设备信息的展示。


无标题

2.  后端使用springboot 集合,mqtt和 TDengine。

package com.device.backend.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** MQTT 配置类*/
@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.default-topic}")private String defaultTopic;@Value("${mqtt.qos:1}")private int qos;@Value("${mqtt.keep-alive-interval:60}")private int keepAliveInterval;@Value("${mqtt.connection-timeout:30}")private int connectionTimeout;/*** MQTT 客户端工厂*/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();org.eclipse.paho.client.mqttv3.MqttConnectOptions options = new org.eclipse.paho.client.mqttv3.MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());options.setKeepAliveInterval(keepAliveInterval);options.setConnectionTimeout(connectionTimeout);options.setAutomaticReconnect(true);options.setCleanSession(false);factory.setConnectionOptions(options);return factory;}/*** MQTT 入站消息通道*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT 入站消息适配器 (订阅)*/@Beanpublic MessageProducer mqttInbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound",mqttClientFactory(),defaultTopic,"device/+/status","device/+/data","device/+/event","device/+/heartbeat","device/+/register");adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(qos);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT 出站消息通道*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT 出站消息处理器 (发布)*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler(clientId + "-outbound", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setDefaultQos(qos);return messageHandler;}
}
# MQTT 配置
mqtt:broker-url: tcp://localhost:1883client-id: device-platform-serverusername: adminpassword: publicdefault-topic: device/#qos: 1keep-alive-interval: 60connection-timeout: 30auto-reconnect: true

 配置TDengine 时序数据库, 存储设备上线,下线等数据。

package com.device.backend.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;/*** TDengine 数据库配置*/
@Configuration
public class TDengineConfig {@Value("${tdengine.driver-class-name}")private String driverClassName;@Value("${tdengine.url}")private String url;@Value("${tdengine.username}")private String username;@Value("${tdengine.password}")private String password;@Value("${tdengine.database}")private String database;/*** 获取 TDengine 连接*/public Connection getConnection() throws SQLException {try {Class.forName(driverClassName);} catch (ClassNotFoundException e) {throw new SQLException("TDengine driver not found", e);}return DriverManager.getConnection(url, username, password);}/*** 初始化 TDengine 数据库和超级表*/@Beanpublic Boolean initTDengine() {try (Connection conn = getConnection();Statement stmt = conn.createStatement()) {// 创建数据库
//            String createDbSql = "CREATE DATABASE IF NOT EXISTS " + database;
//            System.out.println("执行 SQL: " + createDbSql);
//            stmt.execute(createDbSql);
//
//            // 使用数据库
//            String useSql = "USE " + database;
//            System.out.println("执行 SQL: " + useSql);
//            stmt.execute(useSql);
//
//            // 创建设备数据超级表 (STable) - 
//            String createMetricsSql = "CREATE STABLE IF NOT EXISTS device_metrics (" +
//                    "ts TIMESTAMP, " +
//                    "temperature FLOAT, " +
//                    "humidity FLOAT, " +
//                    "voltage FLOAT, " +
//                    "current FLOAT, " +
//                    "power FLOAT, " +
//                    "battery_level INT, " +
//                    "signal_strength INT" +
//                    ") TAGS (" +
//                    "device_sn VARCHAR(100), " +
//                    "device_type VARCHAR(50), " +
//                    "device_group VARCHAR(100)" +
//                    ")";
//            System.out.println("执行 SQL: " + createMetricsSql);
//            stmt.execute(createMetricsSql);
//
//            // 创建设备事件超级表 - 
//            String createEventsSql = "CREATE STABLE IF NOT EXISTS device_events (" +
//                    "ts TIMESTAMP, " +
//                    "event_type VARCHAR(50), " +
//                    "event_content VARCHAR(500), " +
//                    "event_level VARCHAR(20)" +
//                    ") TAGS (" +
//                    "device_sn VARCHAR(100)" +
//                    ")";
//            System.out.println("执行 SQL: " + createEventsSql);
//            stmt.execute(createEventsSql);return true;} catch (Exception e) {
//            System.err.println(" TDengine 初始化失败 (请确认 TDengine 服务已启动): " + e.getMessage());
//            e.printStackTrace();
//
//            // 如果 STABLE 创建失败,尝试备选方案(普通表)
//            try (Connection conn = getConnection();
//                 Statement stmt = conn.createStatement()) {
//
//                System.out.println("尝试备选方案:使用普通表代替...");
//                stmt.execute("USE " + database);
//
//                // 创建普通表(如果 STABLE 不支持)
//                stmt.execute("CREATE TABLE IF NOT EXISTS device_metrics (" +
//                        "ts TIMESTAMP, " +
//                        "temperature FLOAT, " +
//                        "humidity FLOAT, " +
//                        "voltage FLOAT, " +
//                        "current FLOAT, " +
//                        "power FLOAT, " +
//                        "battery_level INT, " +
//                        "signal_strength INT, " +
//                        "device_sn VARCHAR(100)" +
//                        ")");
//
//                stmt.execute("CREATE TABLE IF NOT EXISTS device_events (" +
//                        "ts TIMESTAMP, " +
//                        "event_type VARCHAR(50), " +
//                        "event_content VARCHAR(500), " +
//                        "event_level VARCHAR(20), " +
//                        "device_sn VARCHAR(100)" +
//                        ")");
//
//                return true;}}public String getDatabase() {return database;}
}

 TDengine 数据库表

CREATE DATABASE IF NOT EXISTS device_ts;USE device_ts;-- 设备指标表  
CREATE STABLE IF NOT EXISTS device_metrics (ts TIMESTAMP,temperature FLOAT,humidity FLOAT,voltage FLOAT,current FLOAT,power FLOAT,battery_level INT,signal_strength INT
) TAGS (device_sn VARCHAR(100),device_type VARCHAR(50),device_group VARCHAR(100)
);-- 设备事件表 
CREATE STABLE IF NOT EXISTS device_events (ts TIMESTAMP,event_type VARCHAR(50),event_content VARCHAR(500),event_level VARCHAR(20)
) TAGS (device_sn VARCHAR(100)
);

  无标题1111

3、mqtt处理消息

package com.device.backend.mqtt;import com.device.backend.dto.DeviceMessageDTO;
import com.device.backend.service.DeviceService;
import com.device.backend.service.TDengineService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** MQTT 消息处理器* 接收来自 MQTT Broker 的设备消息并进行处理*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttMessageHandler {private final DeviceService deviceService;private final TDengineService tdengineService;private final SimpMessagingTemplate messagingTemplate;private final ObjectMapper objectMapper;/*** 处理 MQTT 入站消息* inputChannel 对应 MqttConfig 中配置的入站通道*/@ServiceActivator(inputChannel = "mqttInputChannel")public void handleMessage(Message<?> message) {String topic = (String) message.getHeaders().get("mqtt_receivedTopic");String payload = message.getPayload().toString();log.info("收到MQTT消息 - Topic: {}, Payload: {}", topic, payload);try {// 解析 topic: device/{sn}/{type}String[] parts = topic != null ? topic.split("/") : new String[0];if (parts.length < 3) {log.warn("无效的MQTT Topic: {}", topic);return;}String deviceSn = parts[1];String msgType = parts[2];switch (msgType) {case "register":handleRegister(deviceSn, payload);break;case "heartbeat":handleHeartbeat(deviceSn, payload);break;case "status":handleStatus(deviceSn, payload);break;case "data":handleData(deviceSn, payload);break;case "event":handleEvent(deviceSn, payload);break;default:log.warn("未知消息类型: {}", msgType);}} catch (Exception e) {log.error("处理MQTT消息异常: {}", e.getMessage(), e);}}/*** 处理设备注册消息*/private void handleRegister(String deviceSn, String payload) throws Exception {log.info("设备注册: {}", deviceSn);Map<?, Object> data = objectMapper.readValue(payload, Map.class);String ipAddress = (String) data.getOrDefault("ipAddress", "");deviceService.deviceOnline(deviceSn, ipAddress);// 保存注册事件到TDenginetdengineService.saveDeviceEvent(deviceSn, "REGISTER", "设备注册接入", "INFO");// 推送WebSocket消息给前端pushDeviceEvent(deviceSn, "REGISTER", "设备注册接入");}/*** 处理心跳消息*/private void handleHeartbeat(String deviceSn, String payload) {deviceService.deviceOnline(deviceSn, null);// 推送心跳事件pushDeviceEvent(deviceSn, "HEARTBEAT", "设备心跳");}/*** 处理状态消息*/private void handleStatus(String deviceSn, String payload) throws Exception {DeviceMessageDTO msg = objectMapper.readValue(payload, DeviceMessageDTO.class);msg.setDeviceSn(deviceSn);msg.setMsgType("STATUS");Boolean online = msg.getOnline();if (online != null) {if (online) {deviceService.deviceOnline(deviceSn, msg.getIpAddress());} else {deviceService.deviceOffline(deviceSn);}}// 推送到前端pushDeviceStatus(deviceSn, msg);}/*** 处理数据上报消息*/private void handleData(String deviceSn, String payload) throws Exception {DeviceMessageDTO msg = objectMapper.readValue(payload, DeviceMessageDTO.class);msg.setDeviceSn(deviceSn);msg.setMsgType("DATA");// 更新设备最后上报时间deviceService.deviceOnline(deviceSn, msg.getIpAddress());// 保存时序数据到 TDenginetdengineService.saveDeviceMetrics(msg);// 推送实时数据到前端pushDeviceData(deviceSn, msg);// 检查告警阈值checkAlarmThreshold(deviceSn, msg);}/*** 处理事件消息*/private void handleEvent(String deviceSn, String payload) throws Exception {Map<?, Object> data = objectMapper.readValue(payload, Map.class);String eventType = (String) data.getOrDefault("eventType", "UNKNOWN");String eventContent = (String) data.getOrDefault("content", "");String eventLevel = (String) data.getOrDefault("level", "INFO");// 保存到TDenginetdengineService.saveDeviceEvent(deviceSn, eventType, eventContent, eventLevel);// 推送前端pushDeviceEvent(deviceSn, eventType, eventContent);}/*** 检查告警阈值*/private void checkAlarmThreshold(String deviceSn, DeviceMessageDTO msg) {// 注入告警服务 (通过构造器注入)// 这里需要在类中添加告警服务的注入if (msg.getTemperature() != null && msg.getTemperature() > 80) {deviceService.updateStatus(deviceSn, "ALARM", null);tdengineService.saveDeviceEvent(deviceSn, "HIGH_TEMP","温度过高: " + msg.getTemperature() + "°C", "ERROR");pushDeviceEvent(deviceSn, "ALARM", "温度告警: " + msg.getTemperature() + "°C");}if (msg.getBatteryLevel() != null && msg.getBatteryLevel() < 10) {tdengineService.saveDeviceEvent(deviceSn, "LOW_BATTERY","电量不足: " + msg.getBatteryLevel() + "%", "WARNING");pushDeviceEvent(deviceSn, "LOW_BATTERY", "低电量告警: " + msg.getBatteryLevel() + "%");}}/*** 推送设备实时数据到前端 WebSocket*/private void pushDeviceData(String deviceSn, DeviceMessageDTO msg) {try {messagingTemplate.convertAndSend("/topic/device/data/" + deviceSn, msg);messagingTemplate.convertAndSend("/topic/device/data", msg);} catch (Exception e) {log.error("推送WebSocket消息失败: {}", e.getMessage());}}/*** 推送设备状态到前端 WebSocket*/private void pushDeviceStatus(String deviceSn, DeviceMessageDTO msg) {try {Map<String, Object> statusMsg = new HashMap<>();statusMsg.put("deviceSn", deviceSn);statusMsg.put("online", msg.getOnline());statusMsg.put("ipAddress", msg.getIpAddress());messagingTemplate.convertAndSend("/topic/device/status", statusMsg);} catch (Exception e) {log.error("推送设备状态失败: {}", e.getMessage());}}/*** 推送设备事件到前端 WebSocket*/private void pushDeviceEvent(String deviceSn, String eventType, String content) {try {Map<String, Object> event = new HashMap<>();event.put("deviceSn", deviceSn);event.put("eventType", eventType);event.put("content", content);event.put("timestamp", System.currentTimeMillis());messagingTemplate.convertAndSend("/topic/device/event", event);} catch (Exception e) {log.error("推送设备事件失败: {}", e.getMessage());}}
}

  模拟设备订阅主题, 并发布消息

无标题333

 

 至此,基本的设备管理的流程基本上完成了。总体来说就是 物联网设备在mqtt订阅和发布消息。springboot完成消息的解析和前端交互显示设备状态。使用时序数据库TDengine来存储设备上报信息。
页面向设备发送指令也是通过mqtt主题来做。设备获取到消息会执行对应的指令。

   





http://www.jsqmd.com/news/497654/

相关文章:

  • 2026年高稳定手游联运平台系统推荐指南:搭建手游平台/游戏联运平台/游戏聚合发行系统/H5联运平台系统/手游平台sdk/选择指南 - 优质品牌商家
  • Django个人主页网站搭建全指南
  • ### 2. `isTransformResponse: true` 或不设置时(默认) 返回的是转换后的数据,通常是 `res.data` 的内容:
  • 列表推导式详解与实战应用
  • 基于springboot“茶见”在线商城设计与开发(源码+精品论文+答辩PPT等资料)
  • 第 18 篇 综合项目实战:基于 RK3568 的安卓智能门禁系统,全栈开发
  • 《我从达尔文那里学到的投资知识》
  • 力扣 hot100 滑动窗口最大值 单调双端队列 java 简单题解
  • 金融交易系统高可用测试指南:构建永不宕机的安全防线
  • SGI备份还原单文件版
  • 天地图中使用html2canvas问题
  • Zen Browser:基于 Firefox 的极简开源浏览器,隐私与速度兼得
  • Linux系统文件操作简介
  • OpenClaw 核心功能解析:一文让你彻底搞懂 OpenClaw
  • Win 32 API:初步了解与应用
  • 2026年SCI论文降AI率用什么工具?实测5款后选了这个
  • 4K型护套连接器ZE0703-09(250)参数
  • hello-agent task01打卡
  • PDF文件拆分, 不限制文件大小
  • 携程任我行礼品卡回收秒变现攻略 - 京顺回收
  • 任务栏标语图片
  • 加一 - 题目笔记
  • MySQL主键设计原则与自增ID的潜在问题分析
  • 自动化测试常用函数(元素的定位)
  • 技术分享-日志链路追踪
  • 龙虾智能体不是玩具!国家安全部提醒:这3个防护步骤必做
  • (独自升级Lv.1)C++基础面试题
  • 从零学网安第四期--在kali里面制作木马程序并实现远程控制
  • 238. 除了自身以外数组的乘积
  • 自动驾驶购物车测试:超市里的交通拥堵难题——软件测试工程师的实战解构