当前位置: 首页 > news >正文

为什么有了 RocketMQ 事务消息,我们还要自研本地消息表方案?

为什么有了 RocketMQ 事务消息,我们还要自研本地消息表方案?

前言

最近在 Code Review 一个项目时,发现团队自研了一套完整的事务消息框架 4 张数据库表、定时补偿任务、分布式锁、衰减重试机制…一应俱全。

但项目依赖里明明引入了rocketmq-spring-boot-starter:2.3.1,RocketMQ 原生支持事务消息,为什么不直接用?

答案很有意思:RocketMQ 只被用作简单的实时消息推送通道,事务消息功能完全没用。

这引发了一个值得深思的问题:RocketMQ 事务消息已经很成熟了,为什么很多团队仍然选择自研本地消息表方案?


一、先搞清楚:我们要解决的是什么问题?

在分布式系统中,经常遇到这样的场景:

用户支付成功后,需要修改订单状态(写 MySQL),同时发消息给积分系统加积分(发 MQ)。

核心问题:数据库操作和 MQ 投递是两个独立的操作,无法放在同一个原子事务中。

  • 强一致性:需要 2PC(XA 协议)或 Raft,性能极差,不适合高并发场景
  • 最终一致性:BASE 理论,允许中间状态不一致,但保证最终一致

RocketMQ 事务消息和本地消息表,本质上都是实现最终一致性的方案。


二、RocketMQ 事务消息的理想与现实

理想的实现方式

RocketMQ 事务消息的设计非常优雅:

半消息 → 执行本地事务 → Commit/Rollback → 回查兜底

很多人会这样实现:

@RocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){// 执行本地事务orderService.updateOrderStatus((OrderParam)arg);returnRocketMQLocalTransactionState.COMMIT;}@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringorderId=msg.getHeaders().get("orderId").toString();// 直接查业务表Orderorder=orderMapper.selectById(orderId);returnorder!=null?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;}}

看起来很完美,但生产环境会遇到三个关键问题。


问题一:回查时的竞态条件

场景复现:

  1. 发半消息:成功
  2. 执行本地事务:数据库死锁,事务卡了 5 秒还没提交
  3. MQ 回查:Broker 发现消息未确认,发起回查
  4. 误判:查订单表 → 查不到(事务还没 Commit)
  5. 结果:返回 ROLLBACK,MQ 撤销消息
  6. 悲剧:下一秒本地事务提交成功,订单入库了,但消息没了

核心问题:回查时无法区分"事务还在执行中"和"事务执行失败"。


问题二:业务表旧数据的干扰

更隐蔽的问题是:如果业务表里已经有旧数据(比如同一个订单 ID 之前被取消过),回查时会误判。

时间线: T1: 用户下单 ORDER_123,事务成功 T2: 用户取消订单,业务表状态改为"已取消" T3: 用户重新下单 ORDER_123(同一订单ID) T4: 本次事务失败(回滚) T5: MQ 回查 → 查业务表 → 查到 ORDER_123(旧数据)→ COMMIT(误判!)

核心问题:业务表存储的是数据的最终状态,可能被多次事务修改,回查时无法判断"这是本次事务写入的,还是之前遗留的旧数据"。


问题三:消费端的幂等陷阱

消费端的幂等检查和业务执行必须在同一个事务里,否则会出现部分成功的情况:

@TransactionalpublicvoidonMessage(Stringmessage){OrderMsgmsg=JSON.parseObject(message,OrderMsg.class);// 错误:幂等检查和业务执行分离if(idempotentMapper.exists(msg.getOrderId())){return;// 已消费}// 执行业务inventoryService.addPoints(msg.getUserId(),msg.getPoints());// 如果这里失败,业务执行了,但幂等记录没插进去idempotentMapper.insert(msg.getOrderId());}

三、重要澄清:不一定需要事务表

上面的分析可能会让人误解:“RocketMQ 事务消息必须要有事务表才能可靠工作。”

实际上不是这样。要区分三个层次:

方案1:RocketMQ 官方推荐事务消息(最常见)

很多项目根本不会单独建事务表。

@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringorderId=msg.getHeaders().get("orderId").toString();Orderorder=orderMapper.selectById(orderId);if(order!=null){returnCOMMIT;}returnROLLBACK;}

直接查业务表。

例如:

订单创建成功 → 订单表有数据 → COMMIT 订单创建失败 → 订单表没数据 → ROLLBACK

这种写法非常普遍。


为什么很多项目敢这么干?

因为业务主键通常不会复用。

例如:

订单ID: 1001 1002 1003 ...

永远唯一。

不会出现文章举的:

ORDER_123 删除 重新创建 还是ORDER_123

这种情况。

所以:

查订单表 = 查事务状态

成立。


什么时候需要事务表?

当你发现:

业务表不能准确反映事务状态

时。

例如:

场景1:业务记录可能被删除
创建订单 ↓ 消息未确认 ↓ 订单被删除 ↓ Broker回查 ↓ 查不到订单

这时就有问题。


场景2:一个业务对象会被反复修改

例如:

用户积分

用户表:

id=1 score=100

你无法通过:

select*fromuserwhereid=1

判断:

本次加10分事务到底成功没有

此时必须有:

t_transaction_log

场景3:审计要求极高

金融系统:

支付 退款 结算

通常都会留:

transaction_log

用于追踪。


大厂实际情况

一般分两类:

简单业务
订单 商品 用户注册

直接查业务表。

不建事务表。


核心交易链路
支付 资金 库存扣减

会建:

transaction_log

或者

outbox_message

表。


你可以这么理解

RocketMQ 事务消息最核心要求只有一个:

Broker回查时必须能知道本地事务最终状态

至于状态存哪里:

可以是

业务表

也可以是

事务表

RocketMQ 根本不关心。

所以:

事务消息 ≠ 必须有事务表

而是:

事务消息 = 必须有一个可靠的事务状态来源

这个来源可以是业务表,也可以是事务日志表。

很多互联网业务(订单、注册、发帖)直接查业务表就够了;只有复杂交易系统才会额外维护事务表。


四、RocketMQ 事务消息的修复方案(复杂场景)

如果你遇到了上面提到的复杂场景(业务记录可删除、反复修改、审计要求高),就需要引入事务日志表时间窗口判断

修复后的回查逻辑

@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringtxId=msg.getHeaders().get("txId").toString();// 本次事务唯一ID// 1. 查事务日志表(用事务ID,不是业务ID)intcount=logMapper.countByTxId(txId);if(count>0){// 这条日志是本次事务写入的,确认本次事务成功returnRocketMQLocalTransactionState.COMMIT;}// 2. 查不到时,需要时间窗口判断// 可能是事务还在执行中(卡住了),不能直接 RollbacklongbornTime=Long.parseLong(msg.getProperty("BORN_TIMESTAMP"));if(System.currentTimeMillis()-bornTime<5*60*1000){// 返回 UNKNOWN,告诉 MQ:我不确定,你过一会再来问returnRocketMQLocalTransactionState.UNKNOWN;}// 超过时间窗口,才判定为真正失败returnRocketMQLocalTransactionState.ROLLBACK;}

事务日志表的设计

CREATETABLEt_transaction_log(tx_idVARCHAR(64)PRIMARYKEY,-- 本次事务唯一IDbiz_idVARCHAR(64),-- 业务ID(订单号等)statusVARCHAR(16),-- SUCCESS / FAILcreated_atTIMESTAMP,INDEXidx_biz_id(biz_id));

关键点:

  • tx_id是本次事务的唯一标识,每次事务都不同
  • biz_id是业务标识,可能重复(同一订单多次操作)
  • 回查时用tx_id查询,确保查到的是"本次事务"的记录

为什么事务日志表能解决问题?

查询方式查询条件查到什么代表什么
查业务表biz_id(业务ID)业务数据的最终状态“业务表里有数据”,但可能是旧数据
查事务日志表tx_id(事务ID)本次事务的执行记录“本次事务成功写入日志”,确定性

本质区别

  • 业务表:存储业务数据的最终状态,可能被多次事务修改
  • 事务日志表:存储每次事务的执行记录,每次事务一条,互不干扰

重要补充:事务日志表并不能解决所有问题

关键点:不管是查事务日志表还是查业务表,"查不到"时都面临同样的困境。

场景查业务表查事务日志表
本次事务成功查到 → COMMIT查到 → COMMIT
本次事务失败,但业务表有旧数据查到 → COMMIT(误判!)没查到 → 需时间窗口
本次事务还在执行中(卡住)没查到 → 需时间窗口没查到 → 需时间窗口
本次事务失败(回滚)没查到 → 需时间窗口没查到 → 需时间窗口

事务日志表解决的问题

  • 消除"查到时的误判"——避免业务表旧数据的干扰
  • 确保"查到 = 本次事务成功"成为确定性判断

事务日志表无法解决的问题

  • “查不到时怎么判断”——仍然需要时间窗口机制

结论

  • 事务日志表:解决"查到时可能误判"的问题(应对重复下单用同一ID等情况)
  • 时间窗口判断:解决"查不到时的歧义"问题(区分执行中 vs 失败)

两者配合才能彻底解决竞态条件问题。单靠事务日志表或单靠时间窗口都不够。


关键结论:修复后的复杂度并不比本地消息表低

看到上面的方案,你会发现一个关键问题:

为了解决 RocketMQ 事务消息的竞态条件问题,需要引入:

  1. 事务日志表:记录每次事务的执行状态
  2. 时间窗口判断逻辑:区分"执行中"和"失败"
  3. 回查代码:查表 + 时间窗口判断
  4. Service 层改造:业务数据和事务日志在同一事务中写入

这个复杂度……并不比本地消息表方案低。

方案需要的组件复杂度
RocketMQ 事务消息(裸奔版)回查代码(查业务表)低(但不可靠)
RocketMQ 事务消息(修复版)事务日志表 + 时间窗口判断 + 回查逻辑 + Service改造
本地消息表消息表 + 定时任务 + 幂等控制

本质上,修复版的 RocketMQ 事务消息和本地消息表方案复杂度相当。

两者都需要:

  • 额外的数据库表(事务日志表 / 消息表)
  • 和业务数据在同一事务中写入
  • 额外的机制确保最终投递(回查 / 定时任务轮询)

唯一的区别

  • RocketMQ:实时投递(事务提交后立即 Commit 消息)
  • 本地消息表:轮询投递(定时任务扫描后投递)

所以,如果 RocketMQ 事务消息需要这么多额外代码才能可靠工作,那选择本地消息表也未尝不可——至少它天生就包含了这些机制。


修复后的完整代码

@Component@RocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{@AutowiredprivateOrderServiceorderService;@AutowiredprivateTransactionLogMapperlogMapper;/** * 执行本地事务 * 核心原则:业务数据和事务日志,必须在同一个 @Transactional 下提交 */@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{// 开启本地事务(包含:写业务表 + 写日志表)orderService.createOrderWithLog((OrderParam)arg);returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){returnRocketMQLocalTransactionState.ROLLBACK;}}/** * 回查接口 * 解决竞态条件和旧数据干扰问题 */@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringtxId=msg.getHeaders().get("txId").toString();// 1. 查事务日志表intcount=logMapper.countByTxId(txId);if(count>0){returnRocketMQLocalTransactionState.COMMIT;}// 2. 时间窗口判断longbornTime=Long.parseLong(msg.getProperty("BORN_TIMESTAMP"));if(System.currentTimeMillis()-bornTime<5*60*1000){returnRocketMQLocalTransactionState.UNKNOWN;}returnRocketMQLocalTransactionState.ROLLBACK;}}// Service 层@TransactionalpublicvoidcreateOrderWithLog(OrderParamparam){StringtxId=param.getTxId();// 写业务表orderMapper.insert(order);// 写事务日志表(同一事务)logMapper.insert(newTransactionLog(txId:txId,bizId:order.getId(),status:"SUCCESS"));// 事务提交:要么两个都成功,要么两个都失败}

五、看到这里你会发现一个问题

为了解决 RocketMQ 事务消息的竞态问题,我们引入了:

  • 事务日志表:记录每次事务的执行状态
  • 时间窗口判断:区分"执行中"和"失败"

这个复杂度…并不比本地消息表低


六、本地消息表:经典但可靠的方案

核心原理

本地消息表的思路更直接:把"发消息"这个动作本身变成数据库操作,和业务数据放在同一个事务中。

@Transactionalpublicvoidregister(Useruser){// 1. 写入用户表(业务操作)userMapper.insert(user);// 2. 写入本地消息表(和业务数据在同一事务中)msgMapper.insert(newMsg(exchange:"user.welcome",routingKey:"user."+user.getId(),body:JSON.toJSONString(user),status:"INIT"));// 事务提交后,两个操作都已持久化}

然后有一个后台任务轮询:

@Scheduled(fixedDelay=10000)// 每 10 秒扫描一次publicvoidretryFailedMessages(){List<Msg>msgs=msgMapper.selectByStatus("INIT");for(Msgmsg:msgs){try{mqProducer.send(msg.getExchange(),msg.getRoutingKey(),msg.getBody());msg.setStatus("SUCCESS");}catch(Exceptione){msg.setStatus("FAIL");msg.setFailCount(msg.getFailCount()+1);}msgMapper.update(msg);}}

为什么能解决问题?

场景结果一致性
步骤 1 成功,步骤 2 失败整个事务回滚,用户和消息都没写入一致
步骤 1、2 都成功,但事务提交前宕机事务回滚,数据都没写入一致
事务成功,MQ 投递失败后台任务不断重试,直到成功最终一致

项目中的实际实现

我分析的项目中,自研框架的核心实现:

// DefaultMsgSender.javaif(hasTransaction){// 1. 消息先写入 t_msg 表(与业务数据在同一事务中)msgService.batchInsert(...);// 2. 注册事务同步回调 → 事务提交后才投递TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronization(){@OverridepublicvoidafterCompletion(intstatus){if(status==STATUS_COMMITTED){// 事务提交后异步投递消息mqExecutor.execute(()->deliverMsg(msgPOList));}}});}

关键设计:

  • 消息和业务数据在同一事务中 → 原子性保证
  • SpringTransactionSynchronization→ 事务提交后才投递
  • 补偿 Job 扫表重试 → 兜底机制

七、两种方案的对比

维度RocketMQ 事务消息(裸奔版)RocketMQ 事务消息(修复版)本地消息表
实现原理半消息 → 本地事务 → Commit/Rollback → 回查半消息 + 事务日志表 + 时间窗口回查业务数据 + 消息记录同事务 → 定时任务轮询投递
竞态条件无法处理通过 UNKNOWN 状态解决不存在(消息先入库)
实时性低(取决于轮询频率)
通用性仅 RocketMQ仅 RocketMQ任何 MQ(RabbitMQ、Kafka 等)
复杂度中(需要事务日志表)高(需要定时任务、幂等控制)
性能开销中(多写一条日志)高(定时轮询数据库)
可控性高(自定义重试策略)
可观测性高(状态都在数据库)

八、本质分析

看到对比表,你会发现一个有趣的结论:

RocketMQ 事务消息的"修复版",本质上就是把事务日志表换成了本地消息表。

两者都是:

  • 在本地事务中写入一条记录(业务数据 + 事务日志/消息记录)
  • 通过数据库事务保证原子性
  • 通过额外的机制确保最终投递(回查 vs 轮询)

唯一的区别:

  • RocketMQ:实时投递(事务提交后立即 Commit 消息)
  • 本地消息表:轮询投递(定时任务扫描后投递)

九、什么时候选择本地消息表?

1. 已有其他 MQ(RabbitMQ、Kafka 等)

RabbitMQ原生不支持事务消息,只能通过本地消息表实现等价功能。

RocketMQ 是后期引入的?此时自研框架已经稳定运行,没必要迁移。


2. 需要精细控制重试策略

项目需要处理多个业务模块的异步事件:点赞、收藏、签到、红包、文章审核…每个场景对重试策略的要求不同。

本地消息表支持:

  • 衰减重试:1s、5s、30s、1min、5min、10min…
  • 自定义幂等键:每个消费者可以自定义
  • 手动补偿:直接操作数据库,灵活干预

3. 需要高度可观测性

所有消息状态都在数据库里:

  • 运维人员可以直接查询消息投递状态
  • 监控系统可以基于数据库表做告警
  • 出问题时可以手动修复或重试

十、消费端的原子幂等(通用)

无论用哪种方案,消费端都必须实现原子幂等

错误做法

@TransactionalpublicvoidonMessage(Stringmessage){OrderMsgmsg=JSON.parseObject(message,OrderMsg.class);// 幂等检查和业务执行分离if(idempotentMapper.exists(msg.getOrderId())){return;}inventoryService.addPoints(msg.getUserId(),msg.getPoints());// 失败时,业务执行了,但幂等记录没插进去idempotentMapper.insert(msg.getOrderId());}

正确做法

@Transactional(rollbackFor=Exception.class)publicvoidonMessage(Stringmessage){OrderMsgmsg=JSON.parseObject(message,OrderMsg.class);// 利用数据库唯一索引做原子幂等挡板try{idempotentMapper.insertBarrier(msg.getOrderId());}catch(DuplicateKeyExceptione){// 插入报错,说明消费过了,直接返回return;}// 执行业务// 如果报错,事务会回滚,幂等记录也撤销,下次重试能进来inventoryService.addPoints(msg.getUserId(),msg.getPoints());}

十一、生产级兜底方案

无论选择哪种方案,都必须承认系统的不确定性,建立兜底机制。

1. 日志表/消息表爆炸了怎么办?

每天几百万条记录,一个月表就废了。

  • 错误:DELETE FROM log WHERE time < ...(间隙锁阻塞写入)
  • 正确:MySQL 分区表,按天分区,清理时直接DROP PARTITION

2. 死信队列怎么处理?

  • 告警:DLQ 进消息,立刻推送到开发群
  • 重投平台:一键"原样重投"或"修改参数后重投"
  • 最终底线:人工数据库修复

3. 消息积压怎么办?

  • 监控告警:设置积压阈值
  • 临时扩容:增加消费者实例
  • 降级策略:非核心业务暂停投递

十二、总结

RocketMQ 事务消息 vs 本地消息表

选择 RocketMQ 事务消息选择本地消息表
项目从零开始,没有历史包袱已有 RabbitMQ 等其他 MQ
团队熟悉 RocketMQ 生态需要精细控制重试策略
追求实时性需要高度可观测性
希望减少代码量需要跨 MQ 通用方案

技术决策的关键点

  1. 定性:明确是最终一致性,不是强一致性
  2. 严谨:RocketMQ 需要事务日志表 + 时间窗口;本地消息表本身已包含这些机制
  3. 落地:消费端实现原子幂等
  4. 兜底:建立死信告警 + 批量重投机制

结论

没有最好的架构,只有最适合场景的方案。

RocketMQ 事务消息和本地消息表,本质上是两种不同的实现路径,最终效果等价–都是确保"业务操作和消息投递要么都成功,要么都不发生"。

如果 RocketMQ 事务消息需要引入事务日志表才能可靠工作,那复杂度已经和本地消息表相当了。这时候,选择的关键在于:

  • 你的项目用什么 MQ?
  • 你需要什么样的可控性和可观测性?
  • 你的团队有什么技术积累?

选择适合自己场景的方案,比追求"最新最酷"的技术更重要。

http://www.jsqmd.com/news/1049040/

相关文章:

  • 2026年6月公告:宝玑中国区官方维修门店地址优化升级,最新服务热线全新启用 - 亨得利中国服务中心
  • Percona XtraBackup实战:从零构建MySQL生产级备份恢复策略
  • 2026 年大庆市厨卫屋顶防水修缮三家对比测评 吉修匠 99.8 分稳居榜首 - 吉修匠
  • NVMe存储优化:深入解析PCIe电源管理机制与实战调优
  • 从旋转不变到精准定位:深入解析ESPRIT算法的原理与实现
  • 2026东莞黄江装修公司哪家好?本地业主实测推荐 - liuminghui
  • 开户许可证丢了登报怎么线上办理?全流程指南 - 速递信息
  • 微信怎么发起活动报名?云众评选3步搞定 - 微信投票小程序
  • AudioSet强标签发布:从“声音版ImageNet”到“帧级标注”的音频研究新纪元
  • VisualGDB 6.0:解锁Visual Studio跨平台嵌入式与Linux开发新体验
  • 深圳隔音窗品牌哪家靠谱?|静华轩隔音窗|适配住宅、高校、星级酒店、专业录音棚、商务会议室、直播室、家庭KTV、企业办公、全品类居家户型全场景降噪 - 维小达科技
  • 2026 南京主流考研辅导机构综合实力横向对比测评分析 - 小艾信息发布
  • 2026年度留学生论文辅导机构综合实力测评榜单——论文辅导哪家好且靠谱? - 艾德思Editsprings
  • 窗口尺寸太固执?用WindowResizer轻松掌控任意程序窗口
  • 特种行业许可证丢失怎么登报?2026最新办理流程 - 速递信息
  • 2026 年吉林市厨卫屋顶防水修缮三家对比测评 吉修匠 99.8 分稳居榜首 - 吉修匠
  • OpenClaw部署实战:AI工具链落地的最后一公里
  • 实地走访记录|2026年伯爵官方维修门店地址及电话最新统计 - 亨得利中国服务中心
  • 2026 去水印小程序 TOP4 实测横评:第一名安清去水印,轻量无广告首选 - 时时资讯
  • 企业境外投资证书丢失怎么登报?2026最新办理流程 - 速递信息
  • 2026 国内论文辅导机构行业盘点:5 家实测机构与甄选攻略 - 艾德思Editsprings
  • 2026 630~650分段人工智能AI专业985高校适配指南:中南大学人工智能领域专业实力解析 - 温茶叙旧
  • 暗黑破坏神2存档编辑器:5分钟免费修改角色装备与属性
  • 2026 清水印工具 TOP4 横评,小程序零广告首选 - 时时资讯
  • 从Keil仿真到逻辑分析仪:嵌入式调试的双重验证实战
  • 深圳添价收|万国IWC手表专业回收门店全指南 - 薛定谔的梨花猫
  • 2026 年长春厨卫屋顶防水修缮三家对比测评 吉修匠 99.8 分稳居榜首 - 吉修匠
  • 2026年烟台青少年信奥编程培训权威推荐 - 谁都没有我好看
  • 食品经营许可证丢了登报怎么线上办理?正规办理渠道与流程 - 速递信息
  • 微信公众号怎么发起投票?详细制作教程送上,西瓜评选+云帆投票+腾讯投票,全场景对比测评 - 投票小程序