当前位置: 首页 > news >正文

rocketmq traceId重复问题

背景

mq的traceId有重复现象,理论上不同消息消费,tracdId不应该相同,但为什么有一定的概率会出现呢?

查询代码如下:

protected ConsumeStatus consumeMsgSingle(MessageExt ext) { log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody())); String message = new String(ext.getBody()); //获取到key String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags()); //根据key从handleMap里获取到我们的处理类 MessageProcessor messageProcessor = handleMap.get(key); if (Objects.isNull(messageProcessor)) { messageProcessor = handleMap.get(ext.getTopic()); } Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format("未找到消息处理类, topic:%s, tag:%s", ext.getTopic(), ext.getTags()))); Object obj = null; try { //将String类型的message反序列化成对应的对象。 obj = messageProcessor.transferMessage(message); if (obj instanceof MqMetaInfo) { MqMetaInfo meta = (MqMetaInfo) obj; MqMetaInfoConverter.fromExt(meta, ext); } generateMDC(ext); } catch (Exception e) { StringBuilder errMsg = new StringBuilder("对象反序列化失败, ") .append("messageId: ") .append(ext.getMsgId()).append("\n") .append("msgBody: ") .append(new String(ext.getBody())).append("\n") .append("messageExt ") .append(ext).append("\n") .append("stackTrace: ") .append(JSON.toJSONString(e.getStackTrace())); log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{}, message:{}, errMsg:{}" , e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString()); throw new RRException(errMsg.toString()); } //处理消息 boolean result = messageProcessor.handleMessage(obj); if (!result) { if (ext.getReconsumeTimes() > Integer.MAX_VALUE) { return ConsumeStatus.SUCCESS; } return ConsumeStatus.FAIL; } return ConsumeStatus.SUCCESS; }

generateMDC方法如下:

原因分析

可以看到如果message中有traceId,则把traceId关联到该线程,并打印出来。但发现最终该方法执行完成后未做清理traceId的动作,即RocketMq的消费者用的是线程池,而线程回收后traceId依然绑定在该线程上,如果下次有消息过来消费则会有同样traceId出现

重现

消费者

@Slf4j @Service(value = "multiConsumerDemoProcessor") public class MultiConsumerDemoProcessor implements MessageProcessor<String> { @Override public boolean handleMessage(String orderNo) { log.info("开始消费:{}", orderNo); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return true; } @Override public Class<String> getClazz() { return null; } @Override public String transferMessage(String message) { return message; } }

生产者

public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("ip"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("multi-consumer-demo", "demo", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } //producer.shutdown(); } }
运行结果:


可以看到traceId是有重复的

解决

加上finally语句,释放traceId

解决结果

前端

http://www.jsqmd.com/news/738681/

相关文章:

  • 终极网络资源下载神器:5分钟掌握全平台素材捕获技巧
  • 在 OpenClaw Agent 工作流中接入 Taotoken 的详细配置指南
  • Mac NTFS读写痛点解决方案:Nigate工具助您节省90%跨平台文件操作时间
  • RK3318电视盒子刷Armbian系统:从硬件适配到应用部署全攻略
  • 数据迁移不求人:用Navicat导入向导,5分钟搞定MySQL/Oracle跨库数据同步
  • Taotoken账单详情与资源消耗的可追溯性体验
  • Java任务编排框架终极指南:如何快速构建高效任务管理系统?
  • ComfyUI IPAdapter Plus架构深度解析与高级配置实践指南
  • 终极窗口尺寸强制调整工具:3分钟掌握任何窗口的完全控制权
  • League Akari:英雄联盟玩家的终极本地自动化工具完整指南
  • 从图像修复到Deepfake检测:SSIM、PSNR这些老牌指标,在2024年还有用武之地吗?
  • CQO与QOC结构在NLP问答任务中的性能对比研究
  • Halcon实战:别再手动数角了!两种方法自动提取任意Region的顶点坐标(附源码)
  • FanControl终极指南:5分钟让Windows风扇控制变得如此简单
  • 【C语言FDA优化权威指南】:20年嵌入式专家首次公开FDA认证代码优化的7大黄金法则
  • 视觉语言模型在空间推理任务中的挑战与优化策略
  • NVIDIA GPU内存层次结构与MIG技术优化实践
  • 告别‘单打独斗’:CODE项目如何用协同自主算法打造无人机蜂群作战能力?
  • SCMP授权机构怎么查?中物联官方验证方法 - 众智商学院官方
  • 给SoC设计新人的Outstanding实战笔记:用AXI总线搞定Display带宽,别再只盯着公式了
  • 探索Zotero PDF Translate的3个架构突破:如何实现多引擎学术翻译生态
  • AI Agent赋能WordPress管理:clawwp开源项目实战指南
  • 别再对着Metasploitable2靶机发呆了!手把手教你用Kali Linux从21端口一路打到8787端口
  • OpenCV多摄像头开发避坑指南:如何通过VID/PID为你的USB摄像头办个‘身份证’
  • 多模态AI云端推理平台PrismerCloud:从模型部署到生产运维全解析
  • 如何用AKShare快速搭建你的量化投资数据平台?终极指南来啦!
  • 从GJB-5000A到5000B:手把手教你解读2021版软件能力成熟度模型的核心变化
  • 《空性与痕迹:自感痕迹论与全球思想史的重释——岐金兰AI元人文思想体系》
  • 如何彻底告别网盘限速:八大平台直链下载工具完全指南
  • 革命性开源字体解决方案:Bebas Neue免费商用字体的终极指南