当前位置: 首页 > news >正文

【RocketMQ】底层架构核心流程

1、基本概念

  • Producer(生产者)
    • 负责“发送消息”的应用。
  • Consumer(消费者)
    • 负责“消费消息”的应用。
  • Broker
    • 真正存储消息、处理请求的服务器进程。
    • Producer 和 Consumer 最终都是通过网络直接跟 Broker 打交道(RPC)。
  • NameServer
    • 只负责注册和路由发现
    • 保存“哪个 Topic 有哪些队列,这些队列在哪些 Broker 上”这种元数据。
    • Producer/Consumer 启动时会去 NameServer 拉这个路由信息。
  • Topic
    • 逻辑概念,可以理解成“一个业务场景的消息分类”。
    • 对开发者来说:发消息要指定 Topic,订阅也要指定 Topic
  • Queue(Message Queue)
    • 物理上的拆分单元:一个 Topic 会被拆成多个 Queue
    • 用途:
      • 提高吞吐:不同 Queue 可以在多台 Broker 上、被多个 Consumer 实例并行消费。
      • 做负载均衡:同一个消费组里的实例按 Queue 分摊消息。
  • Message
    • 一条真正的数据:包含 topic、body、tags、keys 等。
    • 在磁盘上最终会被顺序写入到 Broker 的 CommitLog。

2、使用方式

生产者代码:

@ResourceprivateMQProducermqProducer;@OverridepublicvoidsendDelayMsg(ImMsgBodyimMsgBody){Stringjson=JSON.toJSONString(imMsgBody);Messagemessage=newMessage();message.setBody(json.getBytes());message.setTopic(ImCoreServerProviderTopicNames.QIYU_LIVE_IM_ACK_MSG_TOPIC);//等级1 -> 1s,等级2 -> 5smessage.setDelayTimeLevel(2);try{SendResultsendResult=mqProducer.send(message);LOGGER.info("[MsgAckCheckServiceImpl] msg is {},sendResult is {}",json,sendResult);}catch(Exceptione){LOGGER.error("[MsgAckCheckServiceImpl] error is ",e);}}

消费者代码:

@OverridepublicvoidafterPropertiesSet()throwsException{DefaultMQPushConsumermqPushConsumer=newDefaultMQPushConsumer();mqPushConsumer.setVipChannelEnabled(false);//设置我们的namesrv地址mqPushConsumer.setNamesrvAddr(rocketMQConsumerProperties.getNameSrv());//声明消费组mqPushConsumer.setConsumerGroup(rocketMQConsumerProperties.getGroupName()+"_"+ImAckConsumer.class.getSimpleName());//每次只拉取一条消息mqPushConsumer.setConsumeMessageBatchMaxSize(1);mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);mqPushConsumer.subscribe(ImCoreServerProviderTopicNames.QIYU_LIVE_IM_ACK_MSG_TOPIC,"");mqPushConsumer.setMessageListener((MessageListenerConcurrently)(msgs,context)->{Stringjson=newString(msgs.get(0).getBody());ImMsgBodyimMsgBody=JSON.parseObject(json,ImMsgBody.class);intretryTimes=msgAckCheckService.getMsgAckTimes(imMsgBody.getMsgId(),imMsgBody.getUserId(),imMsgBody.getAppId());LOGGER.info("retryTimes is {},msgId is {}",retryTimes,imMsgBody.getMsgId());if(retryTimes<0){returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//只支持一次重发if(retryTimes<2){msgAckCheckService.recordMsgAck(imMsgBody,retryTimes+1);msgAckCheckService.sendDelayMsg(imMsgBody);routerHandlerService.sendMsgToClient(imMsgBody);}else{msgAckCheckService.doMsgAck(imMsgBody);}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});mqPushConsumer.start();LOGGER.info("mq消费者启动成功,namesrv is {}",rocketMQConsumerProperties.getNameSrv());}

3、NS/B/P/C 的协作流程

  • NameServer:注册中心 + 路由服务
    • NameServer先启动
    • Broker 启动后,主动去连 NameServer,把自己的信息“注册”上去
      • Broker 会定时向所有 NameS****erver发心跳/注册请求,请求体里带上:
        • 自己的地址(IP:Port)
        • BrokerName、ClusterName
        • 本机 Topic 配置(Topic 名、读/写队列数等)
    • Producer / Consumer 启动时,从 NameServer 拉取路由信息:
      • 某个 Topic 对应哪些 Broker、每个 Broker 上有几个 Queue 是属于这个topic的。总结就是:该 Topic 的 Queue 列表。例如:[Queue0@BrokerA, Queue1@BrokerA, Queue2@BrokerB, Queue3@BrokerB]
  • Producer 发消息的大致步骤:
    • 1)启动时:
      • 设置 NameServer 地址。
      • 定时从 NameServer 拉 Topic 路由。(拿到 Topic 的完整 Queue 列表
    • 2)发送时:
      • 根据路由信息,为当前 Topic 选择一个目标 Broker + Queue(可以轮询、hash 等策略)。
        • 选哪个 Queue 的规则(在 Producer 里实现,常见两种):
          • 不指定 key、不要求顺序
            • 在列表里轮询(Round-Robin)选一个,例如第一次发用 Queue0,第二次用 Queue1……这样自然就负载均衡到多个 Broker、多个 Queue。
          • 指定了 key、要顺序消息
            • key 做 hash,再对队列个数取模,得到固定下标,总是选同一个 Queue。这样同一 key 的消息总进同一个 Queue,消费端单线程消费该 Queue 就能保序。
      • 通过 Netty 向这个 Broker 发一条 RPC 请求(SEND_MESSAGE)。
      • Broker 在本地写磁盘(CommitLog),写成功后返回 SendResult。
    • 3)如果发送超时/异常:
      • RocketMQ Client 内部有重试机制(看配置),可以自动换 Queue 或换 Broker 再发。
  • Consumer 收消息的大致步骤:
    • 1)启动时:
      • 设置 NameServer 地址。
      • 设置消费组(consumerGroup)。
      • 设置要订阅的 Topic。
      • 从 NameServer 获取路由信息,知道哪些 Queue 属于这个 Topic。(拿到 Topic 的完整 Queue 列表
    • 2)负载均衡:
      • 同一消费组里的多个 Consumer 实例,会由客户端做“Queue 分配”:根据当前 Group 内在线实例数和 Topic 的 Queue 数,把 Queue 分给各个实例,每个实例只消费分到的那部分。一个 Queue 只被组内一个实例消费。(实例数 = 你部署并启动了多少个跑这段 Consumer 代码的进程)
    • 3)拉取消息:
      • RocketMQ 的“Push”模式底层其实也是定时 Pull:
      • Consumer 主动从 Broker 的某个 Queue 拉一批消息。
      • 拉到后回调你的 MessageListener。
    • 4)确认 / 重试:
      • 如果你的 Listener 返回 CONSUME_SUCCESS:客户端会更新消费位点提交给 Broker,下次从新的 offset 继续拉。
      • 如果返回 RECONSUME_LATER 或抛异常:Broker 会认为你消费失败,稍后再把这条消息投递给你(或同组其他实例),次数超过阈值就进死信队列。

4、RocketMQ 的存储设计:CommitLog + ConsumeQueue + IndexFile

CommitLog:

CommitLog是真正存消息的地方,所有 Topic 所有 Queue 的消息都混在一起顺序追加写入,这样可以保证磁盘顺序写,吞吐量很高。每个文件默认 1GB,写满就建新文件。

ConsumeQueue:

ConsumeQueue是 CommitLog 的索引,每个 Topic 的每个 Queue 有自己的 ConsumeQueue 文件。每条记录固定 20 字节,存的是 [commitLogOffset(8), msgSize(4), tagsHashCode(8)]。Consumer 消费时先读 ConsumeQueue 拿到 偏移量和大小,再去 CommitLog 读完整消息。

整体设计思路是:写走 CommitLog 保证顺序写高吞吐,读走 ConsumeQueue 保证快速定位

底层版的写->读流程

Producer 发消息(写)

Producer.send(message) ↓ Broker 收到 ↓ CommitLog.putMessage() - 加锁(或用 CAS) - 找到当前 CommitLog 文件的末尾 - 把消息序列化成二进制,追加写入 - 返回 physicalOffset(这条消息在 CommitLog 里的位置) ↓ 返回 SendResult 给 Producer ↓ (异步)ReputMessageService 扫描 CommitLog 新消息 - 写对应的 ConsumeQueue(topic + queueId) - 写 IndexFile(如果消息有 keys)

Consumer 消费(读)

Consumer.pull(topic, queueId, offset:100, MaxMsgNums:10//最多拉多少条) ↓ Broker 收到 ↓ 读 ConsumeQueue 文件 - 定位到 offset 对应的 20 字节记录 - 取出 [commitLogOffset, msgSize, tagsHashCode] - (可选)如果有 tag 过滤,先用 tagsHashCode 判断是否匹配 ↓ 读 CommitLog 文件 - 定位到 commitLogOffset 位置 - 读 msgSize 字节 - 反序列化成 Message 对象 ↓ 返回给 Consumer ↓ Consumer 处理 - Consumer 的 Listener 处理完这批消息,返回 CONSUME_SUCCESS - 本地 offset 更新为 100 + 10 = 110 - 定时5s向 Broker 提交:updateConsumerOffset(group, topic, queueId, 110)下次从110开始拉 - Broker 更新内存 + 默认每5s持久化到 consumerOffset.json consumer第一次启动: 向 Broker 查这个 Group 在 qiyu_live_im_ack_msg_topic 的 Queue 0 的 offset Broker 返回"没有记录" 根据 CONSUME_FROM_FIRST_OFFSET 配置,从 offset=0 开始消费 consumer重启后: 向 Broker 查 offset,拿到上次提交的值(比如 100) 从 offset=100 继续消费,不会重复消费 0~99

Consumer 消费 - push模式

消费者启动 ↓ 连接NameServer获取路由信息 ↓ 向Broker注册消费者 ↓ 启动各种服务线程 ↓ ┌─────────────────────────────────────┐ │ 负载均衡服务 │ │ (每20秒执行一次) │ └─────────────────────────────────────┘ ↓ 计算队列分配 (比如分配到Queue0, Queue1, Queue2) ↓ 为每个队列创建初始PullRequest ┌─────────────────────────────────────┐ │ PullRequest(Queue0, offset=100) │ │ PullRequest(Queue1, offset=200) │ │ PullRequest(Queue2, offset=300) │ └─────────────────────────────────────┘ ↓ 将PullRequest放入拉取请求队列 ↓ ┌─────────────────────────────────────┐ │ 拉取线程循环 │ │ (24小时不停工作) │ └─────────────────────────────────────┘ ↓ 从拉取请求队列取出一个PullRequest ↓ 检查流控条件 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 流控检查通过 │ │ 流控检查失败 │ └─────────────────┘ └─────────────────┘ ↓ ↓ 向Broker发送拉取请求 延迟50ms后重新放入队列 ↓ ↓ ┌─────────────────────────────────────┐ │ Broker端处理(异步) │ └─────────────────────────────────────┘ ↓ 立即检查是否有消息 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 有消息 │ │ 没有消息 │ └─────────────────┘ └─────────────────┘ ↓ ↓ 立即返回消息列表 挂起请求,长轮询等待 ↓ ↓ ┌─────────────────┐ │ 等待30秒内... │ │ - 有新消息到达 │ │ - 或者超时 │ └─────────────────┘ ↓ 返回结果(消息或空) ↓ ┌─────────────────────────────────────┐ │ 客户端收到响应 │ └─────────────────────────────────────┘ ↓ 解析拉取结果 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 拉取到消息 │ │ 没拉取到消息 │ └─────────────────┘ └─────────────────┘ ↓ 消息存储到对应队列的本地缓存 (ProcessQueue) ↓ 按批次大小拆分消息 创建新PullRequest ↓ ↓ 创建消费任务 放入拉取请求队列 ↓ ↓ 提交给消费线程池 继续下次拉取 ↓ 立即创建新PullRequest ↓ 继续处理其他请求(拉取线程循环继续……) ↓ ┌─────────────────────────────────────┐ │ 消费线程池处理(异步) │ └─────────────────────────────────────┘ ↓ 消费线程获取任务 ↓ 执行用户消费逻辑 ↓ ┌─────────────────┐ ┌─────────────────┐ │ 消费成功 │ │ 消费失败 │ └─────────────────┘ └─────────────────┘ ↓ ↓ 发送ACK给Broker 发送NACK给Broker ↓ ↓ 更新消费进度 消息进入重试队列 详细说明各阶段: 1、拉取请求循环 拉取线程的工作循环: while (消费者运行中) { 1. 从拉取请求队列取出PullRequest 2. 检查流控条件(本地缓存是否过多) 3. 向Broker发送拉取请求 4. 处理Broker响应([msg100,msg101,msg102], nextOffset=103),这个nextBeginOffset是基于拉取位点(pullOffset)计算的 5. 创建新的PullRequest继续循环 } 值得注意的是,拉取和broker处理是异步,当broker返回结果时才会触发回调, 回调中将消息提交给消费线程池,然后立即创建新PullRequest放入队列, 拉取线程不会等待消费线程池处理完成。 broker其实就是一个服务端,然后里面有相应的拉取线程池专门处理拉取请求。 broker定时将消费位点刷盘。 拉取请求中的位点是拉取位点。拉取位点每次在拉取线程处理broker响应时更新,然后传入新的拉取请求。 重启时从broker获取消费位点作为拉取的起始位点。 补充: Rebalance分配队列 ↓ 创建PullRequest(mq=Queue0, offset=100) ↓ 放入pullRequestQueue ↓ PullMessageService取出PullRequest ↓ 向Broker发送拉取请求(Queue0, offset=100) ↓ Broker返回: [msg100, msg101, msg102], nextBeginOffset=103 ↓ 更新PullRequest.nextOffset = 103 ↓ 消息放入ProcessQueue(本地缓存) ↓ 提交消费任务 ↓ PullRequest放回pullRequestQueue(复用同一个对象) ↓ PullMessageService再次取出PullRequest ↓ 向Broker发送拉取请求(Queue0, offset=103) ↓ 循环继续... 2、 流控检查详细规则 检查以下条件: - 本地缓存消息数 > 1000条? - 本地缓存消息大小 > 100MB? - 消费进度落后 > 2000条? 任一条件满足 → 延迟拉取 所有条件不满足 → 立即拉取 流控检查的目的: 如果拉取速度 > 消费速度,本地缓存会越来越多 最终导致内存溢出 流控机制防止这种情况发生 3、Broker端长轮询机制 Broker收到拉取请求后: 1. 立即检查队列是否有新消息 2. 有消息 → 立即返回 3. 没消息 → 将请求挂起30秒 4. 30秒内有新消息 → 立即唤醒并返回 5. 30秒超时 → 返回空结果 6. 消息处理和任务创建 4、收到消息后的处理: ConsumeMessageService:负责从ProcessQueue取消息,创建消费任务,提交给线程池 1. 根据consumeMessageBatchMaxSize拆分消息 2. 每个批次(默认是1条)创建一个ConsumeRequest任务 3. 记录消费开始时间戳 4. 提交任务到消费线程池 5. 更新消息状态为"消费中" 5、超时检测机制 1. 有一个定时清理线程(每20秒执行一次) 2. 遍历ProcessQueue中所有"消费中"状态的消息 3. 计算:当前时间 - 消费开始时间 4. 如果超过15分钟,标记为"超时消息" 6、超时处理 1. 超时消息从ProcessQueue中移除 2. 构造一个"重试消息" 3. 设置延迟级别(第1次重试 = 延迟级别1) 4. 发送给Broker(注意:这时还不是发到重试队列),类似于生产者发送普通消息给broker 5. broker判断这是一个有延迟属性的消息,于是存储到 SCHEDULE_TOPIC_XXXX 的对应延迟级别队列 6. 定时任务扫描延迟队列中的消息,检查投递时间是否到达 7. 如果到达,读取消息内容,恢复真实的Topic(%RETRY%ConsumerGroup)和QueueId,重新写入CommitLog,更新%RETRY%ConsumerGroup的ConsumeQueue索引 8. 此时,消费者就能消费到重试队列的消息了 假设场景: - 消费者组:order-group(3个实例) - 业务Topic:OrderTopic(8个队列) - 重试Topic:%RETRY%order-group(4个队列) 分配结果: 消费者实例1: - OrderTopic: Queue0, Queue1, Queue2 - %RETRY%order-group: Queue0, Queue1 消费者实例2: - OrderTopic: Queue3, Queue4, Queue5 - %RETRY%order-group: Queue2 消费者实例3: - OrderTopic: Queue6, Queue7 - %RETRY%order-group: Queue3 重试消息默认是随机放到重试Topic下的任意一个队列 7、本地缓存 本地缓存ProcessQueue的核心价值: 消息状态管理:跟踪每条消息的完整生命周期 可靠性保证:消费失败可重试,宕机可恢复 流控保护:防止内存溢出,保护系统稳定 进度管理:准确计算和报告消费进度 并发协调:协调拉取线程和消费线程的工作 8、消费过程: public class ConsumeMessageConcurrentlyService { public void processConsumeResult( ConsumeConcurrentlyStatus status, ConsumeRequest consumeRequest) { // 不管返回SUCCESS还是RECONSUME_LATER // 都标记为已处理完成 for (MessageExt msg : consumeRequest.getMsgs()) { long offset = msg.getQueueOffset(); processQueue.updateStatus(offset, MessageStatus.CONSUMED); } // 根据返回状态决定后续处理 if (status == ConsumeConcurrentlyStatus.CONSUME_SUCCESS) { // 更新消费位点 updateOffset(consumeRequest);// 先更新内存,每5秒持久化到Broker } else { // 发送到重试队列 sendMessageBack(consumeRequest.getMsgs()); } } } 本地队列的状态: public class ProcessQueue { // 消息状态枚举 enum MessageStatus { WAITING, // 等待消费 CONSUMING, // 正在消费 CONSUMED, // 已消费完成 TIMEOUT // 消费超时 } // 消息状态映射表 private final Map<Long, MessageStatus> msgStatusTable = new ConcurrentHashMap<>(); }
┌─────────────────────────────────────────────────────┐ │ 完整的削峰机制 │ ├─────────────────────────────────────────────────────┤ │ 生产者高峰流量 │ │ ↓ │ │ Broker队列 (持久化存储,容量大) │ │ ↓ │ │ 拉取控制 (按消费能力拉取,不是按生产速度) │ │ ↓ │ │ 本地缓存 (内存缓存,有流控保护) │ │ ↓ │ │ 线程池队列 (任务缓存,有界队列) │ │ ↓ │ │ 消费线程池 (按实际处理能力消费) │ │ ↓ │ │ 业务处理 (最终的处理速度) │ └─────────────────────────────────────────────────────┘ 值得注意的是:RocketMQ的本地缓存设计是每个队列拥有独立的本地缓存。

Push和Pull底层机制相同,区别在于自动化程度。

核心概念:

pullBatchSize(拉取批次大小):

作用:每次从Broker拉取多少条消息

默认值:32条

配置方式:consumer.setPullBatchSize(64)

影响:网络传输效率

consumeMessageBatchMaxSize(消费批次大小):

作用:每个消费任务处理多少条消息

默认值:1条

配置方式:consumer.setConsumeMessageBatchMaxSize(10)

影响:消费线程的任务粒度

5、rocketmq延迟消息/重试消息实现原理

RocketMQ的延迟消息实现采用了临时存储 + 定时扫描 + 重新投递的机制。核心思想是将延迟消息先暂存到特殊的Topic中,通过定时任务扫描到期消息,然后重新投递到原始Topic。

  • 当Broker收到延迟消息后,会进行Topic替换:
    • 将原始Topic替换为一个特殊Topic(所有延迟级别共享同一个特殊Topic)。根据延迟级别确定QueueId,比如延迟级别3对应Queue2,然后将原始的Topic和QueueId保存在消息属性中,用于后续恢复。
    • 消息仍然写入CommitLog,基于特殊Topic和queue建立ConsumeQueue索引。
  • Broker为每个延迟级别启动独立的定时任务:(延迟消息的定时任务是直接运行在Broker服务上的,延迟消息的定时任务实际上是一个特殊的Consumer,它自己管理消费进度,自己更新进度,自己持久化进度)
    • 定时任务扫描对应Queue的ConsumeQueue文件。维护消费进度,记录扫描到ConsumeQueue的哪个位置。
    • 根据ConsumeQueue中的消息物理偏移量和消息大小读取CommitLog完整消息内容
  • 定时任务读取到消息后进行时间计算:
    • 判断当前时间是否大于消息预期投递时间,如果消息到期了则执行重新投递。
  • 重新投递:
    • 从消息属性中恢复原始的Topic和QueueId,将消息重新投递到原始Topic的指定Queue中,基于原始Topic和queue建立ConsumeQueue索引。Consumer就可以正常消费到这条延迟消息了。

6、rocketmq怎么保证消息的可靠性的?

消息在传输过程中可能在三个阶段丢失:①生产者阶段:消息发送到Broker失败;②Broker存储阶段:Broker接收到消息但存储失败;③消费者阶段:消费者接收到消息但处理失败

因此RocketMQ主要通过在这三个阶段提供可靠性保障机制来防止消息丢失。

在生产者端,通过同步发送和自动重试保证可靠性:Producer使用同步发送时,会阻塞等待Broker返回SendResult响应。如果在超时时间内没收到SendResult响应或者收到了SendResult但状态不是成功,则会自动重试,重试时会选择不同的Broker,避免单点故障。

在Broker端,通过存储和复制机制保证可靠性:刷盘策略选择同步刷盘,确保消息持久化到磁盘后才返回成功,主从复制选择同步复制,消息同步到从节点后才返回成功。同步刷盘加同步复制是最高可靠性配置,虽然性能会有所下降,但可靠性最高。

在消费者端,通过确认和重试机制保证可靠性:Consumer处理完消息后返回消费状态,成功时Consumer会向Broker发送ACK确认,失败时不发送ACK。Broker没收到Consumer的ACK确认时,会重新投递消息。超过重试次数的消息进入死信队列,防止消息彻底丢失。(生产者重试失败:直接抛异常给业务代码,需要业务层处理)

通过这三个阶段的保障机制,RocketMQ可以做到消息的高可靠性传输。在实际应用中,需要根据业务对可靠性和性能的要求,选择合适的配置策略

除了前面提到的基础可靠性机制,RocketMQ还提供了事务消息来解决分布式事务场景下的可靠性问题。

:::tips
同步刷盘/异步刷盘:

不管什么刷盘,broker收到消息后都会发送sendresult给生产者。

异步刷盘:broker收到消息后,就直接返回sendresult

同步发送/异步发送:

同步发送 = 业务线程阻塞 + IO线程工作

业务线程:发送请求 → 阻塞等待 → 被唤醒 → 返回结果,重试

IO线程:发送消息 → 接收响应 → 唤醒业务线程

异步发送 = 业务线程不阻塞 + IO线程工作

业务线程:发送请求 → 立即返回 → 继续干活

IO线程:发送消息 → 接收响应 → 执行回调,重试

1、为什么说同步发送比异步发送可靠?

因为异步发送的失败处理是在回调中,业务流程已经走完,难以保证业务一致性。适合对一致性要求不高、追求性能的场景

2、同步发送(rocketmq已经自动重试过了)的两个结果:

情况A:收到sendresult

  • SEND_OK:发送成功
  • FLUSH_DISK_TIMEOUT:刷盘超时
  • FLUSH_SLAVE_TIMEOUT:同步复制超时
  • SLAVE_NOT_AVAILABLE:从节点不可用

情况B:抛出异常(网络或系统问题)

  • 网络超时异常(网络延迟、Broker处理慢、Broker宕机)
  • 网络连接异常(Broker宕机、网络断开、防火墙)
  • Broker返回错误(Topic不存在、权限不足、Broker繁忙)

7、如何解决重复消费问题?

重复消费产生的原因:
①网络异常场景:比如消费者的业务处理逻辑执行时间过长,监听器方法一直没有返回consume_success。消费者有一个定时清理线程,每20秒检查一次本地队列中正在消费的消息。如果发现某条消息的消费时间超过15分钟,就会将其标记为超时,从本地队列移除并构造一个带有延迟属性的重试消息发送到Broker的重试队列。Broker收到后,将消息先存储到内部的延迟队列中,然后broker的定时任务会将消息从延迟队列转移到真正的重试队列,这时消费者就能拉取到这条重试消息进行消费。所以会出现这种情况:消息被重新投递,但原来的消费任务其实已经消费完或者还在消费过程中,最终导致同一条消息被处理两次。或者生产者因为网络问题没有收到sendResult重新发送也会造成重复投递。
②消费者重启场景:消费者处理消息过程中消费位点还没来得及提交,消费者就宕机重启,重启后从上次提交的没有更新的旧的offset开始消费,从而出现重复消费问题。
解决重复消费通常有三种思路:
1、发消息的时候给消息分配一个唯一id,消费者消费的时候把消息id用setnx存到redis中,如果保存成功,则说明是第一次消费,失败了说明是重复消费。给key设置过期时间,避免一直占用内存。
2、通过业务消息的唯一id,比如订单id、支付流水号,消费的时候,先查后插(需要保证原子性),或者直接利用唯一索引去避免重复消息。
3、通过业务逻辑保持幂等性。

http://www.jsqmd.com/news/494960/

相关文章:

  • STM32摇杆ADC采集与处理实战
  • Java的java.util.random实现细节
  • “密码科学与技术”:专业好不好?有哪些就业方向?读研读博有必要吗?
  • 论文中TIFF保存方法
  • LangChain的数据检索
  • 北京上门收酒,闲置老酒名酒变现难?京城亚南酒业帮您一站式解决 - 品牌排行榜单
  • 用买火车票的例子讲解Java反射的作用
  • 北京上门回收洋酒,京城亚南酒业,专业鉴定,高价回收各类洋酒 - 品牌排行榜单
  • 低空智联网技术深度拆解:从通感算一体化到Agentic AI的架构演进
  • 注塑厂批次色差真相:福尔蒂工艺映射法实现ΔE<3量产稳定
  • 2026必备!全行业通用降AI率平台 千笔·降AIGC助手 VS 万方智搜AI
  • C语言核心语法(二)
  • 卡梅德生物深度解析CTAA16(人源癌相关抗原):分子机制与科研应用
  • 大型浸水试验箱内层选用SUS304不锈钢板 - 品牌推荐大师
  • 北京上门收酒哪家靠谱?京城亚南酒业,高价回收老酒名酒当场结算 - 品牌排行榜单
  • OpenAI将Sora融入ChatGPT:机遇与挑战并存
  • 2026年口碑好的建筑脚手架厂家推荐:钢管脚手架/铝合金脚手架/高空作业脚手架厂家采购参考指南 - 行业平台推荐
  • ArcGIS Pro报错:未找到所需字段,或无法正确检索
  • 数据仓库分层体系
  • TeamClaw重磅上线!国内首个专为销售团队打造的、可管理可控制的OpenClaw企业级解决方案
  • 2026年博主亲测:广州正规美业公司最新实践分享
  • 2026年质量好的工程钢管架品牌推荐:东莞搭钢管架/东莞工程钢管架/东莞施工钢管架厂家推荐与采购指南 - 行业平台推荐
  • 从传统产品经理到AI产品经理的必备指:AI产品经理高薪招聘火爆,面试必考题全解析
  • AI海报生成工具完全指南——2026年电商运营必备平台推荐
  • 爬虫对抗:ZLibrary反爬机制实战分析技术文章大纲
  • 判断一个文件最后修改时间是否超出了指定天数
  • 某厂Java面试实录:深度解析高并发秒杀系统、Redis原子扣减、分布式锁与消息可靠性
  • 电源模块纹波测试自动化方案设计与实践
  • 贪心算法的应用
  • 电网的安全稳定裕度