线程安全 ≠ 协程安全:当全局缓存同时遇上线程池和 async,优秀 Python 工程师该如何设计?
线程安全 ≠ 协程安全:当全局缓存同时遇上线程池和 async,优秀 Python 工程师该如何设计?
Python 让很多人第一次感受到编程的温柔:语法简洁,生态丰富,既能写 Web 服务,也能做数据分析、自动化脚本、机器学习和 AI 应用。但越是“简单好用”的语言,越容易在工程深水区暴露细节。
比如今天这个问题:
一个全局缓存,既会被线程池中的同步任务访问,又会被
asyncio异步上下文访问。
线程安全和协程安全有什么区别?threading.Lock和asyncio.Lock能混用吗?
这是一个非常典型的 Python 实战问题。它不只是锁的选择问题,而是并发模型、资源边界和工程责任的问题。
一、先给结论:不能简单混用
threading.Lock和asyncio.Lock不能当成同一种锁混用。
更准确地说:
threading.Lock 保护的是线程之间的共享资源。 asyncio.Lock 保护的是同一个事件循环内协程之间的共享资源。它们面对的调度模型不同:
线程:由操作系统抢占式调度,可能在任意时刻被切换。 协程:由事件循环协作式调度,通常在 await 处让出执行权。因此,在线程池和异步上下文同时访问全局缓存时,最危险的做法是:
# 错误倾向:以为用了 asyncio.Lock 就能保护线程池里的访问cache={}async_lock=asyncio.Lock()或者:
# 错误倾向:在 async 函数中直接用 threading.Lock 包住 awaitthread_lock=threading.Lock()asyncdefbad():withthread_lock:awaitsome_io()前者保护不了线程池里的同步代码,后者可能阻塞事件循环,甚至造成死锁和吞吐下降。
二、线程安全:面对“抢占式切换”的安全
线程安全关注的是:多个线程同时访问同一份可变数据时,程序是否仍然正确。
例如一个全局缓存:
cache={}defget_user(user_id):ifuser_idnotincache:cache[user_id]=load_user_from_db(user_id)returncache[user_id]这段代码看似简单,但在多线程下有竞态条件。
两个线程可能同时发现user_id not in cache,然后同时去数据库加载,再同时写入缓存。轻则重复请求,重则写入脏数据。
正确做法之一是使用threading.Lock:
importthreading cache={}cache_lock=threading.Lock()defget_user(user_id):withcache_lock:ifuser_idnotincache:cache[user_id]=load_user_from_db(user_id)returncache[user_id]但这里还有一个性能问题:如果load_user_from_db很慢,那么锁会被长时间持有,其他线程都被挡住。
更好的做法是区分“检查缓存”和“加载数据”的边界:
importthreading cache={}cache_lock=threading.Lock()defget_user(user_id):withcache_lock:user=cache.get(user_id)ifuserisnotNone:returnuser user=load_user_from_db(user_id)withcache_lock:existing=cache.get(user_id)ifexistingisnotNone:returnexisting cache[user_id]=userreturnuser这不是完美的单飞请求合并,但比长时间持锁更健康。它体现了一个重要原则:
锁应该尽量保护共享状态,而不是保护整个业务流程。
三、协程安全:面对“await 让出执行权”的安全
协程安全关注的是:多个协程在同一个事件循环中交替运行时,共享状态是否仍然正确。
很多初学者以为:
async 是单线程,所以不会有并发问题。
这也是误区。
看这个例子:
counter=0asyncdefincrease():globalcounter value=counterawaitasyncio.sleep(0)counter=value+1如果同时运行多个协程:
importasyncio counter=0asyncdefincrease():globalcounter value=counterawaitasyncio.sleep(0)counter=value+1asyncdefmain():awaitasyncio.gather(*(increase()for_inrange(1000)))print(counter)asyncio.run(main())你可能期待输出1000,但结果可能远小于 1000。
原因是:协程虽然不是被操作系统随时打断,但它会在await处主动让出控制权。多个协程可能读取到同一个旧值,然后覆盖彼此的更新。
此时应该使用asyncio.Lock:
importasyncio counter=0lock=asyncio.Lock()asyncdefincrease():globalcounterasyncwithlock:value=counterawaitasyncio.sleep(0)counter=value+1不过这里也有一个实践提醒:不要在锁内做太多耗时 I/O。虽然asyncio.Lock不会阻塞整个线程,但它会让其他等待同一把锁的协程排队。
四、线程安全和协程安全的核心差异
可以用一张表概括。
| 对比项 | 线程安全 | 协程安全 |
|---|---|---|
| 调度方式 | 操作系统抢占式调度 | 事件循环协作式调度 |
| 切换时机 | 理论上可能发生在很多地方 | 通常发生在await处 |
| 常用锁 | threading.Lock、RLock | asyncio.Lock |
| 是否阻塞线程 | 会阻塞当前线程 | 不阻塞事件循环线程,但会挂起协程 |
| 保护范围 | 多线程共享数据 | 同一事件循环内的协程共享数据 |
| 是否跨线程有效 | 是 | 否,不应用于线程池同步代码 |
| 是否可在 async 中随便用 | 不建议 | 是 async 场景首选 |
一句话:
threading.Lock 是给线程看的。 asyncio.Lock 是给协程看的。它们不是替代关系,而是服务于不同并发世界的工具。
五、错误示例:在 async 中直接使用 threading.Lock
假设你写了这样的代码:
importthreadingimportasyncio cache={}lock=threading.Lock()asyncdefget_value(key):withlock:ifkeyincache:returncache[key]value=awaitfetch_from_remote(key)cache[key]=valuereturnvalue这段代码有两个大问题。
第一,threading.Lock的获取是阻塞式的。如果另一个线程持有这把锁,当前事件循环线程会被卡住,整个 async 服务都可能变慢。
第二,更严重的是,你在持有threading.Lock的时候执行了await。这意味着当前协程让出了执行权,但锁还没释放。其他线程或代码如果也在等待这把锁,就可能形成非常难排查的阻塞链。
正确原则是:
不要在 threading.Lock 保护区内 await。 不要用阻塞锁包裹异步 I/O。六、错误示例:用 asyncio.Lock 保护线程池共享数据
再看另一个错误方向:
importasynciofromconcurrent.futuresimportThreadPoolExecutor cache={}lock=asyncio.Lock()defworker(key):# 线程池里的同步函数无法 async with lockifkeynotincache:cache[key]=compute(key)returncache[key]asyncio.Lock必须通过async with和await使用,它属于事件循环机制。线程池里的同步函数无法直接等待它。
即使你想强行绕,也会让代码变得脆弱、复杂,而且容易出现事件循环绑定、跨线程调度和死锁问题。
正确原则是:
asyncio.Lock 不能保护线程池里的普通同步函数。 线程池访问共享数据时,需要线程锁或更清晰的资源所有权模型。七、真实场景:全局缓存同时被线程池和 async 访问
假设我们有一个图片处理服务。
API 层是异步的:
@app.get("/image/{image_id}")asyncdefget_image(image_id:str):...但图像处理是 CPU 密集型,会丢到线程池或进程池中:
result=awaitloop.run_in_executor(pool,process_image,image_id)同时,API 层和 worker 都要访问一个全局缓存:
cache={"image:001":b"..."}这时如果随便在 async 代码里用asyncio.Lock,线程池不受保护;如果全局都用threading.Lock,async API 又可能被阻塞。
怎么办?
推荐三种方案。
八、方案一:统一用线程安全缓存,async 层通过to_thread访问
如果缓存必须被线程池和 async 层共同访问,可以把缓存设计成一个纯同步、线程安全的组件。
importthreadingimporttimefromtypingimportAnyclassThreadSafeTTLCache:def__init__(self,ttl_seconds:int=300):self._data:dict[str,tuple[float,Any]]={}self._ttl=ttl_seconds self._lock=threading.RLock()defget(self,key:str)->Any|None:now=time.time()withself._lock:item=self._data.get(key)ifitemisNone:returnNoneexpire_at,value=itemifexpire_at<now:delself._data[key]returnNonereturnvaluedefset(self,key:str,value:Any)->None:expire_at=time.time()+self._ttlwithself._lock:self._data[key]=(expire_at,value)defdelete(self,key:str)->None:withself._lock:self._data.pop(key,None)线程池中可以直接使用:
cache=ThreadSafeTTLCache()defprocess_image(image_id:str):cached=cache.get(image_id)ifcachedisnotNone:returncached result=heavy_compute(image_id)cache.set(image_id,result)returnresult异步上下文中不要直接调用可能阻塞的锁竞争逻辑,可以通过asyncio.to_thread转到线程中执行:
importasyncioasyncdefasync_get_cache(key:str):returnawaitasyncio.to_thread(cache.get,key)asyncdefasync_set_cache(key:str,value):awaitasyncio.to_thread(cache.set,key,value)API 层:
asyncdefget_image_result(image_id:str):cached=awaitasync_get_cache(image_id)ifcachedisnotNone:returncached loop=asyncio.get_running_loop()result=awaitloop.run_in_executor(None,process_image,image_id)returnresult这个方案的好处是:共享状态只有一种保护规则:线程锁。
缺点是:async 访问缓存时有线程切换开销。但对于复杂服务来说,这个开销通常比锁模型混乱更可控。
九、方案二:缓存只属于事件循环,线程通过安全通道访问
另一种思路是:明确缓存归属权。
比如规定:
缓存只允许在 async 事件循环中被直接访问。 线程池不能直接读写缓存。 线程池要访问缓存,必须通过事件循环提交请求。可以使用asyncio.run_coroutine_threadsafe:
importasynciofromtypingimportAnyclassAsyncCache:def__init__(self):self._data:dict[str,Any]={}self._lock=asyncio.Lock()asyncdefget(self,key:str):asyncwithself._lock:returnself._data.get(key)asyncdefset(self,key:str,value:Any):asyncwithself._lock:self._data[key]=value在线程中访问时:
defworker_get_cache(loop,cache:AsyncCache,key:str):future=asyncio.run_coroutine_threadsafe(cache.get(key),loop)returnfuture.result()这个方案更适合缓存强依赖 async 生命周期的场景。
但它也有风险:线程会等待事件循环返回结果。如果事件循环又在等待线程池结果,就可能形成循环等待。因此必须谨慎设计调用方向。
十、方案三:把缓存独立成服务或 Actor
在更复杂的系统里,我更推荐把共享状态变成一个“独立所有者”。
示意图:
Async API 层 | | 请求缓存 v Cache Manager / Actor ^ | 请求缓存 Thread Pool Worker缓存不再是一个到处可访问的全局 dict,而是由一个专门组件管理。其他模块通过方法、队列、RPC 或消息传递访问它。
这种思想叫:
不要共享内存来通信。 而是通过通信来共享状态。在 Python 项目中,可以用:
本地队列 专门的缓存管理线程 Redis Memcached 数据库缓存表 消息队列如果是多进程部署,进程内全局缓存本来就不可靠,因为每个进程都有自己的内存副本。此时 Redis 这类外部缓存往往更合适。
十一、最佳实践:锁要短,边界要清晰
无论使用线程锁还是协程锁,都要遵守几个原则。
1. 不要在锁里做慢操作
不推荐:
withlock:value=slow_network_call()cache[key]=value推荐:
value=slow_network_call()withlock:cache[key]=value对于 async 也是一样。
不推荐:
asyncwithlock:value=awaitfetch_data()cache[key]=value推荐:
value=awaitfetch_data()asyncwithlock:cache[key]=value当然,这可能带来重复加载问题。如果要避免重复加载,需要引入更精细的 per-key lock 或 singleflight 模式。
2. 锁保护的是不变量
比如缓存的不变量可能是:
_data 中每个 key 对应的 value 必须没有过期。 _hits 和 _misses 的统计必须和访问行为一致。 某个 key 的构建过程不能重复执行。锁不是为了“看起来安全”,而是为了保护这些不变量不被并发破坏。
3. 避免全局大锁
全局一把锁最简单,但可能限制吞吐。
可以使用 per-key lock:
importthreadingfromcollectionsimportdefaultdict cache={}locks=defaultdict(threading.Lock)defget_or_compute(key):withlocks[key]:ifkeyincache:returncache[key]value=compute(key)cache[key]=valuereturnvalue这样不同 key 之间可以并发执行。
但要注意:defaultdict(threading.Lock)自身在极端并发下也可能需要保护,生产环境可以用更严谨的锁管理器。
4. async 中使用 per-key lock
异步版本:
importasynciofromcollectionsimportdefaultdict cache={}locks=defaultdict(asyncio.Lock)asyncdefget_or_fetch(key):asyncwithlocks[key]:ifkeyincache:returncache[key]value=awaitfetch_value(key)cache[key]=valuereturnvalue这适用于纯 async 场景。
但如果线程池也要访问同一个cache,这个方案仍然不够,因为asyncio.Lock不保护线程。
十二、threading.Lock与asyncio.Lock到底能不能混用?
答案要分层说。
不能把它们当成同一把锁混用
不可以这样理解:
我 async 代码用 asyncio.Lock。 线程代码用 threading.Lock。 它们锁的是同一个 cache,所以应该安全。这是错误的。
如果两个锁不是同一套互斥机制,就可能出现:
协程拿到了 asyncio.Lock。 线程拿到了 threading.Lock。 二者同时修改同一个 cache。这等于没有真正互斥。
可以在系统中同时存在,但必须边界清晰
一个服务里当然可以同时使用两种锁:
asyncio.Lock 保护 async 内部状态。 threading.Lock 保护线程共享状态。但同一份共享数据,最好只归属于一种并发模型。
如果一份数据必须跨线程和协程共享,通常优先选择:
用 threading.Lock 作为底层保护; async 侧通过 to_thread 或专用适配层访问; 或者彻底改为消息传递 / 外部缓存。十三、一个推荐模板:混合场景下的缓存访问层
下面是一个更接近生产代码的模板。
importasyncioimportthreadingimporttimefromtypingimportCallable,TypeVar,Generic T=TypeVar("T")classSafeCache(Generic[T]):def__init__(self,ttl_seconds:int=300):self._ttl=ttl_seconds self._data:dict[str,tuple[float,T]]={}self._lock=threading.RLock()defget(self,key:str)->T|None:now=time.time()withself._lock:item=self._data.get(key)ifitemisNone:returnNoneexpire_at,value=itemifexpire_at<now:self._data.pop(key,None)returnNonereturnvaluedefset(self,key:str,value:T)->None:expire_at=time.time()+self._ttlwithself._lock:self._data[key]=(expire_at,value)defget_or_compute(self,key:str,factory:Callable[[],T])->T:value=self.get(key)ifvalueisnotNone:returnvalue new_value=factory()self.set(key,new_value)returnnew_valueclassAsyncCacheAdapter(Generic[T]):def__init__(self,cache:SafeCache[T]):self._cache=cacheasyncdefget(self,key:str)->T|None:returnawaitasyncio.to_thread(self._cache.get,key)asyncdefset(self,key:str,value:T)->None:awaitasyncio.to_thread(self._cache.set,key,value)asyncdefget_or_compute(self,key:str,factory:Callable[[],T])->T:returnawaitasyncio.to_thread(self._cache.get_or_compute,key,factory)使用方式:
cache=SafeCache[bytes](ttl_seconds=600)async_cache=AsyncCacheAdapter(cache)defsync_worker(image_id:str)->bytes:returncache.get_or_compute(image_id,lambda:process_image_sync(image_id))asyncdefasync_handler(image_id:str)->bytes:cached=awaitasync_cache.get(image_id)ifcachedisnotNone:returncached loop=asyncio.get_running_loop()returnawaitloop.run_in_executor(None,sync_worker,image_id)这个设计的关键是:
真正的数据结构只由 SafeCache 管。 SafeCache 只使用 threading.RLock。 async 层不直接碰锁,只通过适配器访问。这比“到处加锁”更清晰。
十四、调试与测试:不要相信感觉,要制造竞争
并发 bug 最可怕的地方是:它们不一定每次复现。
你可以写压力测试:
fromconcurrent.futuresimportThreadPoolExecutorimportasyncioimportrandom cache=SafeCache[int]()defsync_task(i:int):key=f"k-{random.randint(1,10)}"cache.set(key,i)returncache.get(key)asyncdefasync_task(i:int):key=f"k-{random.randint(1,10)}"awaitasyncio.to_thread(cache.set,key,i)returnawaitasyncio.to_thread(cache.get,key)asyncdefmain():loop=asyncio.get_running_loop()withThreadPoolExecutor(max_workers=8)aspool:thread_jobs=[loop.run_in_executor(pool,sync_task,i)foriinrange(1000)]async_jobs=[async_task(i)foriinrange(1000)]results=awaitasyncio.gather(*thread_jobs,*async_jobs)print(len(results))asyncio.run(main())同时观察:
是否出现 KeyError 是否出现 RuntimeError 是否有死锁 是否事件循环卡顿 CPU 是否异常升高 P95 / P99 延迟是否恶化优秀工程师不会只说“我觉得这样安全”,而是会用测试和监控证明它安全。
十五、给初学者的记忆口诀
可以记住这几句话:
线程锁管线程,协程锁管协程。 asyncio.Lock 不保护线程池。 threading.Lock 可能阻塞事件循环。 不要在 threading.Lock 里 await。 同一份共享状态,最好只有一个所有者。 不确定时,先画出数据流,再决定锁。这几句话比死背 API 更重要。
十六、前沿视角:未来 Python 并发会更重要
Python 生态正在持续向高性能和多场景并发演进。FastAPI、asyncio、AnyIO、Trio、Pandas、NumPy、PyTorch、任务队列、GPU 计算、边缘设备和 AI 服务,都让开发者越来越频繁地面对混合并发模型。
但趋势越热,越要回到基本功。
真正的 Python 最佳实践不是“所有项目都 async”,也不是“所有地方都加锁”,而是:
理解瓶颈。 明确资源归属。 减少共享状态。 缩小锁范围。 用测试验证假设。在复杂系统中,技术判断本身就是一种工程温度。你写下的每一把锁,都是未来维护者要理解的一段承诺。
十七、总结:优秀工程师要能把“安全边界”讲清楚
回到最初的问题:
线程安全与协程安全有什么差异?
threading.Lock与asyncio.Lock能混用吗?
最终答案是:
线程安全解决多线程抢占式并发下的共享数据问题。 协程安全解决同一事件循环中多个协程在 await 切换下的共享数据问题。 threading.Lock 和 asyncio.Lock 可以在同一系统中同时存在,但不能随意混用来保护同一份共享状态。对于“全局缓存同时被线程池和异步上下文访问”这个场景,我更推荐:
方案一:用线程安全缓存作为底层唯一真相,async 层通过 to_thread 访问。 方案二:缓存归 async 事件循环所有,线程通过安全通道提交请求。 方案三:把缓存外置为 Redis / Memcached / 独立缓存服务。不要让一个全局 dict 成为系统里最脆弱、最隐蔽、最难排查的地方。
Python 编程的高级感,不在于你用了多少新语法,而在于你能不能在复杂场景中写出清晰、可靠、可维护的代码。
互动问题
你在项目中是否遇到过“明明加了锁,还是出现并发 bug”的情况?
你更倾向于在混合并发场景中使用本地缓存,还是直接引入 Redis 这类外部缓存?
欢迎在评论区分享你的经验。很多时候,工程能力就是在这些真实问题里一点点长出来的。
