RocketMQ实战:用MySQL唯一索引和Redis锁搞定消息重复消费(附完整代码)
RocketMQ消息幂等实战:MySQL与Redis双保险方案深度解析
消息中间件是现代分布式系统的核心组件,而消息重复消费问题就像悬在开发者头上的达摩克利斯剑——随时可能引发数据混乱。上周我负责的电商订单系统就遭遇了这样的危机:促销期间由于网络抖动,同一笔订单被处理了三次,险些造成库存超卖。本文将分享两种经过生产环境验证的解决方案,包含可直接复用的代码模板和避坑指南。
1. 重复消费的本质与业务影响
消息队列的"至少一次"投递机制是把双刃剑。在订单支付、库存扣减等场景下,重复消费可能导致:
- 用户账户被多次扣款
- 优惠券被重复核销
- 统计报表数据失真
最近对某金融系统的压力测试显示,在网络不稳定的情况下,重复消息率可达0.3%。这意味着日均百万级消息量的系统,每天可能产生3000条重复消息。
典型重复场景分析:
- 生产者重试:ACK丢失时(如Kafka的Producer重试)
- 消费者重启:offset未及时提交(常见于RabbitMQ)
- 负载均衡:Rebalance过程中的分区再分配(RocketMQ的痛点)
2. MySQL唯一索引方案:数据强一致之选
当业务对数据准确性要求极高时,基于数据库唯一约束的方案是最可靠的选择。我们在金融清结算系统中就采用了这种方案,核心在于利用数据库的原子性保证幂等。
2.1 完整实现方案
@Component @RocketMQMessageListener( topic = "paymentTopic", consumerGroup = "payment-group" ) public class PaymentListener implements RocketMQListener<MessageExt> { @Autowired private JdbcTemplate jdbcTemplate; // 幂等表结构示例: // CREATE TABLE `msg_idempotent` ( // `biz_id` varchar(64) NOT NULL COMMENT '业务唯一ID', // `created_at` datetime DEFAULT CURRENT_TIMESTAMP, // PRIMARY KEY (`biz_id`), // UNIQUE KEY `uk_biz_id` (`biz_id`) // ) ENGINE=InnoDB; @Override @Transactional(rollbackFor = Exception.class) public void onMessage(MessageExt message) { String bizId = message.getKeys(); try { // 先尝试插入幂等记录 int affected = jdbcTemplate.update( "INSERT INTO msg_idempotent(biz_id) VALUES(?)", bizId ); // 正常处理业务逻辑 processPayment(message.getBody()); } catch (DuplicateKeyException e) { log.warn("重复消息已过滤: {}", bizId); return; // 已处理过的消息直接返回 } } }关键优化点:
- 使用业务ID而非消息ID作为唯一键(防止不同业务消息ID冲突)
- 采用独立幂等表而非业务表(避免污染业务数据)
- 合并事务处理(保证幂等检查和业务操作原子性)
2.2 性能优化实践
在高并发场景下,我们通过以下策略将MySQL方案的TPS从200提升到1500+:
- 连接池优化:
spring: datasource: hikari: maximum-pool-size: 20 connection-timeout: 3000 - 批量插入:对批量消息先做合并插入
- 二级缓存:用本地缓存记录最近处理过的消息ID(需设置合理过期时间)
3. Redis分布式锁方案:高性能场景首选
当业务吞吐量要求极高且允许短暂不一致时,Redis方案是更好的选择。我们在秒杀系统中采用该方案,QPS可达20000+。
3.1 增强版Redisson实现
@Component @RocketMQMessageListener( topic = "seckillTopic", consumerGroup = "seckill-group" ) public class SeckillListener implements RocketMQListener<MessageExt> { @Autowired private RedissonClient redisson; @Autowired private RedisTemplate<String, Object> redisTemplate; @Override public void onMessage(MessageExt message) { String orderId = message.getKeys(); RLock lock = redisson.getLock("lock:" + orderId); try { // 尝试加锁,等待100ms,锁持有30秒 if (lock.tryLock(100, 30000, TimeUnit.MILLISECONDS)) { // 检查是否已处理 if (redisTemplate.opsForValue().get("processed:" + orderId) != null) { log.info("订单{}已处理", orderId); return; } // 处理秒杀业务 handleSeckill(orderId, message.getBody()); // 标记已处理,设置1小时过期 redisTemplate.opsForValue().set( "processed:" + orderId, "1", 1, TimeUnit.HOURS ); } } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }生产环境注意事项:
- 锁续约:业务处理时间可能超过锁默认30秒有效期
- 锁释放:必须判断当前线程是否持有锁再释放
- Redis持久化:建议开启AOF持久化防止重启丢数据
3.2 多级缓存优化策略
我们通过分级缓存将Redis方案的性能再提升40%:
- 本地缓存:Guava Cache记录最近处理记录
private LoadingCache<String, Boolean> localCache = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(5, TimeUnit.MINUTES) .build(key -> false); - 布隆过滤器:用于快速判断新消息是否可能重复
- Lua脚本:原子化执行检查-处理-标记流程
4. 混合方案设计与容灾策略
在银行核心系统中,我们采用混合方案实现双保险:
graph TD A[消息到达] --> B{业务类型} B -->|支付类| C[MySQL幂等表] B -->|查询类| D[Redis幂等标记] C --> E[异常处理] D --> E E --> F[人工干预通道]异常处理最佳实践:
- 监控报警:对重复消息率设置阈值报警
- 死信队列:配置专门处理异常消息的消费者
@Component @RocketMQMessageListener( topic = "%DLQ%payment-group", consumerGroup = "payment-dlq-group" ) public class PaymentDLQListener implements RocketMQListener<String> { @Override public void onMessage(String message) { // 记录到数据库并触发报警 alertService.notifyAdmin(message); } } - 补偿机制:定期核对关键业务数据一致性
5. 方案选型决策树
根据三年来的实施经验,我总结出以下决策原则:
| 考量维度 | MySQL方案 | Redis方案 |
|---|---|---|
| 数据一致性要求 | ★★★★★ | ★★★☆☆ |
| 吞吐量需求 | ★★☆☆☆ (1000TPS以下) | ★★★★★ (万级TPS) |
| 实现复杂度 | ★★★☆☆ (需建表) | ★★☆☆☆ (配置简单) |
| 运维成本 | ★★★★☆ (依赖数据库) | ★★☆☆☆ (Redis更易扩展) |
| 异常恢复能力 | ★★★★★ (数据持久化) | ★★☆☆☆ (依赖缓存持久化) |
黄金法则:
- 资金交易类业务必须用MySQL方案
- 实时性要求高的读场景用Redis方案
- 关键业务系统建议两种方案同时实施
在最近一次大促中,这套混合方案成功拦截了超过12万条重复消息,保障了零资损。特别提醒:Redis方案一定要配合完善的监控体系,我们曾因Redis集群故障导致短暂幂等失效,后来通过增加本地缓存降级方案解决了这个问题。
