-- 开启事务 START TRANSACTION; -- 设置隔离级别 SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; -- 执行操作 UPDATE accounts SET balance = balance - 100 WHERE id = 1; UPDATE accounts SET balance = balance + 100 WHERE id = 2; -- 提交事务 COMMIT; -- 回滚事务 ROLLBACK; -- 保存点 SAVEPOINT before_transfer; ROLLBACK TO SAVEPOINT before_transfer;
2.2 锁机制实现
-- 显式加锁 SELECT * FROM products WHERE id = 1 FOR UPDATE; -- 共享锁 SELECT * FROM products WHERE id = 1 LOCK IN SHARE MODE; -- 乐观锁 UPDATE products SET stock = stock - 1, version = version + 1 WHERE id = 1 AND version = 1; -- 悲观锁 SELECT * FROM orders WHERE status = 'pending' FOR UPDATE SKIP LOCKED;
2.3 并发控制工具
class TransactionManager: def __init__(self, connection): self.connection = connection def begin_transaction(self, isolation_level='REPEATABLE READ'): self.connection.execute(f"SET TRANSACTION ISOLATION LEVEL {isolation_level}") self.connection.execute("START TRANSACTION") def commit(self): try: self.connection.execute("COMMIT") return True except Exception as e: self.rollback() return False def rollback(self): self.connection.execute("ROLLBACK") def savepoint(self, name): self.connection.execute(f"SAVEPOINT {name}") def rollback_to_savepoint(self, name): self.connection.execute(f"ROLLBACK TO SAVEPOINT {name}") class OptimisticLockManager: def __init__(self, connection): self.connection = connection def update_with_version(self, table, id, updates): version = self._get_version(table, id) set_clause = ', '.join([f"{k} = {v}" for k, v in updates.items()]) set_clause += ', version = version + 1' sql = f"UPDATE {table} SET {set_clause} WHERE id = {id} AND version = {version}" cursor = self.connection.cursor() cursor.execute(sql) return cursor.rowcount > 0 def _get_version(self, table, id): sql = f"SELECT version FROM {table} WHERE id = {id}" cursor = self.connection.cursor() cursor.execute(sql) result = cursor.fetchone() return result[0] if result else 0