当前位置: 首页 > news >正文

(七)Spring Cloud Alibaba 2023.x:RocketMQ 消息队列配置与实现

目录

前言

准备

安装RocketMq服务

下载rocketmq服务

下载rocketmq 控制台

项目集成

引入依赖

生产者服务配置

消费者服务配置

发送队列消息


前言

在微服务架构中,异步消息通信是实现系统解耦、提高性能和增强系统可靠性的重要手段。在 Spring Cloud Alibaba 生态中,RocketMQ 与 Spring Boot 深度集成,提供了开箱即用的消息通信解决方案,极大地简化了开发流程,提升了系统的扩展性和可维护性 。

准备

  • jdk17+
  • maven3.9.4+
  • idea2023
  • spring cloud: 2023.0.1.0
  • spring cloud alibaba: 2023.0.1

源码获取:GitHub - /spring-cloud-alibaba-base-demo: 基于spring cloud alibaba生态快速构建微服务脚手架

安装RocketMq服务

本地window系统安装rocketmq服务

下载rocketmq服务

当前博文版本使用的是:5.2

官方下载地址:下载 | RocketMQ

百度网盘地址:百度网盘 请输入提取码 提取码: 92h6

下载完成解压

在window系统中本地启动rocketmq服务需要配置环境变量,步骤和配置与jdk配置相似

Path变量中添加环境变量

配置完成后进入本地rocketmq的bin文件夹中

首先双击mqnamesrv.cmd启动NameServer服务: 它的作用是提供服务发现、路由信息管理、Broker注册和客户端查询等功能

NameServer服务启动成功后,启动Broker服务:它的作用是消息的存储和传递

当前文件夹cmd执行命令:mqbroker -n localhost:9876

下载rocketmq 控制台

当前博文版本使用的是:2.0.1

官方下载地址:Releases · apache/rocketmq-dashboard

百度网盘地址:百度网盘 请输入提取码 提取码: 92h6

下载完成解压

需要我们自行对项目打包获取jar包

在当前文件夹中cmd执行命令打包:

mvn clean package -Dmaven.test.skip=true

在target文件中找到生成的jar

执行命令启动

java -jar rocketmq-dashboard-2.0.1.jar

浏览器访问地址:http://localhost:8080

注:如果rocketmq的服务地址或者端口进行过调整,那么在打包前自行到项目的application.yml配置文件中更改后再打包

项目集成

根据前面的博文,目前我们已经创建了三个微服务

引入依赖

分别在子模块生产者和消费者服务的pom.xml文件中引入依赖

<!-- 引入消息队列 stream rocketmq --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
生产者服务配置

yml文件配置

spring: application: name: http-cloud-producer cloud: stream: #消息中间件 rocketmq: binder: name-server: localhost:9876 bindings: producer-out-0: producer: group: output_1 bindings: producer-out-0: destination: topic0 producer-out-1: destination: topic1

相关参数/值解释:

output_1 :自定义的消费者组名称

producer-out-0 :自定义的生产者通道名称

topic0 :自定义队列名称

生成者发送消息服务类实现

@Service public class MessageProducer { // 自动注入配置文件中绑定的通道 @Autowired private StreamBridge streamBridge; // 消息通道 public void sendMessageToOutput0(String messageContent) { Map<String, Object> headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "tag0"); Message<String> msg = new GenericMessage<>(messageContent, headers); streamBridge.send("producer-out-0", msg); } public void sendMessageToOutput1(String messageContent) { Map<String, Object> headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "tag1"); Message<String> msg = new GenericMessage<>(messageContent, headers); streamBridge.send("producer-out-1", msg); } }

定义控制层接口

@RestController public class DemoController { @Autowired private MessageProducer messageProducer; /** * 发送消息队列 topic0 * @param message * @return */ @GetMapping(value = "/test/mq/topic0") String sendMqTopic0(@RequestParam("message") String message) { messageProducer.sendMessageToOutput0(message); return "topic0消息发送成功了~~~"; } /** * 发送消息队列 topic1 * @param message * @return */ @GetMapping(value = "/test/mq/topic1") String sendMqTopic1(@RequestParam("message") String message) { messageProducer.sendMessageToOutput1(message); return "topic1消息发送成功了~~~"; } }
消费者服务配置

配置yml文件

spring: application: name: http-cloud-consumer cloud: stream: #消息中间件 rocketmq: binder: name-server: localhost:9876 bindings: consumer0-in-0: consumer: messageModel: CLUSTERING consumer1-in-0: consumer: messageModel: CLUSTERING bindings: consumer0-in-0: destination: topic0 group: clustering-consumer consumer1-in-0: destination: topic1 group: clustering-consumer

相关配置解释:

consumer0-in-0:自定义的消费者通道名称

messageModel:消费者消费模式(集群或者是广播)。本文配置的是集群模式。

topic0 :需要监听的消费者队列名称

clustering-consumer :自定义的消费者组名称

实现消费者mq消费类

@Component public class MessageConsumer { /** * 通过方法名称自动绑定到符合条件的消费者通道 * (注:当前遇到的难点是一个消费者服务只能实现一个消费方法,实现多个消费方法会使消费功能失效) * @return */ @Bean public Consumer<Message<String>> consumer1() { return msg -> { System.out.println(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg); }; } }

注:从配置可以看到,消费者监听了2个队列,但上面的MessageConsumer 类只有一个消费方法,目前版本mq会自动将配置在第一个的队列和上面的方法绑定,但是如果需要监听对多队列并进行消费,目前版本博主还未找到实现方案,后续有方案再补上。

发送队列消息

请求接口给队列topic0发送消息

查看消费者服务控制台打印成功

查看rocketMq控制台消息记录


至此Spring Cloud Alibaba集成RocketMq消息队列完成了。

http://www.jsqmd.com/news/478965/

相关文章:

  • ChatTTS 调试实战:从日志分析到性能优化的完整指南
  • 企业碳排放权交易会计信息处理规范 免费下载
  • Hunyuan-MT 7B Java面试题翻译工具:技术招聘国际化解决方案
  • C++ 中文输出乱码?一篇博客彻底搞定
  • 5个技巧让Unity资源提取效率提升10倍:告别编辑器依赖的轻量级解决方案
  • (五)Spring Cloud Alibaba 2023.x:Seata 分布式事务配置与实现
  • LangGraph.js 核心概念:State / Node / Edge 一文讲透
  • AI智能体编码 skeptic 的 Rust 性能优化实战
  • 198.arctan与arctan2鉴相器的区别
  • Claude生成式UI的逆向与利用
  • YOLOv13全网首发:CVPR2026 Transformer注意力 | BinaryAttention 1-bit注意力,推理提速100%,超越FlashAttention2
  • QQ空间记忆守护:用技术为青春时光建一座数字档案馆
  • C++】面向对象编程:继承与多态的魅力
  • Embedding和向量数据库
  • 深入剖析C++文件操作的底层机制
  • 字符串反转和统计字符串中字符类型及频次
  • [特殊字符] 龍魂系统执行日志|2026-03-13|底层宪法数据库落地·30天最后宣言·打破的根在道统不在霸权
  • 精准删除:掌握SQL中的DELETE语句
  • OmoFun 1.1.4 | 追番神器官方APP下载.官网入口
  • Python 数据结构示例
  • Tensflow学习第T1周打卡
  • test_1_2026
  • 基于单片机的蔬菜大棚数据采集系统的设计
  • 【day52】
  • 端侧AI 的定义与发展背景
  • 03.SpringAI 使用FunctionCalling实现智能客服
  • 基于单片机的土壤墒情监测系统的设计与实现
  • OpenClaw技术架构深度解析:原理、核心与源码全面解读
  • 2026年3月口碑好的压电式传感器厂家推荐TOP - 品牌推荐用户报道者
  • 【亲测】2026年OpenClaw(Clawdbot)零技术点几下秒级安装教程