当前位置: 首页 > news >正文

rabbitmq消息积压:如何快速排查与处理

线上消息队列积压了,消费者处理不过来,老板在群里@你——这时候怎么办?

今天聊点实际的,从判断积压到快速处理,讲点能直接上手的东西。

怎么判断消息积压了

RabbitMQ

# 查看队列消息数量rabbitmqctl list_queues name messages messages_ready messages_unacked# 输出大概长这样# name messages messages_ready messages_unacked# order 15234 15200 34

关键看这几个数字:

  • messages- 队列里总共有多少消息
  • messages_ready- 等待消费的消息(积压的主力)
  • messages_unacked- 已投递但还没确认的

一般来说,messages_ready超过几千就该关注了,上万就是严重积压。

Kafka

# 查看消费 lag(落后多少)kafka-consumer-groups.sh --bootstrap-server localhost:9092\--groupmy-consumer-group\--describe# 输出大概长这样# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG# my-consumer order 0 52341 67589 15248

LAG就是积压量。LAG 几百几千问题不大,上万就得处理了。

为什么会积压

常见原因就几种:

1. 消费者挂了
最常见。消费者进程没了,但生产者还在往里扔消息。

2. 消费者变慢了
比如数据库慢查询、对外接口超时。消息在那儿等着,时间一长就堆起来了。

3. 消费者数量不对
分区数固定,但消费者少了或者消费者不干活了。

4. 消费逻辑有死循环或者阻塞
消费端卡在某个地方,不消费也不报错。

快速处理方案

方案一:临时增加消费者(推荐)

如果是 Kafka,把消费者实例扩几个:

# Docker Compose 临时扩容services:consumer:deploy:replicas:5# 从 1 临时改成 5

RabbitMQ 的话,多启动几个消费者实例就行。

注意:Kafka 一个分区只能被一个消费者消费,所以扩容数量别超过分区数。

方案二:提高消费并行度

@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(ConsumerFactory<String,String>consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setConcurrency(10);// 默认1,临时调大成10returnfactory;}

RabbitMQ 的话,调整 prefetch 数量:

factory.setPrefetchCount(50);// 默认250,太大了会积压太久,太小了吞吐量上不去

方案三:跳过积压消息,先消费最新的

有时候积压太多了,处理完要很久。这时候可以先跳过旧消息:

Kafka 方案:重置 offset 到最新

# 把消费者组的 offset 重置到最新kafka-consumer-groups.sh --bootstrap-server localhost:9092\--groupmy-consumer-group\--topicorder\--reset-offsets --to-latest\--execute

注意:这个方案会丢消息,只有在业务能接受丢消息的时候才用。

方案四:临时写个脚本消费掉

直接写个简单脚本,扫队列然后批量处理:

@ServicepublicclassFastConsumerService{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidconsumeAndDiscard(StringqueueName,intlimit){for(inti=0;i<limit;i++){Messagemessage=rabbitTemplate.receive(queueName,1000);if(message==null){break;}// 空消费,直接确认。积压太多可以用这个临时方案// 但记得后续要补偿处理这些消息!}}}

再次强调:空消费会丢消息,后续一定要有补偿机制。

排查积压的根本原因

处理完积压只是治标,找到根因才是治本。

看消费者日志

消费者有没有报错?有没有超时?有没 BLOCKED?

# RabbitMQ 查看连接状态rabbitmqctl list_connections pid channels# 查看消费者是否正常rabbitmqctl list_consumers queue_name

看监控

RabbitMQ 管理界面:http://localhost:15672

Kafka 使用 Kafka Eagle 或者 JMX 监控:

# 开启 JMX KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true"

常见坑

数据库连接池打满:消费者查询数据库,连接池耗尽,所有消费线程都在等连接。

对外接口超时:消费者调第三方接口,接口响应慢,消费速度断崖式下降。

内存泄漏:消费者 OOM 了但没死透,勉强撑着但几乎不消费。

怎么防止积压

几点经验:

  1. 监控先行。LAG 超过阈值就告警,别等老板@你。
  2. 消费者要快。IO 操作异步化,能批量处理就批量。
  3. 分区/队列数要够。Kafka 分区数决定了最大并发消费数。
  4. 做好限流。上游流量突增时,MQ 要能扛住。

总结

场景推荐方案
消费者挂了重启/扩容消费者
消费太慢提高并发度、优化消费逻辑
积压太多,业务能接受丢消息重置 offset 到最新
积压太多,业务不能丢消息扩容 + 慢慢消费 + 加补偿

积压不可怕,怕的是没监控、没预案。下次遇到这种情况,希望你能从容应对。

http://www.jsqmd.com/news/630969/

相关文章:

  • ROS导航避坑指南:手把手教你调优Costmap的5个关键参数(附YAML配置详解)
  • 3DS模拟器Citra:4步实现经典游戏在PC上的完美重生
  • 7B参数模型在消费级GPU上的极限:Token生成速度优化全记录
  • 动手学深度学习——语义分割
  • C++模板元编程理论基础简介
  • 为什么92%的AI平台租户隔离形同虚设?2026奇点大会首席架构师亲授内存级隔离内核原理
  • 不用装软件!这款MicroPython浏览器 IDE :让你在手机上也能调试树莓派 Pico伪
  • 情感粒度从“喜怒哀惧”粗分类→“羞耻性犹豫”“制度性疲惫”等37维亚情绪谱系:2026奇点大会定义下一代情感分析黄金标准
  • 2026 天津复读教育服务行业天津辅仁学校白皮书 - 外贸老黄
  • 电容滤波在电源设计中的关键作用与优化策略
  • 从零搭建多租户大模型计费中台:基于Prometheus+OpenTelemetry+Apache Calcite的实时分账系统实战
  • 动手学深度学习——语义分割数据集
  • 苹果CMS10搭建电视直播站点的3个隐藏技巧(含M3U8格式处理)
  • ROS图像传输优化:如何用CompressedImage减少80%带宽消耗(附代码对比)
  • 【大模型公平性工程化落地指南】:20年AI架构师亲授3大可量化评估框架与5个避坑实战案例
  • Python网络爬虫实战
  • 2026 天津复读学校实测评测:天津辅仁学校办学全维度体验报告 - 外贸老黄
  • test 10
  • 【绝密白皮书节选】某千亿参数大模型量产项目中,如何将评估周期从14天压缩至22分钟——自动化评估引擎架构图首次解密
  • 给肿瘤学研一新生的SEER数据库‘生存指南’:从零申请账号到完成你的第一个趋势分析图表
  • 免费查AI率发现超标怎么办?这份免费降AI率攻略请收好
  • Python3.10镜像使用全解析:Jupyter和SSH两种方式,满足不同开发需求
  • 我用 AI 辅助开发了一系列小工具():文件提取工具账
  • ESP居然能当 DNS 服务器用?内含NCSI欺骗和DNS劫持实现毖
  • Linux内核中的内存分配器详解
  • 专业的东莞geo优化哪个好推荐 - 企业推荐官【官方】
  • SolidWorks2020安装与破解全流程详解
  • Win11Debloat终极指南:免费Windows系统优化工具完整教程
  • Dify平台快速部署Qwen3-ASR-1.7B语音识别模型指南
  • 告别硬编码!用Go的expr表达式引擎5分钟搞定电商促销规则动态配置