当前位置: 首页 > news >正文

EasyTransaction可靠消息机制:保证消息最终一致性的完整指南

EasyTransaction可靠消息机制:保证消息最终一致性的完整指南

【免费下载链接】EasyTransactionA distribute transaction solution(分布式事务) unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation), reliable message, compensate and so on;项目地址: https://gitcode.com/gh_mirrors/ea/EasyTransaction

EasyTransaction是一款强大的分布式事务解决方案,它统一了TCC、SAGA、FMT(seata/fescar自动补偿)、可靠消息、补偿等多种事务模式的使用方式。本文将重点介绍如何利用EasyTransaction的可靠消息机制,轻松实现分布式系统中的消息最终一致性,确保业务数据的准确性和可靠性。

一、什么是可靠消息机制?

可靠消息机制是分布式系统中保障数据一致性的关键技术之一。它通过一系列机制确保消息在生产、传输和消费过程中的可靠性,即使在系统出现异常的情况下,也能保证消息最终被正确处理。

在EasyTransaction中,可靠消息机制具有以下特点:

  • 事务提交后发送:消息会在本地事务成功提交后才被发送出去
  • 持久化存储:消息会进行持久化存储,防止系统崩溃导致消息丢失
  • 重试机制:提供完善的消息重试机制,确保消息最终被消费

二、可靠消息与最大努力交付消息的区别

EasyTransaction提供了两种消息投递模式,需要根据业务场景选择合适的模式:

2.1 可靠消息(Reliable Message)

可靠消息保证消息一定会被发布出去,即使在发布过程中出现异常,系统也会进行重试,直到消息成功发布。

// 可靠消息示例代码 OrderMessage orderMessage = new OrderMessage(); orderMessage.setUserId(userId); orderMessage.setAmount(money); Future<PublishResult> reliableMessage = transaction.execute(orderMessage);

可靠消息适用于对消息可靠性要求极高的场景,如金融交易、订单状态更新等核心业务流程。

2.2 最大努力交付消息(Best Effort Message)

最大努力交付消息不保证消息一定被发布出去,它不需要持久化,因此速度更快。适用于对消息可靠性要求不高的场景。

// 最大努力交付消息示例代码 NotReliableOrderMessage notReliableMessage = new NotReliableOrderMessage(); notReliableMessage.setUserId(userId); notReliableMessage.setAmount(money); transaction.execute(notReliableMessage);

三、如何使用EasyTransaction可靠消息机制

3.1 引入依赖

首先,需要在项目的pom.xml中引入EasyTransaction相关依赖。具体的依赖配置可以参考项目中的示例模块,如:

  • easytrans-demo/tcc-and-msg/pom.xml
  • easytrans-demo/log-redis/pom.xml

3.2 配置消息队列

EasyTransaction支持多种消息队列实现,如Kafka、ONS等。你需要根据项目需求选择合适的队列实现,并进行相应配置。

以Kafka为例,相关配置类位于:easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaQueueProperties.java

3.3 创建消息对象

创建消息对象,定义需要传递的数据字段:

public class OrderMessage implements ReliableMessagePublishRequest { private Long userId; private BigDecimal amount; // getter和setter方法 }

3.4 发送可靠消息

在业务方法中,通过EasyTransaction的事务管理器发送可靠消息:

@Transactional public void createOrder(Long userId, BigDecimal money) { // 本地业务逻辑:创建订单 Order order = new Order(); order.setUserId(userId); order.setAmount(money); orderMapper.insert(order); // 发送可靠消息 OrderMessage orderMessage = new OrderMessage(); orderMessage.setUserId(userId); orderMessage.setAmount(money); transaction.execute(orderMessage); }

3.5 消费消息

创建消息消费者,处理接收到的消息:

@Component public class OrderMessageConsumer implements EasyTransMsgListener<OrderMessage> { @Override public void onMessage(OrderMessage message) { // 处理消息:如增加用户积分、发送通知等 pointService.addPoints(message.getUserId(), message.getAmount().intValue()); } }

四、可靠消息机制的实现原理

EasyTransaction的可靠消息机制主要通过以下几个组件实现:

4.1 消息存储

消息存储相关的实现类位于:easytrans-log-database-starter/src/main/java/com/yiqiniu/easytrans/log/impl/database/

消息会先被存储到数据库中,确保在事务提交前消息不会丢失。

4.2 消息发送器

消息发送器的实现位于:easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgPublisherImpl.java

事务提交后,发送器会从数据库中读取消息并发送到消息队列。

4.3 消息消费者

消息消费者的实现位于:easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgConsumerImpl.java

消费者负责从消息队列中获取消息并调用相应的处理方法。

4.4 消息重试机制

如果消息消费失败,EasyTransaction会自动进行重试。相关的重试逻辑可以在以下类中找到:easytrans-core/src/main/java/com/yiqiniu/easytrans/recovery/ConsistentGuardianDaemon.java

五、实际应用场景

可靠消息机制在分布式系统中有广泛的应用,例如:

5.1 订单创建后发送通知

当用户创建订单后,发送可靠消息通知库存系统减少库存,通知物流系统准备发货。

5.2 支付完成后更新订单状态

支付系统完成支付后,发送可靠消息通知订单系统更新订单状态,通知积分系统增加用户积分。

5.3 跨系统数据同步

当一个系统的数据发生变更时,通过可靠消息通知其他相关系统进行数据同步。

六、使用注意事项

  1. 消息幂等性:由于消息可能会被重试,因此消息处理逻辑必须保证幂等性,即多次处理同一消息不会产生副作用。

  2. 消息顺序:EasyTransaction的可靠消息机制不保证消息的顺序,如果业务需要严格的消息顺序,需要额外处理。

  3. 消息积压:需要监控消息队列的状态,避免消息积压影响系统性能。相关的监控实现可以参考:easytrans-core/src/main/java/com/yiqiniu/easytrans/monitor/

  4. 事务边界:确保消息发送操作在事务边界内执行,以保证本地事务和消息发送的原子性。

七、总结

EasyTransaction的可靠消息机制为分布式系统提供了简单而强大的消息一致性解决方案。通过本文的介绍,你应该已经了解了可靠消息的基本概念、使用方法和实现原理。

无论是构建电商平台、支付系统还是其他分布式应用,EasyTransaction都能帮助你轻松解决消息一致性问题,提高系统的可靠性和稳定性。

如果你想深入了解更多关于EasyTransaction的功能,可以参考项目中的示例代码,如:

  • easytrans-demo/tcc-and-msg/
  • easytrans-starter/src/test/java/com/yiqiniu/easytrans/test/mockservice/order/OrderService.java

开始使用EasyTransaction,让分布式事务变得简单! 🚀

【免费下载链接】EasyTransactionA distribute transaction solution(分布式事务) unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation), reliable message, compensate and so on;项目地址: https://gitcode.com/gh_mirrors/ea/EasyTransaction

创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

http://www.jsqmd.com/news/658902/

相关文章:

  • StructBERT相似度镜像免配置方案:支持离线环境无网络部署
  • DC/OS高可用性设计:Master节点故障恢复机制
  • python语法练习------题目 2:继承特性
  • Evaluate 核心组件详解:Metrics、Measurements 和 Comparisons
  • layui table单元格编辑 layui表格如何实现可编辑
  • 终极PSReadLine编辑指南:Emacs与Vi模式深度对比及高效切换技巧
  • 如何快速上手Reko:5分钟学会二进制文件反编译
  • 超越默认配置:手把手教你将自定义算法集成到MoveIt!与OMPL
  • 大规模HTML解析任务分发:gumbo-parser与ZooKeeper的完美结合指南
  • 3个关键步骤解决FanControl风扇控制问题:AMD显卡用户的完整指南
  • 嵌入式开发法律风险防控与知识产权保护实战指南
  • Polyglot词向量应用指南:137种语言的语义相似度计算
  • Qwen3-Embedding-4B部署实录:CentOS系统环境配置避坑指南
  • OpenVAS Scanner扫描插件结果数据备份压缩算法选择终极指南
  • Neural Tangents实战:10个核心函数详解与代码示例
  • 网络拓扑可视化:Easy-Topo的智能图形编辑解决方案
  • Faster-RCNN_TF核心架构解析:深入理解区域提议网络RPN
  • 如何解决DG主库执行Drop Tablespace备库未同步_STANDBY_FILE_MANAGEMENT排查
  • 伏羲天气预报科研应用:高校气象实验室快速搭建AI驱动预报验证平台
  • 终极PerceptualSimilarity社区贡献指南:如何参与LPIPS项目开发与改进
  • ThetaGang实战案例:如何用Docker每日自动运行交易
  • 如何快速上手Multitarget-tracker:5分钟入门多目标跟踪
  • 在Obsidian中高效管理B站视频的终极解决方案
  • WuliArt Qwen-Image Turbo高质量输出:JPEG高保真压缩下的色彩还原实测
  • 如何用Smithbox打造你的专属魂系游戏体验:从入门到精通的5个关键步骤
  • 2026届学术党必备的五大降AI率神器推荐
  • 技术写作者的多元变现之路:从零到月入过万的实战指南
  • 如何用解构赋值快速提取数组前几个元素到独立变量
  • Jasminum插件:中文文献管理的终极解决方案指南
  • fake2db社区贡献指南:如何为开源项目添加新的数据库支持