当前位置: 首页 > news >正文

RabbitMQ快速入门

1.单机部署

虚拟机中使用Docker来安装(Rocky9,CentOS7)

下载并拉取镜像

docker pull rabbitmq:3-management

2.安装MQ

执行下面的命令来运行MQ容器:

docker run \ -e RABBITMQ_DEFAULT_USER=#你的账号# \ -e RABBITMQ_DEFAULT_PASS=#你的密码# \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management

(可选)获取虚拟机地址

ip addr

3.AMQP 依赖引入

<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

consumer的yaml配置

spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码

pulisher的yaml配置

spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码

4.访问mq控制台

输入账号密码

https://#你的虚拟机地址#:15672

登录页面:

overview

这里展示了节点详细信息,节点个数等

什么是Connections

消息发布者(生产者)和接收者(消费者)与mq建立的连接

什么是Channels

连接后需要建立的通道,可以理解为消息发送和接受的具体对象

什么是Exchange

交换机,消息接收的方式

直连交换机(Direct Exchange)

直连交换机通过精确匹配路由键(Routing Key)将消息路由到队列。消息的路由键必须与队列绑定的路由键完全一致才会被转发。适用于需要精确路由的场景,例如日志系统中根据日志级别分发消息。

  • 指定路由的routing_key =red,yellow,只接收到属性为red或yellow的队列消息
  • 路由规则:忽略某一路由,消息发送到所有绑定队列。
  • 典型用例:任务分发、日志级别过滤。
@RabbitListener(bindings = @QueueBinding( value=@Queue(name="direct.queue1"), exchange = @Exchange(name="666.direct",type= ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDireQueue1(String message)throws Exception{ System.out.println("消费者接受到direct.queue1消息:"+"【"+message+"】"); } @RabbitListener(bindings = @QueueBinding( value=@Queue(name="direct.queue2"), exchange = @Exchange(name="666.direct",type= ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void listenDireQueue2(String message)throws Exception{ System.out.println("消费者接受到direct.queue2消息:"+"【"+message+"】");
广播交换机(Fanout Exchange)

扇形交换机会将消息广播到所有绑定的队列,忽略路由键。适用于需要一对多广播的场景,例如新闻推送或事件通知。

  • 路由规则:通过绑定某一队列,消息发送到所有绑定队列。
  • 绑定队列:通过config或注解Queuebinding绑定某一队列
  • 典型用例:广播通知、事件发布。

config

@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange( "itcast.fanout" ); } //f.queue1 @Bean public Queue fanoutQueue1(){ return new Queue( "fanout.queue1" ); } //绑定队列到交换机 @Bean public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue1). to(fanoutExchange); } //f.queue2 @Bean public Queue fanoutQueue2(){ return new Queue( "fanout.queue2" ); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2). to(fanoutExchange); }

listener

@RabbitListener(queues ="fanout.queue1" ) public void listenFanoutQueue1(String message)throws Exception{ System.out.println("消费者接受到fanout.queue1消息:"+"【"+message+"】"); } @RabbitListener(queues ="fanout.queue2" ) public void listenFanoutQueue2(String message)throws Exception{ System.out.println("消费者接受到fanout.queue2消息:"+"【"+message+"】"); }
主题交换机(Topic Exchange)

主题交换机通过路由键的模式匹配将消息路由到队列。路由键支持通配符(*匹配一个单词,#匹配零或多个单词),灵活性高。适用于需要基于模式的路由场景,例如多维度消息分类。

  • 路由规则:通过在routing_key中指定通配符赖匹配队列
  • # 匹配零个或一个或多个 ; * 仅匹配一个
  • china.# 可以匹配 china.news和china.news.now
  • china.* 匹配 china.news和china.user
  • 典型用例:动态路由、多条件过滤。
@RabbitListener(bindings = @QueueBinding( value=@Queue(name="topic.queue1"), exchange = @Exchange(name="666.topic",type= ExchangeTypes.TOPIC), key = {"china.#"} )) public void listenTopicQueue1(String message)throws Exception{ System.out.println("消费者接受到topic.queue1消息:"+"【"+message+"】"); } @RabbitListener(bindings = @QueueBinding( value=@Queue(name="topic.queue2"), exchange = @Exchange(name="666.topic",type= ExchangeTypes.TOPIC), key = {"#.news"} )) public void listenTopicQueue2(String message)throws Exception{ System.out.println("消费者接受到topic.queue2消息:"+"【"+message+"】");

什么是Queues

队列,消息存储的位置

  • Classic Queue:预取消息且顺序分配消息
  • Work Queue:持久化(队列 + 消息)+ 手动 ACK + 公平分发

如何实现一个简单的Work Queue

持久化(队列 + 消息)+ 手动 ACK + 公平分发

配置文件

如何公平分发,

设置预取消息为prefetch : 1

如何手动ack

设置 acknowledge-mode: manual

spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码 virtual-host: / #虚拟主机 listener: simple: acknowledge-mode: manual # 手动确认消息(生产环境必须用!) prefetch: 1 # 每次只预取1条消息 → 公平分发核心 concurrency: 1 # 每个消费者实例的并发数

声明工作队列

创建配置类,声明持久化队列(服务重启消息不丢失)

import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Work Queue 队列配置 * 无需声明交换机,使用默认交换机 */ @Configuration public class WorkQueueConfig { // 队列名称 public static final String WORK_QUEUE = "work.queue"; /** * 声明队列:持久化、非排他、非自动删除 */ @Bean public Queue workQueue() { // 参数:队列名、是否持久化、是否排他、是否自动删除 return new Queue(WORK_QUEUE, true, false, false); } }

消息生产者

发送任务消息到工作队列:

import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class WorkQueueProducer { @Resource private RabbitTemplate rabbitTemplate; /** * 发送任务消息 * @param message 任务内容 */ public void sendTask(String message) { // 参数:路由键(队列名)、消息内容 rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message); System.out.println("生产者发送消息:" + message); } }

多个消息消费者

创建两个消费者,监听同一个工作队列,模拟并行处理任务:

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; @Service public class WorkQueueConsumer1 { /** * 监听工作队列 * @param message 消息内容 * @param channel 信道 * @param msg 原始消息对象 */ @RabbitListener(queues = WorkQueueConfig.WORK_QUEUE) public void consume(String message, Channel channel, Message msg) throws IOException { long deliveryTag = msg.getMessageProperties().getDeliveryTag(); try { // 模拟业务处理(耗时任务) System.out.println("消费者1 处理消息:" + message); // 模拟耗时:1秒 Thread.sleep(1000); // 【手动确认】消息处理成功 // 参数:消息标识、是否批量确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { System.err.println("消费者1 处理消息失败:" + message); // 【手动拒绝】消息处理失败,重回队列 // 参数:消息标识、是否批量、是否重回队列 channel.basicNack(deliveryTag, false, true); } } }

什么是Admin

用户权限管理,这里采用多租户架构

不同用户间相互隔离,

http://www.jsqmd.com/news/661345/

相关文章:

  • 剑指offer | 2.3 数据结构相关题目
  • AI头像生成器多风格覆盖:Qwen3-32B支持23种细分美术风格Prompt生成
  • OBS多路RTMP推流插件:5大核心技术优势深度解析与实战指南
  • 2026年新房装修设计哪个好,这些品牌值得关注的干货指南 - mypinpai
  • RL4CO完全指南:用强化学习轻松解决复杂组合优化问题
  • Unity AI Navigation保姆级教程:从NavMesh烘焙到角色点击移动,5分钟搞定寻路系统
  • 盒马鲜生卡回收平台推荐:线上回收是否更靠谱? - 团团收购物卡回收
  • ViTables:突破HDF5数据可视化的边界,让十亿级表格触手可及
  • 从安装包到服务自启:Windows下Tomcat 9.0.x的两种部署姿势全解析(.exe vs .zip)
  • 聚焦理工类考生|湖北理工学院,机械工程强势,赋能未来发展 - myqiye
  • 1 5.8 屏幕键盘的使用:键盘坏了/平板触控时的“救命工具”
  • 百度网盘命令行终极指南:如何用BaiduPCS-Go实现高效文件管理
  • PHP避免进程切换开销的庖丁解牛
  • RISC-V DSP扩展指令集实战:如何用P扩展指令优化音频解码性能
  • 嵌入式现代C++工程实践——第14篇:第二次重构 —— 模板登场,编译时绑定端口和引脚
  • 3大实战场景:深度掌握ComfyUI-VideoHelperSuite的视频合成技巧
  • 权威选购指南:高性价比紫外线消毒设备推荐品牌与厂家实力对比 - 品牌推荐大师1
  • 163MusicLyrics:免费音乐歌词管理工具,3分钟搞定全网歌词下载
  • 2026 年缺陷管理系统排名参考:10 款主流 Bug 工具选型解读
  • 从Sensor到屏幕:YUV、RGB、RAW DATA三大格式的选型实战与性能权衡
  • Speech Seaco Paraformer ASR效果实测:5倍实时速率的语音识别体验
  • 从零构建企业级AI配额中台:5步完成配额策略建模、4层动态配额审计、2种跨模型配额迁移方案
  • 手把手推导:如何从DFT的复数旋转到DCT的实数余弦(含Python验证代码)
  • 终极指南:3步彻底解决Calibre中文路径乱码,完整保留你的电子书中文命名
  • 手把手教你用Verilog写一个带状态机的PID控制器(附完整测试平台代码)
  • SGBM算法调优笔记:为什么我用RGB三通道图比灰度图效果更好?(附避坑经验)
  • 收藏备用|AI Agent开发全链路实战指南
  • Docker镜像迁移实战:深入解析export/save与import/load的核心差异与应用场景
  • 无人机飞控工程师必看:惯性导航里‘b系相对i系在n系投影’到底在解决什么实际问题?
  • 3大核心功能解析:Obsidian本地AI助手如何重塑你的隐私优先知识工作流