当前位置: 首页 > news >正文

SpringBoot3集成RocketMq

一、简介

RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

二、环境部署

1、编译打包

1、下载5.0版本源码包 rocketmq-all-5.0.0-source-release.zip 2、解压后进入目录,编译打包 mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

2、修改配置

在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh

distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh

3、服务启动

1、该目录下 distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/ 2、启动NameServer sh mqnamesrv 输出日志 The Name Server boot success. serializeType=JSON 3、启动Broker+Proxy sh mqbroker -n localhost:9876 --enable-proxy 输出日志 rocketmq-proxy startup successfully 4、关闭服务 sh mqshutdown namesrv Send shutdown request to mqnamesrv(18636) OK sh mqshutdown broker Send shutdown request to mqbroker with proxy enable OK(18647)

4、控制台安装

1、下载master源码包 rocketmq-dashboard-master 2、解压后进入目录,编译打包 mvn clean package -Dmaven.test.skip=true 3、启动服务 java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar 4、输出日志 INFO main - Tomcat started on port(s): 8080 (http) with context path '' 5、访问服务:localhost:8080

三、工程搭建

1、工程结构

2、依赖管理

rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-starter.version}</version> </dependency>

3、配置文件

配置RocketMq服务地址,消息生产者和消费者;

rocketmq: name-server: 127.0.0.1:9876 # 生产者 producer: group: boot_group_1 # 消息发送超时时间 send-message-timeout: 3000 # 消息最大长度4M max-message-size: 4096 # 消息发送失败重试次数 retry-times-when-send-failed: 3 # 异步消息发送失败重试次数 retry-times-when-send-async-failed: 2 # 消费者 consumer: group: boot_group_1 # 每次提取的最大消息数 pull-batch-size: 5

4、配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;

@Configuration public class RocketMqConfig { @Value("${rocketmq.name-server}") private String nameServer; @Value("${rocketmq.producer.group}") private String producerGroup; @Value("${rocketmq.producer.send-message-timeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.max-message-size}") private Integer maxMessageSize; @Value("${rocketmq.producer.retry-times-when-send-failed}") private Integer retryTimesWhenSendFailed ; @Value("${rocketmq.producer.retry-times-when-send-async-failed}") private Integer retryTimesWhenSendAsyncFailed ; @Bean public RocketMQTemplate rocketMqTemplate(){ RocketMQTemplate rocketMqTemplate = new RocketMQTemplate(); rocketMqTemplate.setProducer(defaultMqProducer()); return rocketMqTemplate; } @Bean public DefaultMQProducer defaultMqProducer() { DefaultMQProducer producer = new DefaultMQProducer(); producer.setNamesrvAddr(this.nameServer); producer.setProducerGroup(this.producerGroup); producer.setSendMsgTimeout(this.sendMsgTimeout); producer.setMaxMessageSize(this.maxMessageSize); producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed); return producer; } }

四、基础用法

1、消息生产

编写一个生产者接口类,分别使用RocketMQTemplateDefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;

@RestController public class ProducerWeb { private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class); @Autowired private RocketMQTemplate rocketMqTemplate; @GetMapping("/send/msg1") public String sendMsg1 (){ try { // 构建消息主体 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg")); // 发送消息 rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody); } catch (Exception e) { e.printStackTrace(); } return "OK" ; } @Autowired private DefaultMQProducer defaultMqProducer ; @GetMapping("/send/msg2") public String sendMsg2 (){ try { // 构建消息主体 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg")); // 构建消息对象 Message message = new Message(); message.setTopic("boot-mq-topic"); message.setTags("boot-mq-tag"); message.setKeys("boot-mq-key"); message.setBody(msgBody.getBytes()); // 发送消息,打印日志 SendResult sendResult = defaultMqProducer.send(message); log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } return "OK" ; } }

2、消息消费

编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;

@Service @RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic") public class ConsumerListener implements RocketMQListener<String> { private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); @Override public void onMessage(String message) { log.info("\n=====\n message:{} \n=====\n",message); } }

http://www.jsqmd.com/news/745900/

相关文章:

  • 基于RAG与Slack的AI知识助手myGPTReader:从原理到部署实践
  • 2026年5月台州临海知名装修公司深度评测:谁是真正“闭眼入”的高性价比口碑之选? - 疯一样的风
  • 别再乱引JQuery了!3.4.1版本XSS漏洞实战复现与安全升级指南
  • 告别本地Chrome!用Docker和K8s部署Headless Chrome,Java远程调用实战(附完整YAML)
  • 2026年5月宁波知名装修设计公司口碑榜:品质与服务之选权威推荐 - 疯一样的风
  • Anno 1800 Mod Loader完全掌握:终极模组加载解决方案深度解析
  • Java 25 外部函数接口性能暴增背后的代价:你敢在K8s容器中启用MemorySession吗?3个OOM崩溃现场还原
  • RePKG:解锁Wallpaper Engine创意资源的专业工具
  • Python风控模型上线前必做的7项压力测试:银行级合规验证流程全公开
  • 房产中介房源系统排名
  • 靠谱住家保姆选购全指南:从需求匹配到权益保障解析 - 奔跑123
  • 从Stack Overflow错误提问看介词:你的‘in the code’和‘on the code’用对了吗?
  • 从JustTrustMe到实战:手把手教你用Xposed Hook绕过App的SSL证书校验(Android安全测试必备)
  • BaiduPCS-Go错误码速查手册:5分钟掌握常见问题解决方法
  • 5分钟搞定Waydroid:Linux上运行Android应用的终极指南
  • 2026年3月有实力的农村自建别墅施工公司推荐,农村自建别墅/自建房农村别墅/轻钢别墅,农村自建别墅改造公司选哪家 - 品牌推荐师
  • STM32CubeIDE + FreeRTOS:如何高效定制你的FreeRTOSConfig.h文件?
  • 保姆级教程:用状态控制法和直接赋值法玩转蓝桥杯单片机LED(附完整工程)
  • 保姆级教程:用树莓派4B+DHT22传感器,5分钟搞定OneNET物模型数据上云
  • 2026最新!踩过7个坑亲测,这3款一边录音一边转文字的免费神器好用到哭!
  • Spring Boot项目启动报SLF4J警告?别慌,5分钟教你用Maven排除法搞定Logback与slf4j-simple冲突
  • 手把手用Python+SI仿真工具(以Sigrity PowerSI为例)量化分析:你的PCB走线在10GHz下到底衰减了多少dB?
  • 5步掌握FileMeta:Windows文件智能管理终极方案
  • 实时字幕:小白转文字悬浮字幕功能介绍
  • YahooFinanceApi架构解析:.NET金融数据获取的技术实现与企业级应用
  • Java低代码引擎如何实现“拖拽即编译”?:深度解析AST动态解析、字节码注入与运行时沙箱三大关键技术
  • 从TypeError到高效数据处理:用列表推导式和NumPy彻底告别‘序列乘浮点’烦恼
  • 从Spring Boot到Quarkus再到Micrometer Edge Agent:Java边缘Runtime演进路线图(2024Q3最新版,含废弃技术预警)
  • 为什么你的压测结果和生产环境相差5倍?Java中间件适配测试必须校准的4个关键时序指标
  • 从零到上线:一个PHP后台+微信小程序前端的公司官网全栈开发实录