从 `asyncio.gather` 到 `TaskGroup`:Python 结构化并发、取消传播与异常聚合实战指南
从asyncio.gather到TaskGroup:Python 结构化并发、取消传播与异常聚合实战指南
Python 的魅力,常常来自一种朴素的优雅:它让初学者能很快写出可读代码,也让资深工程师能在 Web 后端、自动化、数据处理、AI 服务与高并发 I/O 场景里构建复杂系统。官方文档把asyncio定义为基于async/await编写并发代码的标准库,尤其适合 I/O 密集型和高层网络代码;它也是很多异步 Web、数据库连接、分布式任务框架的基础。(Python documentation)
但异步编程真正困难的地方,不是“如何同时启动多个任务”,而是:一个任务失败时,其他任务怎么办?取消请求如何传播?多个异常如何被完整保留?
从 Python 3.11 开始,asyncio.TaskGroup把这些问题推向了更清晰的答案。它代表的是一种更现代的并发思想:结构化并发。官方文档说明,TaskGroup是一个异步上下文管理器,可以在其中创建任务,并在退出上下文时等待所有任务完成;它在 Python 3.11 中加入。(Python documentation)
一、先理解问题:gather为什么不总是够用?
很多人第一次写异步并发,会这样做:
importasyncioasyncdeffetch_user():awaitasyncio.sleep(0.2)return{"id":1,"name":"Alice"}asyncdeffetch_orders():awaitasyncio.sleep(0.3)return["order-1","order-2"]asyncdefmain():user,orders=awaitasyncio.gather(fetch_user(),fetch_orders(),)print(user,orders)asyncio.run(main())这段代码简单、直观,适合很多“并发获取多个结果”的场景。
但问题出现在异常上。默认情况下,如果gather(..., return_exceptions=False)中某个 awaitable 抛出异常,这个异常会立刻传播给等待gather的调用方;而其他 awaitable 不会因此自动取消,会继续运行。官方文档也明确指出,TaskGroup相比gather提供了更强的安全保证:当某个任务或子任务抛出异常时,TaskGroup会取消剩余任务。(Python documentation)
这就是核心差异:
gather:更像“把几个任务放在一起等结果” TaskGroup:更像“这组任务属于同一个生命周期”在生产环境中,生命周期比结果更重要。比如你并发调用三个外部服务:库存、价格、优惠。如果价格服务失败,库存请求还在慢慢跑,优惠请求还在重试,整个请求链路就可能变得混乱、浪费资源,甚至产生副作用。
二、TaskGroup入门:并发任务的“作用域”
一个最小可运行示例:
importasyncioasyncdeffetch_profile(user_id:int):awaitasyncio.sleep(0.2)return{"user_id":user_id,"level":"VIP"}asyncdeffetch_points(user_id:int):awaitasyncio.sleep(0.1)return1280asyncdefmain():asyncwithasyncio.TaskGroup()astg:profile_task=tg.create_task(fetch_profile(1001))points_task=tg.create_task(fetch_points(1001))# 离开 async with 后,两个任务都已经结束print(profile_task.result())print(points_task.result())asyncio.run(main())TaskGroup的关键不是create_task,而是async with。它告诉读者和解释器:
这些任务属于同一个并发作用域。离开这个作用域之前,所有任务必须结束。
这和文件操作里的with open(...)很像。你不需要在每个分支里手动关闭文件,因为上下文管理器会兜底;你也不需要手动收尾每个异步任务,因为TaskGroup会在退出时等待它们。
三、取消传播:失败不是孤立事件
现在看一个更真实的例子:一个任务失败,其他任务会怎样?
importasyncioasyncdefslow_worker(name:str):try:print(f"{name}: start")awaitasyncio.sleep(5)print(f"{name}: done")finally:print(f"{name}: cleanup")asyncdefbroken_worker():print("broken_worker: start")awaitasyncio.sleep(0.5)raiseRuntimeError("remote service failed")asyncdefmain():try:asyncwithasyncio.TaskGroup()astg:tg.create_task(slow_worker("worker-A"))tg.create_task(slow_worker("worker-B"))tg.create_task(broken_worker())except*RuntimeErrorasgroup:print(f"caught RuntimeError group:{group}")asyncio.run(main())你会看到类似输出:
worker-A: start worker-B: start broken_worker: start worker-B: cleanup worker-A: cleanup caught RuntimeError group: unhandled errors in a TaskGroup ...当broken_worker抛出RuntimeError后,TaskGroup会取消组内尚未完成的任务。被取消的任务会在下一个await点收到asyncio.CancelledError。官方文档强调,协程应该用try/finally做清理;如果显式捕获CancelledError,通常应在清理后继续传播,否则结构化并发组件可能行为异常。(Python documentation)
这就是取消传播的本质:
子任务失败 ↓ TaskGroup 进入关闭流程 ↓ 取消其他未完成子任务 ↓ 等待它们清理完成 ↓ 把非取消异常聚合后抛出可以用一个流程图理解:
四、千万别吞掉CancelledError
初学者常犯这个错误:
asyncdefbad_worker():try:awaitasyncio.sleep(10)exceptException:print("something wrong")这段代码其实不会捕获CancelledError,因为asyncio.CancelledError直接继承自BaseException,不是普通Exception的子类。官方文档对此有明确说明。(Python documentation)
但更危险的是下面这种写法:
asyncdefworse_worker():try:awaitasyncio.sleep(10)exceptBaseException:print("swallowed everything")这会吞掉取消信号,让TaskGroup以为任务还可以继续,破坏结构化并发的语义。
正确写法是:
asyncdefgood_worker():try:awaitasyncio.sleep(10)exceptasyncio.CancelledError:print("received cancellation, cleaning...")raisefinally:print("release db connection or file handle")记住一句话:
取消不是普通异常,它是协程生命周期管理的一部分。
五、异常聚合:为什么需要ExceptionGroup?
并发世界里,多个任务可能几乎同时失败。过去的问题是:解释器一次通常只能向上传播一个异常,其他异常容易丢失或被隐藏。PEP 654 引入了ExceptionGroup和except*,用于同时传播和分类处理多个无关异常。(Python Enhancement Proposals (PEPs))
看一个例子:
importasyncioasyncdefcall_payment():awaitasyncio.sleep(0.1)raiseTimeoutError("payment timeout")asyncdefcall_inventory():awaitasyncio.sleep(0.1)raiseValueError("invalid inventory response")asyncdefcall_coupon():awaitasyncio.sleep(0.2)return{"coupon":"OK"}asyncdefmain():try:asyncwithasyncio.TaskGroup()astg:tg.create_task(call_payment())tg.create_task(call_inventory())tg.create_task(call_coupon())except*TimeoutErrorasgroup:print("handle timeout errors:")forexcingroup.exceptions:print(" -",exc)except*ValueErrorasgroup:print("handle value errors:")forexcingroup.exceptions:print(" -",exc)asyncio.run(main())这里的except*不是普通except。它会从异常组中“按类型拆分”异常:TimeoutError交给第一个分支处理,ValueError交给第二个分支处理。
这非常适合生产系统:
TimeoutError:可以重试 ValueError:数据格式错误,通常不可重试 PermissionError:权限配置问题,需要告警也就是说,异常聚合不是为了让报错更复杂,而是为了让并发失败更完整、更可诊断。
六、实战案例:并发查询商品页所需数据
假设我们要渲染一个商品详情页,需要同时查询:
- 商品基础信息;
- 库存;
- 促销信息;
- 推荐商品。
需求是:商品基础信息失败则整个页面失败;推荐商品失败可以降级为空;库存或促销失败要中断请求。
importasynciofromdataclassesimportdataclass@dataclassclassProductPage:product:dictstock:dictpromotion:dictrecommendations:listasyncdeffetch_product(product_id:int):awaitasyncio.sleep(0.1)return{"id":product_id,"name":"Python 实战课"}asyncdeffetch_stock(product_id:int):awaitasyncio.sleep(0.2)return{"available":True,"count":36}asyncdeffetch_promotion(product_id:int):awaitasyncio.sleep(0.15)return{"discount":"8.8折"}asyncdeffetch_recommendations(product_id:int):try:awaitasyncio.sleep(0.3)raiseTimeoutError("recommendation service timeout")exceptTimeoutError:# 可降级服务:内部消化异常return[]asyncdefbuild_product_page(product_id:int)->ProductPage:asyncwithasyncio.TaskGroup()astg:product_task=tg.create_task(fetch_product(product_id),name="fetch_product")stock_task=tg.create_task(fetch_stock(product_id),name="fetch_stock")promotion_task=tg.create_task(fetch_promotion(product_id),name="fetch_promotion")rec_task=tg.create_task(fetch_recommendations(product_id),name="fetch_recommendations")returnProductPage(product=product_task.result(),stock=stock_task.result(),promotion=promotion_task.result(),recommendations=rec_task.result(),)asyncdefmain():page=awaitbuild_product_page(42)print(page)asyncio.run(main())这个案例里的设计很重要:
可降级异常在子任务内部处理;不可降级异常交给TaskGroup统一取消和聚合。
这比在外层写一堆混乱的try/except更清晰,也更符合业务语义。
七、加上超时控制:别让慢任务拖垮系统
真实服务里,异步并发必须配合超时。可以这样写:
importasyncioasyncdefcall_remote_service():awaitasyncio.sleep(3)return"OK"asyncdefmain():try:asyncwithasyncio.timeout(1):asyncwithasyncio.TaskGroup()astg:task=tg.create_task(call_remote_service())print(task.result())exceptTimeoutError:print("request timeout, all tasks inside were cancelled")asyncio.run(main())asyncio.timeout()和TaskGroup一起使用时,能形成清晰的边界:
1 秒内完成:返回结果 超过 1 秒:取消作用域内所有任务这正是结构化并发的美感:任务不会“逃逸”到你看不见的地方继续运行。
八、TaskGroup与传统面向对象设计
如果把TaskGroup放进一个服务类中,可以形成更清晰的工程结构:
importasyncioclassProductPageService:asyncdefbuild(self,product_id:int)->dict:asyncwithasyncio.TaskGroup()astg:product=tg.create_task(self.fetch_product(product_id))stock=tg.create_task(self.fetch_stock(product_id))promotion=tg.create_task(self.fetch_promotion(product_id))return{"product":product.result(),"stock":stock.result(),"promotion":promotion.result(),}asyncdeffetch_product(self,product_id:int):awaitasyncio.sleep(0.1)return{"id":product_id}asyncdeffetch_stock(self,product_id:int):awaitasyncio.sleep(0.1)return{"count":10}asyncdeffetch_promotion(self,product_id:int):awaitasyncio.sleep(0.1)return{"tag":"hot"}简单 UML 示意:
这种写法让异步并发不是散落在业务代码里的技巧,而是服务对象内部清晰可维护的实现细节。
九、最佳实践清单
写TaskGroup代码时,我建议遵守这些规则:
| 实践 | 原因 |
|---|---|
用async with TaskGroup()表达任务生命周期 | 避免任务泄漏 |
不要吞掉CancelledError | 取消是结构化并发的基础 |
| 可降级异常在子任务内部处理 | 避免无意义取消整个任务组 |
不可降级异常交给TaskGroup | 自动取消兄弟任务 |
使用except*分类处理异常 | 保留多个并发异常 |
| 给任务命名 | 方便日志和排查 |
| 配合超时 | 防止慢服务拖垮请求 |
用try/finally清理资源 | 数据库连接、锁、文件句柄必须释放 |
一个更贴近生产的日志写法:
asyncdefworker(name:str):try:print({"event":"task_start","task":name})awaitasyncio.sleep(1)print({"event":"task_success","task":name})exceptasyncio.CancelledError:print({"event":"task_cancelled","task":name})raisefinally:print({"event":"task_cleanup","task":name})当系统出问题时,好的日志会告诉你:谁开始了、谁失败了、谁被取消了、谁完成了清理。
十、常见误区
第一个误区:把TaskGroup当成更漂亮的gather。
它们不只是语法不同,而是错误处理模型不同。TaskGroup更强调“同生共死”的任务组语义。
第二个误区:所有异常都在外层统一处理。
并不是。像推荐服务失败、埋点服务失败、非核心缓存失败,可能应该在子任务内部降级。否则一个非核心功能会拖垮整个请求。
第三个误区:取消后不清理资源。
异步任务可能持有数据库连接、文件、锁或网络连接。取消发生时,如果没有finally,系统就可能慢慢积累隐患。
第四个误区:忽略异常聚合。
并发失败往往不是单点失败。ExceptionGroup的价值,正在于它能让你看到多个错误的全貌。
十一、未来视角:为什么结构化并发越来越重要?
今天的 Python,早已不只是脚本语言。它在 FastAPI 后端服务、AI Agent、自动化平台、数据流水线、物联网网关中承担越来越多的并发任务。随着系统复杂度提升,我们需要的不只是“启动任务”,而是“管理任务的生命周期”。
TaskGroup、取消传播和异常聚合,正是 Python 向更可靠工程实践迈进的重要一步。
它让我们写出的异步代码更像一支有纪律的团队:
有人失败,队友不会失联;有人退出,资源会被清理;多个问题同时发生,也不会只留下一个模糊的错误。
总结
本文围绕 Python 异步编程中的三个关键能力展开:
TaskGroup让并发任务拥有清晰作用域;
取消传播保证任务失败时兄弟任务及时停止;
异常聚合让多个并发错误被完整保留并分类处理。
对于初学者,掌握它们能让你避开异步编程中最危险的坑。对于资深开发者,它们是构建高可靠 Python 服务的重要基础。
最后留两个问题给你:
你现在的异步代码中,是否存在“任务创建后没人管”的情况?
当多个并发任务同时失败时,你的系统能否完整记录并分类处理这些异常?
附录:推荐资料
官方资料建议优先阅读 Pythonasyncio文档、TaskGroup文档,以及 PEP 654 关于ExceptionGroup与except*的设计说明。asyncio文档适合建立整体模型,TaskGroup文档适合理解取消语义,PEP 654 则能帮助你理解为什么 Python 需要异常聚合。(Python documentation)
关键词建议自然布局:Python编程、Python教程、Python实战、Python异步编程、TaskGroup、取消传播、异常聚合、Python最佳实践。
