当前位置: 首页 > news >正文

Rocketmq学习笔记

一、安装教程

1.创建目录

mkdir -p ~/rocketmq-docker cd ~/rocketmq-docker

2.创建brokeer.conf配置文件

cat > broker.conf << 'EOF' brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH #虚拟机地址 brokerIP1 = 192.x.x.x EOF

3.创建dokcer-compose.yml文件

cat > docker-compose.yml << 'EOF' version: '3' services: namesrv: image: apache/rocketmq:4.9.6 container_name: rmqnamesrv ports: - 9876:9876 environment: - JAVA_OPT_EXT=-Xms512m -Xmx512m command: sh mqnamesrv networks: - rocketmq broker: image: apache/rocketmq:4.9.6 container_name: rmqbroker ports: - 10909:10909 - 10911:10911 environment: - JAVA_OPT_EXT=-Xms1g -Xmx1g - NAMESRV_ADDR=namesrv:9876 volumes: - ./broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf depends_on: - namesrv networks: - rocketmq shm_size: 512m dashboard: image: apacherocketmq/rocketmq-dashboard:latest container_name: rocketmq-dashboard ports: - 8080:8080 environment: - JAVA_OPTS=-Xms512m -Xmx512m -Drocketmq.namesrv.addr=namesrv:9876 depends_on: - namesrv networks: - rocketmq networks: rocketmq: driver: bridge EOF

4.拉取镜像

docker pull apache/rocketmq:4.9.6 docker pull apacherocketmq/rocketmq-dashboard:1.0.0

5.启动服务(俩种方式,compose,run)

docker-compose up -d

6.run启动

# 1. 先创建网络 docker network create rocketmq # 2. 启动 NameServer docker run -d \ --name rmqnamesrv \ --network rocketmq \ -p 9876:9876 \ -e JAVA_OPT_EXT="-Xms512m -Xmx512m" \ apache/rocketmq:4.9.6 \ sh mqnamesrv # 3. 启动 Broker docker run -d \ --name rmqbroker \ --network rocketmq \ -p 10911:10911 \ -p 10909:10909 \ -e JAVA_OPT_EXT="-Xms1g -Xmx1g" \ -e NAMESRV_ADDR=rmqnamesrv:9876 \ -v $(pwd)/broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf \ --shm-size=512m \ apache/rocketmq:4.9.6 \ sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf # 4. 启动 Dashboard docker run -d \ --name rocketmq-dashboard \ --network rocketmq \ -p 8080:8080 \ -e JAVA_OPTS="-Xms512m -Xmx512m -Drocketmq.namesrv.addr=rmqnamesrv:9876" \ apacherocketmq/rocketmq-dashboard:latest

7.访问呢客户端

http://localhost:8080

如果有重复的端口重新删除设置

# 停止并删除 docker stop rocketmq-dashboard docker rm rocketmq-dashboard # 重新启动,映射到 8087 端口 docker run -d \ --name rocketmq-dashboard \ --network rocketmq \ -p 8087:8080 \ -e JAVA_OPTS="-Xms512m -Xmx512m -Drocketmq.namesrv.addr=rmqnamesrv:9876" \ apacherocketmq/rocketmq-dashboard:latest

二、基础使用

1.同步发送

发送方order

package com.test.rocketdemo.demo; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class SimpleProducer { public static void main(String[] args) throws Exception { // 1. 创建生产者,指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo-producer-group"); // 2. 设置 NameServer 地址(使用你的宿主机IP) producer.setNamesrvAddr("192.168.137.1:9876"); // 3. 启动生产者 producer.start(); System.out.println("生产者启动成功!"); // 4. 发送 10 条消息 for (int i = 1; i <= 10; i++) { // 创建消息:主题、标签、消息内容 String body = "Hello RocketMQ, 这是第 " + i + " 条消息"; Message msg = new Message("DemoTopic", "TagA", body.getBytes()); // 发送消息(同步发送) SendResult sendResult = producer.send(msg); System.out.println("发送结果: " + sendResult.getSendStatus() + ", msgId: " + sendResult.getMsgId()); } // 5. 关闭生产者 producer.shutdown(); System.out.println("生产者已关闭"); } }

接收方注册监听器1.顺序的Messagelistenerorderly 2.并发的接收Messagelistenerconcurrently

package com.test.rocketdemo.demo; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; public class OrderConsumer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, IOException { DefaultMQPushConsumer order = new DefaultMQPushConsumer("order"); order.setNamesrvAddr("192.168.137.1:9876"); order.subscribe("order","*"); // order.registerMessageListener(new MessageListenerOrderly() { // @Override // public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { // for (MessageExt msg : list) { // String body = new String(msg.getBody()); // System.out.println("收到消息: " + body // + ", msgId: " + msg.getMsgId() // + ", 标签: " + msg.getTags()); // } // // return ConsumeOrderlyStatus.SUCCESS; // } // }); order.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { String body = new String(msg.getBody()); System.out.println("收到消息: " + body + ", msgId: " + msg.getMsgId() + ", 标签: " + msg.getTags()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); order.start(); System.in.read(); System.out.println("消息接收成功---"); order.shutdown(); } }

2.广播接收

发送代码

package com.test.rocketdemo.demo.guangbo; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; public class guangboProducer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, IOException { DefaultMQProducer order = new DefaultMQProducer("order"); order.setNamesrvAddr("192.168.137.1:9876"); order.start(); for (int i = 0; i < 5; i++) { String body="发送的消息_" + i + "_序号--" +i; Message message = new Message("order","taga",body.getBytes(StandardCharsets.UTF_8)); SendResult sendResult = order.send(message); System.out.println("消息发送成功---"+sendResult); } order.shutdown(); } }

1.群广播接收

order.setMessageModel(MessageModel.BROADCASTING);


2.集群接收(会把消息分隔开分别发送),增强性能。

package com.test.rocketdemo.demo.guangbo; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.IOException; import java.util.List; public class guangboConsumer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, IOException { DefaultMQPushConsumer order = new DefaultMQPushConsumer("order"); order.setNamesrvAddr("192.168.137.1:9876"); order.subscribe("order","*"); order.setMessageModel(MessageModel.CLUSTERING); order.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt msg : list) { String body = new String(msg.getBody()); System.out.println("收到消息: " + body + ", msgId: " + msg.getMsgId() + ", 标签: " + msg.getTags()); } return ConsumeOrderlyStatus.SUCCESS; } }); order.start(); System.out.println("消息者启动成功---"); } }

3.延迟消息

http://www.jsqmd.com/news/814417/

相关文章:

  • 国内全自动定量液体灌装机厂家实测排行:技术与交付能力对比 - 速递信息
  • 认真求推荐:2026年工业机器人采购,哪些买卖网站价格透明、服务好? - 品牌推荐大师
  • Flutter Hero 动画完全指南
  • 2026年西安画册印刷厂与活页环装定制深度横评:5大高新技术企业选购指南 - 年度推荐企业名录
  • 告别CNN!用PyG Temporal和GC-LSTM搞定动态社交网络的好友推荐(附完整代码)
  • PEG-b-PLA胶束定制服务:满足多场景纳米载体需求
  • 深度学习大师课 第 1 课:什么是深度学习?纯手写你的第一个神经网络
  • 特色体验拉满!2026安徽漂流推荐排行 四季运营/文化融合/网红打卡 - 极欧测评
  • 八大网盘直链解析完整指南:告别限速困扰,获取真实下载地址
  • 基于Next.js与Supabase构建AI智能体优先的问答竞技平台
  • 唯一客服 SCRM:独立部署的Golang企业微信SCRM源码
  • 魔兽争霸3游戏优化终极指南:3步解决帧率限制与界面显示问题
  • Android开源生态重构:从中心化控制到社区驱动的技术路径与挑战
  • 对接过百个医院项目,告诉你医院污水处理设备厂家怎么挑 - 速递信息
  • Midjourney提示词不再孤岛:如何用Notion AI自动结构化生成+同步至ComfyUI节点图+反向标注至Figma设计系统(含私有化部署避坑清单)
  • 2026年度国内流量计公司推荐权威排行榜:五大头部企业硬核实力全拆解 - 速递信息
  • 微信小程序逆向工程:wxappUnpacker技术深度解析与实战指南
  • 基于MCP协议与Gemini大模型构建智能命令行AI助手
  • 网盘直链下载助手终极指南:一键解锁八大平台高速下载限制
  • 东营油城筑家:郑春红与加西亚质感砖家装之选 - 品牌企业推荐师(官方)
  • 2026亲测!安亭正规美容院大揭秘,效果杠杠滴 - 速递信息
  • FPGA/CPLD调试实战:用嵌入式逻辑分析仪让高速数字信号“慢下来”
  • STM32F407的CAN中断到底怎么用?HAL库实战配置与常见回调函数避坑指南
  • Kubernetes智能运维助手:基于LLM的kube-copilot实战指南
  • Logisim-evolution终极指南:从数字电路新手到硬件设计高手
  • 2026年牛津布厂家推荐:东莞仁泰纺织/PVC/涤纶/尼龙/PU牛津布全品类供应 - 品牌企业推荐师(官方)
  • 在Azure DevOps Server中实现用户端原地址透传(X-Forward-For)
  • 手把手教你用Arduino UNO驱动LD3320语音模块(附完整代码与SPI避坑指南)
  • 如何优雅地从九大网盘获取真实下载地址:一个JavaScript工具的深度解析
  • Kibana启动失败?别慌!从版本兼容到防火墙,保姆级排查手册(附最新兼容性列表)