当前位置: 首页 > news >正文

RocketMQ系列第三篇:Java原生基础使用实操,手把手写生产者消费者Demo

文章目录

  • 一、本篇前言:理论落地,从部署到代码实操
  • 二、前置准备:项目环境必备配置
    • 1. 基础环境要求
    • 2. 导入RocketMQ核心Maven依赖
  • 三、核心基础:RocketMQ消息核心对象说明
    • 1. DefaultMQProducer:消息生产者核心类
    • 2. DefaultMQPushConsumer:消息消费者核心类
    • 3. Message:消息实体对象
  • 四、Java实操一:三种常用生产者消息发送Demo
    • 1. 同步发送消息(生产最常用,金融/订单核心业务)
    • 2. 异步发送消息(高并发吞吐业务)
    • 3. 单向发送消息(极低优先级无需确认业务)
  • 五、Java实操二:消费者订阅消费消息完整Demo
  • 六、代码运行顺序&控制台验证步骤
  • 七、新手常见踩坑问题快速排查

一、本篇前言:理论落地,从部署到代码实操

前面两篇我们已经搞定了RocketMQ核心概念工作原理单机+集群环境安装部署,服务已经稳稳跑在服务器上。环境搭好只是基础,真正开发工作中,我们都是通过Java代码对接RocketMQ,实现消息生产发送、订阅消费业务逻辑。

本篇零基础新手跟着步骤复制代码,就能快速跑通:创建Topic、发消息、收消息全链路,彻底弄懂Java和RocketMQ的基础交互逻辑,为后续SpringBoot整合、高阶消息类型使用打好编码根基。

二、前置准备:项目环境必备配置

1. 基础环境要求

  • 已搭建完成RocketMQ单机/集群环境,NameServer、Broker正常启动运行;
  • Java开发环境JDK8及以上,IDEA/Eclipse开发工具;
  • Maven项目工程(普通Java项目即可,无需Spring框架);
  • 服务器防火墙开放9876、10911端口,本地电脑能正常连通RocketMQ服务。

2. 导入RocketMQ核心Maven依赖

在pom.xml文件中引入RocketMQ官方Java客户端依赖,版本和服务端版本保持一致即可,兼容性拉满,稳定无冲突。

<!-- RocketMQ Java客户端核心依赖 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.1.4</version></dependency>

依赖刷新下载完成后,即可开始编写生产者、消费者核心代码,所有API均为官方原生标准接口,无第三方封装,简单易懂好上手。

三、核心基础:RocketMQ消息核心对象说明

写代码前先记三个核心基础对象,所有后续编码都围绕这三个对象展开,不用死记,看懂用途即可:

1. DefaultMQProducer:消息生产者核心类

负责连接RocketMQ服务、创建生产实例、发送各类业务消息,必须指定生产组名称NameServer地址,启动后才能正常投递消息。

2. DefaultMQPushConsumer:消息消费者核心类

业务开发最常用的消费者模式,消费者主动监听订阅的Topic,Broker推送消息回调处理,自动负载均衡、自动维护消费偏移量,无需手动管控消费进度,开箱即用。

3. Message:消息实体对象

消息封装载体,构造方法核心四个参数:Topic(消息主题)、Tag(消息标签)、Key(业务唯一标识)、Body(消息体,真实业务数据字节数组),精准匹配之前学的核心概念。

四、Java实操一:三种常用生产者消息发送Demo

RocketMQ原生Java API提供三种核心消息发送模式,适配不同业务场景,下面逐个编写可直接运行的完整代码,附带场景说明和详细注释。

1. 同步发送消息(生产最常用,金融/订单核心业务)

适用场景:支付下单、订单创建、资金扣款等必须保证消息发送成功的核心业务,发送消息后阻塞等待Broker返回发送结果,确认成功再执行业务后续逻辑,可靠性最高。

importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 同步消息生产者Demo * 场景:核心业务,必须确认消息发送成功 */publicclassSyncProducerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建生产者实例,指定生产组名称(自定义,同业务生产者组名一致)DefaultMQProducerproducer=newDefaultMQProducer("order_sync_producer_group");// 2. 设置RocketMQ NameServer地址(替换为自己服务器IP:9876)producer.setNamesrvAddr("127.0.0.1:9876");// 3. 启动生产者producer.start();System.out.println("同步生产者启动成功!");// 4. 循环发送5条测试消息for(inti=1;i<=5;i++){// 5. 构建消息实体:Topic主题、Tag标签、业务Key、消息体内容Messagemessage=newMessage("order_test_topic",// 消息主题,自定义命名"order_create_tag",// 消息标签,订单创建标签"order_key_00"+i,// 业务唯一Key,用于消息排查追踪("订单编号:00"+i+",订单创建成功").getBytes(RemotingHelper.DEFAULT_CHARSET));// 6. 同步发送消息,等待Broker返回发送结果SendResultsendResult=producer.send(message);// 打印发送结果状态、消息ID等信息System.out.println("第"+i+"条消息发送结果:"+sendResult);}// 7. 发送完成后,关闭生产者(实际项目常驻服务无需关闭)producer.shutdown();}}

2. 异步发送消息(高并发吞吐业务)

适用场景:日志埋点、短信通知、运营推送等高并发、不等待响应业务,发送消息后不阻塞主线程,通过回调接口接收发送成功或失败结果,吞吐量远高于同步发送。

importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendCallback;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 异步消息生产者Demo * 场景:高并发业务,无需同步等待响应,追求吞吐量 */publicclassAsyncProducerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建生产者实例,指定生产组DefaultMQProducerproducer=newDefaultMQProducer("order_async_producer_group");// 2. 设置NameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 3. 启动生产者producer.start();System.out.println("异步生产者启动成功!");// 4. 循环发送5条异步消息for(inti=1;i<=5;i++){Messagemessage=newMessage("order_test_topic","order_notice_tag","notice_key_00"+i,("短信通知:用户00"+i+"支付成功").getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 异步发送,注册回调函数处理发送结果producer.send(message,newSendCallback(){// 发送成功回调@OverridepublicvoidonSuccess(SendResultsendResult){System.out.println("异步消息发送成功:"+sendResult.getMsgId());}// 发送失败回调,处理异常重试、日志记录@OverridepublicvoidonException(Throwablee){System.err.println("异步消息发送失败,异常信息:"+e.getMessage());e.printStackTrace();}});}// 异步发送无需等待,短暂休眠保证回调执行完成Thread.sleep(1000);producer.shutdown();}}

3. 单向发送消息(极低优先级无需确认业务)

适用场景:系统日志统计、简单埋点上报等无需确认发送结果、不关心是否投递成功的低优先级业务,只管发送无需响应,性能极致最高。

importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.common.message.Message;importorg.apache.rocketmq.remoting.common.RemotingHelper;/** * 单向发送生产者Demo * 场景:日志埋点、简单统计,无需确认发送结果 */publicclassOneWayProducerDemo{publicstaticvoidmain(String[]args)throwsException{DefaultMQProducerproducer=newDefaultMQProducer("log_oneway_producer_group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();System.out.println("单向生产者启动成功!");// 发送埋点日志消息Messagemessage=newMessage("log_test_topic","log_click_tag","click_key_001","用户页面点击行为埋点日志".getBytes(RemotingHelper.DEFAULT_CHARSET));// 单向发送,无返回值、无回调producer.sendOneway(message);System.out.println("单向消息发送完成,无需确认结果");producer.shutdown();}}

五、Java实操二:消费者订阅消费消息完整Demo

生产者发送消息后,必须通过消费者订阅对应Topic和Tag,才能拉取并处理业务消息。生产环境默认使用PushConsumer模式,代码如下,常驻运行持续监听消息。

importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.common.message.MessageExt;importjava.util.List;/** * 消息消费者Demo * 订阅order_test_topic主题,消费对应消息 */publicclassDefaultConsumerDemo{publicstaticvoidmain(String[]args)throwsException{// 1. 创建消费者实例,指定消费组名称DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("order_consumer_group");// 2. 设置NameServer连接地址consumer.setNamesrvAddr("127.0.0.1:9876");// 3. 订阅需要消费的Topic和Tag,*代表订阅该主题下所有Tag消息consumer.subscribe("order_test_topic","*");// 4. 注册消息监听回调,收到消息后执行业务处理consumer.registerMessageListener((List<MessageExt>messageExtList,ConsumeConcurrentlyContextcontext)->{// 循环处理每一条消费到的消息for(MessageExtmessageExt:messageExtList){// 获取消息主题、标签、业务Key、消息体内容Stringtopic=messageExt.getTopic();Stringtag=messageExt.getTags();StringmsgKey=messageExt.getKeys();StringmsgBody=newString(messageExt.getBody());// 打印消费到的消息信息,模拟业务处理逻辑System.out.println("==========收到RocketMQ消息==========");System.out.println("消息Topic:"+topic);System.out.println("消息Tag:"+tag);System.out.println("业务Key:"+msgKey);System.out.println("消息内容:"+msgBody);System.out.println("==================================");}// 返回消费成功状态,Broker更新消费偏移量,不再重复消费returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5. 启动消费者,常驻监听消息consumer.start();System.out.println("消费者启动成功,持续监听消费消息中...");}}

消费核心关键点:代码最后返回CONSUME_SUCCESS代表消费成功,Broker记录消费位置;如果消费异常返回RECONSUME_LATER,RocketMQ会自动重试消费,重试多次失败后自动转入死信队列,和之前讲的死信概念完美对应。

六、代码运行顺序&控制台验证步骤

  1. 第一步:确保RocketMQ NameServer、Broker全部正常启动,无报错日志;
  2. 第二步:先运行消费者代码,常驻监听Topic消息,等待消息投递;
  3. 第三步:运行任意一个生产者代码,发送测试消息;
  4. 第四步:查看消费者控制台,正常打印消息内容,代表生产消费全链路通;
  5. 第五步:打开RocketMQ可视化Dashboard,查看Topic消息生产数量、消费堆积、死信情况,可视化验证运行状态。

七、新手常见踩坑问题快速排查

  • 连接不上NameServer:IP地址写错、9876端口防火墙未开放、RocketMQ服务未启动;
  • 生产者发消息失败报错:Broker未关联NameServer、autoCreateTopicEnable未开启,Topic不存在;
  • 消费者收不到消息:订阅Topic名称和生产者不一致、消费组名称重复、消费者启动晚于生产者;
  • 程序启动内存报错:本地开发无需修改JVM内存,服务端已在上篇安装时优化配置。
http://www.jsqmd.com/news/762103/

相关文章:

  • 多模态表格问答技术:原理、实现与应用场景
  • 用快马平台将awesome-design-md秒变可交互设计资源库原型
  • 通过用量看板观测API调用成本与模型消耗的实践体验
  • 基于企业微信机器人构建安全命令行工具:原理、实现与实战
  • SCALER框架:提升大语言模型复杂推理能力的强化学习方案
  • 大视觉语言模型全局感知评估:TopoPerception基准解析
  • 华为AC6507S管理口隔离实战:ping通却登不上Web/SSH的排查与修复
  • Abaqus非线性分析不收敛?从Newton-Raphson迭代原理到软件设置的避坑指南
  • 深入解析Dify-Sandbox:构建安全代码沙箱的多层隔离与Seccomp实践
  • FPGA动态时钟禁用技术原理与节能实践
  • ## 014、LangChain 中的 Tool 开发:自定义工具与第三方工具集成
  • 别再死记硬背PID公式了!用STM32 CubeMx配置FOC电机库,可视化理解P、I、D对电机响应的影响
  • 告别Windows软件臃肿:Bulk Crap Uninstaller如何帮你一键清理系统垃圾?
  • 实战对比:在自定义数据集上微调Inception-ResNet-v2 (PyTorch版),我的调参笔记与效果复盘
  • 10 分钟搞定 OpenClaw Windows 一键部署 打造专属数字员工
  • 2026年4月非标异形件定制厂商推荐:点胶螺丝、膨胀螺栓、防松螺丝、非标异形件定制、304螺丝、316螺丝、不锈钢小螺丝选择指南 - 优质品牌商家
  • 别再只盯着BERT了!用BART搞定文本摘要和对话生成,实战代码分享
  • 用Docker和Vulfocus在云服务器上快速搭建自己的渗透测试靶场(附场景编排实战)
  • SPSSAU文本分析模块初体验:手把手教你上传数据并完成第一个项目分析
  • 利用快马AI五分钟生成免费游戏合集网站原型验证创意
  • 信息熵工程化实践:从理论到日志异常检测与系统监控
  • 维普 AIGC 率太高不用愁!这几款降重工具一次解决查重率和 AI 痕迹两个难题
  • OWASP
  • ProGPT:开源大模型的高级提示词工程与管理框架实践指南
  • 从F-22到你的笔记本:揭秘‘不起眼’的吸波材料如何守护现代电子设备
  • 3分钟掌握浏览器Cookie本地导出终极方案
  • 思源笔记深度解析:本地优先与块级引用的知识管理实践
  • 2026制药行业无菌pea过滤器优质厂家推荐榜:过滤器哪家好、浙江过滤器公司、浙江过滤器厂家、海宁过滤器公司、海宁过滤器厂家选择指南 - 优质品牌商家
  • 《源·觉·知·行·事·物:生成论视域下的统一认知语法》第五章 事:行在时空中的具体化
  • Android/Linux休眠唤醒调试实战:如何定位wakelock阻止休眠的元凶?