当前位置: 首页 > news >正文

RabbitMQ学习2 RabbitMQ-Java客户端

涙はいらない このまま踊ろう

RabbitMQ-Java客户端

AMQP

Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该标准和平台、语言无关,更符合微服务中独立性的要求。

Spring AMQP,是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

入门案例

需求:

利用控制台创建simple.queue
publisher服务中,利用SpringAMQP直接向simple.queue发送消息
consumer服务中,利用SpringAMQP编写消费者,监听simple.queue的消息

JAVA项目架构如下所示:
image

首先要在依赖文件中引入SpringAMQP依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

接下来在RabbitMQ配置文件中配置RabbitMQ服务端信息

spring:rabbitmq:host: 192.168.100.128port: 5672virtual-host: /tcswuzbusername: tcswuzbpassword: 1234

编写消息发送者测试程序

// src/main/java/org/example/listeners/MQListener.java
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToQueue() {String QueueName = "simple.queue";String Message = "Hello,tcswuzb!This is Test Queue!";rabbitTemplate.convertAndSend(QueueName,Message);}}

运行测试方法,效果如下:
image
image
image

编写消息消费者测试程序


@Component
public class MQListener {@RabbitListener(queues = "simple.queue")public void ListenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("Consumer Get Message: "+msg);}
}

运行之后,发送消息
image

Work Queues

Workqueues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息
image

编写代码如下:

@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToWorkQueue() throws InterruptedException {String QueueName = "work_test.queue";String Message;for(int i=1;i<=50;++i) {Message ="Hello,tcswuzb!This is WorkMessage"+i;rabbitTemplate.convertAndSend(QueueName,Message);Thread.sleep(20);}}
}@Component
public class MQListener {@RabbitListener(queues = "work_test.queue")public void ListenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("WorkQueue1 Consumer Get Message: "+msg);}@RabbitListener(queues = "work_test.queue")public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("WorkQueue2 Consumer Get Message: "+msg);}
}

测试结果如下,可以看出RabbitMQ对于消息是均分给了两个消费者处理而不考虑其具体状态:
image

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

使用spring.rabbitmq.listener.simple.prefetch参数,用于配置‌简单消息监听容器(Simple Message Listener Container)‌,指定单个消费者(Consumer)在发送确认(ACK)之前,最多可以从RabbitMQ Broker预取并持有多少条‌未确认(Unacknowledged)‌的消息。

因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:listener:simple:prefetch: 1

同时修改队列2的处理代码,人为添加休眠从而影响其处理效率

@RabbitListener(queues = "work_test.queue")
public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.err.println("WorkQueue2 Consumer Get Message: "+msg);Thread.sleep(200);
}

处理完后效果如下:
image

Work模型的使用:

多个消费者绑定到一个队列,可以加快消息处理速度
同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

Fanout:广播
Direct:定向
Topic:话题

交换机的作用:

接收publisher发送的消息
将消息按照规则路由到与之绑定的队列

Fanout交换机

Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
image

首先,设置一个交换机Exchange并且绑定队列
image

编写测试代码:

@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToExchange() throws InterruptedException {String ExchangeName = "tcswuzb.fanout_test";String Message ="Hello,tcswuzb!This is WorkMessage";rabbitTemplate.convertAndSend(ExchangeName,null,Message);}
}@Component
public class MQListener {@RabbitListener(queues = "fanout1.queue")public void ListenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("WorkQueue1 Consumer Get Message from Fanout1: "+msg);}@RabbitListener(queues = "fanout2.queue")public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("WorkQueue2 Consumer Get Message from Fanout2: "+msg);}
}

效果如下:
image

Direct交换机

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列Queue

image

设置Direct交换机,设置BindingKey绑定队列
image
编写测试代码:

@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToDirectExchange() throws InterruptedException {String ExchangeName = "Direct_test";String Message1 ="Hello,tcswuzb!This is DirectMessage with key(cdy)";String Message2 ="Hello,tcswuzb!This is DirectMessage with key(wzy)";String Message3 ="Hello,tcswuzb!This is DirectMessage with key(zjz)";rabbitTemplate.convertAndSend(ExchangeName,"cdy",Message1);rabbitTemplate.convertAndSend(ExchangeName,"wzy",Message2);rabbitTemplate.convertAndSend(ExchangeName,"zjz",Message3);}
}@Component
public class MQListener {@RabbitListener(queues = "tcswuzb.queue1")public void ListenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("Queue1 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.queue2")public void ListenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("Queue2 Consumer Get Message from Direct: "+msg);}
}

效果如下:
image

Topic交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割。

QueueExchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词
*:代指1个单词

image

设置Topic交换机,设置BindingKey绑定队列
image

编写测试代码

@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToDirectExchange() throws InterruptedException {String ExchangeName = "tcswuzb.topic";String Message1 ="Hello,tcswuzb!This is DirectMessage with key(china.vegetables)";String Message2 ="Hello,tcswuzb!This is DirectMessage with key(japan.fruits)";String Message3 ="Hello,tcswuzb!This is DirectMessage with key(USA.weather)";String Message4 ="Hello,tcswuzb!This is DirectMessage with key(India.news)";String Message5 ="Hello,tcswuzb!This is DirectMessage with key(china.news)";String Message6 ="Hello,tcswuzb!This is DirectMessage with key(japan.weather)";String Message7 ="Hello,tcswuzb!This is DirectMessage with key(china.weather)";String Message8 ="Hello,tcswuzb!This is DirectMessage with key(japan.news)";rabbitTemplate.convertAndSend(ExchangeName,"china.vegetables",Message1);rabbitTemplate.convertAndSend(ExchangeName,"japan.fruits",Message2);rabbitTemplate.convertAndSend(ExchangeName,"USA.weather",Message3);rabbitTemplate.convertAndSend(ExchangeName,"India.news",Message4);rabbitTemplate.convertAndSend(ExchangeName,"china.news",Message5);rabbitTemplate.convertAndSend(ExchangeName,"japan.weather",Message6);rabbitTemplate.convertAndSend(ExchangeName,"china.weather",Message7);rabbitTemplate.convertAndSend(ExchangeName,"japan.news",Message8);}
}@Component
public class MQListener {@RabbitListener(queues = "tcswuzb.topic_queue1")public void ListenQueueMessage1(String msg) throws InterruptedException {System.out.println("Queue1 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.topic_queue2")public void ListenQueueMessage2(String msg) throws InterruptedException {System.out.println("Queue2 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.topic_queue3")public void ListenQueueMessage3(String msg) throws InterruptedException {System.out.println("Queue3 Consumer Get Message from Direct: "+msg);}@RabbitListener(queues = "tcswuzb.topic_queue4")public void ListenQueueMessage4(String msg) throws InterruptedException {System.out.println("Queue4 Consumer Get Message from Direct: "+msg);}
}

效果如下:
image

声明队列与交换机

方法一

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

Queue:用于声明队列,可以用工厂类QueueBuilder构建
Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

通过配置类来生成,以Fanout交换机为例

@Configuration
public class RabbitMQConfiguration {@Beanpublic FanoutExchange GetFanoutExchange() {return new FanoutExchange("tcswuzb.fanout");}@Beanpublic Queue GetNewQueue1() {return new Queue("tcswuzb.test_queue1");}@Beanpublic Queue GetNewQueue2() {return new Queue("tcswuzb.test_queue2");}@Beanpublic Binding GetExchangeQueueBinding1(Queue GetNewQueue1,FanoutExchange GetFanoutExchange) {return BindingBuilder.bind(GetNewQueue1).to(GetFanoutExchange);}@Beanpublic Binding GetExchangeQueueBinding2(Queue GetNewQueue2,FanoutExchange GetFanoutExchange) {return BindingBuilder.bind(GetNewQueue2).to(GetFanoutExchange);}
}

启动后实现效果如下,注意启动启动类后一定要重新加载Bean对象
image
image

方法二

方法一直观明了,但是代码量太大,程序冗余性较高

使用@RabbitListener注解来声明队列和交换机的方式,如果没有对应的队列、交换机、绑定方式,就会直接创建

@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="queue_name"),exchange = @Exchange(name = "exchange_name",type= ExchangeTypes.DIRECT),key = {}))
public void MQListener(String msg) throws InterruptedException{System.out.println("Now Get Message is: "+msg);
}

编写测试测试代码:

// src/main/java/org/example/listeners/MQListener.java
@Component
public class MQListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="tcswuzb.test_queue3"),exchange = @Exchange(name = "tcswuzb.Direct",type= ExchangeTypes.DIRECT),key = {"Afterglow","RAS"}))public void MQListener1(String msg) throws InterruptedException{System.out.println("Now Get Message is: "+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="tcswuzb.test_queue4"),exchange = @Exchange(name = "tcswuzb.Direct",type= ExchangeTypes.DIRECT),key = {"PoppinParty","Roselia"}))public void MQListener2(String msg) throws InterruptedException{System.out.println("Now Get Message is: "+msg);}
}// src/test/java/org/example/PublisherApplicationTests.java
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessageToDirectExchange() throws InterruptedException {String ExchangeName = "tcswuzb.Direct";String Message1 ="Hello,tcswuzb!This is DirectMessage with key(PoppinParty)";String Message2 ="Hello,tcswuzb!This is DirectMessage with key(Roselia)";String Message3 ="Hello,tcswuzb!This is DirectMessage with key(Afterglow)";String Message4 ="Hello,tcswuzb!This is DirectMessage with key(RAS)";;rabbitTemplate.convertAndSend(ExchangeName,"PoppinParty",Message1);rabbitTemplate.convertAndSend(ExchangeName,"Roselia",Message2);rabbitTemplate.convertAndSend(ExchangeName,"Afterglow",Message3);rabbitTemplate.convertAndSend(ExchangeName,"RAS",Message4);}
}

效果如下,可以看到队列、交换机、绑定方式自动创建
image
image

消息转换器

我们向队列发送一个map变量

@Test
void TestSendMapTiQueue() {Map<Number,String> Message = new HashMap<>();Message.put(1,"Honoka");Message.put(2,"Umi");String QueueName="tcswuzb.map_queue";rabbitTemplate.convertAndSend(QueueName,Message);
}

查看接收信息,发现结果解析错误
image

当发送者publisher使用RabbitTemplate.convertAndSend()发送 Java 对象时,该转换器会将对象序列化为JSON格式的字节数组,并设置消息头中的content-typeapplication/json。这使得消息在RabbitMQ管理界面或与其他非Java系统交互时具有良好的可读性和兼容性。

当消费者consumer通过@RabbitListener或其他方法接收消息时,通过使用消息转化组件JacksonJsonMessageConverter,读取消息体中的JSON数据,并根据目标类型将其反序列化为具体的Java对象。

我们需要在publisherconsumer两个模块中都添加如下依赖

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

在SpringBoot应用中,通常通过定义Bean的方式全局配置该转换器,使其自动应用于RabbitTemplate@RabbitListener

@Bean
public MessageConverter JacksonMessageConverter() {return new JacksonJsonMessageConverter();
}

重新发送消息并查看,可以看到消息已经恢复正常
image

http://www.jsqmd.com/news/705484/

相关文章:

  • 西恩士高端显微检测 液冷冷板清洁度显微镜分析 - 工业干货社
  • return 结果1, 结果2 在python中和在javascript中的区别
  • 【微服务与云原生架构】DevOps、CI/CD流水线、GitOps 系统性知识体系
  • YetAnotherKeyDisplayer完整指南:3大场景实战与5个深度定制技巧
  • 华硕笔记本终极优化指南:用G-Helper一键解决性能与色彩问题![特殊字符]
  • 开源金融研究智能体Dexter:基于AI的自动化投资分析实践
  • 制作加笔记
  • 量子Kerr非线性谐振器在机器学习核方法中的应用
  • WaveTools:为《鸣潮》玩家打造的全能游戏优化伴侣
  • Python零基础入门学习之输入与输出
  • 矩阵分解在推荐系统中的应用与实践
  • python click
  • 碳交易与需求响应双轮驱动的综合能源系统优化运行软件
  • 2026年3月可靠的上海钢结构厂家推荐,钢结构板房/设备钢平台/工业钢平台/仓库钢平台,上海钢结构生产厂家有哪些 - 品牌推荐师
  • python常见运算符及用法小结
  • 别留小尾巴/尽快剪掉小尾巴:从一次“ABA”字段重命名,谈谈“解决问题要彻底”
  • LocalGPT:本地化AI助手与3D生成器的架构解析与实践指南
  • MS2130芯片HDMI采集棒性能解析与应用指南
  • Hermes Agent 为什么最近总被反复提起?
  • IPXWrapper终极指南:让Windows 11完美运行90年代经典游戏联机
  • 液冷冷板清洁度颗粒测试设备 西恩士液冷设备优选厂商 - 工业干货社
  • VS Code MCP插件开发实战:手把手完成服务注册、工具发现、会话路由全流程(附GitHub可运行模板)
  • 服创大赛演示视频
  • 3大核心技术模块:WaveTools如何重塑《鸣潮》玩家的游戏体验
  • Flutter for OpenHarmony 引导页萌系实战指南:给新用户一份软乎乎的欢迎礼✨
  • AI智能体开发实战:AgentGym平台架构解析与自定义智能体接入指南
  • Python queue模块功能大全
  • 新手避坑指南:从URDF到MoveIt!Setup Assistant配置机械臂的完整流程
  • 终极QMC音频解密方案:快速免费解锁你的QQ音乐文件 [特殊字符]
  • Linux /tmp 目录管理