分布式事务反直觉坑位与避坑指南:你以为的一致性可能不存在
分布式事务反直觉坑位与避坑指南:你以为的一致性可能不存在
一、分布式事务的"直觉陷阱"
写单机事务时,直觉通常是可靠的:BEGIN → 操作 → COMMIT,要么全成功,要么全回滚。但到了分布式环境,直觉开始骗人。你以为两阶段提交(2PC)保证了原子性?协调者宕机时,参与者可能永远锁住资源。你以为读已提交(Read Committed)不会读到脏数据?跨分片的快照隔离可能让你读到时间上不可能存在的状态。你以为幂等重试是安全的?同一笔业务的重试可能在另一个分片上创建了重复记录。这些坑不是理论上的,每一个都是我在生产环境中踩过的。分布式事务的难点不在于算法本身,而在于"你以为的行为"和"实际的行为"之间的差距。
二、分布式事务的核心机制与反直觉场景
2.1 分布式事务的执行模型
graph TD A[客户端发起事务] --> B[协调者开启事务] B --> C[Phase 1: Prepare] C --> D[参与者1: 准备就绪] C --> E[参与者2: 准备就绪] C --> F[参与者3: 准备就绪] D --> G{所有参与者就绪?} E --> G F --> G G -->|是| H[Phase 2: Commit] G -->|否| I[Phase 2: Rollback] H --> J[参与者1: 提交] H --> K[参与者2: 提交] H --> L[参与者3: 提交] M[协调者宕机] -->|Phase 1 后| N[参与者阻塞等待] N --> O[资源锁定] O --> P[业务超时]2.2 2PC 的三个反直觉场景
场景一:协调者在 Phase 2 宕机。参与者已经 Prepare 成功,持有锁等待协调者的 Commit/Rollback 指令。协调者恢复前,参与者无法释放锁,业务阻塞。直觉认为"Prepare 成功就等于提交",但协议规定必须等 Commit 指令。
场景二:网络分区导致部分参与者收不到 Commit。收到 Commit 的参与者提交了事务,没收到的还在等待。此时系统处于不一致状态:部分分片已提交,部分分片未提交。
场景三:超时回滚与实际提交的冲突。参与者 Prepare 后等待超时,自行回滚。但协调者可能在超时前已经发出了 Commit,只是网络延迟导致参与者没收到。结果:协调者认为事务已提交,参与者认为事务已回滚。
2.3 隔离级别的跨分片失效
单机数据库的快照隔离(Snapshot Isolation)保证同一事务内的读取看到一致性快照。但在跨分片事务中,不同分片的快照可能在不同时间点获取,导致读到"时间旅行"状态。
三、生产级实现:分布式事务的防御性编程
import uuid import time import hashlib import threading from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple, Set from collections import defaultdict from enum import Enum class TxnStatus(Enum): """事务状态""" INITIATED = "initiated" PREPARING = "preparing" PREPARED = "prepared" COMMITTING = "committing" COMMITTED = "committed" ROLLING_BACK = "rolling_back" ROLLED_BACK = "rolled_back" TIMEOUT = "timeout" UNKNOWN = "unknown" class ParticipantStatus(Enum): """参与者状态""" READY = "ready" PREPARED = "prepared" COMMITTED = "committed" ABORTED = "aborted" TIMEOUT = "timeout" @dataclass class Participant: """事务参与者""" participant_id: str endpoint: str status: ParticipantStatus = ParticipantStatus.READY prepare_time: Optional[float] = None commit_time: Optional[float] = None error_message: str = "" @dataclass class DistributedTransaction: """分布式事务""" txn_id: str participants: List[Participant] status: TxnStatus = TxnStatus.INITIATED create_time: float = 0.0 timeout_seconds: float = 30.0 # 幂等键,防止重复提交 idempotency_key: str = "" # 重试次数 retry_count: int = 0 max_retries: int = 3 @dataclass class TransactionLog: """事务日志,用于恢复""" txn_id: str status: TxnStatus participants_status: Dict[str, ParticipantStatus] timestamp: float payload: str = "" # 事务载荷的哈希,用于校验 class DefensiveTransactionManager: """ 防御性分布式事务管理器 在标准 2PC 基础上增加超时处理、幂等保护和死信队列 """ def __init__( self, default_timeout: float = 30.0, max_retries: int = 3, retry_backoff_base: float = 1.0 ): self.default_timeout = default_timeout self.max_retries = max_retries self.retry_backoff_base = retry_backoff_base # 事务日志,持久化存储 self._txn_logs: Dict[str, TransactionLog] = {} # 幂等键去重表 self._idempotency_keys: Dict[str, str] = {} # 死信队列,存放无法完成的事务 self._dead_letter_queue: List[DistributedTransaction] = [] self._lock = threading.RLock() def begin_transaction( self, participant_endpoints: List[str], idempotency_key: str = "", timeout: Optional[float] = None ) -> DistributedTransaction: """ 开启分布式事务 工程细节:必须生成幂等键,防止网络重试导致重复事务 """ txn_id = self._generate_txn_id() if not idempotency_key: idempotency_key = f"idem_{txn_id}" # 幂等键去重:同一幂等键只允许一个活跃事务 with self._lock: if idempotency_key in self._idempotency_keys: existing_txn_id = self._idempotency_keys[idempotency_key] raise ValueError( f"幂等键 {idempotency_key} 已被事务 " f"{existing_txn_id} 使用,拒绝重复提交" ) self._idempotency_keys[idempotency_key] = txn_id participants = [ Participant( participant_id=f"p_{i}", endpoint=ep ) for i, ep in enumerate(participant_endpoints) ] txn = DistributedTransaction( txn_id=txn_id, participants=participants, create_time=time.time(), timeout_seconds=timeout or self.default_timeout, idempotency_key=idempotency_key ) # 记录事务日志,用于恢复 self._write_txn_log(txn, TxnStatus.INITIATED) return txn def prepare(self, txn: DistributedTransaction) -> bool: """ Phase 1: 准备阶段 向所有参与者发送 Prepare 请求 工程细节:设置超时,避免无限等待 """ txn.status = TxnStatus.PREPARING self._write_txn_log(txn, TxnStatus.PREPARING) deadline = txn.create_time + txn.timeout_seconds all_prepared = True for participant in txn.participants: if time.time() > deadline: # 超时,标记剩余参与者为超时状态 participant.status = ParticipantStatus.TIMEOUT all_prepared = False continue try: # 模拟向参与者发送 Prepare 请求 prepared = self._send_prepare(participant, deadline) if prepared: participant.status = ParticipantStatus.PREPARED participant.prepare_time = time.time() else: participant.status = ParticipantStatus.ABORTED all_prepared = False except TimeoutError: participant.status = ParticipantStatus.TIMEOUT all_prepared = False except Exception as e: participant.status = ParticipantStatus.ABORTED participant.error_message = str(e) all_prepared = False if all_prepared: txn.status = TxnStatus.PREPARED self._write_txn_log(txn, TxnStatus.PREPARED) else: # 有参与者未就绪,进入回滚 self._rollback(txn) return all_prepared def commit(self, txn: DistributedTransaction) -> bool: """ Phase 2: 提交阶段 向所有参与者发送 Commit 请求 工程细节:Commit 必须无限重试直到成功 这是 2PC 的核心约束——一旦 Prepare 成功就必须 Commit """ if txn.status != TxnStatus.PREPARED: return False txn.status = TxnStatus.COMMITTING self._write_txn_log(txn, TxnStatus.COMMITTING) # Commit 阶段必须成功,即使部分参与者暂时不可达 # 这是 2PC 最容易出问题的地方 all_committed = True for participant in txn.participants: committed = self._send_commit_with_retry(participant, txn) if not committed: all_committed = False if all_committed: txn.status = TxnStatus.COMMITTED self._write_txn_log(txn, TxnStatus.COMMITTED) # 清理幂等键 with self._lock: self._idempotency_keys.pop(txn.idempotency_key, None) else: # 部分参与者提交失败,进入异常处理 self._handle_partial_commit(txn) return all_committed def _rollback(self, txn: DistributedTransaction) -> None: """ 回滚事务 工程细节:即使回滚也可能失败,需要重试 """ txn.status = TxnStatus.ROLLING_BACK self._write_txn_log(txn, TxnStatus.ROLLING_BACK) for participant in txn.participants: if participant.status in ( ParticipantStatus.PREPARED, ParticipantStatus.ABORTED ): try: self._send_rollback(participant) participant.status = ParticipantStatus.ABORTED except Exception: # 回滚失败,加入死信队列 pass txn.status = TxnStatus.ROLLED_BACK self._write_txn_log(txn, TxnStatus.ROLLED_BACK) # 清理幂等键 with self._lock: self._idempotency_keys.pop(txn.idempotency_key, None) def _handle_partial_commit( self, txn: DistributedTransaction ) -> None: """ 处理部分提交的异常状态 这是最危险的状态:部分分片已提交,部分未提交 策略:持续重试,超过最大次数后进入死信队列 """ txn.retry_count += 1 if txn.retry_count >= txn.max_retries: # 超过最大重试次数,进入死信队列,人工介入 self._dead_letter_queue.append(txn) self._write_txn_log(txn, TxnStatus.UNKNOWN) return # 指数退避重试 backoff = self.retry_backoff_base * (2 ** txn.retry_count) time.sleep(min(backoff, 60)) # 最大等待60秒 # 重试未提交的参与者 for participant in txn.participants: if participant.status != ParticipantStatus.COMMITTED: self._send_commit_with_retry(participant, txn) def _send_prepare( self, participant: Participant, deadline: float ) -> bool: """模拟发送 Prepare 请求""" # 实际实现中是 RPC 调用 remaining = deadline - time.time() if remaining <= 0: raise TimeoutError("Prepare 请求超时") return True def _send_commit_with_retry( self, participant: Participant, txn: DistributedTransaction ) -> bool: """ 带重试的 Commit 请求 Commit 失败必须重试,不能放弃 """ for attempt in range(self.max_retries): try: # 模拟 Commit RPC participant.status = ParticipantStatus.COMMITTED participant.commit_time = time.time() return True except Exception: backoff = self.retry_backoff_base * (2 ** attempt) time.sleep(min(backoff, 30)) return False def _send_rollback(self, participant: Participant) -> None: """模拟发送 Rollback 请求""" # 实际实现中是 RPC 调用 pass def _write_txn_log( self, txn: DistributedTransaction, status: TxnStatus ) -> None: """ 写入事务日志 每次状态变更都持久化,用于协调者宕机后的恢复 """ log = TransactionLog( txn_id=txn.txn_id, status=status, participants_status={ p.participant_id: p.status for p in txn.participants }, timestamp=time.time() ) self._txn_logs[txn.txn_id] = log def _generate_txn_id(self) -> str: """生成全局唯一的事务ID""" return f"txn_{uuid.uuid4().hex[:16]}_{int(time.time()*1000)}" def recover(self) -> List[DistributedTransaction]: """ 协调者恢复:扫描事务日志,处理未完成的事务 这是 2PC 恢复机制的核心 """ recovered = [] for txn_id, log in self._txn_logs.items(): if log.status == TxnStatus.PREPARING: # 准备阶段宕机,安全回滚 recovered.append(self._rebuild_txn(txn_id, log)) elif log.status == TxnStatus.PREPARED: # 准备完成但未提交,需要重新提交 # 这是恢复的关键:不能回滚,因为参与者已 Prepare recovered.append(self._rebuild_txn(txn_id, log)) elif log.status == TxnStatus.COMMITTING: # 提交阶段宕机,需要重试提交 recovered.append(self._rebuild_txn(txn_id, log)) elif log.status == TxnStatus.ROLLING_BACK: # 回滚阶段宕机,需要重试回滚 recovered.append(self._rebuild_txn(txn_id, log)) return recovered def _rebuild_txn( self, txn_id: str, log: TransactionLog ) -> DistributedTransaction: """从日志重建事务对象""" participants = [ Participant( participant_id=pid, endpoint="", status=status ) for pid, status in log.participants_status.items() ] return DistributedTransaction( txn_id=txn_id, participants=participants, status=log.status, create_time=log.timestamp ) class SagaTransactionManager: """ Saga 模式事务管理器 用补偿操作替代 2PC 的全局锁,适合长事务场景 核心思路:每个步骤都有对应的补偿操作,失败时逆向执行补偿 """ @dataclass class SagaStep: """Saga 步骤""" step_id: str action: callable # 正向操作 compensation: callable # 补偿操作 executed: bool = False compensated: bool = False def __init__(self): self._steps: List[SagaTransactionManager.SagaStep] = [] self._executed_steps: List[int] = [] # 已执行步骤的索引 def add_step( self, step_id: str, action: callable, compensation: callable ) -> None: """ 添加 Saga 步骤 每个步骤必须定义补偿操作,否则无法回滚 """ self._steps.append(self.SagaStep( step_id=step_id, action=action, compensation=compensation )) def execute(self) -> Tuple[bool, str]: """ 执行 Saga 事务 正向执行所有步骤,任一步骤失败则逆向补偿 """ for i, step in enumerate(self._steps): try: step.action() step.executed = True self._executed_steps.append(i) except Exception as e: # 正向执行失败,开始逆向补偿 self._compensate() return False, f"步骤 {step.step_id} 失败: {str(e)}" return True, "Saga 事务执行成功" def _compensate(self) -> None: """ 逆向补偿已执行的步骤 补偿顺序:最后执行的步骤最先补偿(栈式回滚) 补偿操作本身也可能失败,需要记录并人工处理 """ for idx in reversed(self._executed_steps): step = self._steps[idx] try: step.compensation() step.compensated = True except Exception: # 补偿失败,记录到死信队列 # 补偿失败意味着系统处于不一致状态 # 必须人工介入 step.compensated = False3.1 为什么需要幂等键
网络重试是分布式系统的常态。客户端超时后重试,可能产生两笔相同的事务。幂等键在事务开始前就去重,确保同一业务操作只执行一次。这是分布式事务的第一道防线。
3.2 为什么 Commit 必须无限重试
2PC 的核心约束:一旦参与者 Prepare 成功,就必须 Commit。因为 Prepare 成功意味着参与者已经承诺可以提交,如果协调者决定 Rollback,参与者可能已经无法回滚(比如已经将数据写入不可撤销的存储)。所以 Commit 失败只能重试,不能回滚。
3.3 Saga 的补偿操作设计原则
补偿操作必须满足:幂等(重复执行结果相同)、可观测(执行结果可查询)、尽量完整(补偿后数据状态接近事务开始前)。补偿操作不是"撤销",而是"反向操作"——比如正向是"扣款",补偿是"退款",不是"删除扣款记录"。
四、分布式事务的架构权衡
4.1 2PC vs Saga vs TCC
| 维度 | 2PC | Saga | TCC |
|---|---|---|---|
| 一致性 | 强一致 | 最终一致 | 最终一致 |
| 锁持有时间 | 长(整个事务期间) | 短(每个步骤独立) | 中(Try 阶段预留资源) |
| 实现复杂度 | 低(协议简单) | 中(需设计补偿操作) | 高(需设计 Try/Confirm/Cancel) |
| 性能 | 低(全局锁) | 高(无全局锁) | 中(资源预留有开销) |
| 适用场景 | 短事务、强一致性要求 | 长事务、可接受最终一致 | 中等事务、需要资源预留 |
4.2 跨分片快照隔离的实现
MySQL 的 XA 事务在跨分片时不保证快照隔离。解决方案:在应用层实现逻辑快照——事务开始时记录全局时间戳,每个分片的读取都使用该时间戳的快照。这需要存储引擎支持 AS OF 语法或 MVCC 快照读取。
4.3 事务与消息队列的配合
分布式事务经常需要与消息队列配合:事务提交后发送消息。问题是:事务提交成功但消息发送失败怎么办?解决方案:本地消息表——将消息写入与业务数据同一数据库的本地表,事务提交后由后台任务扫描并发送。
4.4 死信队列与人工介入
所有自动恢复机制都有边界。超过最大重试次数的事务进入死信队列,由人工介入处理。死信队列不是失败,而是"机器处理不了,需要人判断"。人工处理完成后,更新事务状态并清理死信队列。
五、总结
分布式事务的反直觉坑位,根源在于"单机直觉"与"分布式现实"的冲突。2PC 的阻塞问题、跨分片隔离级别的失效、幂等重试的必要性、补偿操作的设计原则——每一个都需要打破单机思维,重新建立分布式场景下的正确直觉。防御性编程是核心策略:假设一切都会失败,为每种失败设计恢复路径。事务的终极目标不是"不失败",而是"失败后能恢复到一致状态"。
