当前位置: 首页 > news >正文

记一次生产环境MQ队列积压150W问题分析与解决方案

# MQ队列积压150W+问题分析与解决方案报告 ## 一、背景描述 ### 1.1 问题现象 - **队列积压量**:150W+ 消息 - **影响范围**:消息消费延迟严重,队列持续增长 - **风险等级**:🔴 **高危** - 存在MQ服务器内存溢出及宕机风险 ### 1.2 根因分析

┌─────────────────────────────────────────────────────────┐
│ 问题架构图示 │
├─────────────────────────────────────────────────────────┤
│ Producer ──► [MQ队列: 150W+积压] ◄── Consumer │
│ (生产者) 消息无差别投递 (消费者端过滤) │
│ │
│ ❌ 问题:过滤逻辑后置,导致无效消息大量堆积 │
│ │
│ 消费者处理流程: │
│ 接收消息 → 计算MD5 → 查重判断 → 重复则丢弃 │
│ ↑___________________________________________↓ │
│ (高CPU消耗操作在消费端执行) │
└─────────────────────────────────────────────────────────┘

| 维度 | 现状问题 | 理想状态 | |:---|:---|:---| | **过滤位置** | 消费者端执行 | 生产者端执行 | | **资源消耗** | 150W+次MD5计算 | 0次无效消息投递 | | **队列压力** | 无效消息占用存储 | 仅有效消息入队 | | **消费延迟** | 严重延迟 | 实时处理 | --- ## 二、MQ管理端操作简介 ### 2.1 常用管理工具 | 工具 | 访问方式 | 核心功能 | |:---|:---|:---| | **RabbitMQ Management** | `http://host:15672` | 可视化监控、队列管理 | | **RocketMQ Console** | 部署Web控制台 | Topic/ConsumerGroup管理 | | **Kafka UI (Kowl/AKHQ)** | 独立部署 | 分区监控、消息查询 | ### 2.2 关键监控指标 ```bash # RabbitMQ 命令行查看队列深度 rabbitmqctl list_queues name messages_ready messages_unacknowledged # 输出示例 # name messages_ready messages_unacknowledged # task_queue 1523421 0

2.3 积压应急操作(⚠️ 谨慎执行)

操作命令/路径适用场景
查看队列状态Queues → 队列名 → Get messages诊断消息内容
Purge清空队列Queues → 队列名 → Purge本次采用
Delete删除队列Queues → 队列名 → Delete重建队列
消费速率监控Overview → Message rates评估处理能力

Purge操作截图示意

RabbitMQ Management → Queues → [队列名] → [Purge Messages] 按钮 → 确认"Are you sure?"

三、解决方案

3.1 临时方案:Purge清空队列

执行步骤
# 1. 确认积压队列名称rabbitmqctl list_queues|greptask# 2. 评估影响(可选:备份部分消息)# 通过管理界面导出或消费端采样# 3. 执行Purge(管理界面或API)curl-uuser:pass-XDELETE http://mq-host:15672/api/queues/%2f/task_queue/contents# 4. 验证清理结果rabbitmqctl list_queues name messages_ready
风险控制
风险点应对措施
误删有效消息业务低峰期执行;提前通知业务方
消息丢失不可恢复明确接受临时方案的数据损失
消费者空转临时降低消费者实例数

3.2 长久方案:前置过滤逻辑

架构改造
┌─────────────────────────────────────────────────────────┐ │ 优化后架构图示 │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 任务生产 │───►│ MD5查重服务 │───►│ MQ队列 │ │ │ │ (Producer) │ │ (新增) │ │ (精简有效) │ │ │ └─────────────┘ └─────────────┘ └──────┬──────┘ │ │ ↑______________________________│ │ │ │ │ 重复任务直接过滤 │ ▼ │ │ └──────────────────────────────┘ ┌─────────────┐│ │ │ Consumer ││ │ │ (纯业务处理) ││ │ └─────────────┘│ │ │ │ ✅ 收益:MD5计算前置,无效消息0入队,队列压力降低90%+ │ └─────────────────────────────────────────────────────────┘
代码改造示例
// ========== 改造前:消费者端过滤(问题代码) ==========@ComponentpublicclassTaskConsumer{@RabbitListener(queues="task_queue")publicvoidconsume(Messagemessage){StringfilePath=parseMessage(message);// ❌ 问题:高耗操作在消费端,无效消息已占用队列Stringmd5=calculateMd5(filePath);// 150W+次执行if(md5Cache.exists(md5)){log.warn("重复文件,丢弃: {}",filePath);return;// 消息已投递,资源已浪费}processBusiness(filePath);// 实际业务}}// ========== 改造后:生产者端过滤 ==========@ServicepublicclassTaskProducer{@AutowiredprivateMd5Servicemd5Service;@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidproduceTask(StringfilePath){// ✅ 优化:入队前完成过滤Stringmd5=md5Service.calculateMd5(filePath);if(md5Service.isDuplicate(md5)){log.info("重复文件,跳过投递: {}",filePath);return;// 直接过滤,不占用MQ资源}// 仅有效消息入队TaskMessagemsg=newTaskMessage(filePath,md5);rabbitTemplate.convertAndSend("task_exchange","task_routing",msg);}}@ComponentpublicclassOptimizedConsumer{@RabbitListener(queues="task_queue")publicvoidconsume(TaskMessagemessage){// ✅ 消费端专注业务,无需重复计算MD5processBusiness(message.getFilePath());}}
配套优化:MD5查重服务
@ServicepublicclassMd5Service{// 方案1:Redis Set(推荐,O(1)查询)@AutowiredprivateStringRedisTemplateredisTemplate;publicbooleanisDuplicate(Stringmd5){Booleanadded=redisTemplate.opsForSet().add("md5:set",md5);return!Boolean.TRUE.equals(added);// 已存在返回true}// 方案2:BloomFilter(超大规模,允许微量误判)@AutowiredprivateRBloomFilter<String>bloomFilter;publicbooleanmightDuplicate(Stringmd5){if(!bloomFilter.contains(md5)){bloomFilter.add(md5);returnfalse;// 一定不重复}returntrue;// 可能重复,需二次确认}}

四、方案对比与收益

指标改造前临时方案长久方案
队列积压150W+持续增长清零维持低位
MD5计算次数150W+/批次-有效任务数
MQ存储压力极高缓解极低
消费延迟小时级恢复秒级
数据一致性最终一致可能丢失最终一致
实施成本-5分钟2-3天开发

五、实施时间线

Day 0 ──┬── 问题发现,队列积压150W+ │ Day 0 ──┼── [紧急] 执行Purge临时方案 ✅ 14:00 │ │ Day 1-2 ──┼── 开发长久方案:MD5查重服务 │ Day 3 ──┼── 联调测试,灰度发布 │ Day 4 ──┴── 全量上线,监控验证 ✅

六、经验总结

6.1 设计原则

“过滤前置,计算后置”— 昂贵操作尽量靠近数据源

6.2 监控建议

// 增加生产者端指标监控MeterRegistryregistry=...;registry.counter("mq.produce.filtered","reason","duplicate").increment();registry.counter("mq.produce.success").increment();

6.3 防护机制

  • 生产者端:限流 + 重复校验
  • MQ层:队列长度告警(阈值:10W)
  • 消费者端:消费速率监控 + 自动扩容

http://www.jsqmd.com/news/512889/

相关文章:

  • 云原生PLM为何能成为企业新宠?深度解析其核心优势与未来演进之路
  • Hive学习记录第一章
  • VOOHU——工业级千兆网络变压器选型要点:从宽温要求到封装选择
  • 计算机系统基础知识(七):软件篇之数据库系统详解
  • AI 学习测试文章
  • Java 集合框架工具类与性能优化实战
  • 英伟达旗舰发布年代图,H200,B200,B300参数对比图。 #AI计算#英伟达GPU
  • 跨境电商的下半场:从“引流”到“截流”,重构推特获客逻辑
  • 实战为王·数据说话,2026 AI超级员工真实效能测评
  • LeetCode hot100-238 除了自身以外数组的乘积
  • Ollie‘s EDI 对接指南:折扣零售巨头的供应链合规要点
  • 企业AI实战训练营培训总结:
  • labview异步调用子程序并运行子程序。 实现异步调用,不会卡死线程。 适合多个界面间相互跳转切换
  • Win11无线网卡驱动突然罢工?别急着重装系统,试试这个Intel官网驱动直装法
  • 3年数据分析从业者、统计专业背景:数据分析师工作具体要求及CDA二级备考经验
  • 实测椒图 AI:前端 / 设计 / 电商都能用的一站式图像工具,Nano Banana Pro 模型太稳了
  • 《Spring AI + 大模型全栈实战》学习手册系列 · 专题四:《Ollama 模型管理与调优:让 AI 模型在低配服务器上流畅运行》
  • 错误弹窗记录
  • 51单片机PID算法控制无刷直流电机Proteus仿真探索
  • Docker 环境下 Redis Lua 脚本部署与执行
  • 2026年厦门二手房装修公司推荐:厦门六区施工案例详解与环保材料选用指南 - 品牌推荐
  • 支付宝立减金别过期才想起,精选变现完整流程 - 淘淘收小程序
  • 避坑指南:Dify智能文档助手开发中常见的5个文件处理问题及解决方案
  • Windows家庭版升级专业版全攻略
  • gorm 中的Updates Update, Save,Create , UpdateColumn 区别与联系
  • AI时代,你的全球业务需要一张“会思考”的网络
  • Unity网络基础UDP客户端
  • Cortex-M3 异常处理机制的设计哲学
  • 25大数据 3-1 字符串函数
  • 102类农业害虫图像识别数据集分享(适用于YOLO系列深度学习分类检测任务)