分布式事务解决方案TCC实战
分布式事务解决方案TCC实战
一、分布式事务概述
在分布式系统中,事务跨越多个服务或数据库,传统的ACID事务无法直接适用,需要采用分布式事务解决方案。
1.1 分布式事务挑战
| 挑战 | 说明 |
|---|---|
| 网络延迟 | 跨服务调用存在网络延迟和超时 |
| 数据一致性 | 多个数据源需要保持一致 |
| 故障恢复 | 部分失败时需要回滚或补偿 |
| 性能影响 | 分布式锁和协调开销 |
1.2 常见解决方案
| 方案 | 特点 | 适用场景 |
|---|---|---|
| 2PC | 强一致,阻塞式 | 数据一致性要求极高 |
| TCC | 最终一致,非阻塞 | 高并发场景 |
| Saga | 事件驱动,长事务 | 业务流程编排 |
| 消息队列 | 最终一致,异步 | 解耦场景 |
二、TCC原理
TCC(Try-Confirm-Cancel)是一种基于业务层面的分布式事务解决方案。
2.1 TCC三阶段
┌─────────────────────────────────────────────────────────────┐ │ 事务协调器 │ └─────────────────────────────────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Try阶段 │ │ Confirm阶段 │ │ Cancel阶段 │ │ (预留资源) │ │ (确认提交) │ │ (取消释放) │ └─────────────────┘ └─────────────────┘ └─────────────────┘2.2 TCC状态机
public enum TccStatus { TRYING("尝试中"), CONFIRMING("确认中"), CANCELING("取消中"), SUCCESS("成功"), FAILED("失败"); }三、TCC实现
3.1 TCC接口定义
public interface TccTransaction { @TccMethod(confirmMethod = "confirm", cancelMethod = "cancel") void tryExecute(TccContext context); void confirm(TccContext context); void cancel(TccContext context); }3.2 订单服务TCC实现
@Service public class OrderTccService implements TccTransaction { @Autowired private OrderRepository orderRepository; @Override public void tryExecute(TccContext context) { Order order = Order.builder() .userId(context.getUserId()) .amount(context.getAmount()) .status("PENDING") .build(); orderRepository.save(order); context.setOrderId(order.getId()); context.setStatus(TccStatus.TRYING.name()); } @Override public void confirm(TccContext context) { Order order = orderRepository.findById(context.getOrderId()).orElse(null); if (order != null && "PENDING".equals(order.getStatus())) { order.setStatus("SUCCESS"); orderRepository.save(order); } context.setStatus(TccStatus.SUCCESS.name()); } @Override public void cancel(TccContext context) { Order order = orderRepository.findById(context.getOrderId()).orElse(null); if (order != null && "PENDING".equals(order.getStatus())) { order.setStatus("CANCELLED"); orderRepository.save(order); } context.setStatus(TccStatus.FAILED.name()); } }3.3 账户服务TCC实现
@Service public class AccountTccService implements TccTransaction { @Autowired private AccountRepository accountRepository; @Override public void tryExecute(TccContext context) { Account account = accountRepository.findByUserId(context.getUserId()); if (account.getBalance() < context.getAmount()) { throw new InsufficientBalanceException("余额不足"); } account.setFreezeAmount(account.getFreezeAmount() + context.getAmount()); account.setBalance(account.getBalance() - context.getAmount()); accountRepository.save(account); context.setStatus(TccStatus.TRYING.name()); } @Override public void confirm(TccContext context) { Account account = accountRepository.findByUserId(context.getUserId()); account.setFreezeAmount(account.getFreezeAmount() - context.getAmount()); accountRepository.save(account); context.setStatus(TccStatus.SUCCESS.name()); } @Override public void cancel(TccContext context) { Account account = accountRepository.findByUserId(context.getUserId()); account.setBalance(account.getBalance() + context.getAmount()); account.setFreezeAmount(account.getFreezeAmount() - context.getAmount()); accountRepository.save(account); context.setStatus(TccStatus.FAILED.name()); } }四、事务协调器
4.1 协调器设计
@Service public class TccCoordinator { @Autowired private TransactionRepository transactionRepository; @Autowired private ApplicationContext applicationContext; @Transactional public String startTransaction(List<TccParticipant> participants) { String transactionId = UUID.randomUUID().toString(); Transaction transaction = Transaction.builder() .transactionId(transactionId) .status(TccStatus.TRYING.name()) .participants(JsonUtil.toJson(participants)) .build(); transactionRepository.save(transaction); return transactionId; } public void commit(String transactionId) { Transaction transaction = transactionRepository.findById(transactionId).orElse(null); if (transaction == null) { return; } List<TccParticipant> participants = JsonUtil.fromJson( transaction.getParticipants(), new TypeReference<List<TccParticipant>>() {} ); for (TccParticipant participant : participants) { try { TccTransaction service = applicationContext.getBean( participant.getServiceName(), TccTransaction.class ); TccContext context = new TccContext(); context.setTransactionId(transactionId); context.setParams(participant.getParams()); service.confirm(context); } catch (Exception e) { rollback(transactionId); throw new TransactionCommitException("提交失败", e); } } transaction.setStatus(TccStatus.SUCCESS.name()); transactionRepository.save(transaction); } public void rollback(String transactionId) { Transaction transaction = transactionRepository.findById(transactionId).orElse(null); if (transaction == null) { return; } List<TccParticipant> participants = JsonUtil.fromJson( transaction.getParticipants(), new TypeReference<List<TccParticipant>>() {} ); for (TccParticipant participant : participants) { try { TccTransaction service = applicationContext.getBean( participant.getServiceName(), TccTransaction.class ); TccContext context = new TccContext(); context.setTransactionId(transactionId); context.setParams(participant.getParams()); service.cancel(context); } catch (Exception e) { // 记录日志,后续人工处理 log.error("回滚失败: {}", e.getMessage()); } } transaction.setStatus(TccStatus.FAILED.name()); transactionRepository.save(transaction); } }4.2 TCC上下文
public class TccContext { private String transactionId; private String status; private Map<String, Object> params = new HashMap<>(); public void setParam(String key, Object value) { params.put(key, value); } public <T> T getParam(String key, Class<T> clazz) { return clazz.cast(params.get(key)); } // getters and setters }五、异常处理与重试
5.1 幂等性保证
public class IdempotentUtil { private static final String PREFIX = "tcc:idempotent:"; @Autowired private StringRedisTemplate redisTemplate; public boolean checkAndLock(String key, long expireSeconds) { Boolean result = redisTemplate.opsForValue().setIfAbsent( PREFIX + key, "locked", expireSeconds, TimeUnit.SECONDS ); return Boolean.TRUE.equals(result); } public void release(String key) { redisTemplate.delete(PREFIX + key); } }5.2 重试机制
@Configuration public class RetryConfig { @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(1000); retryTemplate.setBackOffPolicy(backOffPolicy); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); retryTemplate.setRetryPolicy(retryPolicy); return retryTemplate; } }5.3 补偿任务
@Component public class CompensationTask { @Autowired private TransactionRepository transactionRepository; @Autowired private TccCoordinator coordinator; @Scheduled(fixedDelay = 60000) public void processPendingTransactions() { List<Transaction> pendingTransactions = transactionRepository.findByStatus( TccStatus.TRYING.name() ); for (Transaction transaction : pendingTransactions) { if (isTimeout(transaction.getCreateTime())) { coordinator.rollback(transaction.getTransactionId()); } } } private boolean isTimeout(Date createTime) { long diff = System.currentTimeMillis() - createTime.getTime(); return diff > 5 * 60 * 1000; // 5分钟超时 } }六、Seata TCC模式
6.1 引入依赖
<dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.6.1</version> </dependency>6.2 配置文件
seata: enabled: true application-id: order-service tx-service-group: my_tx_group config: type: nacos nacos: server-addr: localhost:8848 group: SEATA_GROUP namespace: registry: type: nacos nacos: server-addr: localhost:8848 group: SEATA_GROUP6.3 Seata TCC实现
@Service public class OrderTccServiceImpl { @LocalTCC public void prepare(BusinessActionContext context, OrderDTO orderDTO) { Order order = new Order(); order.setOrderId(orderDTO.getOrderId()); order.setStatus("PREPARE"); orderRepository.save(order); context.setActionContext("orderId", orderDTO.getOrderId()); } public void commit(BusinessActionContext context) { String orderId = context.getActionContext("orderId"); Order order = orderRepository.findByOrderId(orderId); order.setStatus("COMMITTED"); orderRepository.save(order); } public void rollback(BusinessActionContext context) { String orderId = context.getActionContext("orderId"); Order order = orderRepository.findByOrderId(orderId); order.setStatus("ROLLED_BACK"); orderRepository.save(order); } }七、最佳实践
7.1 设计原则
- 幂等性:Confirm和Cancel操作必须是幂等的
- 可补偿性:Cancel操作必须能正确回滚Try阶段的修改
- 资源隔离:Try阶段应锁定资源,防止并发问题
- 超时机制:设置合理的超时时间,自动触发Cancel
7.2 注意事项
| 事项 | 说明 |
|---|---|
| 网络分区 | 考虑网络分区情况下的一致性保障 |
| 数据库事务 | Try阶段应使用数据库事务保证原子性 |
| 日志记录 | 记录详细的事务日志便于排查问题 |
| 监控告警 | 监控事务成功率、失败率、超时率 |
7.3 性能优化
// 使用批量操作减少数据库访问 @Transactional public void batchConfirm(List<String> transactionIds) { List<Transaction> transactions = transactionRepository.findAllById(transactionIds); for (Transaction transaction : transactions) { // 批量更新状态 transaction.setStatus(TccStatus.SUCCESS.name()); } transactionRepository.saveAll(transactions); }八、总结
TCC是一种灵活的分布式事务解决方案,适用于高并发场景。通过合理设计Try、Confirm、Cancel三个阶段,可以在保证最终一致性的同时,获得较好的性能表现。
关键要点:
- TCC是业务层面的分布式事务解决方案
- 需要保证Confirm和Cancel的幂等性
- 需要实现超时和重试机制
- 可以借助Seata等框架简化实现
在实际应用中,应根据业务场景选择合适的分布式事务方案,平衡一致性、性能和复杂度。
