当前位置: 首页 > news >正文

详细介绍:【MQ】集群部署和可靠性攻略

>MQ是消息队列,用于应用间异步通信,解耦削峰填谷,提升系统响应速度与可靠性。

目录

一、集群搭建

1.1 普通集群

1.2 镜像集群

1.3 仲裁队列

二、消息可靠性

2.1 生产者确认

2.2 消费者确认

三、拓展

3.1 死信交换机

3.2 延迟队列

3.3 惰性队列


一、集群搭建

1.1 普通集群

确保服务器的下面4个端口是开放状态

依次在三个服务器上创建对应的目录

mkdir -p /opt/rabbitmq-cluster/mq1/data

mkdir -p /opt/rabbitmq-cluster/mq1/config

创建配置文件,三个服务同理

echo "FXZMCVGLBIXZCDEMMVZQ" > /opt/rabbitmq-cluster/.erlang.cookie

chmod 600 /opt/rabbitmq-cluster/.erlang.cookie

创建rabbitmq.conf配置文件,三个服务同理

vim /opt/rabbitmq-cluster/mq1/config/rabbitmq.conf

# 允许远程访问
loopback_users.guest = false
# AMQP 端口
listeners.tcp.default = 5672
# 集群配置
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
# 集群节点类型
cluster_partition_handling = autoheal
# 日志级别
log.console.level = info

配置hosts文件

vi /etc/hosts

127.0.0.1 mq1
127.0.0.2 mq2
127.0.0.3 mq3

编辑docker-compose.yml文件

/opt/rabbitmq-cluster/mq1/docker-compose.yml

version: '3.8'
services:rabbitmq-mq1:image: rabbitmq:3.8-managementcontainer_name: mq1hostname: mq1restart: alwaysenvironment:- RABBITMQ_DEFAULT_USER=rabbit- RABBITMQ_DEFAULT_PASS=123321- RABBITMQ_ERLANG_COOKIE=FXZMCVGLBIXZCDEMMVZQports:- "5672:5672"- "15672:15672"- "4369:4369"- "25672:25672"volumes:- ./config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf- /opt/rabbitmq-cluster/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie- ./data:/var/lib/rabbitmqnetwork_mode: host

docker compose up -d

查看节点是否部署成功

docker exec -it mq1 rabbitmqctl cluster_status

至此我们的mq集群就搭建完成了

至此我们的普通集群搭建完成,普通集群之间的交换机等信息会同步,但是发送的消息不会进行同步。

我们在节点1创建一个队列,可以在其他节点看见该队列

我们在节点1发送一条数据到队列,然后去其他节点查看数据可以获取数据。

但是此时我们关闭节点1会发现,队列不可用,说明队列和消息没有同步

1.2 镜像集群

镜像队列有三种模式:exactly模式,all模式,node模式;不过官方推荐使用exactly模式,所以笔者只介绍准确模式,其余两种各位读者可自行了解。

rabbitmqctl set_policy ha-policy "^myqueue\." \ '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

对于队列副本ha-param的值,我们遵循分布式系统的最佳实践 -- (N/2)+1原则。

可能有的读者会担心难道每次启动该节点都要再执行一次这个命令吗,其实不用的,因为mq的策略是持久保持在我们挂载的数据卷目录的。

docker exec -it mq1 rabbitmqctl set_policy ha-image "^image\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

这里我们创建的队列一定要满足命令的正则匹配规则,以image开头的队列名

1.3 仲裁队列

从3.8版本后加入了仲裁队列,和镜像队列类型,但是使用更简单,推荐使用。

二、消息可靠性

2.1 生产者确认

spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@EnableRabbit
public class RabbitMqConfig {// 1. 配置消息转换器@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// 2. 配置RabbitTemplate - 2.3.9兼容写法@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置消息转换器rabbitTemplate.setMessageConverter(messageConverter());// 配置ConfirmCallback (发布确认)rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (correlationData != null) {log.info("消息ID: {}", correlationData.getId());}if (ack) {log.debug("消息发送成功 - ID: {}",correlationData != null ? correlationData.getId() : "N/A");} else {log.error("消息发送失败 - ID: {}, 原因: {}",correlationData != null ? correlationData.getId() : "N/A", cause);//TODO 异步记录到数据库}});// 配置ReturnCallback (路由失败返回)rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation");log.warn("消息路由失败[ID: {}] - Code: {}, Text: {}, Exchange: {}, RoutingKey: {}",msgId != null ? msgId : "N/A", replyCode, replyText, exchange, routingKey);//TODO 记录失败消息});// 开启强制路由rabbitTemplate.setMandatory(true);return rabbitTemplate;}
}

控制台显示的D都是表面已持久化

2.2 消费者确认

spring:rabbitmq:listener:simple:acknowledge-mode: auto

如果消费者处理消息失败,会一直requeue到队列中,可以增加本地重试机制

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

三、拓展

3.1 死信交换机

代码如下:

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化.deadLetterExchange("dl.direct") // 指定死信交换机.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

@Bean
public Queue ttlQueue(){return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化.ttl(10000) // 设置队列的超时时间,10秒.deadLetterExchange("dl.ttl.direct") // 指定死信交换机.build();
}

3.2 延迟队列

官网地址:https://www.rabbitmq.com/community-plugins.html

3.3 惰性队列


http://www.jsqmd.com/news/78531/

相关文章:

  • 2025年托福培训机构综合推荐及高效提分指南 - 品牌测评鉴赏家
  • AI会议截止日期管理终极指南:如何高效追踪全球顶级学术会议
  • 甲基化分析工具MethylDackel:BS-seq数据处理终极指南
  • Unp4k终极指南:5步解锁Star Citizen游戏资源
  • Cowabunga:iOS个性化定制完全手册,打造属于你的独特手机界面
  • java计算机毕业设计人事管理系统的设计与实现 基于SpringBoot的教职工综合信息管理平台 面向高校的人事与薪酬一体化服务系统
  • 大模型训练新范式:Llama-Factory + 高性能GPU加速全流程实战
  • python打包exe实用工具auto-py-to-exe的操作方法
  • Klonsdif搜索TV浏览器:专为电视大屏优化的轻量级搜索工具
  • 每周AI看 | Chatgpt5.2正式上线、网易七鱼智能客服打通微信小店、特朗普签署行政令、迪士尼向OpenAI投资10亿美元
  • springboot基于vue的海产品溯源网站-来源产地_680tq4t3
  • 5步搞定HTML转PDF:零基础也能掌握的文档转换神器
  • 资源一号卫星参数详情
  • Apache Iceberg性能大揭秘:如何让你的大数据查询快如闪电?
  • Hive简介 - 实践
  • 德卡读卡器SDK:快速集成读卡器版本查询功能
  • springboot基于vue的观赏鱼养殖互助商城系统的设计与实现_1vlf0334
  • 需求管理与项目管理一体化工具选型指南:如何选择最合适的解决方案
  • 2025年下半年上海iso9001认证、iso三体系认证、CE认证、iatf16949认证、iso27001认证服务全面评测与权威推荐指南 - 2025年11月品牌推荐榜
  • 抖音AI运营工具技术实践:特赞内容矩阵架构与300%效率提升方案
  • 博德之门3脚本扩展器的隐藏技巧:从玩家到创造者的进阶之路
  • 国内水处理设备哪家好?想定制水处理设备,求推荐靠谱厂家/制造商/生产商 - 品牌推荐大师
  • Gitee-ssh推送本地代码到 Gitee
  • 【PPT模板】哪家好:2025年12月专业深度测评与排名前五推荐
  • 62
  • springboot基于vue的贵州省铜仁地区乡村振兴综合管理系统_4p67u7m6
  • 学术新人的期刊写作 “工具矩阵”:9 款 AI 如何拆解刊文的 “隐性门槛”
  • Pinyin4NET:终极中文拼音转换解决方案
  • Rust语言学习笔记
  • 11、黑客必备:脚本编写、文件压缩与存储设备管理全解析