当前位置: 首页 > news >正文

第5篇_PUBLISH不是收到就转发_Broker怎么处理QoS_PacketId和多客户端fanout

PUBLISH 看起来最简单:

收到消息,找到订阅者,转发出去。

但 Broker 真正掉线、重复、乱序、延迟,很多都藏在这里。

先给结论:

Broker 不能直接把发布者的 PacketId 原封不动转发给订阅者。
PacketId 是一条客户端连接内的事务编号,Broker fanout 时必须为每个目标连接重新分配。

当前讨论的 fanout 仍然是 PLC 侧轻量 Broker 的小规模路由,不追求通用 Broker 的海量订阅索引和跨节点分发。


一、一条 PUBLISH 进入 Broker 后发生了什么

注意最后一行:

发布者的PUBACK packetId=10是对发布者连接的确认。
订阅者收到的 PUBLISH 会使用订阅者连接自己的 PacketId。


二、PacketId 的作用域

假设发布者 A 发来:

PUBLISH PacketId = 10

Broker 要转发给 B 和 C:

连接PacketId 作用
A -> BrokerA 这条连接的入站事务
Broker -> BB 这条连接的出站事务
Broker -> CC 这条连接的出站事务

这三者不能混用。

如果 Broker 直接把10转给 B 和 C,后果可能是:

  1. 和 B/C 当前未完成事务冲突。
  2. 重发时无法判断 ACK 属于哪条投递。
  3. QoS2 四步握手状态错乱。
  4. 客户端认为收到重复或非法报文。

三、QoS0 / QoS1 / QoS2 在 Broker 侧的差异

QoS发布者到 BrokerBroker 到订阅者事务要求
QoS0收到即处理直接投递不需要 ACK
QoS1收到后回 PUBACK目标连接等待 PUBACK至少一次,可能重发
QoS2PUBREC / PUBREL / PUBCOMP目标连接独立 QoS2 闭环恰好一次语义,状态最多

Broker 里必须拆成两套事务:

事务方向作用
RxInflight处理发布者发来的 QoS1/QoS2
TxInflight处理 Broker 转发给订阅者的 QoS1/QoS2

这不是为了显得复杂,而是 PacketId 作用域决定的。


四、fanout 不能让慢客户端拖垮其他客户端

一个 PUBLISH 可能命中多个订阅者。

如果某个客户端很慢,Broker 不能阻塞整个路由链。

当前设计是:

每个客户端有自己的投递队列。

队列作用
协议响应队列CONNACK、SUBACK、PUBACK、PINGRESP 等,优先发送
普通投递队列PUBLISH fanout 后进入目标客户端
QoS 事务表记录等待 ACK 的出站消息

协议响应必须优先,因为客户端协议状态机在等它。


五、QoS2 不可怕,可怕的是状态没闭环

QoS2 的链路是:

Broker 转发给订阅者时,又是另一条 QoS2 链:

这两条链不能混在一起。

否则测试 QoS2 时就会出现:

  • 客户端显示发送成功,但订阅端不收。
  • Broker 重复发同一条。
  • 客户端过一会儿断开。
  • PacketId 表看起来“莫名其妙占满”。

六、ST 代码入口

代码入口作用
FB_MqttBroker.M_HandlePublish处理连接槽位输出的发布消息
FB_MqttBroker.M_RoutePublishNow根据路由表 fanout 到订阅者
FB_MqttBrokerConnection.M_EnqueueDelivery写入目标连接投递队列
FB_MqttBrokerTxScheduler.M_RegisterPublish注册出站 QoS 事务
FB_MqttBrokerConnection.M_AssignPacketId为目标连接分配 PacketId
FB_MqttBrokerCodec.M_BuildPublish构造发给订阅者的 PUBLISH

关键逻辑可以压缩成:

// 发布者 PacketId 只用于发布者连接的 ACK。 uiSourcePacketId := stPublishIn.uiPacketId; // fanout 到订阅者时,必须按目标连接重新分配 PacketId。 uiTargetPacketId := aConnections[uiTargetSlot].M_AssignPacketId(); aConnections[uiTargetSlot].M_EnqueueDelivery( sTopicName := stPublishIn.sTopicName, pPayload := stPublishIn.pPayload, uiPayloadLen := stPublishIn.uiPayloadLen, eQoS := eTargetQoS, uiPacketId := uiTargetPacketId);

七、现场排障表

现象优先检查可能原因
QoS1 重复消息很多TxInflight 是否正确收 PUBACK重发事务没闭环
QoS2 一发就断PUBREC/PUBREL/PUBCOMP 状态入站和出站事务混用
发布者成功但订阅者收不到投递队列水位路由命中但目标队列满
一个慢客户端影响全部各 Slot 队列是否独立fanout 同步阻塞
PacketId 冲突目标连接 PacketId 分配直接沿用发布者 PacketId

模型边界与验证路径

PUBLISH fanout 本质上是状态作用域问题。

表面上看,Broker 只是把一条消息转给多个订阅者。往上看一层,它其实要维护三类边界:发布者连接边界、订阅者连接边界、QoS 事务边界。

结论可信度依据验证路径
PacketId 是连接作用域highMQTT QoS 事务语义用两个订阅者同时接收 QoS1,观察各自 PacketId
入站 QoS 和出站 QoS 事务要分开highBroker 既是接收方又是发送方分别观察RxInflightTxInflight
慢客户端应只影响自身队列medium当前槽位队列隔离模型故意让一个客户端低速接收,观察另一个客户端延迟

这里最容易写错的不是报文格式,而是把“消息 ID”想成全局概念。MQTT 的 PacketId 不是全局消息编号,它只在当前客户端连接里有意义。这个模型边界没想清楚,QoS1 / QoS2 后面基本都会出问题。


八、这一篇你最该记住的 6 句话

  1. PUBLISH 不是收到就转发,Broker 还要做路由、队列和 QoS 事务。
  2. PacketId 是连接作用域,不是全局消息 ID。
  3. Broker 转发给订阅者时,必须按目标连接重新分配 PacketId。
  4. QoS1 / QoS2 要拆入站事务和出站事务。
  5. 协议响应队列应该优先于普通 PUBLISH 投递队列。
  6. 慢客户端必须被隔离在自己的队列里,不能拖垮整个 Broker。

下篇预告

下一篇讲 Broker 不能只会转发 PUBLISH 的三个能力:

Retain、Will、KeepAlive。

这三个功能看起来像补充项,但工业现场非常常用。


完整 ST 代码

本篇涉及的完整代码入口:

  • MqttBroker/Device/Application/POUs/FBs/FB_MqttBroker.M_HandlePublish.st
  • MqttBroker/Device/Application/POUs/FBs/FB_MqttBroker.M_RoutePublishNow.st
  • MqttBroker/Device/Application/POUs/FBs/FB_MqttBrokerConnection.M_EnqueueDelivery.st
  • MqttBroker/Device/Application/POUs/FBs/FB_MqttBrokerTxScheduler.M_RegisterPublish.st
  • MqttBroker/Device/Application/POUs/FBs/FB_MqttBrokerConnection.M_AssignPacketId.st

系列导航

  • 系列定位:第 5 篇
  • 上一篇:SUBSCRIBE 不是存个字符串:Broker 怎么维护订阅表、通配符和多客户端路由
  • 下一篇:Retain、Will、KeepAlive:工业现场为什么不能只会转发 PUBLISH

项目与资料

  • 开源项目名称:MqttBroker
  • 前置系列:MqttClient_V2_0
  • 核心关键词:PUBLISH、fanout、QoS、PacketId、投递队列

适合谁收藏

  • 正在实现 Broker 消息转发的人
  • QoS1 / QoS2 经常重复、超时、断线的人
  • 想理解 PacketId 作用域的人
  • 想把 Broker 写成可维护状态机的人
http://www.jsqmd.com/news/900833/

相关文章:

  • 陕西旅游酒店 GEO 服务市场深度调查:AI 搜索优化格局与真实服务真相
  • 你还在手动写脚本,别人已经用智能体跑完回归测试了
  • Cartographer无里程计建图实战:室内外效果对比与参数调优心得
  • AI智能体培训后可以做什么工作?这7个方向值得关注
  • GMS1.4 YYC编译的游戏,如何安全地修改游戏内文字?(附UndertaleModTool实战)
  • 2026世界杯洛杉矶SoFi体育场:50亿造价的天价足球圣殿
  • 《超简单:用 Python 让 Excel 飞起来》读书笔记:1.2.1 安装 Python 官方编程环境 IDLE
  • 2026年广州空调安装/清洗/移机/加雪种/拆装/维修/深度清洗/中央空调清洗/杀菌消毒/拆洗推荐:专业技术与省心服务口碑之选 - 品牌企业推荐师(官方)
  • 【多无人机集群控制11】鲁棒编队跟踪仿真,滑模与PID对比,MATLAB例程
  • 第6篇_Retain_Will_KeepAlive_工业现场为什么不能只会转发PUBLISH
  • 别再只用disp了!Matlab里fprintf格式化输出实战,从%f到%f\n的保姆级指南
  • 从Arduino到ESP32:搞定3.3V/5V混接通信,这几种电平转换电路你试过吗?
  • 把 ZipVoice 从 onnxruntime 移植到 MNN —— 7 个让人怀疑人生的细节
  • 别只改my.cnf了!深入解读MariaDB密码策略与general_log审计的取舍与最佳实践
  • 别再只盯着RGB了!搞懂CIE 1931 XYZ和Yxy,你的图像处理才算入门
  • ProxySQL选型实战:从手写读写分离到中间件的踩坑全记录
  • Grok生成的pdf怎么导出 “AI导出鸭”不会搞算我输!
  • ChatGPT饮食建议生成器上线倒计时:最后48小时必须完成的3项合规改造(GDPR+《互联网诊疗监管办法》双达标清单)
  • Louvain算法实战:用NetworkX和Python分析你的社交网络好友圈子
  • Win11Debloat:3分钟完成Windows 11终极优化与深度清理的免费神器
  • 到处听见韬τ定律
  • Python 入门:初识函数
  • 告别CH340!用ESP32-S3的USB CDC功能实现零成本串口打印与调试(ESP-IDF 4.4环境)
  • 从微信抢红包到数据备份:5个真实Python小项目带你玩转schedule定时任务库
  • 人工智能-现代方法(四)
  • 【ChatGPT】电子束光刻机EBL 深度拆解、爆炸图10张、信息图10张、下位机C++、上位机C#、PLC代码框架
  • 信号处理/通信算法必看:用Wirtinger导数搞定复数域梯度下降(附Python代码)
  • 从TI杯B题到毕业设计:手把手教你复刻一个自动泊车小车(附STM32/OpenMV代码)
  • 安全攻防 - 04 GMSSL 工程介绍
  • 从‘退化因子’到‘健康指标’:给你的机器人状态估计做个‘体检’