当前位置: 首页 > news >正文

pdu_mqtt.py

# -*- coding: utf-8 -*-
import json
import logging
import time
import threading
from paho.mqtt import client as mqtt_client

# MQTT 全局配置
MQTT_CONFIG = {
    'broker': '172.19.64.31',
    'port': 1883,
    'username': 'mh',
    'password': '123456',
    'pub_topic': 'geek_pdu/{pdu_id}/publish',
    'sub_topic': 'geek_pdu/{pdu_id}/subscribe',
}

# 全局 MQTT 客户端
_mqtt_client = None
_client_lock = threading.Lock()
_subscribed_pdus = set()
_subscribe_lock = threading.Lock()


def get_mqtt_client():
    """获取 MQTT 客户端单例"""
    global _mqtt_client
    if _mqtt_client is None:
        with _client_lock:
            if _mqtt_client is None:
                _mqtt_client = _init_mqtt_client()
    return _mqtt_client


def _ensure_subscribed(pdu_id, log_callback=None):
    """确保订阅了指定 PDU 的状态 Topic"""
    client = get_mqtt_client()
    if not client or not client.is_connected():
        return False
   
    with _subscribe_lock:
        if pdu_id in _subscribed_pdus:
            return True
        sub_topic = MQTT_CONFIG['sub_topic'].replace('{pdu_id}', pdu_id)
        client.subscribe(sub_topic)
        _subscribed_pdus.add(pdu_id)
        if log_callback:
            log_callback(f"📡 订阅 PDU 状态: {sub_topic}")
        logging.info(f"Subscribed to {sub_topic}")
    return True


def _init_mqtt_client():
    """初始化 MQTT 客户端"""
    client = mqtt_client.Client()
    client.username_pw_set(MQTT_CONFIG['username'], MQTT_CONFIG['password'])
   
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            logging.info("MQTT PDU client connected")
        else:
            logging.error(f"MQTT PDU client connect failed, rc={rc}")
   
    def on_message(client, userdata, msg):
        # 收到消息时只打印,不解析(避免刷屏)
        try:
            payload = msg.payload.decode()
            topic = msg.topic
            logging.debug(f"MQTT received: {topic} -> {payload}")
        except Exception as e:
            logging.error(f"MQTT on_message error: {e}")
   
    def on_disconnect(client, userdata, rc):
        logging.warning(f"MQTT PDU client disconnected, rc={rc}")
        with _subscribe_lock:
            _subscribed_pdus.clear()
   
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
   
    try:
        client.connect(MQTT_CONFIG['broker'], MQTT_CONFIG['port'], keepalive=60)
        client.loop_start()
        time.sleep(1)
    except Exception as e:
        logging.error(f"MQTT PDU client connect error: {e}")
   
    return client


def _publish(pdu_id, msg_dict, log_callback=None):
    """发送 MQTT 消息"""
    client = get_mqtt_client()
    if not client or not client.is_connected():
        error_msg = f"MQTT client not connected"
        if log_callback:
            log_callback(f"❌ {error_msg}")
        return False, error_msg
   
    pub_topic = MQTT_CONFIG['pub_topic'].replace('{pdu_id}', pdu_id)
    msg = json.dumps(msg_dict)
    result = client.publish(pub_topic, msg, qos=1)
   
    if result[0] == 0:
        if log_callback:
            log_callback(f"✅ 发送命令到 PDU {pdu_id}: {msg}")
        return True, ""
    else:
        error_msg = f"Send failed, code: {result[0]}"
        if log_callback:
            log_callback(f"❌ {error_msg}")
        return False, error_msg


def pdu_power_off(pdu_id, log_callback=None):
    """发送 PDU 断电命令"""
    _ensure_subscribed(pdu_id, log_callback)
    return _publish(pdu_id, {'type': 'event', 'key': 0}, log_callback)


def pdu_power_on(pdu_id, log_callback=None):
    """发送 PDU 上电命令"""
    _ensure_subscribed(pdu_id, log_callback)
    return _publish(pdu_id, {'type': 'event', 'key': 1}, log_callback)


def pdu_power_cycle(pdu_id, delay=5, log_callback=None):
    """
    PDU 断电后延时恢复(先断电,等待 delay 秒,再上电)
    """
    if log_callback:
        log_callback(f"执行 PDU 掉电循环 (ID: {pdu_id}, 断电等待 {delay} 秒)")
   
    # 1. 断电
    success, msg = pdu_power_off(pdu_id, log_callback)
    if not success:
        return False, msg
   
    # 2. 等待
    if log_callback:
        log_callback(f"等待 {delay} 秒...")
    time.sleep(delay)
   
    # 3. 上电
    success, msg = pdu_power_on(pdu_id, log_callback)
    if not success:
        return False, msg
   
    return True, ""
http://www.jsqmd.com/news/858338/

相关文章:

  • 告别uglifyjs!在Vue CLI项目里优雅配置terser,实现按需移除console.log
  • 别再用错按钮和开关了!WinCC flexible 2008里控制PLC输出的正确姿势(附SMART 700 IE实操)
  • 智能矩阵运营系统的流量博弈论:当1000个账号争夺有限流量时,最优调度策略是什么?
  • 为Claude Code配置Taotoken以解决密钥被封与额度不足问题
  • 热激活延迟荧光(TADF)
  • 盐城金条回收银条回收铂金项链回收克拉钻石回收婚嫁首饰回收高价多少钱一克同城价格查询上门上门估价闲置变现转让靠谱权威排行榜 - 检测回收中心
  • 2026 河池专业防水公司TOP5推荐:卫生间、外墙、楼顶、地下室渗漏专业公司推荐(2026年5月河池最新深度调研方案) - 防水百科
  • 终极指南:使用UndertaleModTool轻松修改Undertale游戏文件
  • 解锁IDM无限试用期:开源激活脚本的完整使用指南
  • 5.20上课笔记
  • CUK电路仿真结果
  • 抖音下载神器终极指南:免费批量下载视频与音乐的完整教程
  • 终极指南:5分钟掌握Poppins免费开源多语言几何字体
  • Adobe-GenP:5分钟免费解锁Adobe全家桶的终极指南
  • STM32F103RC五路循迹小车避坑指南:从GPIO配置到PWM调速的完整流程
  • 盐城千足金回收银项链回收铂金首饰回收裸钻回收闲置首饰回收高价多少钱一克同城价格查询上门上门估价闲置变现转让靠谱权威排行榜 - 检测回收中心
  • 天水金首饰回收投资银条回收铂金手镯回收30分钻石回收二手黄金回收高价多少钱一克同城价格查询上门上门估价闲置变现转让靠谱权威排行榜 - 检测回收中心
  • CookieCloud终极指南:如何实现跨设备浏览器Cookie安全同步
  • 如何高效使用Wayback Machine浏览器扩展:实用网页存档功能完全指南
  • Rust 核心理论: 高并发与异步(三)
  • 全自动量化交易工具对比:从条件单到无干预执行的三种选择
  • 场景适配论__数字孪生IOC建设中渲染技术与智能体能力的协同逻辑
  • A2A火了:Google刚出的Agent间通信协议,到底解决了什么问题
  • 天水千足金回收银项链回收铂金首饰回收裸钻回收闲置首饰回收高价多少钱一克同城价格查询上门上门估价闲置变现转让靠谱权威排行榜 - 检测回收中心
  • 扬州黄金吊坠回收同城白银回收同城铂金回收钻石首饰回收本地贵金属回收本地排名正规门店专业推荐哪家靠谱二手哪家强 - 检测回收中心
  • 3分钟掌握TripoSR:从单图到3D模型的开源革命
  • PPTist终极指南:免费在线PPT制作工具完整教程
  • 基于 RPA 自动化技术的私域机器人助手构建指南
  • AndroidScreenShare:如何免费实现跨设备实时屏幕共享与音频传输?
  • 微信批量消息发送终极指南:WeChat-mass-msg完整使用教程