当前位置: 首页 > news >正文

组件-RocketMQ

一、概念

消息队列中间件,消息存储在文件系统中,文件默认在用户主目录的store目录下。

1、作用

异步解耦

流量削峰

解决高并发场景下响应过慢的问题。
收集日志

2、名词

Message
消息,生产者生产与消费者消费的最小单位。
Key
生产者指定的消息的唯一标识。
Topic
每个消息必须属于某一种主题。
一个生产者可以生产不同主题的消息,而一个消费者只能消费某一个主题下的消息。
Tag
Topic是消息的一级分类,Tag相当于消息的二级分类
Queue
存储消息的物理实体。
一个队列中的消息只能被同一个消费者组中的一个消费者消费。

3、死信队列

MQ有默认的生产者重发次数与消费者重消次数,也可以通过代码配置,其中消费者消费失败后的重新消费次数默认是16次,每次的时间是递增的,跟延迟时间等级一致,当重新消费次数达到上限后,消息会进入死信队列(死信队列所在主题为:%DLQ% + 消费者组名),消息进入死信队列后依然要重新处理
解决思路:
方案一:单独创建一个消费死信队列的消费者组,去对死信队列中的消息进行消费(数据库留痕等)
方案二(推荐):消费者可以获取到该笔消息重新消费的次数,当次数达到某个临界值时,不再重新消费,对该消息进行额外处理

4、如何保证消息不丢失

生产者端:
发送失败后有默认重发机制,默认重发两次。
发送完消息后,判断发送结果,如果是失败,可以记录到数据库。再通过定时任务,找出达到重试次数的消息,特别处理。
broker端:
broker写入消息包含复制策略与刷盘策略,采用同步刷盘、同步复制策略。
消费者端:
默认有重试机制,默认16次,消费失败后会再次消费。
消费时,要等处理完业务逻辑再返回成功,如果直接返回成功,处理业务逻辑失败,也不会再次消费到该消息。

5、消息堆积了怎么办

先检查代码,是不是出现死循环之类的问题。
临时扩容消费者。

二、架构


1、Producer

生产者将消息发送到broker。
同一个生产者组下的生产者将消息发送同一个topic下的queue中。
第一步:从NameServer获取消息Topic的路由信息的请求,NameServer返回该Topic的路由表及Broker列表
第二步:Producer根据指定的Queue选择策略(轮询策略,最小投递延迟策略),从Queue列表中选出一个队列,向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue
broker收到消息后,将消息写入成功后会给producer反馈应答,否则producer会再次发送,broker写入消息包含复制策略与刷盘策略。

复制策略
同步复制:消息写入master后,master会等待slave同步数据成功后才向producer返回成功ACK。
异步复制:消息写入master后,master立即向producer返回成功ACK,无需等待slave同步数据成
功。
刷盘策略
同步刷盘:当消息持久化到broker的磁盘后才算是消息写入成功。
异步刷盘:当消息写入到broker的内存后即表示消息写入成功,无需等待消息持久化到磁盘。
最佳实践:多master多slave异步复制,master宕机后slave可以自动切换成master。

2、Consumer

消费者从broker中获取到消息并对消息进行相关业务处理。
同一个消费者组下的消费者消费同一个topic下的不同queue里的消息,所以消费者组中的消费者数量应该小于等于该topic下的queue数量,如果超出queue数量,则多出的消费者不消费消息。
支持以推(push),拉(pull)两种模式对消息进行消费。

3、Broker

存储消息的服务器。
broke默认每30s向NameServer发送心跳,未定时发送心跳会被NameServer剔除。
为了增强broker性能与吞吐量,一般集群部署。
为了解决数据不丢失,将broker集群的每个节点采用主从复制的形式进行横向扩展。

4、NameServer

Broker注册(支持Broker的动态注册与发现)与Broker发现(Producer和Conumser通过NameServer获取注册的Broker集群的路由信息)。
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每30秒会拉取一次最新的路由。
客户端(生产者消费者)对NameServer的选择策略:首先采用的是随机策略进行的选择,失败后采用的是轮询策略。
NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息。

三、安装

unzip解压:rocketmq-all-4.9.2-bin-release
修改runbroker.sh、runserver.sh
修改broker.conf:brokerIP1=192.168.8.10
启动nameserver:nohup sh bin/mqnamesrv &
验证namesrv是否启动成功:tail -f ~/logs/rocketmqlogs/namesrv.log
启动broker:nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
验证broker是否启动成功:tail -f ~/logs/rocketmqlogs/broker.log
控制台:rocketmq-console-ng-1.0.1.jar,改application.properties的nameserver地址
关闭broker:sh bin/mqshutdown broker
关闭nameserver:sh bin/mqshutdown namesrv

四、消息幂等

某条消息消费一次与消费多次的效果是相同的。

1、消息重复的情况

发送时重复:消息发送到broker并持久化后出现网络中断,导致broker对producer应答失败,producer会再次发送消息。
消息时重复:消息被消费者消息后,consumer给broker反馈应答时出现网络中断,broker会认为该消息没有被成功消费,消费者便会再次消费到该消息。

2、解决方案

producer发送消息时,将该条消息的业务唯一标识作为key,消费者消费时获取到该条消息的业务唯一标识,即key。先从缓存如redis中获取key,如果key存在所以该条消息已经被消费过,无须处理。如果redis中不存在,再查询数据库如mysql,如果数据库中存在,说明该条消息被消费过了,放到redis中,消费无须处理,如果mysql中未查询中,说明该条消息第一次被消费,消费该条消息,再将key放到redis与mysql中。

五、java访问RocketMQ

1、生产者

pom.xml

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>

application.yml

rocketmq:name-server:192.168.8.10:9876producer:group:my-producer-groupsend-message-timeout:10000
// 发送普通消息(没返回值)rocketMQTemplate.convertAndSend("TopicOne","这是普通消息");// 发送同步消息(会等到收到broker返回结果才会继续往下执行),使用场景:重要通知,如支付结果SendResultresult=rocketMQTemplate.syncSend("TopicOne","这是同步消息");System.out.println(result.getSendStatus());// SEND_OK// 发送异步消息(发送成功后不需要等待broker返回结果,通过回调函数处理结果与异常),使用场景:对响应时间敏感rocketMQTemplate.asyncSend("TopicOne","这是异步消息",newSendCallback(){publicvoidonSuccess(SendResultsendResult){System.out.println("发送成功,结果:"+sendResult.getSendStatus());}publicvoidonException(Throwablee){System.out.println("发送异常:"+e);}});// 发送单向消息(不需要等待broker的反馈,没返回结果),使用场景:日志收集rocketMQTemplate.sendOneWay("TopicOne","这是单向消息");// 发送顺序消息(将消息发送到同一个队列中,实现消息的顺序消费)rocketMQTemplate.syncSendOrderly("TopicOne","1的步骤1","1");rocketMQTemplate.syncSendOrderly("TopicOne","2的步骤1","2");rocketMQTemplate.syncSendOrderly("TopicOne","2的步骤2","2");rocketMQTemplate.syncSendOrderly("TopicOne","1的步骤2","1");rocketMQTemplate.syncSendOrderly("TopicOne","1的步骤3","1");rocketMQTemplate.syncSendOrderly("TopicOne","2的步骤3","2");// 发送延时消息rocketMQTemplate.syncSend("TopicOne",MessageBuilder.withPayload("这是1秒延迟消息").build(),3000,1);rocketMQTemplate.syncSend("TopicOne",MessageBuilder.withPayload("这是5秒延迟消息").build(),3000,2);rocketMQTemplate.syncSend("TopicOne",MessageBuilder.withPayload("这是10秒延迟消息").build(),3000,3);

2、消费者

pom.xml

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>

application.yml

rocketmq:name-server:192.168.8.10:9876
@Component@RocketMQMessageListener(consumerGroup="my-consumer-group",// 消费者组topic="TopicOne",// 消费主题messageModel=MessageModel.CLUSTERING,// 消息模式,默认是集群模式,如果是广播模式,所有的消费者都能接收到consumeMode=ConsumeMode.ORDERLY)// 消费模式:顺序消费publicclassMyConsumerimplementsRocketMQListener<String>{publicvoidonMessage(Stringmessage){System.out.println("消费者收到消息:"+message);}}
http://www.jsqmd.com/news/636291/

相关文章:

  • TLD7002 vs 传统LED驱动芯片:为什么英飞凌这款芯片更适合你的灯光项目?
  • Windows下用Bat脚本批量创建文件夹的3种高效方法(解决中文乱码和空格问题)
  • WebExtensions打包与发布终极指南:从开发到上架Firefox Add-ons商店
  • vscode-browser-preview终极指南:在编辑器中直接调试网页的10个技巧
  • 如何快速掌握 Shlink REST API:从入门到精通的完整指南
  • HTML头部元信息避坑指南:提升页面性能、SEO与用户体验的关键细节
  • ADS Layout 入门实战:从零搭建你的第一个射频电路物理版图
  • 后端面试高频考点:大模型时代API设计转型必懂点
  • 你的STM32编码器代码可能白写了?聊聊HAL库定时器编码器模式怎么用
  • 7步掌握Keras-RetinaNet:从零开始的目标检测实战指南
  • 从S曲线到5次多项式:深入对比两种轨迹规划方法的MATLAB仿真与选型指南
  • 如何用jsPDF-AutoTable从HTML表格一键生成PDF文档
  • Moco最佳实践清单:10个技巧让你的Mock服务器更高效
  • 深入解析mount命令:从基础挂载到高级应用
  • 逆向实战:如何用Frida揪出Android SO里隐藏的动态注册JNI函数(附完整脚本)
  • C#怎么实现字符串全拼搜索_C#如何基于拼音首字母查询【案例】
  • [论文阅读] CVPR-2024-TransNeXt
  • 教程】锁相环PLL相位噪声仿真代码汇总:文件作用、模块噪声位置与传递函数及相噪仿真方法、CAD...
  • 500W无桥PFC开关电源设计资料详解:硬件原理与C语言源码揭秘
  • 解决PyQt5与Qt平台插件xcb的兼容性问题:从报错到成功运行
  • Postman实战:如何通过Post请求高效上传文件
  • 强化学习_07_PyTorch实现PPO-Clip算法在Pendulum-v1中的实战解析
  • 修复Adobe Premiere Pro CC 2018启动崩溃及ZXPSignLib-minimal.dll文件缺失问题
  • 魔兽世界GSE高级宏编译器完全指南:从技能管理到操作优化
  • Win11Debloat 终极指南:三步搞定Windows系统优化与隐私保护
  • 用OpenClaw重构10年Python工业物联网遗留系统:3天完成3人月工作量,代码量减少62%
  • Qiskit Tutorials社区贡献指南:如何参与量子开源项目开发
  • CodeChecker API开发指南:构建自定义分析工具和集成方案
  • 如何快速实现Mina与Rails集成:自动化资产编译和数据库迁移的终极指南
  • 从二进制到可读:objdump反汇编实战与ARM指令深度解析