当前位置: 首页 > news >正文

分布式之RabbitMQ的使用(3)QueueBuilder - 详解

文章目录

  • 前言
  • 一、概述
  • 二、功能特点
  • 三、准备工作
    • 创建CreateRabbitMQConfig
    • 创建listening
    • 创建QueueBuilderController
  • 四、常用属性及方法介绍
    • (一)创建持久化队列
      • 语法
      • 代码编写
        • CreateRabbitMQConfig
        • QueueBuilderController
        • ListenRabbitMQConfig
        • 测试
    • (二)创建非持久化队列
    • (三)创建排他队列
    • (四) 创建自动删除队列
  • (五)设置队列长度限制
  • 五、使用步骤
    • (一)引入相关依赖
    • (二)创建队列对象
    • (三)后续操作

前言

本篇帖子是上一篇分布式之RabbitMQ的使用(3)的续写。

一、概述

QueueBuilder 是在与消息队列系统(如 RabbitMQ)集成时,用于以编程方式构建队列(Queue)的工具类或构建器模式的实现。它提供了一种便捷的方法来配置队列的各种属性,使得在应用程序中能够根据具体需求灵活创建不同类型和特性的队列。

二、功能特点

  1. 属性配置灵活性:通过一系列方法,可以轻松设置队列的关键属性,如是否持久化、是否排他、是否自动删除等,以满足不同的业务场景和消息处理要求。
  2. 与消息队列系统紧密集成:通常是特定消息队列客户端库(如 Spring AMQP 用于 Spring Boot 与RabbitMQ 集成)的一部分,能够无缝对接相应的消息队列服务,确保创建的队列在系统中正确生效。

三、准备工作

创建CreateRabbitMQConfig

新建config文件夹,并在其文件夹下创建类CreateRabbitMQConfig

package com.hsh.config;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CreateRabbitMQConfig {
}

创建listening

新建listen文件夹,并在其文件夹下创建类ListenRabbitMQConfig

package com.hsh.listen;
import org.springframework.stereotype.Component;
@Component
public class ListenRabbitMQConfig {
}

创建QueueBuilderController

在controller文件夹下创建类QueueBuilderController

package com.hsh.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {
@Autowired
private RabbitTemplate rabbitTemplate;
}

四、常用属性及方法介绍

(一)创建持久化队列

语法

方法: QueueBuilder.durable(String queueName)
说明:用于创建一个持久化队列。持久化队列在消息队列服务器重启或发生意外故障后,其队列定义(包括队列名称、属性等)以及队列中尚未被消费的消息不会丢失,会在服务器恢复正常后继续存在,可以继续被消费者消费。
语法如下示例:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();

在上述示例中,创建了一个名为 myDurableQueue 的持久化队列。

代码编写

我们以创建简单队列的持久化队列为例。

CreateRabbitMQConfig

在上面新建的CreateRabbitMQConfig中编写,用于在项目加载时创建队列

package com.hsh.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CreateRabbitMQConfig {
@Bean
public Queue myDurableQueue () {
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();
return durableQueue;
}
}
QueueBuilderController

编写QueueBuilderController,用于用户发起请求,也就是生产者。

package com.hsh.controller;
import com.hsh.pojo.Goods;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String index(){
Goods goods =new Goods();
for (int i = 0; i < 10; i++){
goods.setGoodsId(i);
rabbitTemplate.convertAndSend("myDurableQueue", goods);
}
return "发送成功";
}
}
ListenRabbitMQConfig

编写ListenRabbitMQConfig用于监听队列

package com.hsh.listen;
import com.hsh.pojo.Goods;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.stereotype.Component;
@Component
public class ListenRabbitMQConfig {
// 不在这里创建队列了 @RabbitListener(queuesToDeclare = @Queue("work"))
// 我们这个myDurableQueue在上面的 CreateRabbitMQConfig创建,这样更加灵活,也是为什么使用QueueBuilder的原因。
@RabbitListener(queues = "myDurableQueue")
public void receive(Message  message){
Goods goods = (Goods) SerializationUtils.deserialize(message.getBody());
System.out.println("消费者===========:" + goods);
}
}
测试

直接访问http://localhost:8080/queueBuilderController/send
控制台如下
在这里插入图片描述

(二)创建非持久化队列

方法: QueueBuilder.nonDurable(String queueName)
说明:创建一个非持久化队列。非持久化队列在某些情况下(如连接关闭、相关消费者进程结束等),队列及其可能剩余的消息通常会被自动删除,不具备在服务器重启等情况下的数据保留能力。
示例:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();

这里创建了一个名为 myNonDurableQueue 的非持久化队列。

和上面创建持久化队列差不多就是把CreateRabbitMQConfig的内容换一下不再演示。

(三)创建排他队列

  • 方法: QueueBuilder.exclusive(String queueName)
  • 说明:排他队列是连接专属的,只有创建它的连接可以使用它,并且当连接关闭时,该队列会被自动删除。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue exclusiveQueue = QueueBuilder.exclusive("myExclusiveQueue")
    .build();

此示例创建了一个名为 myExclusiveQueue 的排他队列。

(四) 创建自动删除队列

  • 方法: QueueBuilder.autoDelete(String queueName)
  • 说明:自动删除队列在所有消费者都取消订阅或者队列中的消息都被消费完后(满足其中一个条件即可),队列会会被自动删除。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue autoDeleteQueue = QueueBuilder.autoDelete("myAutoDeleteQueue")
    .build();

创建了一个名为 myAutoDeleteQueue 的自动删除队列。

(五)设置队列长度限制

  • 方法: QueueBuilder.durable(String queueName).maxLength(int maxLength) (以在持久化队列上设置为例,非持久化等其他类型队列设置方式类似)
  • 说明:用于设置队列的最大长度限制,即队列中最多能容纳的消息数量。当队列中的消息数量达到该限制时,后续发送到该队列的消息可能会根据消息队列系统的策略进行相应处理(如被丢弃等)。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue queueWithLengthLimit = QueueBuilder.durable("myQueueWithLimit")
    .maxLength(1000)
    .build();

在上述示例中,创建了一个名为 myQueueWithLimit 的持久化队列,并设置其最大长度为 1000 条消息。

五、使用步骤

(一)引入相关依赖

在使用 QueueBuilder 之前,需要确保项目中引入了与消息队列系统对应的客户端库。例如,在 SpringBoot 中集成 RabbitMQ 并使用 QueueBuilder 时,需要在项目的 pom.xml(Maven 项目)或build.gradle(Gradle 项目)中引入 Spring AMQP 依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(二)创建队列对象

根据业务需求,选择合适的 QueueBuilder 方法来创建队列对象。也就是上面的CreateRabbitMQConfig中编写如下代码例如:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
// 创建一个持久化队列
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();
// 创建一个非持久化队列
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();

(三)后续操作

创建好队列对象后,可以根据具体的消息队列集成场景进行后续操作,比如:

将队列绑定到交换机(在使用交换机的消息队列架构中):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 创建一个持久化队列
@Bean
public Queue myQueue() {
return QueueBuilder.durable("myQueue")
.build();
}
// 创建一个持久化的直接交换机
@Bean
public DirectExchange myDirectExchange() {
return ExchangeBuilder.directExchange("myDirectExchange")
.durable(true)
.build();
}
// 创建队列与交换机之间的绑定关系
/*
*/
@Bean
public Binding binding(DirectExchange myDirectExchange, Queue myQueue) {
return BindingBuilder.bind(myQueue)
.to(myDirectExchange)
.with("routingKey");
}
}
  • myQueue 方法使用 QueueBuilder 创建了一个名为 myQueue 的持久化队列,并通过 @Bean 注解将其注册为 Spring 容器中的一个 Bean。
  • myDirectExchange 方法使用 ExchangeBuilder 创建了一个名为myDirectExchange 的持久化直接交换机,同样通过 @Bean 注解将其注册为 Spring 容器中的一个 Bean。
  • binding 方法通过 @Bean 注解创建了一个 Binding 对象,用于将前面创建的 myQueue 队列和myDirectExchange 交换机进行绑定,并且指定了路由键为 routingKey 。Spring 会自动将已经创建好的 myDirectExchangemyQueue 这两个 Bean 作为参数传递给 binding 方法,无需手动干预。

binding方法:

  • 调用时机:Spring 在创建 Binding 这个 Bean 时会自动调用 binding 方法。因为这个方法也被标注了 @Bean 注解,所以 Spring 会识别它并按照创建 Bean 的流程来处理。
  • 参数传递: binding 方法的参数是 DirectExchange myDirectExchangeQueue myQueue 。Spring 会通过方法参数的类型来自动匹配已经在 Spring 容器中创建好的相应 Bean 实例并作为参数传递进来。也就是说,Spring 会找到之前通过 myDirectExchange 方法创建并注册的直接交换机 Bean 和通过 myQueue 方法创建并注册的队列 Bean,然后将它们分别传递给 binding 方法,以便在方法内部使用 BindingBuilder 将队列和交换机进行绑定操作。

将队列提供给消息生产者和消费者使用:
在消息生产者端,可以将消息发送到创建好的队列中。例如,在 Spring AMQP 中:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue myQueue;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(myQueue.getName(), message);
}
}

在上述代码中,通过 RabbitTemplate 将消息发送到了名为 myQueue 的队列中(这里假设 myQueue 是之前通过 QueueBuilder 创建的队列)。

在消息消费者端,可以从创建好的队列中获取消息进行处理。例如:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@RabbitListener(queues = "myQueue")
public void receiveMessage(String message) {
// 处理接收到的消息
System.out.println("Received message: " " + message);
}
}

这里通过 RabbitListener 注解监听名为 myQueue 的队列,当队列中有消息时,会调用receiveMessage 方法进行处理。

http://www.jsqmd.com/news/40569/

相关文章:

  • 2025年市面上口碑好的出国留学中介机构哪家强,全球联申/名校录取/留学就业一体化/背景提升/语言培训中介哪家好
  • 网络犯罪新手段:黑客如何利用IT技术实施货物盗窃
  • 11/14
  • 25.11.14
  • 题解:AtCoder ARC209D A_A_i
  • 代码制作数学动画 python manim jjmpeg - 何苦
  • 重组融合蛋白技术概述
  • OpenEuler安装宝塔
  • 20230827 - Balancer 攻击事件:价格操纵 + 精度丢失的经典组合拳
  • 我的标题2
  • 破解cocos creator 2.3.2, 让它支持M芯片
  • Kotlin Coroutines
  • 我的标题
  • 深入解析:软考中级-系统集成项目管理工程师**的超详细知识点笔记。
  • GeoScene Pro试用申请
  • 题解:P13573 [CCPC 2024 重庆站] Pico Park
  • 【AI智能体】Coze 提取对标账号短视频生成视频文案实战详解 - 指南
  • Java Benchmark使用
  • 实用指南:12-机器学习与大模型开发数学教程-第1章1-4 导数与几何意义
  • 基于Vue社区共享游泳馆预约高效的系统n897q36e (工具+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • docker登录容器镜像仓库
  • 吴恩达深度学习课程二: 改善深层神经网络 第三周:超参数调整,批量标准化和编程框架(一)超参数调整
  • Go-秘籍-全-
  • Kotlin中的flow、stateflow、shareflow之间的区别和各自的功能 - 教程
  • 非离散网络流——P3347 [ZJOI2015] 醉熏熏的幻想乡
  • [note] 素数判定与分解质因数
  • 不能识别adb/usb口记录 - 实践
  • 恭喜自己,挑战成功! - Ghost
  • 如何在测试覆盖不足后补充验证
  • react动态表单