当前位置: 首页 > news >正文

RabbitMq高级篇

发送者可靠性 mq可靠性 消费者可靠性 延迟消息

消息从生产者到消费者每一步都可能导致消息丢失

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

保证消息的可靠性从三方面进行入手

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

发送者可靠性

1,生产者可靠性 生产者重试机制

但是当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

具体yml文件

logging: pattern: dateformat: MM-dd HH:mm:ss:SSS level: com.itheima: DEBUG #设置com.itheima包及其子包的日志级别为DEBUG,可以输出更多的调试信息 spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: admin password: password123 connection-timeout: 1s #连接超时时间,单位为毫秒 template: retry: enabled: true #启用重试机制 multiplier: 1 #重试间隔的倍数,默认为1 这个只是连接失败的重试 max-attempts: 3 #最大重试次数,默认为3 publisher-confirm-type: none #correlated #启用消息确认机制,使用相关模式,可以通过回调函数获取确认结果 publisher-returns: false #true #启用消息返回机制,当消息无法路由到队列时会触发返回

生产者确认机制:

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

- MQ内部处理消息的进程发生了异常

- 生产者发送消息到达MQ后未找到`Exchange`

- 生产者发送消息到达MQ的`Exchange`后,未找到合适的`Queue`,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括`Publisher Confirm`和`Publisher Return`两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的**回执**。

- 当消息投递到MQ,但是路由失败时,通过**Publisher Return**返回异常信息,同时返回ack的确认信息,代表投递成功

- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

- 其它情况都会返回NACK,告知投递失败

其中`ack`和`nack`属于**Publisher Confirm**机制,`ack`是投递成功;`nack`是投递失败。而`return`则属于**Publisher Return**机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

具体的yml文件配置如上图所示

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执 已不是采用这种回调方式

配置ReturnCallBack

@Slf4j @Configuration @AllArgsConstructor public class MqConfirmConfig implements ApplicationContextAware {//实现这个ApplicationContextAware接口在进行执行setApplicationContjjext将整个上下文进行传进去 @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//手动获得到rabbittemplate的bean //配置return回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.debug("收到的消息return 回调了,exchange:{},routingKey:{},replyCode:{},replyText:{},message:{}", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage() ); } }); } }

定义ConfirmCallBack

@Test void testConfirm() throws InterruptedException { //创建cd CorrelationData 是 Spring AMQP 用来追踪消息发送状态的容器。 CorrelationData cd = new CorrelationData(UUID.randomUUID().toString()); //添加回调 cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { log.error("消息回调失败", ex); }//保证消息发送成功后,才会执行onSuccess方法,如果消息发送失败了,就会执行onFailure方法 @Override public void onSuccess(@Nullable CorrelationData.Confirm result) { log.debug("消息回调成功,结果:{}", result); if (result.isAck()) { //消息发送成功 log.debug("发送消息成功收到ack回调"); } else { //发送消息失败 log.error("发送消息失败收到ack回调,原因:{}", result.getReason()); } } }); rabbitTemplate.convertAndSend("test1.direct1", "red", "hello", cd); /* cd.getFuture():获取一个 ListenableFuture 对象。这是一个异步任务句柄,代表“等待 RabbitMQ 返回确认结果”这个未来的动作。 addCallback:给这个未来动作绑定两个回调函数: onFailure(Throwable ex): 触发时机:当发送过程本身出现异常时触发。 场景:例如网络断了、RabbitMQ 服务挂了、连接超时等,导致消息根本没能发出去,或者连确认都没收到就报错了。 注意:这不代表消息在 MQ 内部处理失败,而是通信层面的失败。 onSuccess(CorrelationData.Confirm result): 触发时机:当成功收到 RabbitMQ 的确认帧(Ack/Nack)时触发。 参数 result:包含确认的具体结果。 */ }

由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。
当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
而如果连交换机都是错误的,则只会收到nack。

注意
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

2,MQ的可靠性

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化 在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:设置为Durable就是持久化模式,Transient就是临时模式。
  • 队列持久化 在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。
  • 消息持久化 在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties
@Test void testPageOut() { Message msg = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();//设置消息持久化 rabbitTemplate.convertAndSend("fanout.queue2", msg); /* 在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。 */ }

LazyQueue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut.PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

3,保证消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • **none**:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • **manual**:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • **auto**:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

具体消费者的配置文件

logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: admin password: password123 listener: simple: prefetch: 1 #每次只能消费一条消息,直到处理完成后才会接收下一条消息 retry: enabled: true #启用重试机制 acknowledge-mode: auto #确认机制,none表示不需要确认,auto表示自动确认,manual表示手动确认

失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }

完整代码如下

@Configuration @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true") public class ErrorConfiguration { @Bean public DirectExchange errorExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue"); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){ return BindingBuilder.bind(errorQueue).to(errorExchange).with("error"); } @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");//重新发布 } }

保持业务幂等性

业务幂等性

何为幂等性?
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

这个思路非常简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:

@Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jjmc.setCreateMessageIds(true); return jjmc; }

业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

@Override public void markOrderPaySuccess(Long orderId) { // 1.查询订单 Order old = getById(orderId); // 2.判断订单状态 if (old == null || old.getStatus() != 1) { // 订单不存在或者订单状态不是1,放弃处理 return; } // 3.尝试更新订单 Order order = new Order(); order.setId(orderId); order.setStatus(2); order.setPayTime(LocalDateTime.now()); updateById(order); }

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

我们可以合并上述操作为这样:

@Override public void markOrderPaySuccess(Long orderId) { // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1 lambdaUpdate() .set(Order::getStatus, 2) .set(Order::getPayTime, LocalDateTime.now()) .eq(Order::getId, orderId) .eq(Order::getStatus, 1) .update(); }

注意看,上述代码等同于这样的SQL语句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

保持原子性

兜底方案

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
有没有其它兜底方案,能够确保订单的支付状态一致呢?

其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

至此,消息可靠性的问题已经解决了。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息
在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

死信交换机和延迟消息
首先我们来学习一下基于死信交换机的延迟消息方案。

死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange**属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

@Test void testSendTTLMessage () { rabbitTemplate.convertAndSend("simple.direct", "hi", "hello", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //使用消息转换器 message.getMessageProperties().setExpiration("10000"); return message; } }); log.info("消息发送成功"); }

注意:
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延迟消息:{}", msg); }

基于@Bean的方式:

package com.itheima.consumer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class DelayExchangeConfig { @Bean public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange("delay.direct") // 指定交换机类型和名称 .delayed() // 设置delay的属性为true .durable(true) // 持久化 .build(); } @Bean public Queue delayedQueue(){ return new Queue("delay.queue"); } @Bean public Binding delayQueueBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay"); } }

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test void testPublisherDelayMessage() { // 1.创建消息 String message = "hello, delayed message"; // 2.发送消息,利用消息后置处理器添加消息头 rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 添加延迟消息属性 message.getMessageProperties().setDelay(5000); return message; } }); }

注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息

订单状态同步问题

假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。
但是大多数情况下用户支付都会在1分钟内完成,我们发送的消息却要在MQ中停留30分钟,额外消耗了MQ的资源。因此,我们最好多检测几次订单支付状态,而不是在最后第30分钟才检测。
例如:我们在用户下单后的第10秒、20秒、30秒、45秒、60秒、1分30秒、2分、...30分分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可。
这样就可以有效避免对MQ资源的浪费了。

package com.hmall.common.domain; import com.hmall.common.utils.CollUtils; import lombok.Data; import java.util.List; @Data public class MultiDelayMessage<T> { /** * 消息体 */ private T data; /** * 记录延迟时间的集合 */ private List<Long> delayMillis; public MultiDelayMessage(T data, List<Long> delayMillis) { this.data = data; this.delayMillis = delayMillis; } public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){ return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis)); } /** * 获取并移除下一个延迟时间 * @return 队列中的第一个延迟时间 */ public Long removeNextDelay(){ return delayMillis.remove(0); } /** * 是否还有下一个延迟时间 */ public boolean hasNextDelay(){ return !delayMillis.isEmpty(); } }
http://www.jsqmd.com/news/470646/

相关文章:

  • 计算机网络(七)-- 运输层 | TCP 流量控制 拥塞控制
  • 别拿粉尘不当威胁:方盾半面罩给你的肺多一层保护
  • 砂边倒角机哪家好?2026口碑厂商深度评测,底漆打磨机/双面抛光机/侧面打磨机/全自动洗板机,砂边倒角机制造厂家排行榜 - 品牌推荐师
  • Spring Boot 中关于 Bean 加载、实例化、初始化全生命周期的扩展点
  • ROS1+VINS-fusion+RTAB-Map 程序部署记录
  • 【干货】字节大佬:教培行业销售运营全景作战地图
  • 2026年苏州口碑好的家教老师联系方式,全托补习班/一对一家教试听课/师范家教/一对一/全托一对一,家教机构联系方式 - 品牌推荐师
  • 电子商务行业内哪个环节容易遇到攻击
  • UA-Glo® 荧光法细胞活力检测试剂盒技术原理与应用
  • 五:MySQL 索引使用优化指南:何时建、怎么建、怎么用
  • 虚幻引擎资源查看工具全面解析:从新手入门到高级应用实战指南
  • 2026年知名的ALD技术工厂推荐:ALD工艺开发/ALD原子层沉积高口碑品牌推荐 - 行业平台推荐
  • 联发科牵手星链:紧急警报直连太空
  • 2026年分期乐天虹提货券回收价格表 - 京回收小程序
  • 企业AI大脑是什么?企业落地前先回答的 5 个关键问题
  • AI写专著的秘密武器,实用工具大集合,开启高效写作模式
  • 揭秘AI专著撰写工具!功能对比分析,选对工具事半功倍
  • 代码反混淆实战指南:如何用AST技术快速还原JavaScript代码
  • 2026年热门的二轴程控平面磨床公司推荐:精密台湾型高精度平面磨床品牌厂家哪家靠谱 - 行业平台推荐
  • 掌握AI专著写作工具,快速生成创新性专著,提升学术影响力
  • 2026年口碑好的打包丝公司推荐:退火丝/黑色退火丝/退火调直丝厂家推荐哪家好 - 行业平台推荐
  • ML梅花联轴器哪家研发
  • 2026 实测10款降AI率工具!知网/维普/Turnitin/QuillBot降AI率效果大比拼!
  • 网络代理相关
  • RePKG:Wallpaper Engine资源处理的效率革命工具
  • AI专著生成攻略!实用工具大盘点,助你轻松完成著作
  • 用Openclaw小龙虾自动发布小红书!!!Ubuntu
  • 如何用ExifToolGui批量重命名照片:让摄影文件管理不再繁琐
  • Python基于flask的演唱会在线票务预订平台
  • 抖音直播回放下载工具:从痛点解决到企业级应用的全攻略