当前位置: 首页 > news >正文

RabbitMQ 从入门到实战!一文搞懂核心交换机 + Spring Boot 整合,附完整代码

🔥个人主页:代码不加冰(欢迎来访)
🎬作者简介:java后端学习者
❄️个人专栏:LeetCode刷题日记 , 苍穹外卖日记,SSM框架深入,JavaWeb,
命运的结局尽可永在,不屈的挑战却不可须臾或缺!

前言:

大家好我是代码不加冰,前面我们安装完成 了RabbitMQ,并对控制台操作进行了大致的了解,通过一个案例来进行实践,这里我们继续深入学习一下。

摘要:

消息队列是分布式系统的核心组件,而 RabbitMQ 凭借其稳定性和灵活性成为首选。本文从零开始,手把手带你掌握 RabbitMQ 的三大核心交换机:

📢 Fanout 广播模式:支付成功后同时通知订单、积分、短信系统,一呼百应;
🎯 Direct 直连模式:ERROR 日志发钉钉告警、INFO 日志写文件归档,精准路由;
🔀 Topic 主题模式:外卖平台通过order.#user.*等通配符实现灵活订阅,微服务解耦利器。

文章提供完整的 Spring Boot 整合代码,涵盖RabbitTemplate发送消息、@RabbitListener注解消费、YAML 配置详解,以及 Work Queue 工作队列的公平分发机制。通过快递分拣、日志处理、事件总线等真实场景类比,让复杂概念一目了然。

无论你是新手还是进阶者,这篇保姆级教程都能让你快速上手 RabbitMQ,写出生产级别的消息驱动代码!

前置知识:

AMQP = Advanced Message Queuing Protocol(高级消息队列协议)

简单说:它是一种标准化的"消息传递规则",让不同的程序之间可以跨语言、跨平台地通信。

通俗理解

把 AMQP 想象成快递行业的统一标准

场景快递行业AMQP
标准规范快递包裹有统一尺寸、面单格式消息有固定格式和规则
发送方你(寄件人)生产者(Producer)
中转站快递分拣中心交换机(Exchange)
目的地你的地址/快递柜队列(Queue)
接收方收件人消费者(Consumer)

有了这个标准:顺丰寄出的包裹,中通能送到;同样的道理,Python 写的生产者发的消息,Java 写的消费者能收到。

有 AMQP 时(统一标准):
text Python程序 ──→ │ Java程序 ──→ │ AMQP协议 ──→ 任何支持AMQP的消息中间件 Go程序 ──→ │
AMQP 的核心概念
AMQP 概念RabbitMQ 实现作用
Producer生产者代码发消息的一方
Consumer消费者代码收消息的一方
Exchange交换机(Direct、Topic、Fanout等)决定消息去哪个队列
Queue队列存消息的地方
Binding绑定关系把交换机和队列连接起来
Virtual Host虚拟主机隔离环境,类似数据库的schema
Spring AMQP 是什么

Spring AMQP是 Spring 框架对 AMQP 协议的封装和增强,让你用 Spring 的方式操作 RabbitMQ,而不需要直接写原生的 AMQP 代码。

Spring AMQP = Spring 团队写的 RabbitMQ 便捷工具包


Spring AMQP 的核心组件
组件作用原生 AMQP 对应
RabbitTemplate发送消息的工具类Channel.basicPublish
@RabbitListener监听队列、自动消费手写 Consumer 循环
MessageConverter自动转 Java 对象为消息手动序列化
RabbitAdmin自动声明队列/交换机手动调用 queueDeclare

java客户端入门案例:

1、创建 Spring Boot 项目

在 IDEA 中:File → New → Project → Spring Initializr

依赖选择:

  • Spring Web(可选)

  • Spring Boot DevTools(可选)


2、pom.xml 依赖

xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.14</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>rabbitmq-springboot-demo</artifactId> <version>1.0.0</version> <properties> <java.version>11</java.version> </properties> <dependencies> <!-- Spring Boot AMQP Starter(包含RabbitMQ) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- Spring Boot Web(可选,用于测试) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Lombok(可选,简化代码) --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- 测试依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>

3、application.yml 配置文件

yaml spring: rabbitmq: # 基础连接配置 host: localhost port: 5672 username: admin password: 123456 virtual-host: / # 连接池配置(可选) connection-timeout: 5000 # 连接超时(毫秒) # 生产者确认机制(可选) publisher-confirm-type: correlated # 开启发送确认 publisher-returns: true # 开启消息返回 # 消费者配置(可选) listener: simple: acknowledge-mode: auto # 自动确认 concurrency: 3 # 最小消费者数 max-concurrency: 10 # 最大消费者数 prefetch: 1 # 每次拉取消息数 retry: enabled: true # 开启重试 max-attempts: 3 # 最大重试次数 initial-interval: 1000 # 重试间隔(毫秒) # 自定义配置(项目相关) rabbitmq: queue: name: hello_queue exchange: name: hello_exchange routing: key: hello_routing_key

4、配置类(读取YAML)

方式一:使用 @ConfigurationProperties

java import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "rabbitmq") public class RabbitMQProperties { private QueueConfig queue = new QueueConfig(); private ExchangeConfig exchange = new ExchangeConfig(); private RoutingConfig routing = new RoutingConfig(); @Data public static class QueueConfig { private String name; } @Data public static class ExchangeConfig { private String name; } @Data public static class RoutingConfig { private String key; } }

5、RabbitMQ 配置类

java import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${rabbitmq.queue.name}") private String queueName; @Value("${rabbitmq.exchange.name}") private String exchangeName; @Value("${rabbitmq.routing.key}") private String routingKey; /** * 创建连接工厂 */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtualHost); // 开启发布确认 factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); factory.setPublisherReturns(true); return factory; } /** * 创建 RabbitTemplate(用于发送消息) */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); // 设置强制消息路由失败后返回 template.setMandatory(true); return template; } /** * 创建 RabbitAdmin(用于声明队列、交换机、绑定) */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } /** * 声明队列 */ @Bean public Queue queue() { // 参数:name, durable(持久化), exclusive(独占), autoDelete(自动删除) return new Queue(queueName, true, false, false); } /** * 声明直连交换机 */ @Bean public DirectExchange exchange() { return new DirectExchange(exchangeName, true, false); } /** * 绑定队列到交换机 */ @Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routingKey); } }

6、生产者服务(发送消息)

java import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Slf4j @Service @RequiredArgsConstructor public class MessageProducer { private final RabbitTemplate rabbitTemplate; @Value("${rabbitmq.exchange.name}") private String exchangeName; @Value("${rabbitmq.routing.key}") private String routingKey; /** * 发送消息 */ public void sendMessage(String message) { // 生成唯一ID用于追踪 CorrelationData correlationId = new CorrelationData(java.util.UUID.randomUUID().toString()); log.info("📤 准备发送消息: {}", message); // 发送消息 rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationId); // 设置发送确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("✅ 消息发送成功! correlationId: {}", correlationData.getId()); } else { log.error("❌ 消息发送失败! cause: {}", cause); } }); log.info("📤 消息已发送到交换机: {}, 路由键: {}", exchangeName, routingKey); } /** * 发送对象消息(自动转JSON) */ public void sendObject(Object object) { rabbitTemplate.convertAndSend(exchangeName, routingKey, object); log.info("📤 对象消息已发送: {}", object); } }

7、消费者服务(接收消息)

java import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Slf4j @Service public class MessageConsumer { /** * 监听队列,消费消息 */ @RabbitListener(queues = "${rabbitmq.queue.name}") public void receiveMessage(String message) { log.info("📩 收到消息: {}", message); // 处理业务逻辑 processMessage(message); } /** * 处理消息的业务逻辑 */ private void processMessage(String message) { // 模拟业务处理 log.info("🔧 正在处理消息: {}", message); // 这里可以添加你的业务代码 } /** * 监听队列,消费对象消息 */ @RabbitListener(queues = "${rabbitmq.queue.name}") public void receiveObject(Object object) { log.info("📦 收到对象消息: {}", object); } }

8、测试 Controller(方便测试)

java import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/api/message") @RequiredArgsConstructor public class MessageController { private final MessageProducer messageProducer; @PostMapping("/send") public String sendMessage(@RequestParam String content) { messageProducer.sendMessage(content); return "消息已发送: " + content; } @PostMapping("/send/json") public String sendJson(@RequestBody User user) { messageProducer.sendObject(user); return "用户信息已发送: " + user.getName(); } // 测试用实体类 static class User { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User{name='" + name + "', age=" + age + "}"; } } }

9、启动类

java import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMQApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQApplication.class, args); System.out.println("🚀 RabbitMQ Demo 启动成功!"); System.out.println("📡 访问: http://localhost:8080/api/message/send?content=hello"); } }

以上就是简单的入门案例,然后我们继续深入一点:

Work Queue(工作队列模型)

Work Queue是 RabbitMQ 中一个队列,多个消费者的模式,消息会被平均分配给各个消费者。


模型图解
text ┌─────────────┐ │ Producer │ │ (生产者) │ └──────┬──────┘ │ ▼ ┌─────────────┐ │ Queue │ │ Task Queue │ │ [m1][m2][m3]│ └──────┬──────┘ │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │Consumer1 │ │Consumer2 │ │Consumer3 │ │ (C1) │ │ (C2) │ │ (C3) │ └──────────┘ └──────────┘ └──────────┘ 核心特点:
  • 一个队列,多个消费者

  • 消息默认轮询分发(Round-Robin)

  • 每个消息只能被一个消费者消费

常见面试题
问题答案
Work Queue 的特点?一个队列,多个消费者,消息只能被一个消费者消费
轮询分发的问题?处理快的消费者和慢的消费者工作量相同,整体效率低
如何解决轮询分发问题?basicQos(1)+ 手动确认
消息确认的作用?保证消息不丢失,处理失败可重新入队

Fanout 交换机(广播交换机)

Fanout 交换机:把收到的消息广播到所有绑定的队列,不管 Routing Key 是什么。

核心特点
特点说明
广播模式消息发送给所有绑定的队列
忽略 Routing KeyRouting Key 写什么都被忽略
一对多一个消息 → 多个消费者同时收到

业务场景

用户支付成功后,需要同时通知多个系统:

text 用户支付100元 │ ▼ 支付成功事件 ──Fanout──→ 同时发给3个系统 │ ├──→ 订单系统:更新订单状态为"已支付" ├──→ 积分系统:给用户加100积分 └──→ 短信系统:发短信通知用户


Direct 交换机(直连交换机)

核心特点

  • 精确匹配:Routing Key 必须完全相等

  • 一对一或一对多(多个队列可以用同一个 Routing Key)

text 应用产生日志 │ ▼ Direct Exchange (log_exchange) │ ├── Routing Key = "ERROR" ──→ 错误队列 ──→ 运维人员发钉钉告警 ├── Routing Key = "WARN" ──→ 警告队列 ──→ 存入数据库待查 └── Routing Key = "INFO" ──→ 信息队列 ──→ 写入文件归档

业务场景

一个电商系统,不同级别的日志需要不同处理:

为什么用 Direct

  • ✅ ERROR 消息只给告警系统

  • ✅ WARN 消息只给分析系统

  • ✅ INFO 消息只给归档系统

  • 精确匹配,不会发错


Topic 交换机(主题交换机)

核心特点
  • 模糊匹配:支持通配符*#

  • 更灵活的路由规则

通配符规则
通配符含义示例
*匹配一个log.*.error→ 匹配log.app.error
#匹配零个或多个log.#→ 匹配log.app.error,log.app,log.app.service.error
分隔符.分隔单词user.order.create

业务场景

一个外卖平台,各种事件需要灵活订阅:

text 事件类型(Routing Key 设计): ├── order.created # 订单创建 ├── order.cancelled # 订单取消 ├── order.delivered # 订单送达 ├── user.registered # 用户注册 ├── user.login # 用户登录 ├── payment.success # 支付成功 ├── payment.refund # 退款成功 ├── rider.accepted # 骑手接单 ├── rider.arrived # 骑手到店 └── rider.completed # 骑手送达
不同服务的订阅规则
服务订阅规则匹配到的事件
短信服务order.#user.#所有订单和用户相关事件
统计分析#.created#.cancelled所有创建和取消事件
骑手APPrider.*所有骑手相关事件
财务服务payment.#所有支付相关事件
风控服务*.cancelledpayment.refund取消事件和退款事件
http://www.jsqmd.com/news/945387/

相关文章:

  • 3个关键问题+5个核心功能:为什么GanttProject是免费开源项目管理的最佳选择?
  • 2026年近期,陕西地区液体包装机平台推荐哪家?这份综合指南为您解析 - 2026年企业资讯
  • PHY电压对网变内部CMC位置的“隐形指挥”
  • 性能与价格的双重平衡:主流UNS S17400厂商横向评测 - 品牌2026
  • 3分钟快速上手:零基础打造你的AI游戏瞄准助手终极指南
  • 维普查重愈发严苛,适配维普的 AI 论文写作工具怎么挑选?【2026 深度盘点实测指南】
  • 额度对半砍?腾讯、字节员工发现,大模型Token额度正在“降本增效”
  • 基于分布式智能采样与MRF推理的隐私保护交通感知系统
  • AI热潮下一二级市场合并:VC像PE、天使在消失,投资风格巨变!
  • ssm智能卤菜销售平台(10157)
  • 2026年自动剪辑系统怎么用AI实现:从素材处理到成片输出的自动化落地指南 - 广州矩阵架构科技公司
  • 2026年 搪瓷钢板厂家优选榜单:地铁站/隧道/隔音/外墙/双曲弧/木纹/电镀/穿孔搪瓷钢板源头品牌深度解析 - 品牌企业推荐师(官方)
  • 别再让YOLOv8自动选模型了!手把手教你自定义best.pt的评判标准(附权重修改代码)
  • 大气层自定义固件:释放Nintendo Switch全部潜力的开源解决方案
  • 从零到精通:Jellyfin MetaShark插件完整配置与故障排除指南
  • 5分钟搞定抖音内容保存:这个开源工具让你轻松收藏喜欢的视频和直播
  • 2026年基建配套海运集装箱实测评测:桐乡,平湖,湖州,桐乡打包集装箱/桐乡活动板房集装箱/桐乡海运集装箱/桐乡焊接集装箱/选择指南 - 优质品牌商家
  • 理工科论文避坑指南:能精准生成公式图表、参考文献真实可溯源的 5 款 AI 工具实测盘点
  • 【AI推荐系统实战指南】:20年专家亲授5大AI工具与推荐引擎无缝整合的黄金法则
  • Win Server 2019远程桌面多用户登录踩坑实录:从RDPWrap配置到组策略避坑
  • 2026年大型空调主机拆除靠谱公司排名 - myqiye
  • 杰理之打开广播,会报死机【篇】
  • YOLOv5猫狗检测实战:除了训练,你的模型部署和优化思路准备好了吗?
  • 终极指南:如何使用Attu轻松管理你的Milvus向量数据库
  • 深入解析jsdiff:JavaScript文本差异比对的终极解决方案
  • GitHub 上 Stars 最多的 6 个开源 AI 工具:让 AI Agent 更强大
  • 如何有效规避 AutoGPT 架构深度剖析大模型应用中的提示词注入与安全越狱漏洞
  • 重庆家庭水管漏水维修可靠公司排行实测盘点:重庆家庭水管漏水检测维修上门/重庆检测漏水检测/重庆水管漏水检测维修/选择指南 - 优质品牌商家
  • 企业级MR平台AI赋能升级路径(2024 Gartner验证的3层架构模型)
  • AI Agent Harness Engineering 在金融领域的十大应用场景