当前位置: 首页 > news >正文

消息队列--RocketMQ 架构设计与优化

系列导读:本篇将深入讲解 RocketMQ 架构设计与生产环境最佳实践。


文章目录

    • 一、RocketMQ 架构
      • 1.1 核心组件
      • 1.2 组件说明
      • 1.3 与 Kafka 对比
    • 二、集群部署
      • 2.1 集群架构
      • 2.2 Docker 部署
      • 2.3 Broker 配置
    • 三、生产者最佳实践
      • 3.1 配置
      • 3.2 发送消息
    • 四、消费者最佳实践
      • 4.1 消费配置
      • 4.2 消费消息
    • 五、事务消息
      • 5.1 事务消息流程
      • 5.2 实现代码
    • 总结

一、RocketMQ 架构

1.1 核心组件

┌─────────────────────────────────────────────────────────────┐ │ RocketMQ 架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Producer ──► NameServer ──► Broker ──► Consumer │ │ │ │ │ │ │ ▼ │ │ │ CommitLog │ │ │ ConsumeQueue │ │ │ │ │ └─── 路由注册 ───► Topic 路由信息 │ │ │ └─────────────────────────────────────────────────────────────┘

1.2 组件说明

组件说明
NameServer路由注册中心,轻量级
Broker消息服务器,存储消息
Producer消息生产者
Consumer消息消费者

1.3 与 Kafka 对比

特性RocketMQKafka
吞吐量十万级百万级
延迟ms级ms级
事务消息支持不支持
延迟消息支持不支持
消息过滤支持不支持

二、集群部署

2.1 集群架构

┌─────────────────────────────────────────────────────────────┐ │ RocketMQ 集群 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └────────────────┼────────────────┘ │ │ │ │ │ ┌────────────────┼────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Broker-Master│ │ Broker-Master│ │ Broker-Master│ │ │ │ (主) │ │ (主) │ │ (主) │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │Broker-Slave │ │Broker-Slave │ │Broker-Slave │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘

2.2 Docker 部署

version:'3'services:namesrv:image:apache/rocketmq:5.1.0container_name:rmqnamesrvports:-"9876:9876"command:sh mqnamesrvbroker:image:apache/rocketmq:5.1.0container_name:rmqbrokerports:-"10911:10911"-"10909:10909"environment:NAMESRV_ADDR:"namesrv:9876"command:sh mqbroker-c /opt/rocketmq/conf/broker.confvolumes:-./broker.conf:/opt/rocketmq/conf/broker.conf-./logs:/opt/rocketmq/logs-./store:/opt/rocketmq/store

2.3 Broker 配置

# broker.conf brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 namesrvAddr = 192.168.1.100:9876;192.168.1.101:9876 defaultTopicQueueNums = 4 autoCreateTopicEnable = true autoCreateSubscriptionGroup = true listenPort = 10911 deleteWhen = 04 fileReservedTime = 48 mapedFileSizeCommitLog = 1073741824 mapedFileSizeConsumeQueue = 300000 diskMaxUsedSpaceRatio = 88 storePathRootDir = /opt/rocketmq/store maxMessageSize = 65536

三、生产者最佳实践

3.1 配置

@ConfigurationpublicclassRocketMQConfig{@BeanpublicDefaultMQProducerproducer(){DefaultMQProducerproducer=newDefaultMQProducer("order-producer-group");producer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");producer.setRetryTimesWhenSendFailed(3);producer.setSendMsgTimeout(3000);producer.setMaxMessageSize(4*1024*1024);// 4MBreturnproducer;}}

3.2 发送消息

@ServicepublicclassOrderProducer{@AutowiredprivateDefaultMQProducerproducer;// 同步发送publicSendResultsendSync(Orderorder)throwsException{Messagemessage=newMessage("order-topic","order-created",JSON.toJSONString(order).getBytes());returnproducer.send(message);}// 异步发送publicvoidsendAsync(Orderorder){Messagemessage=newMessage("order-topic","order-created",JSON.toJSONString(order).getBytes());producer.send(message,newSendCallback(){@OverridepublicvoidonSuccess(SendResultresult){log.info("发送成功: {}",result);}@OverridepublicvoidonException(Throwablee){log.error("发送失败",e);}});}// 延迟消息publicvoidsendDelay(Orderorder,intdelayLevel){Messagemessage=newMessage("order-topic",JSON.toJSONString(order).getBytes());message.setDelayTimeLevel(delayLevel);// 1-18producer.send(message);}}

四、消费者最佳实践

4.1 消费配置

@ConfigurationpublicclassRocketMQConsumerConfig{@BeanpublicDefaultMQPushConsumerconsumer(){DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("order-consumer-group");consumer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");consumer.setConsumeThreadMin(10);consumer.setConsumeThreadMax(20);consumer.setConsumeMessageBatchMaxSize(10);returnconsumer;}}

4.2 消费消息

@ComponentpublicclassOrderConsumer{@PostConstructpublicvoidstart()throwsMQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("order-consumer-group");consumer.setNamesrvAddr("192.168.1.100:9876");consumer.subscribe("order-topic","order-created");consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContextcontext){for(MessageExtmsg:msgs){try{Orderorder=JSON.parseObject(newString(msg.getBody()),Order.class);processOrder(order);}catch(Exceptione){log.error("消费失败",e);returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}

五、事务消息

5.1 事务消息流程

1. 发送半消息(Half Message) 2. 执行本地事务 3. 提交或回滚消息 4. 事务回查机制

5.2 实现代码

@ServicepublicclassOrderTransactionProducer{@AutowiredprivateTransactionMQProducerproducer;publicvoidsendTransactionMessage(Orderorder){Messagemessage=newMessage("order-topic",JSON.toJSONString(order).getBytes());producer.sendMessageInTransaction(message,order);}}// 事务监听器@ComponentpublicclassOrderTransactionListenerimplementsTransactionListener{@AutowiredprivateOrderServiceorderService;@OverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{Orderorder=(Order)arg;orderService.createOrder(order);returnLocalTransactionState.COMMIT_MESSAGE;}catch(Exceptione){returnLocalTransactionState.ROLLBACK_MESSAGE;}}@OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmsg){// 事务回查Orderorder=JSON.parseObject(newString(msg.getBody()),Order.class);if(orderService.exists(order.getId())){returnLocalTransactionState.COMMIT_MESSAGE;}returnLocalTransactionState.ROLLBACK_MESSAGE;}}

总结

RocketMQ 架构:NameServer、Broker
集群部署:主从架构、配置优化
生产者实践:同步、异步、延迟消息
消费者实践:并发消费、批量消费
事务消息:半消息、事务回查

下篇预告:消息队列选型对比指南


作者:刘~浪地球
系列:消息队列(三)
更新时间:2026-04-13

http://www.jsqmd.com/news/634376/

相关文章:

  • Qt Creator + OpenCV 4.x 处理大图不崩溃?手把手教你从32位迁移到64位环境(附MinGW-w64编译避坑指南)
  • 中石化加油卡回收2026新价格,回收注意以下几点 - 猎卡回收公众号
  • HTML到DOCX转换技术深度解析:企业级文档生成解决方案
  • 从像素到点云:线激光三维重建的完整技术链路解析
  • 手把手教你用Verilog实现SLVS-EC接口接收器(附完整代码解析)
  • 如何轻松下载macOS完整安装包:Download Full Installer终极指南
  • 终极指南:如何一键解锁联想拯救者Insyde BIOS隐藏设置
  • 专业无线安全分析:Universal Radio Hacker完全实战指南
  • 2026物业小程序开发公司推荐,麦冬科技定制化服务深度解析(附带联系方式) - 品牌2025
  • 5步构建高效无线充电系统:从零到竞赛级的完整指南
  • 阶段零:AI四大核心应用场景
  • 5个超实用技巧:掌握猫抓浏览器扩展的核心玩法
  • 模型调度失衡导致P99延迟飙升2.3秒,深度解析Agent编排器的4级负载均衡重构方案
  • AnimateAnyone项目配置优化:3种高效方法实现稳定运行与性能提升
  • Buzz项目深度GPU加速指南:从零配置到性能优化
  • 2026 年广东省内佛山高端翡翠镶嵌设计六大品牌排名及解析 - 十大品牌榜
  • BLE与WiFi技术演进对比:从室内定位到物联网应用
  • Jasper Gold C2RTL:如何高效验证RTL与C模型的一致性
  • Python运行环境故障排查:从‘Can‘t find a default Python‘到完美修复
  • MATLAB科研图表终极指南:用export_fig实现完美学术图像输出 [特殊字符]
  • 第一章《网络信息安全概述》
  • FreakStudio萍
  • 难转染细胞救星:Polysciences PEI MAX与PEI 25K选型指南|曼博生物官方独家代理 - 上海曼博生物
  • 2026实战|AI生成代码工具选型与避坑指南(附实操案例)
  • 快速部署MBTI 人格测试网站App | 附源码
  • APK-Installer:Windows原生运行安卓应用的终极解决方案指南
  • 物联网浏览器(IoTBrowser)-js开发人脸识别椎
  • RoboSense RS-LIDAR-16实战指南:从可视化工具到数据解析全流程
  • 快速开发小程序公司:2026年北京麦冬科技定制服务解析(附带联系方式) - 品牌2025
  • 告别手绘!用Midjourney的‘局部重绘’和‘自定义缩放’功能,精细调整你的地质示意图