Python异步性能调优实战
Python异步性能调优实战:连接池、背压与结构化并发
asyncio不只是把函数加个async关键字。真正的异步优化在于连接池管理、背压控制和结构化并发——这三板斧用好了,性能能翻3-10倍。
前言
很多Python开发者对asyncio的理解停留在"把def换成async def,把调用加个await"。结果写出来的代码虽然"异步"了,但性能甚至不如同步版本。
我曾经接手过一个项目,前任开发者用asyncio写了一个爬虫,号称"高并发"。结果一测速,每秒只能爬20个页面——跟同步版本没什么区别。问题出在哪?连接池没配好、没有背压控制、用法全是反模式。
这篇文章会从实际性能问题出发,深入讲解asyncio的三个核心优化方向:连接池、背压、结构化并发。每个点都有真实代码和性能对比。
一、asyncio基础回顾与常见误区
1.1 异步不是万能药
先说结论:asyncio适合IO密集型任务,不适合CPU密集型任务。
# ❌ 错误:CPU密集任务用asyncioasyncdefcpu_heavy_task():result=0foriinrange(10_000_000):result+=i# 这是CPU计算,不会让出控制权!returnresult# 这样写只会阻塞事件循环,其他协程全部等待asyncdefmain():# 两个任务看起来是"并发"的,实际上是串行的r1=awaitcpu_heavy_task()r2=awaitcpu_heavy_task()# ✅ 正确:CPU密集任务用ProcessPoolExecutorimportasynciofromconcurrent.futuresimportProcessPoolExecutordefcpu_heavy_sync(n:int)->int:result=0foriinrange(n):result+=ireturnresultasyncdefmain():loop=asyncio.get_event_loop()withProcessPoolExecutor()aspool:# 真正的并行计算r1=loop.run_in_executor(pool,cpu_heavy_sync,10_000_000)r2=loop.run_in_executor(pool,cpu_heavy_sync,10_000_000)results=awaitasyncio.gather(r1,r2)1.2 常见反模式
反模式1:在异步函数中调用同步阻塞代码
importtimeimportrequests# 同步HTTP库!# ❌ 错误:用了async但内部是同步阻塞的asyncdeffetch_url(url:str)->str:response=requests.get(url)# 这会阻塞整个事件循环!returnresponse.text# 后果:所有其他协程都被阻塞,变成串行执行asyncdefmain():urls=["https://api.example.com/1","https://api.example.com/2"]results=awaitasyncio.gather(*[fetch_url(u)foruinurls])# 实际执行时间 = sum(每个请求的时间),不是 max# ✅ 正确:使用异步HTTP库importaiohttpasyncdeffetch_url_async(url:str)->str:asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresponse:returnawaitresponse.text()# 执行时间 = max(每个请求的时间),真正的并发反模式2:每次请求创建新连接
# ❌ 错误:每次请求都创建新的sessionasyncdeffetch_all(urls:list[str])->list[str]:results=[]forurlinurls:asyncwithaiohttp.ClientSession()assession:# 每次都创建新连接!asyncwithsession.get(url)asresp:results.append(awaitresp.text())returnresults# 问题:创建TCP连接有开销(DNS解析、TCP握手、TLS握手)# 100个URL = 100次完整连接建立# ✅ 正确:复用sessionasyncdeffetch_all(urls:list[str])->list[str]:asyncwithaiohttp.ClientSession()assession:# 一个session复用所有请求tasks=[fetch_one(session,url)forurlinurls]returnawaitasyncio.gather(*tasks)asyncdeffetch_one(session:aiohttp.ClientSession,url:str)->str:asyncwithsession.get(url)asresp:returnawaitresp.text()反模式3:无限制的并发
# ❌ 错误:一次性创建10000个协程asyncdeffetch_10000_urls(urls:list[str]):asyncwithaiohttp.ClientSession()assession:tasks=[fetch_one(session,url)forurlinurls]returnawaitasyncio.gather(*tasks)# 10000个并发请求!# 问题:# 1. 内存爆炸(每个协程占用内存)# 2. 文件描述符耗尽# 3. 目标服务器被压垮# 4. 操作系统连接数限制# ✅ 正确:使用信号量控制并发数asyncdeffetch_with_limit(urls:list[str],max_concurrent:int=50):semaphore=asyncio.Semaphore(max_concurrent)asyncwithaiohttp.ClientSession()assession:asyncdeflimited_fetch(url):asyncwithsemaphore:returnawaitfetch_one(session,url)tasks=[limited_fetch(url)forurlinurls]returnawaitasyncio.gather(*tasks)二、结构化并发:TaskGroup vs asyncio.gather
2.1 asyncio.gather 的问题
asyncio.gather是最常用的并发工具,但它有一些隐含问题:
# gather的问题:一个任务异常,其他任务的行为不确定asyncdefmain():asyncdefok():awaitasyncio.sleep(1)return"ok"asyncdeffail():awaitasyncio.sleep(0.5)raiseValueError("boom")# 默认行为:一个失败,其他被取消results=awaitasyncio.gather(ok(),fail())# 但cancel不是立即的,可能有竞态条件# 更严重的问题:任务泄漏asyncdefleaky():task=asyncio.create_task(some_long_running_coro())# 如果这里抛异常,task永远不会被等待或取消# 它会在后台默默运行,消耗资源2.2 TaskGroup:Python 3.11+ 的结构化并发
importasyncio# ✅ TaskGroup:更安全的并发管理asyncdefstructured_concurrent():asyncwithasyncio.TaskGroup()astg:task1=tg.create_task(fetch_data("url1"))task2