import Vue from 'vue'
import App from './App.vue'
import router from './router'
import store from './store'
import ElementUI from 'element-ui'
import 'element-ui/lib/theme-chalk/index.css'
import './assets/styles/global.css'
import * as echarts from 'echarts'
import dayjs from 'dayjs'
import websocket from './utils/websocket'// 注册 Element UI
Vue.use(ElementUI, { size: 'small' })// 全局挂载 echarts
Vue.prototype.$echarts = echarts// 全局挂载 dayjs
Vue.prototype.$dayjs = dayjs// 全局挂载 websocket
Vue.prototype.$websocket = websocketVue.config.productionTip = falsenew Vue({router,store,render: h => h(App)
}).$mount('#app')// 应用启动时初始化 WebSocket 连接(可选)
// 在需要实时数据的页面中显式连接
websocket.connect().catch(err => {console.warn('WebSocket 连接失败,将在需要时重试:', err)
})
1. 前端使用vue2.x 集合websocket 做设备信息的展示。

2. 后端使用springboot 集合,mqtt和 TDengine。
package com.device.backend.config;import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;/*** MQTT 配置类*/ @Configuration public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.default-topic}")private String defaultTopic;@Value("${mqtt.qos:1}")private int qos;@Value("${mqtt.keep-alive-interval:60}")private int keepAliveInterval;@Value("${mqtt.connection-timeout:30}")private int connectionTimeout;/*** MQTT 客户端工厂*/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();org.eclipse.paho.client.mqttv3.MqttConnectOptions options = new org.eclipse.paho.client.mqttv3.MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());options.setKeepAliveInterval(keepAliveInterval);options.setConnectionTimeout(connectionTimeout);options.setAutomaticReconnect(true);options.setCleanSession(false);factory.setConnectionOptions(options);return factory;}/*** MQTT 入站消息通道*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT 入站消息适配器 (订阅)*/@Beanpublic MessageProducer mqttInbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound",mqttClientFactory(),defaultTopic,"device/+/status","device/+/data","device/+/event","device/+/heartbeat","device/+/register");adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(qos);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT 出站消息通道*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT 出站消息处理器 (发布)*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler(clientId + "-outbound", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setDefaultQos(qos);return messageHandler;} }
# MQTT 配置 mqtt:broker-url: tcp://localhost:1883client-id: device-platform-serverusername: adminpassword: publicdefault-topic: device/#qos: 1keep-alive-interval: 60connection-timeout: 30auto-reconnect: true
配置TDengine 时序数据库, 存储设备上线,下线等数据。
package com.device.backend.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;/*** TDengine 数据库配置*/
@Configuration
public class TDengineConfig {@Value("${tdengine.driver-class-name}")private String driverClassName;@Value("${tdengine.url}")private String url;@Value("${tdengine.username}")private String username;@Value("${tdengine.password}")private String password;@Value("${tdengine.database}")private String database;/*** 获取 TDengine 连接*/public Connection getConnection() throws SQLException {try {Class.forName(driverClassName);} catch (ClassNotFoundException e) {throw new SQLException("TDengine driver not found", e);}return DriverManager.getConnection(url, username, password);}/*** 初始化 TDengine 数据库和超级表*/@Beanpublic Boolean initTDengine() {try (Connection conn = getConnection();Statement stmt = conn.createStatement()) {// 创建数据库
// String createDbSql = "CREATE DATABASE IF NOT EXISTS " + database;
// System.out.println("执行 SQL: " + createDbSql);
// stmt.execute(createDbSql);
//
// // 使用数据库
// String useSql = "USE " + database;
// System.out.println("执行 SQL: " + useSql);
// stmt.execute(useSql);
//
// // 创建设备数据超级表 (STable) -
// String createMetricsSql = "CREATE STABLE IF NOT EXISTS device_metrics (" +
// "ts TIMESTAMP, " +
// "temperature FLOAT, " +
// "humidity FLOAT, " +
// "voltage FLOAT, " +
// "current FLOAT, " +
// "power FLOAT, " +
// "battery_level INT, " +
// "signal_strength INT" +
// ") TAGS (" +
// "device_sn VARCHAR(100), " +
// "device_type VARCHAR(50), " +
// "device_group VARCHAR(100)" +
// ")";
// System.out.println("执行 SQL: " + createMetricsSql);
// stmt.execute(createMetricsSql);
//
// // 创建设备事件超级表 -
// String createEventsSql = "CREATE STABLE IF NOT EXISTS device_events (" +
// "ts TIMESTAMP, " +
// "event_type VARCHAR(50), " +
// "event_content VARCHAR(500), " +
// "event_level VARCHAR(20)" +
// ") TAGS (" +
// "device_sn VARCHAR(100)" +
// ")";
// System.out.println("执行 SQL: " + createEventsSql);
// stmt.execute(createEventsSql);return true;} catch (Exception e) {
// System.err.println(" TDengine 初始化失败 (请确认 TDengine 服务已启动): " + e.getMessage());
// e.printStackTrace();
//
// // 如果 STABLE 创建失败,尝试备选方案(普通表)
// try (Connection conn = getConnection();
// Statement stmt = conn.createStatement()) {
//
// System.out.println("尝试备选方案:使用普通表代替...");
// stmt.execute("USE " + database);
//
// // 创建普通表(如果 STABLE 不支持)
// stmt.execute("CREATE TABLE IF NOT EXISTS device_metrics (" +
// "ts TIMESTAMP, " +
// "temperature FLOAT, " +
// "humidity FLOAT, " +
// "voltage FLOAT, " +
// "current FLOAT, " +
// "power FLOAT, " +
// "battery_level INT, " +
// "signal_strength INT, " +
// "device_sn VARCHAR(100)" +
// ")");
//
// stmt.execute("CREATE TABLE IF NOT EXISTS device_events (" +
// "ts TIMESTAMP, " +
// "event_type VARCHAR(50), " +
// "event_content VARCHAR(500), " +
// "event_level VARCHAR(20), " +
// "device_sn VARCHAR(100)" +
// ")");
//
// return true;}}public String getDatabase() {return database;}
}
TDengine 数据库表
CREATE DATABASE IF NOT EXISTS device_ts;USE device_ts;-- 设备指标表 CREATE STABLE IF NOT EXISTS device_metrics (ts TIMESTAMP,temperature FLOAT,humidity FLOAT,voltage FLOAT,current FLOAT,power FLOAT,battery_level INT,signal_strength INT ) TAGS (device_sn VARCHAR(100),device_type VARCHAR(50),device_group VARCHAR(100) );-- 设备事件表 CREATE STABLE IF NOT EXISTS device_events (ts TIMESTAMP,event_type VARCHAR(50),event_content VARCHAR(500),event_level VARCHAR(20) ) TAGS (device_sn VARCHAR(100) );

3、mqtt处理消息
package com.device.backend.mqtt;import com.device.backend.dto.DeviceMessageDTO;
import com.device.backend.service.DeviceService;
import com.device.backend.service.TDengineService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** MQTT 消息处理器* 接收来自 MQTT Broker 的设备消息并进行处理*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttMessageHandler {private final DeviceService deviceService;private final TDengineService tdengineService;private final SimpMessagingTemplate messagingTemplate;private final ObjectMapper objectMapper;/*** 处理 MQTT 入站消息* inputChannel 对应 MqttConfig 中配置的入站通道*/@ServiceActivator(inputChannel = "mqttInputChannel")public void handleMessage(Message<?> message) {String topic = (String) message.getHeaders().get("mqtt_receivedTopic");String payload = message.getPayload().toString();log.info("收到MQTT消息 - Topic: {}, Payload: {}", topic, payload);try {// 解析 topic: device/{sn}/{type}String[] parts = topic != null ? topic.split("/") : new String[0];if (parts.length < 3) {log.warn("无效的MQTT Topic: {}", topic);return;}String deviceSn = parts[1];String msgType = parts[2];switch (msgType) {case "register":handleRegister(deviceSn, payload);break;case "heartbeat":handleHeartbeat(deviceSn, payload);break;case "status":handleStatus(deviceSn, payload);break;case "data":handleData(deviceSn, payload);break;case "event":handleEvent(deviceSn, payload);break;default:log.warn("未知消息类型: {}", msgType);}} catch (Exception e) {log.error("处理MQTT消息异常: {}", e.getMessage(), e);}}/*** 处理设备注册消息*/private void handleRegister(String deviceSn, String payload) throws Exception {log.info("设备注册: {}", deviceSn);Map<?, Object> data = objectMapper.readValue(payload, Map.class);String ipAddress = (String) data.getOrDefault("ipAddress", "");deviceService.deviceOnline(deviceSn, ipAddress);// 保存注册事件到TDenginetdengineService.saveDeviceEvent(deviceSn, "REGISTER", "设备注册接入", "INFO");// 推送WebSocket消息给前端pushDeviceEvent(deviceSn, "REGISTER", "设备注册接入");}/*** 处理心跳消息*/private void handleHeartbeat(String deviceSn, String payload) {deviceService.deviceOnline(deviceSn, null);// 推送心跳事件pushDeviceEvent(deviceSn, "HEARTBEAT", "设备心跳");}/*** 处理状态消息*/private void handleStatus(String deviceSn, String payload) throws Exception {DeviceMessageDTO msg = objectMapper.readValue(payload, DeviceMessageDTO.class);msg.setDeviceSn(deviceSn);msg.setMsgType("STATUS");Boolean online = msg.getOnline();if (online != null) {if (online) {deviceService.deviceOnline(deviceSn, msg.getIpAddress());} else {deviceService.deviceOffline(deviceSn);}}// 推送到前端pushDeviceStatus(deviceSn, msg);}/*** 处理数据上报消息*/private void handleData(String deviceSn, String payload) throws Exception {DeviceMessageDTO msg = objectMapper.readValue(payload, DeviceMessageDTO.class);msg.setDeviceSn(deviceSn);msg.setMsgType("DATA");// 更新设备最后上报时间deviceService.deviceOnline(deviceSn, msg.getIpAddress());// 保存时序数据到 TDenginetdengineService.saveDeviceMetrics(msg);// 推送实时数据到前端pushDeviceData(deviceSn, msg);// 检查告警阈值checkAlarmThreshold(deviceSn, msg);}/*** 处理事件消息*/private void handleEvent(String deviceSn, String payload) throws Exception {Map<?, Object> data = objectMapper.readValue(payload, Map.class);String eventType = (String) data.getOrDefault("eventType", "UNKNOWN");String eventContent = (String) data.getOrDefault("content", "");String eventLevel = (String) data.getOrDefault("level", "INFO");// 保存到TDenginetdengineService.saveDeviceEvent(deviceSn, eventType, eventContent, eventLevel);// 推送前端pushDeviceEvent(deviceSn, eventType, eventContent);}/*** 检查告警阈值*/private void checkAlarmThreshold(String deviceSn, DeviceMessageDTO msg) {// 注入告警服务 (通过构造器注入)// 这里需要在类中添加告警服务的注入if (msg.getTemperature() != null && msg.getTemperature() > 80) {deviceService.updateStatus(deviceSn, "ALARM", null);tdengineService.saveDeviceEvent(deviceSn, "HIGH_TEMP","温度过高: " + msg.getTemperature() + "°C", "ERROR");pushDeviceEvent(deviceSn, "ALARM", "温度告警: " + msg.getTemperature() + "°C");}if (msg.getBatteryLevel() != null && msg.getBatteryLevel() < 10) {tdengineService.saveDeviceEvent(deviceSn, "LOW_BATTERY","电量不足: " + msg.getBatteryLevel() + "%", "WARNING");pushDeviceEvent(deviceSn, "LOW_BATTERY", "低电量告警: " + msg.getBatteryLevel() + "%");}}/*** 推送设备实时数据到前端 WebSocket*/private void pushDeviceData(String deviceSn, DeviceMessageDTO msg) {try {messagingTemplate.convertAndSend("/topic/device/data/" + deviceSn, msg);messagingTemplate.convertAndSend("/topic/device/data", msg);} catch (Exception e) {log.error("推送WebSocket消息失败: {}", e.getMessage());}}/*** 推送设备状态到前端 WebSocket*/private void pushDeviceStatus(String deviceSn, DeviceMessageDTO msg) {try {Map<String, Object> statusMsg = new HashMap<>();statusMsg.put("deviceSn", deviceSn);statusMsg.put("online", msg.getOnline());statusMsg.put("ipAddress", msg.getIpAddress());messagingTemplate.convertAndSend("/topic/device/status", statusMsg);} catch (Exception e) {log.error("推送设备状态失败: {}", e.getMessage());}}/*** 推送设备事件到前端 WebSocket*/private void pushDeviceEvent(String deviceSn, String eventType, String content) {try {Map<String, Object> event = new HashMap<>();event.put("deviceSn", deviceSn);event.put("eventType", eventType);event.put("content", content);event.put("timestamp", System.currentTimeMillis());messagingTemplate.convertAndSend("/topic/device/event", event);} catch (Exception e) {log.error("推送设备事件失败: {}", e.getMessage());}}
}
模拟设备订阅主题, 并发布消息

至此,基本的设备管理的流程基本上完成了。总体来说就是 物联网设备在mqtt订阅和发布消息。springboot完成消息的解析和前端交互显示设备状态。使用时序数据库TDengine来存储设备上报信息。
页面向设备发送指令也是通过mqtt主题来做。设备获取到消息会执行对应的指令。
