当前位置: 首页 > news >正文

消息队列顺序性保证实战

消息队列顺序性保证实战

一、消息顺序性概述

消息队列的顺序性是指消息按照发送顺序被消费的特性,在金融交易、订单处理等场景至关重要。

1.1 顺序性问题场景

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Producer │────▶│ Queue │────▶│ Consumer │ │ (生产者) │ │ (队列) │ │ (消费者) │ └─────────────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 1 │ │ │ 消费消息1 │ │ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 2 │ │ │ 消费消息2 │ │ └─────────────┘ │ ▼ 问题:消息1和消息2顺序可能错乱

1.2 顺序性破坏原因

原因说明
多分区同一主题多个分区,消息可能发送到不同分区
多消费者多个消费者并行消费,处理速度不同
重试机制消息重试可能打乱顺序
网络延迟网络波动导致消息到达顺序变化

二、Kafka顺序性保证

2.1 单分区单消费者

apiVersion: apps/v1 kind: Deployment metadata: name: kafka-consumer spec: replicas: 1 template: spec: containers: - name: consumer image: kafka-consumer:1.0.0 env: - name: KAFKA_TOPIC value: "orders" - name: KAFKA_GROUP_ID value: "order-group" - name: KAFKA_PARTITION value: "0"

2.2 分区键策略

public class OrderProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; public void sendOrder(Order order) { // 使用订单ID的哈希值作为分区键 String key = String.valueOf(order.getUserId()); kafkaTemplate.send("orders", key, order); } }

2.3 自定义分区器

public class OrderPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String orderKey = (String) key; int numPartitions = cluster.partitionCountForTopic(topic); return Math.abs(orderKey.hashCode()) % numPartitions; } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }

三、RocketMQ顺序性保证

3.1 同步发送

public class RocketMQProducer { private DefaultMQProducer producer; public void sendOrderMessage(Order order) throws Exception { Message message = new Message( "OrderTopic", "OrderTag", order.getOrderId(), JSON.toJSONBytes(order) ); // 同步发送,保证顺序 SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> queues, Message msg, Object arg) { Long orderId = (Long) arg; int index = (int) (orderId % queues.size()); return queues.get(index); } }, order.getOrderId()); } }

3.2 顺序消费

public class RocketMQConsumer { private DefaultMQPushConsumer consumer; public void consumeOrderMessages() throws Exception { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { Order order = JSON.parseObject(msg.getBody(), Order.class); processOrder(order); } return ConsumeOrderlyStatus.SUCCESS; } }); } }

四、RabbitMQ顺序性保证

4.1 单队列单消费者

@Component public class RabbitMQConsumer { @RabbitListener(queues = "order-queue", concurrency = "1") public void consumeOrder(Order order) { processOrder(order); } }

4.2 消息优先级

public class RabbitMQProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrderWithPriority(Order order, int priority) { MessageProperties props = new MessageProperties(); props.setPriority(priority); Message message = new Message( JSON.toJSONBytes(order), props ); rabbitTemplate.send("order-exchange", "order-routing-key", message); } }

五、消息顺序性最佳实践

5.1 业务层面保证

public class OrderService { @Transactional public void processOrders(List<Order> orders) { // 按订单ID排序 orders.sort(Comparator.comparingLong(Order::getOrderId)); for (Order order : orders) { // 处理订单逻辑 processOrder(order); } } }

5.2 消息去重

@Component public class MessageDeduplicationService { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String PREFIX = "msg:dedupe:"; public boolean isDuplicate(String messageId) { String key = PREFIX + messageId; Boolean exists = redisTemplate.hasKey(key); if (Boolean.TRUE.equals(exists)) { return true; } redisTemplate.opsForValue().set(key, "true", 24, TimeUnit.HOURS); return false; } }

5.3 死信队列

# RabbitMQ死信队列配置 spring: rabbitmq: listener: simple: retry: enabled: true max-attempts: 3 initial-interval: 1000ms template: reply-timeout: 5000ms # 死信交换机和队列 @Configuration public class DeadLetterConfig { @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("dead-letter-queue").build(); } @Bean public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("dead-letter-exchange").durable(true).build(); } }

六、顺序性验证

6.1 测试用例

@Test void testMessageOrder() { // 发送100条有序消息 for (int i = 0; i < 100; i++) { producer.send("test-topic", String.valueOf(i), "message-" + i); } // 收集消费结果 List<String> received = consumer.receiveAll(); // 验证顺序 for (int i = 0; i < received.size(); i++) { assertEquals("message-" + i, received.get(i)); } }

6.2 性能测试

# 使用kafka-producer-perf-test测试 kafka-producer-perf-test.sh \ --topic orders \ --num-records 100000 \ --record-size 1024 \ --throughput 10000 \ --producer-props bootstrap.servers=kafka:9092

七、总结

消息顺序性保证策略:

  1. 单分区单消费者:最简单的顺序保证方式
  2. 分区键策略:按业务键哈希分配分区
  3. 同步发送:确保消息按顺序发送
  4. 顺序消费:使用消息队列的顺序消费特性
  5. 业务层面排序:消费后再排序确保顺序

根据业务场景选择合适的顺序性保证方案。

http://www.jsqmd.com/news/893533/

相关文章:

  • 集成学习在低资源语言情感分析中的应用:以波斯语社交媒体评论为例
  • RFDoc:面向证件检测的高效二进制局部特征描述符设计与实践
  • 无标签知识蒸馏:用动态合成数据训练轻量级人脸识别模型
  • 2026雨水收集系统厂家推荐榜:消防不锈钢水箱/焊接不锈钢水箱/生活不锈钢水箱/组合式不锈钢水箱/调蓄型雨水收集系统/选择指南 - 优质品牌商家
  • 11- Claude Code 最强插件库详解:从安装到全插件用途全吃透
  • SignFormer:基于Vision Transformer的静态手语识别模型解析与实战
  • KK-HF Patch:如何解决恋活!游戏体验的三大核心痛点?
  • 构建多图记忆系统VEKTOR:让AI智能体告别金鱼综合症
  • MHmarkets:平台工具、风控与体验体系观察
  • 保姆级教程:在Windows 10/11上配置Kaggle CLI并一键提交submission.csv
  • 明日方舟游戏资源库:技术开发者与创意工作者的完整解决方案
  • 美容平台支付失败率骤降91%:Lovable多通道聚合支付网关设计(含微信/支付宝/跨境PayPal容灾切换逻辑)
  • 利用Taotoken为内容创作平台集成多模型文本生成能力
  • 基于Transformer与知识图谱的药物重定位:2型糖尿病老药新用智能发现
  • 简单三步让Zotero中文文献管理效率提升10倍:Jasminum插件完全指南
  • TwinGAN:双阶段GAN实现中国山水画风格迁移的技术解析与实践
  • 五分钟快速搭建本地AI助手:基于OpenClaw的实践指南
  • 【独家首发】中国制造业AI Agent成熟度白皮书(覆盖17个细分行业,含68家样本企业实测数据)
  • 如何快速实现VR视频转换:用VR-Reversal在普通电脑上自由探索3D内容
  • 告别编译报错!手把手教你用CMake GUI搞定Cesium For Unreal 1.22.0插件依赖库
  • MySQL命令行导出数据库
  • 2026年开源商城和 SaaS 怎么选?为什么越来越多企业开始重视“自主可控”?——真正决定企业长期上限的,从来不是“前期上线速度”,而是“未来还能不能持续演进”
  • Linux权限管理避坑指南:为什么你的新用户加不进sudo组?详解wheel组与/etc/sudoers.d
  • 在Mac本地部署离线AI助手:Llama 2模型与llama.cpp实战指南
  • triton-inference-server-ge-backend 是什么?让模型推理服务化变得如此简单
  • Gateway网关全解:OpenClow如何无缝对接大模型并实现安全熔断与限流
  • CPT Markets:从技术架构看平台运行稳定性
  • 基于文本诱导与图素训练的低资源语言TTS语言适应框架
  • AI Agent商业化失败案例复盘:10个致命错误与教训
  • Auto.js终极指南:用JavaScript轻松实现安卓手机自动化