涙はいらない このまま踊ろう
RabbitMQ-Java客户端
AMQP
Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该标准和平台、语言无关,更符合微服务中独立性的要求。
Spring AMQP,是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
入门案例
需求:
利用控制台创建
simple.queue
在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue的消息
JAVA项目架构如下所示:

首先要在依赖文件中引入SpringAMQP依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接下来在RabbitMQ配置文件中配置RabbitMQ服务端信息
spring:rabbitmq:host: 192.168.100.128port: 5672virtual-host: /tcswuzbusername: tcswuzbpassword: 1234
编写消息发送者测试程序
// src/main/java/org/example/listeners/MQListener.java
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToQueue() {String QueueName = "simple.queue";String Message = "Hello,tcswuzb!This is Test Queue!";rabbitTemplate.convertAndSend(QueueName,Message);}}
运行测试方法,效果如下:



编写消息消费者测试程序
@Component
public class MQListener {@RabbitListener(queues = "simple.queue")public void ListenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("Consumer Get Message: "+msg);}
}
运行之后,发送消息

Work Queues
Workqueues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

编写代码如下:
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToWorkQueue() throws InterruptedException {String QueueName = "work_test.queue";String Message;for(int i=1;i<=50;++i) {Message ="Hello,tcswuzb!This is WorkMessage"+i;rabbitTemplate.convertAndSend(QueueName,Message);Thread.sleep(20);}}
}@Component
public class MQListener {@RabbitListener(queues = "work_test.queue")public void ListenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("WorkQueue1 Consumer Get Message: "+msg);}@RabbitListener(queues = "work_test.queue")public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("WorkQueue2 Consumer Get Message: "+msg);}
}
测试结果如下,可以看出RabbitMQ对于消息是均分给了两个消费者处理而不考虑其具体状态:

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
使用spring.rabbitmq.listener.simple.prefetch参数,用于配置简单消息监听容器(Simple Message Listener Container),指定单个消费者(Consumer)在发送确认(ACK)之前,最多可以从RabbitMQ Broker预取并持有多少条未确认(Unacknowledged)的消息。
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:
spring:rabbitmq:listener:simple:prefetch: 1
同时修改队列2的处理代码,人为添加休眠从而影响其处理效率
@RabbitListener(queues = "work_test.queue")
public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.err.println("WorkQueue2 Consumer Get Message: "+msg);Thread.sleep(200);
}
处理完后效果如下:

Work模型的使用:
多个消费者绑定到一个队列,可以加快消息处理速度
同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
交换机
真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
Fanout:广播
Direct:定向
Topic:话题
交换机的作用:
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
Fanout交换机
Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

首先,设置一个交换机Exchange并且绑定队列

编写测试代码:
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToExchange() throws InterruptedException {String ExchangeName = "tcswuzb.fanout_test";String Message ="Hello,tcswuzb!This is WorkMessage";rabbitTemplate.convertAndSend(ExchangeName,null,Message);}
}@Component
public class MQListener {@RabbitListener(queues = "fanout1.queue")public void ListenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("WorkQueue1 Consumer Get Message from Fanout1: "+msg);}@RabbitListener(queues = "fanout2.queue")public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("WorkQueue2 Consumer Get Message from Fanout2: "+msg);}
}
效果如下:

Direct交换机
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
每一个
Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列Queue

设置Direct交换机,设置BindingKey绑定队列

编写测试代码:
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToDirectExchange() throws InterruptedException {String ExchangeName = "Direct_test";String Message1 ="Hello,tcswuzb!This is DirectMessage with key(cdy)";String Message2 ="Hello,tcswuzb!This is DirectMessage with key(wzy)";String Message3 ="Hello,tcswuzb!This is DirectMessage with key(zjz)";rabbitTemplate.convertAndSend(ExchangeName,"cdy",Message1);rabbitTemplate.convertAndSend(ExchangeName,"wzy",Message2);rabbitTemplate.convertAndSend(ExchangeName,"zjz",Message3);}
}@Component
public class MQListener {@RabbitListener(queues = "tcswuzb.queue1")public void ListenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("Queue1 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.queue2")public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("Queue2 Consumer Get Message from Direct: "+msg);}
}
效果如下:

Topic交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指1个单词

设置Topic交换机,设置BindingKey绑定队列

编写测试代码
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToDirectExchange() throws InterruptedException {String ExchangeName = "tcswuzb.topic";String Message1 ="Hello,tcswuzb!This is DirectMessage with key(china.vegetables)";String Message2 ="Hello,tcswuzb!This is DirectMessage with key(japan.fruits)";String Message3 ="Hello,tcswuzb!This is DirectMessage with key(USA.weather)";String Message4 ="Hello,tcswuzb!This is DirectMessage with key(India.news)";String Message5 ="Hello,tcswuzb!This is DirectMessage with key(china.news)";String Message6 ="Hello,tcswuzb!This is DirectMessage with key(japan.weather)";String Message7 ="Hello,tcswuzb!This is DirectMessage with key(china.weather)";String Message8 ="Hello,tcswuzb!This is DirectMessage with key(japan.news)";rabbitTemplate.convertAndSend(ExchangeName,"china.vegetables",Message1);rabbitTemplate.convertAndSend(ExchangeName,"japan.fruits",Message2);rabbitTemplate.convertAndSend(ExchangeName,"USA.weather",Message3);rabbitTemplate.convertAndSend(ExchangeName,"India.news",Message4);rabbitTemplate.convertAndSend(ExchangeName,"china.news",Message5);rabbitTemplate.convertAndSend(ExchangeName,"japan.weather",Message6);rabbitTemplate.convertAndSend(ExchangeName,"china.weather",Message7);rabbitTemplate.convertAndSend(ExchangeName,"japan.news",Message8);}
}@Component
public class MQListener {@RabbitListener(queues = "tcswuzb.topic_queue1")public void ListenQueueMessage1(String msg) throws InterruptedException {System.out.println("Queue1 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.topic_queue2")public void ListenQueueMessage2(String msg) throws InterruptedException {System.out.println("Queue2 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.topic_queue3")public void ListenQueueMessage3(String msg) throws InterruptedException {System.out.println("Queue3 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.topic_queue4")public void ListenQueueMessage4(String msg) throws InterruptedException {System.out.println("Queue4 Consumer Get Message from Direct: "+msg);}
}
效果如下:

声明队列与交换机
方法一
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
Queue:用于声明队列,可以用工厂类
QueueBuilder构建
Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
通过配置类来生成,以Fanout交换机为例
@Configuration
public class RabbitMQConfiguration {@Beanpublic FanoutExchange GetFanoutExchange() {return new FanoutExchange("tcswuzb.fanout");}@Beanpublic Queue GetNewQueue1() {return new Queue("tcswuzb.test_queue1");}@Beanpublic Queue GetNewQueue2() {return new Queue("tcswuzb.test_queue2");}@Beanpublic Binding GetExchangeQueueBinding1(Queue GetNewQueue1,FanoutExchange GetFanoutExchange) {return BindingBuilder.bind(GetNewQueue1).to(GetFanoutExchange);}@Beanpublic Binding GetExchangeQueueBinding2(Queue GetNewQueue2,FanoutExchange GetFanoutExchange) {return BindingBuilder.bind(GetNewQueue2).to(GetFanoutExchange);}
}
启动后实现效果如下,注意启动启动类后一定要重新加载Bean对象


方法二
方法一直观明了,但是代码量太大,程序冗余性较高
使用@RabbitListener注解来声明队列和交换机的方式,如果没有对应的队列、交换机、绑定方式,就会直接创建
@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="queue_name"),exchange = @Exchange(name = "exchange_name",type= ExchangeTypes.DIRECT),key = {}))
public void MQListener(String msg) throws InterruptedException{System.out.println("Now Get Message is: "+msg);
}
编写测试测试代码:
// src/main/java/org/example/listeners/MQListener.java
@Component
public class MQListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="tcswuzb.test_queue3"),exchange = @Exchange(name = "tcswuzb.Direct",type= ExchangeTypes.DIRECT),key = {"Afterglow","RAS"}))public void MQListener1(String msg) throws InterruptedException{System.out.println("Now Get Message is: "+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="tcswuzb.test_queue4"),exchange = @Exchange(name = "tcswuzb.Direct",type= ExchangeTypes.DIRECT),key = {"PoppinParty","Roselia"}))public void MQListener2(String msg) throws InterruptedException{System.out.println("Now Get Message is: "+msg);}
}// src/test/java/org/example/PublisherApplicationTests.java
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToDirectExchange() throws InterruptedException {String ExchangeName = "tcswuzb.Direct";String Message1 ="Hello,tcswuzb!This is DirectMessage with key(PoppinParty)";String Message2 ="Hello,tcswuzb!This is DirectMessage with key(Roselia)";String Message3 ="Hello,tcswuzb!This is DirectMessage with key(Afterglow)";String Message4 ="Hello,tcswuzb!This is DirectMessage with key(RAS)";;rabbitTemplate.convertAndSend(ExchangeName,"PoppinParty",Message1);rabbitTemplate.convertAndSend(ExchangeName,"Roselia",Message2);rabbitTemplate.convertAndSend(ExchangeName,"Afterglow",Message3);rabbitTemplate.convertAndSend(ExchangeName,"RAS",Message4);}
}
效果如下,可以看到队列、交换机、绑定方式自动创建


消息转换器
我们向队列发送一个map变量
@Test
void TestSendMapTiQueue() {Map<Number,String> Message = new HashMap<>();Message.put(1,"Honoka");Message.put(2,"Umi");String QueueName="tcswuzb.map_queue";rabbitTemplate.convertAndSend(QueueName,Message);
}
查看接收信息,发现结果解析错误

当发送者publisher使用RabbitTemplate.convertAndSend()发送 Java 对象时,该转换器会将对象序列化为JSON格式的字节数组,并设置消息头中的content-type为 application/json。这使得消息在RabbitMQ管理界面或与其他非Java系统交互时具有良好的可读性和兼容性。
当消费者consumer通过@RabbitListener或其他方法接收消息时,通过使用消息转化组件JacksonJsonMessageConverter,读取消息体中的JSON数据,并根据目标类型将其反序列化为具体的Java对象。
我们需要在publisher和consumer两个模块中都添加如下依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
在SpringBoot应用中,通常通过定义Bean的方式全局配置该转换器,使其自动应用于RabbitTemplate和@RabbitListener
@Bean
public MessageConverter JacksonMessageConverter() {return new JacksonJsonMessageConverter();
}
重新发送消息并查看,可以看到消息已经恢复正常

