当 asyncio.Lock 遇上多线程:一个看似简单却三次修错的并发 Bug
在开发 MiQi Desktop(一个多通道 AI Agent 桌面应用)时,我们遇到了一个经典的并发问题:两个对话同时进行,只有一个能收到回复。排查和修复这个 bug 的过程,暴露了 Python asyncio 在多线程场景下一个容易被忽视的陷阱。
现象
用户反馈:
“同时对话两个,只有一个有回答”
日志显示:
Failed to activate sandbox for desktop:1781234544206: <asyncio.locks.Lock object at 0x000002B54A5F7830 [unlocked, waiters:1]> is bound to a different event loop一个对话正常响应,另一个卡死不动。
架构背景
MiQi Desktop 的 Bridge 进程通过 stdin/stdout 与 Electron 前端通信。每当用户发送一条消息,Bridge 的处理流程是:
defhandle_chat_send(req_id,params):def_run_in_thread():asyncdef_run():agent=build_agent(session_key)result=awaitagent.process_direct(content)send_result(result)asyncio.run(_run())# 每次创建新的 event loop# 每条消息启一个新的 daemon threadt=threading.Thread(target=_run_in_thread,daemon=True)t.start()关键设计决策:
- 每条消息 = 一个新 thread + 一个新 event loop
- SandboxManager 是进程级单例,所有消息共享同一个实例
第一次修:延迟创建 Lock(失败)
最初SandboxManager.__init__在构造时就创建了asyncio.Lock():
classSandboxManager:def__init__(self):self._lock=asyncio.Lock()# 构造时创建我以为问题是"构造时没有 event loop",于是改成延迟创建:
classSandboxManager:def__init__(self):self._lock=None# 延迟asyncdefinitialize(self):ifself._lockisNone:self._lock=asyncio.Lock()# 第一次 await 时创建测试:单条消息发送 → 通过。
结果:用户再次报错。原因很明显——第一条消息把 Lock 绑定到了 Loop A,第二条消息从 Loop B 调用时还是报错。
教训:只测了串行场景,没测并发。
第二次修:检测 Loop 变化并重建 Lock(失败)
既然问题是"Lock 绑错了 Loop",那我每次检测当前 Loop 是否跟上次一样,不一样就重建:
asyncdefinitialize(self):current_loop=asyncio.get_running_loop()ifself._lockisNoneorself._lock_loopisnotcurrent_loop:self._lock=asyncio.Lock()# 重建self._lock_loop=current_loop测试:两个串行的asyncio.run()调用 → 通过。
结果:并发时直接挂死。
# 真实并发场景(挂死)defreq1():asyncio.run(...)# thread1, loop Adefreq2():asyncio.run(...)# thread2, loop Bt1.start();t2.start()# 同时跑t1.join();t2.join()# 永远等不到根因分析:
时间线: t=0 thread1: initialize() → 创建 lock_A,绑到 loop_A t=1 thread2: initialize() → 检测到 loop 不同 → 重建 lock_B,绑到 loop_B ↑ 此时 self._lock 从 lock_A 变成了 lock_B t=2 thread1: async with self._lock: ← 读到的是 lock_B(loop_B 的锁)! → "bound to a different event loop" 或死等thread2 的重建操作踩掉了thread1 正在使用的锁对象。这是一个经典的 TOCTOU(Time-of-check to time-of-use)竞态。
教训:
- 并发场景下,"检测+重建"不是原子操作
- 两个 thread 对同一个
self._lock字段的读写本身就需要同步 - 用 asyncio 的工具来修 asyncio 的跨线程问题,本质上是死局
第三次修:用 threading.Lock 替代 asyncio.Lock(成功)
回到本质:我们需要的是什么?
- 保护
_sandboxes字典的并发读写(微秒级操作) - 允许多个 thread 的 event loop 同时调用 manager 的方法
- 不阻塞 event loop 太久
threading.Lock完美满足这三个需求:
importthreadingclassSandboxManager:def__init__(self):self._lock=threading.Lock()# 无 loop 亲和性self._creating:set[str]=set()# 防重复创建asyncdefget_or_create(self,session_key):# 快路径:已存在,直接返回(微秒级锁)withself._lock:ifsession_keyinself._sandboxes:returnself._sandboxes[session_key]ifsession_keyinself._creating:returnNone# 另一个 thread 正在创建self._creating.add(session_key)# 慢路径:创建 sandbox(锁外执行,不阻塞其他 thread)sandbox=BwrapSandbox(session_key=session_key,...)try:awaitsandbox.start()# 可能耗时数秒withself._lock:self._sandboxes[session_key]=sandbox self._creating.discard(session_key)returnsandboxexceptException:withself._lock:self._creating.discard(session_key)returnNone设计要点
threading.Lock不绑定任何 event loop——哪个 thread 都能用- 临界区极短——只保护字典操作(读写
_sandboxes),微秒级 - 慢操作在锁外——
sandbox.start()是异步操作(启动 WSL 子进程),放在锁外执行 _creating集合防重复——两个 thread 同时对同一个 session 创建 sandbox 时,第二个直接返回 None
“在 async 代码里用 threading.Lock 不是大忌吗?”
是的,asyncio 社区通常不推荐在协程里使用threading.Lock,因为它会阻塞 event loop。但这里有几个关键前提:
- 临界区只有字典读写——耗时在微秒级,event loop 感知不到
- 每个 thread 有自己独立的 event loop——不存在"一个 thread 持锁,同 loop 上的其他 task 被阻塞"的问题
- 真正的慢操作(subprocess、网络)在锁外——锁内绝不
await
如果你的场景是"单线程 + 单 event loop + 多个协程并发",那asyncio.Lock是正确选择。但我们的场景是"多线程 + 多 event loop + 共享对象"——这本质上是一个线程安全问题,应该用线程安全的工具。
并发测试验证
importasyncio,threading,timefrommiqi.sandbox.managerimportSandboxManager# 共享单例(模拟 BridgeState._sandbox_manager)mgr=SandboxManager(workspace=Path('/tmp/test'))defreq1():asyncdefrun():awaitmgr.initialize()sandbox=awaitmgr.activate('session_A')assertsandboxisnotNoneasyncio.run(run())defreq2():time.sleep(0.05)asyncdefrun():awaitmgr.initialize()sandbox=awaitmgr.activate('session_B')assertsandboxisnotNoneasyncio.run(run())t1=threading.Thread(target=req1)t2=threading.Thread(target=req2)t1.start();t2.start()t1.join(timeout=30);t2.join(timeout=30)# 不再挂死,两个 session 都成功创建总结
| asyncio.Lock | threading.Lock | |
|---|---|---|
| 适用场景 | 单 loop 内多协程并发 | 多线程(含多 loop)并发 |
| Loop 亲和性 | 有(绑定到创建时的 loop) | 无 |
| 阻塞行为 | 挂起当前协程,不阻塞 loop | 阻塞当前线程 |
| 跨 thread 安全 | 不安全 | 安全 |
关键教训
asyncio.run()创建新 event loop。如果你的代码路径会被多次asyncio.run()调用(每次新 loop),asyncio 原语(Lock、Queue、Event、Semaphore)都不能跨调用共享。“测试通过"不等于"正确”。串行通过不代表并发安全。如果你的代码会被多线程调用,必须写并发测试。
asyncio 和 threading 不是互斥的。在"多线程 + 每线程独立 loop"的架构下,线程间的共享状态应该用
threading.Lock,loop 内的协程协作才用asyncio.Lock。把慢操作移到锁外。无论用哪种锁,临界区越短越好。"检查 dict → 创建对象 → 启动子进程 → 写回 dict"这种长临界区是并发问题的温床。正确做法:锁内只做 dict 操作,subprocess 启动放在锁外。
