消息队列-RabbitMq
1.概述
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信
为什么会产生消息队列?有几个原因
- 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
- 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
- MQ框架有很多,比较流行的有 RabbitMq, ActiveMq, ZeroMq, kafka, 以及阿里开源的RocketMq;
为什么用mq?
1.模块之间耦合度过高,导致一个模块宕机后,全部功能都不能用了
2.同步通讯的时间成本问题,mq可以进行消息分发异步处理
为什么要用rabbitmq?
1.activemq,ROCKETMQ,只支持java语言,kafka可以支持多们语言,rabbitmq支持多种语言
2.效率方面:activemq, rocketmq,kafka效率都是毫秒级别,rabbitmq是微妙级别
3.消息丢失,消息重复问题:rabbitmq针对消息持久化和重复问题都有比较成熟的解决方案
4.学习成本:rabbitmq非常简单
rabbitmq是由rabbit公司去研发与维护的,最终是在pivotal维护
rabbitmq严格遵循amqp协议,高级消息队列协议,帮助我们在进程之间传递异步消息
2.RabbitMq
2.1.RabbitMq简介
RabbitMQ是消息代理:它接受并转发消息。您可以将其视为邮局:将您要发布的邮件放在邮箱中时,可以确保Mailperson先生或女士最终将邮件传递给您的收件人。以此类推,RabbitMQ是一个邮政信箱,一个邮局和一个邮递员。
RabbitMQ与邮局之间的主要区别在于,它不处理纸张,而是接收,存储和转发数据消息的二进制数据。
RabbitMQ和一般的消息传递使用一些术语。
生产仅意味着发送。发送消息的程序是生产者。
队列是RabbitMQ内部的邮政信箱的名称。尽管消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。甲队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息,许多消费者可以尝试从一个队列接收数据。这就是我们表示队列的方式。
消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息。
请注意,生产者,消费者和经纪人不必位于同一主机上。实际上,在大多数应用程序中却没有。一个应用程序既可以是生产者,也可以是消费者。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
2.2 RabbitMq环境搭建
安装步骤此处省略,安装完成后,通过 rabbitmq-plugins enable rabbitmq_management 启用管理插件。
查看管理界面
通过默认账户 guest/guest 登录,登录成功则说明安装成功。
2.3 添加用户
2.3.1 添加admin用户
2.3.2 用户角色
- 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 - 监控者(monitoring)
可登陆管理控制台,同时可以查看节点的相关信息(进程数,内存使用情况,磁盘使用情况等) - 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 - 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 - 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
2.3.3 创建Virtual Hosts (虚拟主机)
选中admin用户设置权限
看到权限已加
2.3.4 管理界面中的功能
2.4 五种队列
RabbitMQ提供了多种消息模型,官网上第6种是RPC不属于常规的消息队列。
属于消息模型的是前5种:
- Hello World 模型,简单的一对一
- 工作队列模型,一个生产者将消息分发给多个消费者
- 发布/订阅模型,生产者发布消息,多个消费者同时收取
- 路由模型,生产者通过关键字发送消息给特定消费者
- 主题模型,路由模式基础上,在关键字里加入了通配符
2.4.1 简单队列
一对一的队列。生产者P生产消息放入队列(这里不是简单地直接放入队列中),消费者C消费消息,消费者和生产者是一种一对一的关系。
2.4.1.1测试demo
引入包
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>RabbitMq连接工具类
package com.zjk.demo.common.rabbitmq.util;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/** * @author tomatoes * @date2021/3/22 * @description **/ public class RabbitMqConnectionUtils{public static Connection getConnection()throws Exception{returngetConnection("ip",5672,"adminHost","admin","admin");}public static Connection getConnection(String host, int port, String vHost, String userName, String passWord)throws Exception{//1、定义连接工厂 ConnectionFactory factory=new ConnectionFactory();//2、设置服务器地址 factory.setHost(host);//3、设置端口 factory.setPort(port);//4、设置虚拟主机、用户名、密码 factory.setVirtualHost(vHost);factory.setUsername(userName);factory.setPassword(passWord);//5、通过连接工厂获取连接 Connection connection=factory.newConnection();returnconnection;}}生产者
public class RabbitMqProducer{private final static String QUEUE_NAME="hello";public static void main(String[]args)throws Exception{//1、获取连接 Connection connection=RabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channel=connection.createChannel();//3、声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义消息内容 String message="hello rabbitmq, my name is tomcatoes ";//5、发布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent'"+ message +"'");//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}消费者
public class RabbitMqConsumer{private final static String QUEUE_NAME="hello";public static void main(String[]args)throws Exception{//1、获取连接 Connection connection=RabbitMqConnectionUtils.getConnection("47.99.199.41",5672,"/","guest","guest");//2、声明通道 Channel channel=connection.createChannel();//3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义队列的消费者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel);//5、监听队列 /* true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费 false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈, 如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其 发送消息,直到该消费者反馈。 */ channel.basicConsume(QUEUE_NAME, true, queueingConsumer);//6、获取消息while(true){QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();String message=new String(delivery.getBody());System.out.println(" [x] Received '"+ message +"'");}}}2.4.2 work模式
一个生产者、2个消费者。
但MQ中一个消息只能被一个消费者获取。即消息要么被C1获取,要么被C2获取。这种模式适用于类似集群,能者多劳。性能好的可以安排多消费,性能低的可以安排低消费。
但如果面对我需要多个消费者都对这一消息进行消费的需求,这种模式显然就不适用了。那就可以采用发布订阅模式。
简而言之 一个生产者,多个消费者,一个消息只能被一个消费者获取。多个消费者只有一个队列。
2.4.2.1 轮询分发策略(round robin)
使用工作队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ采用轮询分发策略将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。
生产者
public class Producer{private final static String QUEUE_NAME="work_queue";public static void main(String[]args)throws Exception{//1、获取连接 Connection connection=RabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channel=connection.createChannel();//3、声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义消息内容(发布多条消息)for(int i=0;i<10;i++){String message="hello rabbitmq "+ i;//5、发布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[z] Sent'"+ message +"'");//模拟发送消息延时,便于演示多个消费者竞争接受消息 Thread.sleep(i *10);}//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}消费者1
public class Consumer1{private final static String QUEUE_NAME="work_queue";public static void main(String[]args)throws Exception{//1、获取连接 Connection connection=RabbitMqConnectionUtils.getConnection();//2、声明通道 Channel channel=connection.createChannel();//3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//同一时刻服务器只会发送一条消息给消费者 //channel.basicQos(1);//4、定义队列的消费者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel);//5、监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME, false, queueingConsumer);//6、获取消息while(true){QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();String message=new String(delivery.getBody());System.out.println(" [z] Received '"+ message +"'");//消费者1接收一条消息后休眠10毫秒 Thread.sleep(10);//返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}}消费者2
public class Consumer2{private final static String QUEUE_NAME="work_queue";public static void main(String[]args)throws Exception{//1、获取连接 Connection connection=RabbitMqConnectionUtils.getConnection();//2、声明通道 Channel channel=connection.createChannel();//3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//同一时刻服务器只会发送一条消息给消费者 //channel.basicQos(1);//4、定义队列的消费者 QueueingConsumer queueingConsumer=new QueueingConsumer(channel);//5、监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME, false, queueingConsumer);//6、获取消息while(true){QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();String message=new String(delivery.getBody());System.out.println(" [z] Received '"+ message +"'");//消费者2接收一条消息后休眠1000毫秒 Thread.sleep(1000);//返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}}测试结果分析:
首先生产者一次打印从0-9条消息
消费者1消费偶数的消息
消费者2消费奇数的消息
结果分析:
消费者1和消费者2获取到的消息内容是不同的,也就是说同一个消息只能被一个消费者获取。
消费者1和消费者2分别获取奇数条消息和偶数条消息,两种获取消息的条数是一样的。
前面我们说这种模式是竞争消费者模式,一条队列被多个消费者监听,这里两个消费者,其中消费者1和消费者2在获取消息后分别休眠了10毫秒和1000毫秒,也就是说两个消费者获取消息的效率是不一样的,但是结果却是两者获得的消息条数是一样的,这根本就不构成竞争关系,那么我们应该怎么办才能让工作效率高的消费者获取消息更多,也就是消费者1获取消息更多呢?
PS:在增加一个消费者其实获取消息条数也是一样的,消费者1获取0,3,6,9,消费者2获取1,4,7,消费者3获取2,5,8
2.4.2.2 公平分发(Fair Dispatch)
有可能消费者处理消息的能力有差异(硬件设备,网络原因),我们期望处理能力强的消费者多处理消息,处理能力弱的消费者少处理消息。通过basicQos(perfetch)和autoAck配合也可以实现。
实现:
- basicQos:设置同一时刻服务器只会发perfetch**(此处为1)**条消息给消费者
- autoAck:将自动应答改为手动。就处理完一条消息后手动提交。
两个消费者类 Consumer1 与 Consumer2类中新增channel.basicQos(1);
修改后测试结果
注意:使用公平分发,必须关闭自动应答ack,然后改成手动应答方式。
2.4.3 发布订阅模式(publish/subscribe)
一个生产者发送的消息可能会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者。
注:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers。这里采用的是fanout类型。后面会详细介绍这几种交换器。
模式特点归纳:
- 一个生产者,多个消费者
- 每个消费者都有自己的队列
- 生产者没有直接将消息发送到队列,而是发送到交换机(Exchange)
- 每个队列都要绑定交换机
- 生产者发送到消息经过交换机 --> 到达队列–> 可以实现一个消息被多个消费者消费
2.4.3.1测试代码
public class Consumer1{private final static String EXCHANGE_NAME="fanout_exchange";private final static String QUEUE_NAME="fanout_exchange_queue1";public static void main(String[]args)throws Exception{Connection connection=RabbitMqConnectionUtils.getConnection();Channel channel=connection.createChannel();//声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1);DeliverCallback deliverCallback=(consumerTag, delivery)->{String message=new String(delivery.getBody(),"utf-8");System.out.println(" [z] Received '"+ message +"'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//修改为手动应答,true为自动应答,false相反 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag ->{});}}Product
public class Producer{private final static String EXCHANGE_NAME="fanout_exchange";public static void main(String[]args)throws Exception{//1、获取连接 Connection connection=RabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channel=connection.createChannel();//3、声明交换器 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");for(int i=0;i<10;i++){//4、创建消息 String message="hello rabbitmq"+i;//5、发布消息 channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes());System.out.println("[z] Sent'"+ message+"'");}//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}测试结果
ProductConsumer1
Consumer2
2.4.4 路由模式(routing模式)
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。
2.4.4.1测试代码
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “delete”);
Producer
Consumer1
Consumer2
2.4.4.2测试结果
Producer
Consumer1
Consumer2
经过测试可以看出,消费者所监听的队列中只接收到指定路由key的消息。
2.4.5 主题模式(topic)
上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。
发送到topic交换的消息不能具有任意的 routing_key- 它必须是由点(.)分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。
一些有效的路由键示例:stock.usd.nyse,nyse.vmw,quick.orange.rabbit。路由密钥中可以包含任意数量的字符,最多可达255个字节。
绑定键有两个重要的特殊特性:(是用.分割的单词,而不是字符)
- *:可以替代一个单词。
- #:可以替换零个或多个单词。
2.4.5.1测试代码
Producer
public class Producer{private final static String EXCHANGE_NAME="topic_exchange";public static void main(String[]args)throws Exception{//1、获取连接 Connection connection=RabbitMqConnectionUtils.getConnection();//2、声明信道 Channel channel=connection.createChannel();//3、声明交换器 channel.exchangeDeclare(EXCHANGE_NAME,"topic");for(int i=0;i<10;i++){//4、创建消息 String message="hello rabbitmq"+ i;//5、发布消息 channel.basicPublish(EXCHANGE_NAME,"routeKey.z", null, message.getBytes());System.out.println("[z] Sent'"+ message +"'");}//6、关闭通道 channel.close();//7、关闭连接 connection.close();}}Consumer1
public class Consumer1{private final static String EXCHANGE_NAME="topic_exchange";private final static String QUEUE_NAME="topic_exchange_queue1";public static void main(String[]args)throws Exception{Connection connection=RabbitMqConnectionUtils.getConnection();Channel channel=connection.createChannel();//声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"*.*");// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1);DeliverCallback deliverCallback=(consumerTag, delivery)->{String message=new String(delivery.getBody(),"utf-8");System.out.println(" [z] Received '"+ message +"'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//修改为手动应答,true为自动应答,false相反 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag ->{});}}Consumer2
public class Consumer2{private final static String EXCHANGE_NAME="topic_exchange";private final static String QUEUE_NAME="topic_exchange_queue2";public static void main(String[]args)throws Exception{Connection connection=RabbitMqConnectionUtils.getConnection();Channel channel=connection.createChannel();//声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"routeKey.z");// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1);DeliverCallback deliverCallback=(consumerTag, delivery)->{String message=new String(delivery.getBody(),"utf-8");System.out.println(" [z] Received '"+ message +"'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//修改为手动应答,true为自动应答,false相反 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag ->{});}}2.4.5.2测试结果
2.5 四种交换器(Exchange)
前面五种队列模式介绍完了,但是实际上只有三种,第一种简单队列,第二种工作模式,剩下的三种都是和交换器绑定的合起来称为一种,这小节我们就来详细介绍交换器。
交换器分为四种,分别是:direct、fanout、topic和 headers。
前面三种分别对应路由模式、发布订阅模式和通配符模式,headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器,这里也不详细介绍。
2.5.1 direct
如果路由键完全匹配的话,消息才会被投放到相应的队列。
2.5.2 fanout
当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。
2.5.3 topic
设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。
2.6 总结
关于 RabbitMQ 的五种队列,其实实际使用最多的是最后一种主题模式,通过模糊匹配,使得操作更加自如。那么我们总结一下有交换器参与的队列(最后三种队列)工作方式如下:
