当前位置: 首页 > news >正文

Python23_asyncio并发

Python23_asyncio并发

文章目录

  • Python23_asyncio并发
    • @[toc]
  • 第一章:核心概念
      • 1.1 什么是 asyncio?
      • 1.2 核心术语对照表
    • 第二章:基础语法与API
      • 2.1 定义和运行协程
      • 2.2 并发执行多个任务
      • 2.3 超时与取消
    • 第三章:常见问题与解答(FAQ)
      • Q1: `async` 和 `await` 到底是什么?
      • Q2: 为什么我的协程没有并发执行?
      • Q3: `asyncio.run()` 是什么?能调用多次吗?
      • Q4: 如何在协程中调用同步(阻塞)代码?
      • Q5: `asyncio.gather()` vs `asyncio.wait()` 区别?
      • Q6: 如何正确处理异常?
      • Q7: 什么是事件循环?如何获取?
      • Q8: 如何实现生产者-消费者模式?
      • Q9: `async for` 和异步迭代器是什么?
      • Q10: 如何调试 asyncio 程序?
    • 第四章:最佳实践
      • ✅ 应该做的
      • ❌ 不应该做的
    • 第五章:速查表
      • 常用 API 速查
      • 并发模式选择
    • 第六章:学习路径建议

第一章:核心概念

1.1 什么是 asyncio?

asyncio是 Python 3.4+ 引入的标准库,用于编写并发代码,使用async/await语法。

importasyncioasyncdefmain():print("Hello")awaitasyncio.sleep(1)print("World")asyncio.run(main())

关键特性:

  • 单线程并发:利用事件循环(Event Loop)实现非阻塞 I/O
  • 协程(Coroutine):使用async def定义的函数
  • 非抢占式:协程主动让出控制权(await),而非被强制切换

1.2 核心术语对照表

术语说明类比
Coroutine协程,使用async def定义的函数特殊的生成器函数
Task被事件循环调度的协程封装线程中的线程对象
Event Loop事件循环,调度执行协程心脏,驱动整个系统
Future异步操作最终结果占位符期货/期票
Awaitable可被await的对象可等待的

第二章:基础语法与API

2.1 定义和运行协程

importasyncio# 定义协程asyncdefsay_hello(name):print(f"Hello,{name}!")awaitasyncio.sleep(1)# 模拟异步操作print(f"Goodbye,{name}!")# 运行协程的三种方式# 方式1:asyncio.run() - 推荐,Python 3.7+asyncio.run(say_hello("Alice"))# 方式2:在已有事件循环中运行asyncdefmain():awaitsay_hello("Bob")asyncio.run(main())# 方式3:创建Task(并发执行)asyncdefmain():task1=asyncio.create_task(say_hello("Alice"))task2=asyncio.create_task(say_hello("Bob"))awaittask1awaittask2 asyncio.run(main())

2.2 并发执行多个任务

importasyncioasyncdeffetch_data(url,delay):print(f"开始获取{url}")awaitasyncio.sleep(delay)print(f"完成获取{url}")returnf"Data from{url}"asyncdefmain():# 方式1:asyncio.gather - 等待所有完成results=awaitasyncio.gather(fetch_data("url1",2),fetch_data("url2",1),fetch_data("url3",3))print(f"所有结果:{results}")# 方式2:asyncio.wait - 更灵活的控制tasks=[asyncio.create_task(fetch_data("url1",2)),asyncio.create_task(fetch_data("url2",1)),]done,pending=awaitasyncio.wait(tasks,timeout=3)print(f"已完成:{len(done)}, 待处理:{len(pending)}")# 方式3:asyncio.as_completed - 按完成顺序处理tasks=[fetch_data(f"url{i}",i)foriinrange(1,4)]forcoroinasyncio.as_completed(tasks):result=awaitcoroprint(f"先完成的:{result}")asyncio.run(main())

2.3 超时与取消

importasyncioasyncdeflong_running_task():try:awaitasyncio.sleep(10)return"Completed"exceptasyncio.CancelledError:print("任务被取消")raise# 必须重新抛出asyncdefmain():# 设置超时try:result=awaitasyncio.wait_for(long_running_task(),timeout=2.0)exceptasyncio.TimeoutError:print("操作超时!")# 手动取消任务task=asyncio.create_task(long_running_task())awaitasyncio.sleep(1)task.cancel()try:awaittaskexceptasyncio.CancelledError:print("成功取消任务")asyncio.run(main())

第三章:常见问题与解答(FAQ)

Q1:asyncawait到底是什么?

A:

  • async def:定义一个协程函数,调用它返回协程对象,不会立即执行
  • await:暂停当前协程,等待另一个协程/任务完成,期间事件循环可以运行其他任务
asyncdeffoo():return42# 调用协程函数得到的是协程对象,不是结果!coro=foo()# 这里不会执行 printprint(type(coro))# <class 'coroutine'># 必须 await 才能获取结果result=awaitfoo()# 42

Q2: 为什么我的协程没有并发执行?

A:常见错误是顺序await,没有创建 Task:

# ❌ 错误:顺序执行,总耗时 3 秒asyncdefbad():awaitasyncio.sleep(1)# 等待1秒awaitasyncio.sleep(1)# 再等待1秒awaitasyncio.sleep(1)# 再等待1秒# ✅ 正确:并发执行,总耗时 1 秒asyncdefgood():awaitasyncio.gather(asyncio.sleep(1),asyncio.sleep(1),asyncio.sleep(1))# ✅ 或者使用 create_taskasyncdefgood2():task1=asyncio.create_task(asyncio.sleep(1))task2=asyncio.create_task(asyncio.sleep(1))awaittask1awaittask2

Q3:asyncio.run()是什么?能调用多次吗?

A:

  • asyncio.run(coro)是 Python 3.7+ 的简便入口函数
  • 它会创建新的事件循环,运行协程,关闭循环
  • 不能嵌套调用,也不能在已有事件循环中调用
# ❌ 错误:嵌套调用asyncdefmain():asyncio.run(other_coro())# RuntimeError!# ✅ 正确:在 main 中使用 awaitasyncdefmain():awaitother_coro()asyncio.run(main())

Q4: 如何在协程中调用同步(阻塞)代码?

A:使用asyncio.to_thread()(Python 3.9+) 或loop.run_in_executor()

importasyncioimporttimedefblocking_io():"""模拟阻塞操作(如文件读写、CPU计算)"""time.sleep(2)# 阻塞!return"Done"asyncdefmain():# 方式1:Python 3.9+result=awaitasyncio.to_thread(blocking_io)# 方式2:通用方法loop=asyncio.get_running_loop()result=awaitloop.run_in_executor(None,blocking_io)print(result)asyncio.run(main())

Q5:asyncio.gather()vsasyncio.wait()区别?

特性gather()wait()
返回值直接返回所有结果列表返回(done_set, pending_set)
异常处理默认第一个异常就返回可配置return_when
使用场景简单并发,需要所有结果需要超时控制、部分结果
# gather: 简单获取所有结果results=awaitasyncio.gather(task1,task2,task3)# wait: 更灵活的控制done,pending=awaitasyncio.wait([task1,task2,task3],timeout=5.0,return_when=asyncio.FIRST_COMPLETED# 任意一个完成就返回)

Q6: 如何正确处理异常?

importasyncioasyncdefmay_fail():raiseValueError("出错了!")asyncdefmain():# 方式1:try-excepttry:awaitmay_fail()exceptValueErrorase:print(f"捕获错误:{e}")# 方式2:gather 的异常处理results=awaitasyncio.gather(may_fail(),asyncio.sleep(1),return_exceptions=True# 将异常作为结果返回,不抛出)# results = [ValueError("出错了!"), None]# 方式3:任务异常处理task=asyncio.create_task(may_fail())try:awaittaskexceptValueError:print("任务失败")print(f"异常详情:{task.exception()}")asyncio.run(main())

Q7: 什么是事件循环?如何获取?

importasyncioasyncdefmain():# 获取当前运行的事件循环loop=asyncio.get_running_loop()# 获取/设置事件循环(低级API,一般不直接使用)# loop = asyncio.get_event_loop() # 可能已废弃# asyncio.set_event_loop(loop)print(f"当前循环:{loop}")asyncio.run(main())

注意:现代 asyncio 推荐使用高层 API(asyncio.run(),asyncio.create_task()),避免直接操作事件循环。


Q8: 如何实现生产者-消费者模式?

importasyncioasyncdefproducer(queue:asyncio.Queue,n:int):foriinrange(n):awaitasyncio.sleep(0.5)# 模拟生产时间awaitqueue.put(i)print(f"生产:{i}")awaitqueue.put(None)# 发送结束信号asyncdefconsumer(queue:asyncio.Queue,name:str):whileTrue:item=awaitqueue.get()ifitemisNone:# 结束信号queue.task_done()breakawaitasyncio.sleep(1)# 模拟消费时间print(f"消费者{name}处理:{item}")queue.task_done()asyncdefmain():queue=asyncio.Queue(maxsize=5)# 限制队列大小producers=[asyncio.create_task(producer(queue,5))]consumers=[asyncio.create_task(consumer(queue,f"C{i}"))foriinrange(2)]awaitasyncio.gather(*producers)awaitqueue.join()# 等待所有项目被处理awaitasyncio.gather(*consumers)asyncio.run(main())

Q9:async for和异步迭代器是什么?

importasyncioclassAsyncRange:"""异步迭代器示例"""def__init__(self,n):self.n=n self.i=0def__aiter__(self):returnselfasyncdef__anext__(self):ifself.i>=self.n:raiseStopAsyncIterationawaitasyncio.sleep(0.1)# 模拟异步操作val=self.i self.i+=1returnvalasyncdefmain():# 使用 async forasyncforiinAsyncRange(5):print(i)# 异步推导式result=[iasyncforiinAsyncRange(5)ifi%2==0]print(result)# [0, 2, 4]asyncio.run(main())

Q10: 如何调试 asyncio 程序?

importasyncioimportlogging# 启用调试模式logging.basicConfig(level=logging.DEBUG)asyncdefmain():# 检测阻塞:设置慢回调警告阈值loop=asyncio.get_running_loop()loop.slow_callback_duration=0.1# 超过100ms警告# 或者环境变量:PYTHONASYNCIODEBUG=1awaitasyncio.sleep(1)# 运行方式:PYTHONASYNCIODEBUG=1 python script.pyasyncio.run(main(),debug=True)

调试技巧:

  1. 使用asyncio.run(debug=True)
  2. 设置环境变量PYTHONASYNCIODEBUG=1
  3. 使用aiomonitor库进行运行时监控
  4. 使用pytest-asyncio进行异步测试

第四章:最佳实践

✅ 应该做的

# 1. 始终使用 asyncio.run() 作为入口asyncdefmain():...if__name__=="__main__":asyncio.run(main())# 2. 及时创建 Task 实现并发task=asyncio.create_task(coro())# 立即调度awaittask# 等待完成# 3. 使用 timeout 防止无限等待awaitasyncio.wait_for(coro(),timeout=5.0)# 4. 使用 contextlib.asynccontextmanager 管理资源fromcontextlibimportasynccontextmanager@asynccontextmanagerasyncdefmanaged_resource():awaitconnect()try:yieldresourcefinally:awaitdisconnect()# 5. 使用 asyncio.gather 收集结果results=awaitasyncio.gather(*tasks,return_exceptions=True)

❌ 不应该做的

# 1. 不要在协程中使用 time.sleep()awaitasyncio.sleep(1)# ✅time.sleep(1)# ❌ 阻塞整个事件循环!# 2. 不要直接调用协程函数而不 awaitcoro=my_async_func()# 只是创建协程对象,不会执行!# 3. 不要忽略 CancelledErrortry:awaittaskexceptasyncio.CancelledError:raise# ✅ 必须重新抛出或正确处理# 4. 不要创建过多并发任务(内存爆炸)# 使用 asyncio.Semaphore 限制并发数semaphore=asyncio.Semaphore(10)asyncdeflimited_task():asyncwithsemaphore:awaitactual_work()

第五章:速查表

常用 API 速查

操作代码
运行协程asyncio.run(main())
创建任务asyncio.create_task(coro())
等待多个await asyncio.gather(*coros)
超时控制await asyncio.wait_for(coro, timeout)
睡眠await asyncio.sleep(delay)
当前时间asyncio.get_event_loop().time()
队列asyncio.Queue(maxsize)
asyncio.Lock()
信号量asyncio.Semaphore(n)
事件asyncio.Event()
条件变量asyncio.Condition()

并发模式选择

场景推荐方案
多个独立任务asyncio.gather()
需要超时/部分结果asyncio.wait()
按完成顺序处理asyncio.as_completed()
生产者-消费者asyncio.Queue
限制并发数asyncio.Semaphore
共享资源保护asyncio.Lock

第六章:学习路径建议

第一阶段:基础概念 └── 理解协程、事件循环、Task 的关系 └── 掌握 async/await 语法 第二阶段:并发控制 └── gather/wait/as_completed 的使用 └── 超时和取消机制 第三阶段:同步互斥 └── Queue、Lock、Semaphore、Event 第四阶段:实战整合 └── 异步 HTTP (aiohttp) └── 异步数据库 (aiomysql/aiopg) └── 异步 Web 框架 (FastAPI/Sanic)

📌提示:asyncio 学习曲线较陡,建议从简单示例开始,逐步理解"事件循环驱动协程切换"的核心机制。遇到问题时,首先检查是否有阻塞操作阻塞了事件循环。

http://www.jsqmd.com/news/651045/

相关文章:

  • CustomTkinter终极指南:快速打造现代化Python桌面应用的完整解决方案
  • Cursor Pro激活终极指南:如何免费解锁AI代码编辑器的完整功能
  • 告别黑屏!用STM32CubeIDE一步步搞定ILI9488驱动并点亮LVGUI
  • Waydroid技术解析:如何在Linux系统上实现原生级Android应用运行体验
  • 如何利用Stylus选择器插值:动态生成复杂选择器的终极指南
  • Z-Image-Turbo-rinaiqiao-huiyewunv企业落地:动漫衍生品设计团队AI灵感激发工作流
  • 如何选择一款真正适合你的离线思维导图工具?
  • 终极解决方案:Unlock Music音乐解密工具完全指南
  • 【STM32】STM32F407主从定时器联动:实现高精度相移互补PWM的工程实践
  • 如何选择专业的厂房暖通中央空调工程公司?这家企业在生物医药行业表现出色 - 品牌2026
  • 兔抗RBM9抗体亲和纯化,高特异性识别,多实验场景适配
  • 终极指南:如何使用Robo 3T轻松解决MongoDB数据验证规则冲突与集合约束合并
  • 别再只会用SPI了!MFRC522模块的UART/I2C接口切换与Windows下快速上手调试指南
  • 终极构建指南:MSBuild、Wix与NuGet在usbipd-win项目的完美融合
  • STM32 FSMC时序配置实战:从手册解读到SRAM驱动
  • CLIP-GmP-ViT-L-14快速部署:Docker镜像构建与NVIDIA GPU加速配置
  • 2026年两轮电动车换电加盟深度横评:伏特兽成本模式破局指南 - 精选优质企业推荐榜
  • 暗黑2存档编辑器终极指南:d2s-editor从零到精通完整教程
  • 完全弹性碰撞公式的物理意义与工程应用解析
  • Java开发者收藏:AI大模型转型指南,工程思维助你升级技能树!
  • 恒压供水全套图纸程序 西门子s7-200smart西门子触摸屏。 1、一对一变频(一台变频带一...
  • Mac Mouse Fix终极指南:让你的第三方鼠标在macOS上比触控板更好用
  • 3步彻底解决Cursor设备限制:机器ID重置技术深度解析
  • Excel 模拟运算表:从基础到实战的假设分析指南
  • 终极AASM状态机教程:如何快速构建智能Ruby状态管理系统
  • Tsuru平台API限流策略:保护服务稳定性的完整指南
  • VT2710板卡PSI5配置避坑指南:电流、时隙、电压参数怎么设才不翻车?
  • 三个 AI Agent 工具的额度监控,三种完全不同的数据源
  • IFC 转 SOLIDWORKS 实战指南:从建筑模型到机械设计的无缝衔接
  • 收藏!2026年AI人才争夺战白皮书:大模型成春招焦点,高薪岗位抢先看!