消息队列顺序性保证实战
消息队列顺序性保证实战
一、消息顺序性概述
消息队列的顺序性是指消息按照发送顺序被消费的特性,在金融交易、订单处理等场景至关重要。
1.1 顺序性问题场景
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Producer │────▶│ Queue │────▶│ Consumer │ │ (生产者) │ │ (队列) │ │ (消费者) │ └─────────────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 1 │ │ │ 消费消息1 │ │ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 2 │ │ │ 消费消息2 │ │ └─────────────┘ │ ▼ 问题:消息1和消息2顺序可能错乱1.2 顺序性破坏原因
| 原因 | 说明 |
|---|---|
| 多分区 | 同一主题多个分区,消息可能发送到不同分区 |
| 多消费者 | 多个消费者并行消费,处理速度不同 |
| 重试机制 | 消息重试可能打乱顺序 |
| 网络延迟 | 网络波动导致消息到达顺序变化 |
二、Kafka顺序性保证
2.1 单分区单消费者
apiVersion: apps/v1 kind: Deployment metadata: name: kafka-consumer spec: replicas: 1 template: spec: containers: - name: consumer image: kafka-consumer:1.0.0 env: - name: KAFKA_TOPIC value: "orders" - name: KAFKA_GROUP_ID value: "order-group" - name: KAFKA_PARTITION value: "0"2.2 分区键策略
public class OrderProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; public void sendOrder(Order order) { // 使用订单ID的哈希值作为分区键 String key = String.valueOf(order.getUserId()); kafkaTemplate.send("orders", key, order); } }2.3 自定义分区器
public class OrderPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String orderKey = (String) key; int numPartitions = cluster.partitionCountForTopic(topic); return Math.abs(orderKey.hashCode()) % numPartitions; } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }三、RocketMQ顺序性保证
3.1 同步发送
public class RocketMQProducer { private DefaultMQProducer producer; public void sendOrderMessage(Order order) throws Exception { Message message = new Message( "OrderTopic", "OrderTag", order.getOrderId(), JSON.toJSONBytes(order) ); // 同步发送,保证顺序 SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> queues, Message msg, Object arg) { Long orderId = (Long) arg; int index = (int) (orderId % queues.size()); return queues.get(index); } }, order.getOrderId()); } }3.2 顺序消费
public class RocketMQConsumer { private DefaultMQPushConsumer consumer; public void consumeOrderMessages() throws Exception { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { Order order = JSON.parseObject(msg.getBody(), Order.class); processOrder(order); } return ConsumeOrderlyStatus.SUCCESS; } }); } }四、RabbitMQ顺序性保证
4.1 单队列单消费者
@Component public class RabbitMQConsumer { @RabbitListener(queues = "order-queue", concurrency = "1") public void consumeOrder(Order order) { processOrder(order); } }4.2 消息优先级
public class RabbitMQProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrderWithPriority(Order order, int priority) { MessageProperties props = new MessageProperties(); props.setPriority(priority); Message message = new Message( JSON.toJSONBytes(order), props ); rabbitTemplate.send("order-exchange", "order-routing-key", message); } }五、消息顺序性最佳实践
5.1 业务层面保证
public class OrderService { @Transactional public void processOrders(List<Order> orders) { // 按订单ID排序 orders.sort(Comparator.comparingLong(Order::getOrderId)); for (Order order : orders) { // 处理订单逻辑 processOrder(order); } } }5.2 消息去重
@Component public class MessageDeduplicationService { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String PREFIX = "msg:dedupe:"; public boolean isDuplicate(String messageId) { String key = PREFIX + messageId; Boolean exists = redisTemplate.hasKey(key); if (Boolean.TRUE.equals(exists)) { return true; } redisTemplate.opsForValue().set(key, "true", 24, TimeUnit.HOURS); return false; } }5.3 死信队列
# RabbitMQ死信队列配置 spring: rabbitmq: listener: simple: retry: enabled: true max-attempts: 3 initial-interval: 1000ms template: reply-timeout: 5000ms # 死信交换机和队列 @Configuration public class DeadLetterConfig { @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("dead-letter-queue").build(); } @Bean public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("dead-letter-exchange").durable(true).build(); } }六、顺序性验证
6.1 测试用例
@Test void testMessageOrder() { // 发送100条有序消息 for (int i = 0; i < 100; i++) { producer.send("test-topic", String.valueOf(i), "message-" + i); } // 收集消费结果 List<String> received = consumer.receiveAll(); // 验证顺序 for (int i = 0; i < received.size(); i++) { assertEquals("message-" + i, received.get(i)); } }6.2 性能测试
# 使用kafka-producer-perf-test测试 kafka-producer-perf-test.sh \ --topic orders \ --num-records 100000 \ --record-size 1024 \ --throughput 10000 \ --producer-props bootstrap.servers=kafka:9092七、总结
消息顺序性保证策略:
- 单分区单消费者:最简单的顺序保证方式
- 分区键策略:按业务键哈希分配分区
- 同步发送:确保消息按顺序发送
- 顺序消费:使用消息队列的顺序消费特性
- 业务层面排序:消费后再排序确保顺序
根据业务场景选择合适的顺序性保证方案。
