RabbitMQ快速入门
1.单机部署
虚拟机中使用Docker来安装(Rocky9,CentOS7)
下载并拉取镜像
docker pull rabbitmq:3-management2.安装MQ
执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=#你的账号# \ -e RABBITMQ_DEFAULT_PASS=#你的密码# \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management(可选)获取虚拟机地址
ip addr3.AMQP 依赖引入
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>consumer的yaml配置
spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码pulisher的yaml配置
spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码4.访问mq控制台
输入账号密码
https://#你的虚拟机地址#:15672登录页面:
overview
这里展示了节点详细信息,节点个数等
什么是Connections
消息发布者(生产者)和接收者(消费者)与mq建立的连接
什么是Channels
连接后需要建立的通道,可以理解为消息发送和接受的具体对象
什么是Exchange
交换机,消息接收的方式
直连交换机(Direct Exchange)
直连交换机通过精确匹配路由键(Routing Key)将消息路由到队列。消息的路由键必须与队列绑定的路由键完全一致才会被转发。适用于需要精确路由的场景,例如日志系统中根据日志级别分发消息。
- 指定路由的routing_key =red,yellow,只接收到属性为red或yellow的队列消息
- 路由规则:忽略某一路由,消息发送到所有绑定队列。
- 典型用例:任务分发、日志级别过滤。
@RabbitListener(bindings = @QueueBinding( value=@Queue(name="direct.queue1"), exchange = @Exchange(name="666.direct",type= ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDireQueue1(String message)throws Exception{ System.out.println("消费者接受到direct.queue1消息:"+"【"+message+"】"); } @RabbitListener(bindings = @QueueBinding( value=@Queue(name="direct.queue2"), exchange = @Exchange(name="666.direct",type= ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void listenDireQueue2(String message)throws Exception{ System.out.println("消费者接受到direct.queue2消息:"+"【"+message+"】");广播交换机(Fanout Exchange)
扇形交换机会将消息广播到所有绑定的队列,忽略路由键。适用于需要一对多广播的场景,例如新闻推送或事件通知。
- 路由规则:通过绑定某一队列,消息发送到所有绑定队列。
- 绑定队列:通过config或注解Queuebinding绑定某一队列
- 典型用例:广播通知、事件发布。
config
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange( "itcast.fanout" ); } //f.queue1 @Bean public Queue fanoutQueue1(){ return new Queue( "fanout.queue1" ); } //绑定队列到交换机 @Bean public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue1). to(fanoutExchange); } //f.queue2 @Bean public Queue fanoutQueue2(){ return new Queue( "fanout.queue2" ); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2). to(fanoutExchange); }listener
@RabbitListener(queues ="fanout.queue1" ) public void listenFanoutQueue1(String message)throws Exception{ System.out.println("消费者接受到fanout.queue1消息:"+"【"+message+"】"); } @RabbitListener(queues ="fanout.queue2" ) public void listenFanoutQueue2(String message)throws Exception{ System.out.println("消费者接受到fanout.queue2消息:"+"【"+message+"】"); }主题交换机(Topic Exchange)
主题交换机通过路由键的模式匹配将消息路由到队列。路由键支持通配符(*匹配一个单词,#匹配零或多个单词),灵活性高。适用于需要基于模式的路由场景,例如多维度消息分类。
- 路由规则:通过在
routing_key中指定通配符赖匹配队列 - # 匹配零个或一个或多个 ; * 仅匹配一个
china.# 可以匹配 china.news和china.news.nowchina.* 匹配 china.news和china.user- 典型用例:动态路由、多条件过滤。
@RabbitListener(bindings = @QueueBinding( value=@Queue(name="topic.queue1"), exchange = @Exchange(name="666.topic",type= ExchangeTypes.TOPIC), key = {"china.#"} )) public void listenTopicQueue1(String message)throws Exception{ System.out.println("消费者接受到topic.queue1消息:"+"【"+message+"】"); } @RabbitListener(bindings = @QueueBinding( value=@Queue(name="topic.queue2"), exchange = @Exchange(name="666.topic",type= ExchangeTypes.TOPIC), key = {"#.news"} )) public void listenTopicQueue2(String message)throws Exception{ System.out.println("消费者接受到topic.queue2消息:"+"【"+message+"】");什么是Queues
队列,消息存储的位置
- Classic Queue:预取消息且顺序分配消息
- Work Queue:持久化(队列 + 消息)+ 手动 ACK + 公平分发
如何实现一个简单的Work Queue
持久化(队列 + 消息)+ 手动 ACK + 公平分发
配置文件
如何公平分发,
设置预取消息为prefetch : 1
如何手动ack
设置 acknowledge-mode: manual
spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码 virtual-host: / #虚拟主机 listener: simple: acknowledge-mode: manual # 手动确认消息(生产环境必须用!) prefetch: 1 # 每次只预取1条消息 → 公平分发核心 concurrency: 1 # 每个消费者实例的并发数声明工作队列
创建配置类,声明持久化队列(服务重启消息不丢失)
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Work Queue 队列配置 * 无需声明交换机,使用默认交换机 */ @Configuration public class WorkQueueConfig { // 队列名称 public static final String WORK_QUEUE = "work.queue"; /** * 声明队列:持久化、非排他、非自动删除 */ @Bean public Queue workQueue() { // 参数:队列名、是否持久化、是否排他、是否自动删除 return new Queue(WORK_QUEUE, true, false, false); } }消息生产者
发送任务消息到工作队列:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class WorkQueueProducer { @Resource private RabbitTemplate rabbitTemplate; /** * 发送任务消息 * @param message 任务内容 */ public void sendTask(String message) { // 参数:路由键(队列名)、消息内容 rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message); System.out.println("生产者发送消息:" + message); } }多个消息消费者
创建两个消费者,监听同一个工作队列,模拟并行处理任务:
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; @Service public class WorkQueueConsumer1 { /** * 监听工作队列 * @param message 消息内容 * @param channel 信道 * @param msg 原始消息对象 */ @RabbitListener(queues = WorkQueueConfig.WORK_QUEUE) public void consume(String message, Channel channel, Message msg) throws IOException { long deliveryTag = msg.getMessageProperties().getDeliveryTag(); try { // 模拟业务处理(耗时任务) System.out.println("消费者1 处理消息:" + message); // 模拟耗时:1秒 Thread.sleep(1000); // 【手动确认】消息处理成功 // 参数:消息标识、是否批量确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { System.err.println("消费者1 处理消息失败:" + message); // 【手动拒绝】消息处理失败,重回队列 // 参数:消息标识、是否批量、是否重回队列 channel.basicNack(deliveryTag, false, true); } } }什么是Admin
用户权限管理,这里采用多租户架构
不同用户间相互隔离,
