当前位置: 首页 > news >正文

RabbitMQ—高级篇

目录

RabbitMQ高级篇

发送者可靠性-发送者重连

发送者可靠性-发送者确认

MQ可靠性-数据持久化

MQ可靠性-Lazy Queue

消费者可靠性-消费者确认

消费者可靠性-失败重试

失败消息处理策略

消费者可靠性-业务幂等性

唯一消息id


RabbitMQ高级篇

发送者可靠性-发送者重连

由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

注意:

🤔当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待过程中,当前线程是被阻塞的,影响业务性能

😏如果业务对性能高有要求,建议禁用重试机制,也可以考虑使用异步线程来执行发送消息的代码

发送者可靠性-发送者确认

SpirngAMQP提供了Publisher ConfirmPublisher Return两种确认机制。开启确认机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。

💖返回结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

实现发送者确认:

1、在publisher这个微服务的application.yml中添加配置:

publisher-confirm-type三种模式选择:

  • none:关闭confirm机制(默认)

  • simple:同步阻塞等待MQ的回执消息

  • correlated:MQ异步回调方式返回回执消息

2、每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

3、发送消息,指定ID、消息ConfirmCallback

MQ可靠性-数据持久化

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟

😏导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失

  • 内存空间有限,当消费者故障过处理过慢时,会导致消息积压,引发MQ阻塞

RabbitMQ实现数据持久化包括三个方面:

  • 交换机持久化

  • 队列持久化

  • 消息持久化

MQ可靠性-Lazy Queue

从RabbitMQ的3.6.0版本开始,增加了Lazy Queue的概念,也就是惰性队列

惰性队列特征:

  • 接收到消息后直接存入磁盘,不再存储到内存

  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存1部分消息到内存,最多2048条)

在3.12版本后所有队列都是Lazy Queue模式,无法更改

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:

  • 控制台添加

  • 代码添加

  • 注解添加

RabbitMQ如何保证消息的可靠性

  • 首先通过配置让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在

  • RabbitMQ在3.6版本引入LazyQueue,并且3.12版本后会称为队列的默认模式,LayQueue会将所有消息都持久化

  • 开启持久化和生产者确认时,RabbitMQ只有在消息持久化成功后才会给生产者返回ACK回执

消费者可靠性-消费者确认

消费者确认机制是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送应一个回执,告知RabbitMQ自己消息处理状态:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式

  • none:不处理。消息投递给消费者后立刻ack,消息会立即从MQ删除。不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务侵入,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack

——当业务出现异常,根据异常判断返回不同结果:

1、如果是业务异常,会自动返回nack

2、如果是消息处理或校验异常,自动返回reject

消费者可靠性-失败重试

SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq

可以通过application.yaml文件中添加配置来开启重试机制:

失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。(默认)

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

失败消息策略实现:

1、定义接收失败消息的交换机、队列及其绑定关系

2、定义RepublishMessageRecoverer:

3、开启消费者失败重试机制:

消费者可靠性-业务幂等性

幂等是一个数学概念,用函数表达式来描述:f(x)= f(f(x))

程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的

唯一消息id

方案一,给每个消息都设置一个唯一id,利用id区分是否重复消息:

1、每一条消息都生成一个唯一的id,与消息一起投递给消费者

2、消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

3、如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理

方案二,结合业务逻辑,基于业务本身做判断

如何保证支付服务与交易服务之间的订单状态一致性?

1、首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步

2、其次,为了保证MQ消息可靠性,采用生产者确认机制、消费者确认机制、消费者失败重试等策略,确保消息投递和处理的可靠。同时开启MQ的持久化,避免因服务宕机导致消息丢失

3、最后,在交易服务更新订单状态时做业务幂等判断,避免因消息重复消费导致订单状态异常

http://www.jsqmd.com/news/535723/

相关文章:

  • 别再让PB级大表拖垮你的GaussDB集群了!手把手教你6个实战优化技巧
  • 终极浏览器3D高斯点云编辑器:SuperSplat完整指南与5大核心优势
  • 5分钟掌握HidHide:如何轻松隐藏Windows游戏设备
  • 避坑指南:STM32 HAL库IAP升级中的常见错误与解决方案
  • Blender置换贴图终极指南:5步让3D模型瞬间拥有真实细节
  • 收藏!后端岗遇冷,大模型+算法岗成程序员新出路(小白必看)
  • 杰理之内置触摸拓扑结构【篇】
  • MFCMouseEffect:把桌面输入反馈这件事,做成一个真正可扩展的引擎
  • 前端进阶必修课:尚硅谷React全家桶实战教程全解析(附源码课件)
  • NE555定时器电路设计:从LED闪烁到电机调速的5个实用项目
  • 宜昌做养发哪个店好?黑奥秘全国千店覆盖,便捷养发更靠谱 - 美业信息观察
  • ABAQUS三维多孔材料建模:自定义与多软件导出
  • Access Advance 欢迎VDP 池新许可方,并发布独立经济分析,确认符合FRAND 原则
  • Comsol 助力全固态电池模拟:锂枝晶与裂纹扩展的奇幻之旅
  • 收藏必备!小白程序员轻松入门RAG,打造靠谱大模型应用
  • 揭秘AI_NovelGenerator:重构长篇小说创作的智能架构
  • 如何用pyLDAvis让LDA主题模型从“黑箱“变“水晶球“:3步掌握交互式可视化
  • 滑动窗口—找到字符串中所有字母异位词
  • 如何快速上手ESP-ADF:从零开始构建智能音频项目
  • Claude code-simplifier 插件深度解析:千年“屎山“代码的终极救星
  • 探索Comsol弱形式求解三维光子晶体能带
  • ChatGPT Web Share 实战:构建高效、安全的 AI 对话共享服务
  • 上位机签名脚本片段
  • DFI Retail与SymphonyAI合作,共同推动人工智能驱动的销售能力
  • ChatGPT Cookie 实战指南:安全存储与高效管理的最佳实践
  • 远程信息收集技术
  • GFLV2 (Generalized Focal Loss V2):在回归分支引入分布统计信息,提升定位质量——YOLOv8 改进实战
  • 5分钟掌握DownKyi:B站视频下载的完整解决方案
  • Aspose.Cells实战:如何优雅处理复杂Excel报表的PDF导出(含分页与缩放配置)
  • 网络入侵检测系统(NIDS)中的人工智能安全问题