EasyTransaction可靠消息机制:保证消息最终一致性的完整指南
EasyTransaction可靠消息机制:保证消息最终一致性的完整指南
【免费下载链接】EasyTransactionA distribute transaction solution(分布式事务) unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation), reliable message, compensate and so on;项目地址: https://gitcode.com/gh_mirrors/ea/EasyTransaction
EasyTransaction是一款强大的分布式事务解决方案,它统一了TCC、SAGA、FMT(seata/fescar自动补偿)、可靠消息、补偿等多种事务模式的使用方式。本文将重点介绍如何利用EasyTransaction的可靠消息机制,轻松实现分布式系统中的消息最终一致性,确保业务数据的准确性和可靠性。
一、什么是可靠消息机制?
可靠消息机制是分布式系统中保障数据一致性的关键技术之一。它通过一系列机制确保消息在生产、传输和消费过程中的可靠性,即使在系统出现异常的情况下,也能保证消息最终被正确处理。
在EasyTransaction中,可靠消息机制具有以下特点:
- 事务提交后发送:消息会在本地事务成功提交后才被发送出去
- 持久化存储:消息会进行持久化存储,防止系统崩溃导致消息丢失
- 重试机制:提供完善的消息重试机制,确保消息最终被消费
二、可靠消息与最大努力交付消息的区别
EasyTransaction提供了两种消息投递模式,需要根据业务场景选择合适的模式:
2.1 可靠消息(Reliable Message)
可靠消息保证消息一定会被发布出去,即使在发布过程中出现异常,系统也会进行重试,直到消息成功发布。
// 可靠消息示例代码 OrderMessage orderMessage = new OrderMessage(); orderMessage.setUserId(userId); orderMessage.setAmount(money); Future<PublishResult> reliableMessage = transaction.execute(orderMessage);可靠消息适用于对消息可靠性要求极高的场景,如金融交易、订单状态更新等核心业务流程。
2.2 最大努力交付消息(Best Effort Message)
最大努力交付消息不保证消息一定被发布出去,它不需要持久化,因此速度更快。适用于对消息可靠性要求不高的场景。
// 最大努力交付消息示例代码 NotReliableOrderMessage notReliableMessage = new NotReliableOrderMessage(); notReliableMessage.setUserId(userId); notReliableMessage.setAmount(money); transaction.execute(notReliableMessage);三、如何使用EasyTransaction可靠消息机制
3.1 引入依赖
首先,需要在项目的pom.xml中引入EasyTransaction相关依赖。具体的依赖配置可以参考项目中的示例模块,如:
- easytrans-demo/tcc-and-msg/pom.xml
- easytrans-demo/log-redis/pom.xml
3.2 配置消息队列
EasyTransaction支持多种消息队列实现,如Kafka、ONS等。你需要根据项目需求选择合适的队列实现,并进行相应配置。
以Kafka为例,相关配置类位于:easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaQueueProperties.java
3.3 创建消息对象
创建消息对象,定义需要传递的数据字段:
public class OrderMessage implements ReliableMessagePublishRequest { private Long userId; private BigDecimal amount; // getter和setter方法 }3.4 发送可靠消息
在业务方法中,通过EasyTransaction的事务管理器发送可靠消息:
@Transactional public void createOrder(Long userId, BigDecimal money) { // 本地业务逻辑:创建订单 Order order = new Order(); order.setUserId(userId); order.setAmount(money); orderMapper.insert(order); // 发送可靠消息 OrderMessage orderMessage = new OrderMessage(); orderMessage.setUserId(userId); orderMessage.setAmount(money); transaction.execute(orderMessage); }3.5 消费消息
创建消息消费者,处理接收到的消息:
@Component public class OrderMessageConsumer implements EasyTransMsgListener<OrderMessage> { @Override public void onMessage(OrderMessage message) { // 处理消息:如增加用户积分、发送通知等 pointService.addPoints(message.getUserId(), message.getAmount().intValue()); } }四、可靠消息机制的实现原理
EasyTransaction的可靠消息机制主要通过以下几个组件实现:
4.1 消息存储
消息存储相关的实现类位于:easytrans-log-database-starter/src/main/java/com/yiqiniu/easytrans/log/impl/database/
消息会先被存储到数据库中,确保在事务提交前消息不会丢失。
4.2 消息发送器
消息发送器的实现位于:easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgPublisherImpl.java
事务提交后,发送器会从数据库中读取消息并发送到消息队列。
4.3 消息消费者
消息消费者的实现位于:easytrans-queue-kafka-starter/src/main/java/com/yiqiniu/easytrans/queue/impl/kafka/KafkaEasyTransMsgConsumerImpl.java
消费者负责从消息队列中获取消息并调用相应的处理方法。
4.4 消息重试机制
如果消息消费失败,EasyTransaction会自动进行重试。相关的重试逻辑可以在以下类中找到:easytrans-core/src/main/java/com/yiqiniu/easytrans/recovery/ConsistentGuardianDaemon.java
五、实际应用场景
可靠消息机制在分布式系统中有广泛的应用,例如:
5.1 订单创建后发送通知
当用户创建订单后,发送可靠消息通知库存系统减少库存,通知物流系统准备发货。
5.2 支付完成后更新订单状态
支付系统完成支付后,发送可靠消息通知订单系统更新订单状态,通知积分系统增加用户积分。
5.3 跨系统数据同步
当一个系统的数据发生变更时,通过可靠消息通知其他相关系统进行数据同步。
六、使用注意事项
消息幂等性:由于消息可能会被重试,因此消息处理逻辑必须保证幂等性,即多次处理同一消息不会产生副作用。
消息顺序:EasyTransaction的可靠消息机制不保证消息的顺序,如果业务需要严格的消息顺序,需要额外处理。
消息积压:需要监控消息队列的状态,避免消息积压影响系统性能。相关的监控实现可以参考:easytrans-core/src/main/java/com/yiqiniu/easytrans/monitor/
事务边界:确保消息发送操作在事务边界内执行,以保证本地事务和消息发送的原子性。
七、总结
EasyTransaction的可靠消息机制为分布式系统提供了简单而强大的消息一致性解决方案。通过本文的介绍,你应该已经了解了可靠消息的基本概念、使用方法和实现原理。
无论是构建电商平台、支付系统还是其他分布式应用,EasyTransaction都能帮助你轻松解决消息一致性问题,提高系统的可靠性和稳定性。
如果你想深入了解更多关于EasyTransaction的功能,可以参考项目中的示例代码,如:
- easytrans-demo/tcc-and-msg/
- easytrans-starter/src/test/java/com/yiqiniu/easytrans/test/mockservice/order/OrderService.java
开始使用EasyTransaction,让分布式事务变得简单! 🚀
【免费下载链接】EasyTransactionA distribute transaction solution(分布式事务) unified the usage of TCC , SAGA ,FMT (seata/fescar AutoCompensation), reliable message, compensate and so on;项目地址: https://gitcode.com/gh_mirrors/ea/EasyTransaction
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考
