当前位置: 首页 > news >正文

2025-07-21-Mon-T-RocketMQ

1. 核心功能

1.1 MQ介绍

RocketMQ基本组件

  • Topic: 消息归类的基本单元
  • Queue: 消息队列
  • Producer
  • Consumer
  • ConsumerGroup:
  • NameServer:可以理解为注册中心,负责更新和发现Broker
  • Broker集群:Broker 可以有一个或多个,每一个Brocker就是一个Kafka实例(RacketMQ实例)

1.2 RocketMQ环境搭建

#[Step 1] :拉取 RocketMQ 镜像
docker pull apache/rocketmq:5.1.0# [Step 2] : 创建容器共享网络 rocketmq
docker network create rocketmq# [Step 3] : 部署 NameServer 前提条件
# 创建目录
mkdir -p /data/rocketmq/nameserver/{bin,logs}# 授权文件
chmod 777 -R /data/rocketmq/nameserver/*# 创建容器
docker run -d \
--privileged=true --name rmqnamesrv \
apache/rocketmq:5.1.0 sh mqnamesrv# 拷贝启动脚本
docker cp rmqnamesrv:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh /data/rocketmq/nameserver/bin/# 删除容器 NameServer
docker rm -f rmqnamesrv# [Step 4] : 启动容器 NameServer
# 启动容器 NameServer
docker run -d --network rocketmq \
--privileged=true --restart=always \
--name rmqnamesrv -p 9876:9876 \
-v /data/rocketmq/nameserver/logs:/home/rocketmq/logs \
-v /data/rocketmq/nameserver/bin/runserver.sh:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh \
apache/rocketmq:5.1.0 sh mqnamesrv# 部分命令解释 : 
1. -e "MAX_HEAP_SIZE=256M" 设置最大堆内存和堆内存初始大小
2. -e "HEAP_NEWSIZE=128M"  设置新生代内存大小# [Step 5] : 查看 NameServer 启动日志
# 查看启动日志
docker logs -f rmqnamesrv
# 成功,则进行下一步# [Step 6] : 部署 Broker + Proxy
mkdir -p /data/rocketmq/broker/{store,logs,conf,bin}
chmod 777 -R /data/rocketmq/broker/*# [Step 7] : 创建broker.conf文件
vim /data/rocketmq/broker/conf/broker.conf
# nameServer 地址多个用;隔开 默认值null
# 例:127.0.0.1:6666;127.0.0.1:8888 
namesrvAddr = 192.168.56.2:9876
# 集群名称
brokerClusterName = DefaultCluster
# 节点名称. 主节点的名称必须要和从节点名称一致
brokerName = broker-a
# broker id节点ID, 0 表示 master, 其他的正整数表示 slave,不能小于0 
brokerId = 0
# Broker服务地址	String	内部使用填内网ip,如果是需要给外部使用填公网ip
brokerIP1 = 192.168.56.2
# Broker角色
brokerRole = ASYNC_MASTER
# 刷盘方式
flushDiskType = ASYNC_FLUSH
# 在每天的什么时间删除已经超过文件保留时间的 commit log,默认值04
deleteWhen = 04
# 以小时计算的文件保留时间 默认值72小时
fileReservedTime = 72
# 是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# 禁用 tsl
tlsTestModeEnable = false# 下面是没有注释的版本, 记得修改"namesrvAddr", "brokerIP1"的地址
# namesrvAddr = 192.168.56.2:9876
# brokerClusterName = DefaultCluster
# brokerName = broker-a
# brokerId = 0
# brokerIP1 = 192.168.56.2
# brokerRole = ASYNC_MASTER
# flushDiskType = ASYNC_FLUSH
# deleteWhen = 04
# fileReservedTime = 72
# autoCreateTopicEnable=true
# autoCreateSubscriptionGroup=true
# tlsTestModeEnable = false
# [Step 8] : 拷贝启动脚本
# 启动 Broker 容器
docker run -d \
--name rmqbroker --privileged=true \
apache/rocketmq:5.1.0 sh mqbroker# 拷贝脚本文件
docker cp rmqbroker:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh /data/rocketmq/broker/bin# [Step 9] : 启动容器 Broker
# 删除容器 Broker
docker rm -f rmqbroker# 启动容器 Broker
docker run -d --network rocketmq \
--restart=always --name rmqbroker --privileged=true \
-p 10911:10911 -p 10909:10909 \
-v /data/rocketmq/broker/logs:/root/logs \
-v /data/rocketmq/broker/store:/root/store \
-v /data/rocketmq/broker/conf/broker.conf:/home/rocketmq/broker.conf \
-v /data/rocketmq/broker/bin/runbroker.sh:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
apache/rocketmq:5.1.0 sh mqbroker --enable-proxy -c /home/rocketmq/broker.conf# 查看启动日志
docker logs -f rmqbroker# [Step 10] : 部署RocketMQ控制台(rocketmq-dashboard)
docker pull apacherocketmq/rocketmq-dashboard:latest# [Step 11] : 启动容器 Rocketmq-dashboard
docker run -d \
--restart=always --name rmq-dashboard \
-p 8080:8080 --network rocketmq \
-e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
apacherocketmq/rocketmq-dashboard# 日志
docker logs -f rmq-dashboard

整合springboot

1.3 RocketMQ高可用集群搭建

1.3.1 集群各个角色介绍

1.3.2 双主双从搭建

1.4 消息发送样例

1.4.1 同步消息

同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式

1.4.2 异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知

1.4.3 单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送

1.4.4 延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存

1.4.5 顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。

可能大家会有疑问,mq不就是FIFO吗?

rocketMq的broker的机制,导致了rocketMq会有这个问题. 因为一个broker中对应了四个queue

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。

一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。

1.4.6 批量消息

Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费

1.4.7 过滤消息

Rocketmq提供消息过滤功能,通过tag或者key进行区分

我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待

1.4.8 事务消息

它可以被认为是一个两阶段的提交消息实现,以确保分布式系统的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。

2. 项目实战

2.1 项目背景介绍 - 电商高可用MQ实战

环境搭建

springboot

Dubbo

Zookeeper

RocketMQ

MySQL

2.2 功能分析

2.3 下单功能, 保证各个服务的数据一致性

2.4 确认下单功能,通过消息进行数据分发

2.5 整体联调

3. 高级功能和源码分析

http://www.jsqmd.com/news/48698/

相关文章:

  • 第一章 简介
  • 2025-07-13-Sun-T-AI-LangChain4j
  • P24_现有网络模型的使用及修改
  • 20232403 2025-2026-1 《网络与系统攻防技术》实验六实验报告
  • 第二讲类神经网络训练不起来
  • 【计算机网络】深入浅出DNS:网络世界的地址簿与导航系统 - 教程
  • 2025-01-24-Fri-T-如何做一个开源项目
  • 利用大语言模型分析技术支持诈骗Facebook群组的网络犯罪研究
  • 一些唐话
  • 2025-05-29-Thu-T-设计模式
  • 2025-05-27-Tue-T-JVM
  • 11-28
  • 20232421 2025-2026-1 《网络与系统攻防技术》实验六实验报告
  • 20232315 2025-2026-1 《网络与系统攻防技术》实验六实验报告
  • [CISCN 2022 华东北]duck WP
  • 20232320 2025-2026-1 《网络与系统攻防技术》实验六实验报告
  • 2025-01-14-Tue-T-实体关系图ERD
  • 《Either Way》
  • 20232424 2025-2026-1 《网络与系统攻防技术》实验六实验报告
  • 2024-11-26-Tue-T-SSM
  • HTML游戏创建:利用视频作为特效自动播放的方法
  • 第四章-Tomcat线程模型与运行方式 - 指南
  • 11-21
  • 11-25
  • 11-24
  • 2023-10-15-R-如何阅读一本书
  • 2023-10-11-T-JAVA
  • 通过SSH反向隧道让远程服务器走本地代理
  • 2023-09-19-R-金字塔原理
  • 2023-09-19-E-文章管理