当前位置: 首页 > news >正文

RabbitMQ_4_高级特性(1) - 详解

消息确认

消息确认机制

生产者发送消息之后,到达消费端可能会有以下两种情况:

  1. 消息处理成功
  2. 消息处理异常

RabbitMQ向消费者发送消息之后,就会把这条消息删掉,那么第二种情况就会造成消息丢失。那么如何确保消费端已经成功接受了,并正确处理了呢?

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。

ps:这个机制和我们上篇文章的发布确认有所不同,它是作用在消费端和Broker服务器之间的。

消费者再订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种:

  • 自动确认:当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存/磁盘中删除,而不管消费者是否真正处理了这些消息。自动确认模式适合对于消息可靠性要求不高的场景。
  • 手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存/磁盘中移去消息。这种模式适合对消息可靠性要求比较高的场景。
/*** Start a non-nolocal, non-exclusive consumer, with* a server-generated consumerTag.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param callback an interface to the consumer object* @return the consumerTag generated by the server* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws
IOException;

代码示例:

DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

当autoAck参数置为false时,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:意识等待投递给消费者的消息。二是已经投递给消费者,但是还米欸有收到消费者确认信号的·消息。如果RabbitMQ一致没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来那个消费者。

从RabbitMQ的管理平台上,也可以看到当前队列中的Ready和Unacked状态的消息数。

Ready:等待投递给消费者的消息数

Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数

手动确认方法

消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ也提供了不同的确认应答方式,消费者客户端可以调用与其对应的channel相关方法,共有以下三种:

1、肯定确认:Channel.basicAck(long deliveryTage,boolean multiple)

RabbitMQ已知道该消息并且成功处理消息,可以将其丢弃了。

参数说明:

(1)deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值。deliveryTag时每个通道(Channel)独立维护的,所以在每个通道上都是唯一的。当消费者确认(ack)一条消息时,必须使用对应的通道上进行确认。

(2)multiple:是否批量确认。在某些情况下,为了减少网络流量,可以对一系列连续的deliveryTag进行批量确认。值为true则会一次性ack所有小于或等于指定deliveryTag的消息。值为false,则只确认当前指定的deliveryTag的消息。

2、否定确认:Channel.basicReject(long deliveryTag,boolean requeue)

RabbitMQ在2.0.0版本开始引入Basic.Reject这个命令,消费者客户端可以调用channel.basicReject方法来告诉RabbitMQ拒绝这个消息。

参数说明:

1、deliveryTag:参考上文。

2、requeue:表示拒绝后这条消息如何处理。如果热queue参数设置为true,则RabbitMQ会重新把这条消息存入队列中,以便可以发送给下一个订阅的消费者。如果热queue参数设置为false,则RabbitMQ会把这条消息从队列中移除,而不会把它发送给新的消费者。

3、否定确认:Channel.basicNack(long deliveryTag,boolean multiple,boolean requeue)

Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。消费者客户端可以调用channel.basicNack方法来实现。

参数介绍参考上面两个方法。

代码示例

Spring-AMQP对消息确认机制提供了三种策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}

1、AcknowledgeMode.NONE

这种模式下,消息一旦投递给消费者,不管消费者是否处理了消息,RabbitMQ就会自动确认消息,从RabbitMQ队列中移除消息。如果消费者处理消息失败,消息可能会丢失。(相当于自动确认)

2、Acknowledge.AUTO(默认)

这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息。

3、AcknowledgeMode.MANUAL

手动确认模式下,消费者必须在成功处理消息后显示调用basicAck方法来确认消息。如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。

下面我们通过代码进行演示:

主要流程:

1、配置确认机制(自动确认/手动确认)

2、生产者发送消息

3、消费端逻辑

4、测试

1、AcknowledgeMode.NONE
1、配置确认机制
spring:application:name: rabbit-extensions-demo#配置RabbitMQ的基本信息#amqp://username:password@Ip:port/virtual-hostrabbitmq:addresses: amqp://admin:admin@1主机Ip:5672/虚拟机listener:simple:acknowledge-mode: none  #消息接收确认
#            acknowledge-mode: auto
#            acknowledge-mode: manual
2、发送消息

队列,交换机配置

public class Contants {public static  final String ACK_QUEUE = "ack.queue";public static  final String ACK_EXCHANGE = "ack.exchange";
}
@Configuration
public class RabbitMQConfig {/*** 声明队列* @return*/@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Contants.ACK_QUEUE).build();}/*** 声明交换机* @return*/@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Contants.ACK_EXCHANGE).build();}/*** 声明绑定关系*/@Bean("ackBinding")public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}

通过接口发送消息:

@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Contants.ACK_EXCHANGE,"ack","ack consumer test...");return "消息发送成功";}
}
3、消费端逻辑
@Component
public class AckListener {
@RabbitListener(queues = Contants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws Exception {//消费者逻辑System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody()),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("模拟业务逻辑处理");//    int num = 3 / 0;System.out.println("业务逻辑处理完成");}
}
4、运行程序并调用接口发送消息(先把消费者注掉)

此时,可以看到队列里面多了一条消息:

开启消费者,重新启动服务器,控制台输出:

修改代码,手动制造算数异常:

 int num = 3 / 0;

将刚才注掉的代码打开,发送消息:

可以看到控制台打印出异常,并且业务逻辑也没有正确执行,但是队列中的消息仍然消失了:

2、Acknowledge.AUTO(默认)
1、修改配置:
spring:application:name: rabbit-extensions-demo#配置RabbitMQ的基本信息#amqp://username:password@Ip:port/virtual-hostrabbitmq:addresses: amqp://admin:admin@106.52.188.165:5672/extensionlistener:simple:
#            acknowledge-mode: none  #消息接收确认acknowledge-mode: auto
#            acknowledge-mode: manual

生产者消费者,复用原来的即可。

2、发送消息

可以看到消息能够正常被消费:

手动制造算数异常,可以看到消息不断重新入队尝试重新发送给消费者:

3、AcknowledgeMode.MANUAL
1、修改配置
spring:application:name: rabbit-extensions-demo#配置RabbitMQ的基本信息#amqp://username:password@Ip:port/virtual-hostrabbitmq:addresses: amqp://admin:admin@106.52.188.165:5672/extensionlistener:simple:
#            acknowledge-mode: none  #消息接收确认
#            acknowledge-mode: autoacknowledge-mode: manual

由于此处需要使用deliveryTag,因此需要修改消费者代码获取Channel进而获取到deliveryTag。

2、修改消费者代码
@Component
public class AckListener {@RabbitListener(queues = Contants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("接收到消息:%s,deliveryTag:%d\n", new String(message.getBody()),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("模拟业务逻辑处理");
//            int num = 3 / 0;System.out.println("业务逻辑处理完成");//肯定确认channel.basicAck(deliveryTag, false);} catch (Exception e) {//否定确认channel.basicNack(deliveryTag, false, false);}}
}

channel.basicAck(deliveryTag, false):第一个参数表示当前肯定确认从哪个deliveryTag开始,第二个参数表示是否批量确认,由于我们这里只发送一条消息所以是false。

channel.basicNack(deliveryTag, false, false):第一个参数表示当前肯定确认从哪个deliveryTag开始,第二个参数表示是否批量确认,第三个参数表示是否重新入队,这里先选择false进行观察。

注意:这里不要引错channel的包!!!

否则会报异常:

重启服务器发送消息:

可以看到消息被正常处理了,这里的deliveryTag=2,是因为我们刚才测试auto的那条消息因为重发一直在队列中,成为重启后当前channel发给消费者的第一条消息。

制造异常,重新发送消息:可以看到此时的业务并没有正确执行,但是消息也并没有被重新入队:

修改为重新入队:

catch (Exception e) {//否定确认channel.basicNack(deliveryTag, false, true);}

可以看到此时就出现了auto那样的情况,消息会不停入队并尝试重发:

http://www.jsqmd.com/news/312607/

相关文章:

  • 2026年 画室推荐排行榜:艺考画室、画室收费标准、画室排名前十位,专业师资与升学口碑深度解析
  • 2026 年净化板、净化工程、C 型钢、光伏夹芯板、光伏岩棉板五大优选品牌 赋能绿色工程高质量建设
  • 题解:P9041 [PA 2021] Fiolki 2
  • 顶尖学府与科技中心联合发布AI研究基金与学者奖项
  • 第一次申请博客,没想到很快就通过了
  • 如何一定需要使用电脑进行拍照,但是电脑像素太差怎么办
  • React Server Components (RSC) 协议中的高危漏洞:CVE-2025-55182 技术剖析
  • 实战抄作业:使用 Claude Code 将 10 万行 TypeScript 代码移植到 Rust
  • 数据驱动,优选未来:2026年1月空气治理/甲醛检测/除甲醛/空气检测/四川甲醛治理服务商选购指南
  • 2026机场与健身房商用全自动咖啡机推荐 商用设备适配高需求场景
  • 2026年 辽宁建筑资质代办服务推荐榜:涵盖升级/增项/延期/转让等全流程,专业高效助力企业合规发展
  • 手把手玩转CNN-BiLSTM-Attention分类模型
  • 设计模式学习(21) 23-20 解释器模式
  • 总结2026年企业和文化团建活动服务靠谱的十大公司
  • 2026年上海木暖地板行业口碑排名揭晓,木暖世家等品牌值得关注
  • LuatOS框架的使用
  • 2026年高性价比法律检索系统推荐,北京靠谱软件排名情况
  • 2026年苏州生鲜行业口碑排名,途一鲜规模、实力、性价比全解析
  • 2026年河北医院设计服务靠谱公司盘点,看哪家性价比高
  • 中电金信:【AI智变】化身“质检员”,AI让客服质检更智能、更高效
  • 2026年有哪些中药提取物厂家TOP5靠谱推荐 排毛球/去泪痕植物原料哪家好
  • Vmware安装contros9的linux镜像
  • 2026年度“真香”之选:爱果乐千元级人体工学椅,性价比天花板再升级
  • PackageManagerService 简析
  • 2026年辽宁资质代办服务推荐榜:监理/设计/电力/市政/水利/勘察/施工/劳务/特种工程等全类别资质专业代办,高效合规助力企业升级
  • 2026 年净化板、净化工程、C 型钢、光伏夹芯板、光伏岩棉板五大优质供应商甄选 实力品牌助力工程建设
  • 【苏州高薪急聘】自动化机械设计师:挑战柔性抓取技术新蓝海 | 省级专精特新企业,定义工业4.0末端执行器
  • 细聊国内有轨电车个性化定制,新阳光价格多少钱
  • 2026年变压器组件抓取方案选型指南
  • 如何选择丰台科技园区写字楼租赁服务商,高性价比场地在哪