当前位置: 首页 > news >正文

SpringBoot 广播消息实现(发布/订阅)

在 RabbitMQ 的五大工作模式中,发布/订阅(Publish/Subscribe)广播模式是分布式系统中非常核心的通信方式。

我们日常使用的普通点对点队列,一条消息只会被一个消费者消费(竞争消费);而广播模式可以实现一条消息、多服务、多消费者同时接收,完美实现一对多通知。

缓存刷新、配置更新、全局通知、多节点日志同步、服务状态广播等场景,全部依赖 Fanout 广播模式。

一、什么是 MQ 广播(发布/订阅)模式?

1. 核心定义

广播模式基于FanoutExchange(扇形交换机)实现,核心逻辑:

生产者发送一条消息到 Fanout 交换机,所有绑定该交换机的队列,都会完整收到这条消息。

不管路由键是什么、不管队列名称,只要完成绑定,就会无条件广播投递。

2. 核心特性

  • • 无视routingKey,路由键传空、传任意值都不生效

  • • 纯广播、全量投递、一对多分发

  • • 每条消息独立进入每一个绑定队列

  • • 天然支持多服务、多节点同步通知

  • • 无匹配规则,绑定即接收

3. 适用业务场景

  • • 分布式缓存全局刷新(多节点统一清空缓存)

  • • 系统配置动态推送、热更新

  • • 全站公告、全局消息推送

  • • 微服务多节点日志采集、链路追踪

  • • 服务上下线、状态同步广播

  • • 多端消息同步(PC/APP/小程序)

二、四大交换机模式核心对比

交换机类型

匹配规则

消费模式

核心场景

Direct(直连)

完全匹配 routingKey

点对点竞争消费

订单、支付、任务处理

Topic(主题)

通配符模糊匹配

选择性多消费

日志分级、消息订阅

Fanout(广播)无视路由键,全部投递

全员订阅消费

缓存刷新、全局通知

Headers

匹配消息头参数

自定义匹配

极少使用

三、关键认知误区

1:同一个队列多消费者可以实现广播

绝对错误!

同一个队列下的多个消费者,默认是竞争消费,一条消息只会被一个消费者消费。

广播必备条件:每个消费者对应一个独立队列,全部绑定同一个 Fanout 交换机。

2:Fanout 交换机需要配置路由键

Fanout 交换机底层逻辑直接忽略 routingKey,无论发送时传什么值,都不会影响广播效果。

3:广播消息天然可靠、不会丢失

默认非持久化、自动ACK 场景下,广播消息极易丢失,生产必须做持久化+手动ACK

四、SpringBoot 完整实现

1. 基础依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2. 生产级配置文件

spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: # 手动ACK 保证广播消息不丢 acknowledge-mode: manual # 限制预取数,防止单节点消息堆积 prefetch: 5 # 开启消费重试 retry: enabled: true max-attempts: 3 initial-interval: 1000

3. 广播交换机、队列、绑定配置类

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutBroadcastConfig { // 广播交换机名称 public static final String FANOUT_EXCHANGE = "system.fanout.broadcast.exchange"; // 三个独立消费者队列 public static final String QUEUE_CACHE_REFRESH = "queue.cache.refresh"; public static final String QUEUE_NOTICE = "queue.system.notice"; public static final String QUEUE_LOG = "queue.log.collect"; // 声明 Fanout 广播交换机:持久化、不自动删除 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE, true, false); } // 队列1:缓存刷新队列 @Bean public Queue cacheRefreshQueue() { return new Queue(QUEUE_CACHE_REFRESH, true); } // 队列2:系统通知队列 @Bean public Queue noticeQueue() { return new Queue(QUEUE_NOTICE, true); } // 队列3:日志采集队列 @Bean public Queue logQueue() { return new Queue(QUEUE_LOG, true); } // 全部绑定到广播交换机 @Bean public Binding bindingCacheRefresh(Queue cacheRefreshQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(cacheRefreshQueue).to(fanoutExchange); } @Bean public Binding bindingNotice(Queue noticeQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(noticeQueue).to(fanoutExchange); } @Bean public Binding bindingLog(Queue logQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(logQueue).to(fanoutExchange); } }

4. 广播消息生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class BroadcastProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/broadcast") public String sendBroadcastMsg(@RequestParam String content) { // Fanout广播:路由键传空字符串 rabbitTemplate.convertAndSend( FanoutBroadcastConfig.FANOUT_EXCHANGE, "", content ); return "✅ 广播消息发送成功:" + content; } }

5. 多消费者实现(全员接收)

消费者1:缓存刷新消费者

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class CacheRefreshConsumer { @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_CACHE_REFRESH) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println("【缓存服务】接收广播消息:" + msg); // 执行缓存刷新业务逻辑 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消费失败,重回队列重试 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }

消费者2:系统通知消费者

@Component public class SystemNoticeConsumer { @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_NOTICE) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println("【通知服务】接收广播消息:" + msg); // 执行消息推送业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }

消费者3:日志采集消费者

@Component public class LogCollectConsumer { @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_LOG) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println("【日志服务】接收广播消息:" + msg); // 执行日志采集业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }

五、测试效果

访问接口:

http://localhost:8080/send/broadcast?content=全局缓存刷新通知

控制台输出:

【缓存服务】接收广播消息:全局缓存刷新通知 【通知服务】接收广播消息:全局缓存刷新通知 【日志服务】接收广播消息:全局缓存刷新通知

✅ 一条消息,多服务同时消费,广播生效!

六、总结

1. 必须开启持久化

交换机、队列全部设置持久化,防止重启丢失广播配置。

2. 强制手动ACK

广播场景多为重要通知、缓存同步,自动ACK会导致业务未执行完成消息丢失。

3. 每个服务独立队列

不同微服务必须使用独立队列,避免竞争消费,保证广播全覆盖。

4. 广播消息建议做幂等

MQ 重试、网络抖动会导致广播消息重复推送,核心业务必须基于消息ID做幂等防重。

5. 禁止设置复杂路由键

Fanout 无视路由键,统一传空字符串,保持代码规范。


写在最后

广播发布订阅模式是微服务分布式通信的重要基石,区别于传统的点对点任务消费,它主打全局通知、多节点同步、状态广播,是缓存刷新、配置热更新、系统公告等场景的最优解。

很多开发者一直混淆“竞争消费”和“广播消费”的本质,导致线上通知不全、同步失效等隐性问题。吃透 Fanout 交换机的底层原理与落地规范,能帮你彻底解决分布式多节点同步难题。

持续更新 SpringBoot、微服务、MQ 中间件、架构实战、面试刷题干货,帮你夯实技术底盘,轻松搞定工作与面试。

觉得文章有用,点赞、收藏、转发一波,持续关注,解锁更多生产级技术干货!

http://www.jsqmd.com/news/895946/

相关文章:

  • SOES:解决工业实时通信中EtherCAT从站开发的架构性挑战
  • zhouhui/distiluse-base-multilingual-cased vs 其他句子嵌入模型:10个关键指标对比
  • 极域电子教室防控制工具:如何快速解除限制,实现自由学习
  • 终极SQL代码检查指南:如何用sql-lint告别数据库开发中的低级错误
  • 为什么选择lllyasviel/flux1-dev-bnb-nf4?深入了解模型架构与核心优势
  • ChatGLM-6B-INT4 API接口开发:构建RESTful服务的完整教程
  • Unity 2020.2保姆级教程:用Obi Fluid插件5分钟搞定一个会‘粘墙’的流体特效
  • 微信消息自动转发工具:5分钟实现多群消息同步
  • SenseNova-U1社区指南:如何参与贡献与获取技术支持
  • 探索DeepSeek-V4-Pro-Base的FP8量化技术:内存效率与计算性能的完美平衡
  • 终极指南:FinancialBERT-Sentiment-Analysis模型深度解析与实战应用
  • openpilot 2025技术展望:从规则驱动到AI原生驾驶系统的范式转变
  • 绝区零一条龙:3步轻松配置全自动游戏助手,彻底解放你的双手
  • 如何快速掌握开源字体:思源宋体7步实现专业中文排版
  • MTK Camera调试实战:精准控制Log开关与Buffer Dump策略
  • 宁德时代105亿进军算力能源协同领域,能否复刻锂电产业链的利润收割模式?
  • 别再让Kettle转换里的SQL乱跑了!用‘阻塞数据’组件精准控制执行顺序的实战心得
  • 源代码论文分享|Spring Boot 社区物业管理系统!
  • 如何快速上手AceGPT-13B:5分钟完成安装与推理的完整指南
  • 我们改变不了房价, 改变不了这个社会的运行规则。但 可以改变自己
  • 智能评价助手:告别手动评价,让AI为你的京东购物体验增值
  • Keil C51代码银行中常量定位问题解决方案
  • QKeyMapper:Windows玩家的终极按键映射神器,无需重启零风险
  • InsForge测试驱动开发:Red-Green-Refactor循环完整指南
  • 零成本获取全球金融数据:AKShare开源财经数据接口库完整指南
  • bert-base-multilingual-cased:华为昇腾NPU优化的104语言BERT模型全面解析
  • 别再只用UI RawImage了!用Unity的Shader Graph为你的Minimap实现高级视觉效果(动态遮罩、迷雾战争、风格化渲染)
  • Claude Code用户如何配置Taotoken解决封号与Token不足问题
  • 技术面试文化轮深度解析:从沟通能力到组织智慧的实战指南
  • 基于Claude与Shopify API构建智能电商客服系统实战