当前位置: 首页 > news >正文

Python26_并发协程

Python26_并发协程

文章目录

  • Python26_并发协程
    • @[toc]
    • 第一章:基础概念
      • 1.1 什么是协程(Coroutine)?
      • 1.2 协程、线程、进程的区别
      • 1.3 核心关键字
    • 第二章:asyncio 核心机制
      • 2.1 事件循环(Event Loop)
      • 2.2 创建和运行任务
      • 2.3 并发执行多个任务
    • 第三章:异步 I/O 与常用操作
      • 3.1 模拟 I/O 操作
      • 3.2 异步上下文管理器
      • 3.3 异步迭代器
    • 第四章:异常处理与取消
      • 4.1 协程中的异常
      • 4.2 任务取消(Cancellation)
    • 第五章:高级主题
      • 5.1 在协程中运行同步代码
      • 5.2 有界信号量(Semaphore)控制并发数
      • 5.3 超时控制
    • 第六章:常见错误与调试
      • 6.1 常见错误清单
      • 6.2 调试技巧
    • 第七章:实战模式
      • 7.1 生产者-消费者模式(使用 Queue)
      • 7.2 Web 爬虫并发模板
    • 附录:速查表

第一章:基础概念

1.1 什么是协程(Coroutine)?

Q:协程是什么?与普通函数有什么区别?

A:协程是一种可以暂停执行并在稍后恢复的函数。与普通函数不同,协程使用async def定义,调用时不会立即执行,而是返回一个协程对象。

importasyncio# 普通函数defnormal_func():return"Hello"# 协程函数asyncdefcoro_func():return"Hello Async"# 区别演示print(normal_func())# 立即执行,输出: Helloprint(coro_func())# 不执行!输出: <coroutine object ...>print(asyncio.run(coro_func()))# 正确执行,输出: Hello Async

1.2 协程、线程、进程的区别

特性进程 (Process)线程 (Thread)协程 (Coroutine)
资源占用高(独立内存空间)中(共享进程内存)低(单线程内)
切换开销高(需要切换页表)中(需要切换栈)低(用户态切换)
数据共享需要 IPC共享内存(需锁)单线程安全
适用场景CPU 密集型I/O 密集型高并发 I/O
Python实现multiprocessingthreadingasyncio

Q:为什么协程适合高并发 I/O?

A:协程在单线程内通过事件循环调度,遇到 I/O 操作时主动让出控制权,不阻塞其他协程。相比线程,协程切换无需内核介入,内存占用极小(约 1KB),可轻松支持数万并发连接。


1.3 核心关键字

关键字作用
async def定义协程函数
await暂停协程,等待另一个协程/可等待对象完成
async for异步迭代
async with异步上下文管理器

第二章:asyncio 核心机制

2.1 事件循环(Event Loop)

Q:什么是事件循环?

A:事件循环是asyncio的核心,负责调度和执行协程。它维护一个任务队列,不断检查哪些协程可以继续执行。

importasyncioasyncdefsay_hello():print("Hello")awaitasyncio.sleep(1)# 模拟 I/O,挂起协程print("World")# 方式1:使用 asyncio.run()(推荐,Python 3.7+)asyncio.run(say_hello())# 方式2:手动管理事件循环(旧方式)loop=asyncio.get_event_loop()loop.run_until_complete(say_hello())loop.close()

2.2 创建和运行任务

Q:asyncio.create_task()await直接调用有什么区别?

A:

  • await coro():顺序执行,等待完成才继续
  • create_task():并发执行,立即创建后台任务
importasyncioimporttimeasyncdeftask(name,delay):print(f"任务{name}开始")awaitasyncio.sleep(delay)print(f"任务{name}完成")returnf"结果{name}"asyncdefmain():start=time.time()# 方式1:顺序执行(总耗时 3秒)# result1 = await task("A", 1)# result2 = await task("B", 2)# 方式2:并发执行(总耗时 2秒)task1=asyncio.create_task(task("A",1))task2=asyncio.create_task(task("B",2))result1=awaittask1 result2=awaittask2print(f"耗时:{time.time()-start:.2f}秒")print(f"结果:{result1},{result2}")asyncio.run(main())

2.3 并发执行多个任务

Q:如何并发执行多个协程并等待全部完成?

A:使用asyncio.gather()asyncio.wait()

importasyncioasyncdeffetch_data(url):awaitasyncio.sleep(1)# 模拟网络请求returnf"数据来自{url}"asyncdefmain():urls=["url1","url2","url3"]# 方式1:gather - 等待全部完成,返回结果列表results=awaitasyncio.gather(*[fetch_data(url)forurlinurls],return_exceptions=True# 捕获异常而不中断)print(results)# 方式2:wait - 更灵活的控制tasks=[asyncio.create_task(fetch_data(url))forurlinurls]done,pending=awaitasyncio.wait(tasks,return_when=asyncio.ALL_COMPLETED)fortaskindone:print(task.result())asyncio.run(main())

gathervswait对比:

特性gatherwait
返回值结果列表(保持顺序)完成/未完成的任务集
异常处理return_exceptions=True需手动检查
使用场景简单并发需要精细控制(如超时、部分完成)

第三章:异步 I/O 与常用操作

3.1 模拟 I/O 操作

Q:为什么time.sleep()不能在协程中使用?

A:time.sleep()阻塞整个线程,导致事件循环无法调度其他协程。必须使用asyncio.sleep()

importasyncioimporttimeasyncdefbad_example():print("开始")time.sleep(2)# ❌ 错误:阻塞整个事件循环!print("结束")asyncdefgood_example():print("开始")awaitasyncio.sleep(2)# ✅ 正确:挂起当前协程,让出控制权print("结束")

3.2 异步上下文管理器

Q:如何实现异步资源管理(如数据库连接)?

A:使用async with__aenter__/__aexit__

importasyncioclassAsyncDatabase:asyncdefconnect(self):awaitasyncio.sleep(0.1)print("数据库已连接")returnselfasyncdefquery(self,sql):awaitasyncio.sleep(0.5)returnf"查询结果:{sql}"asyncdefclose(self):awaitasyncio.sleep(0.1)print("数据库已关闭")asyncdef__aenter__(self):awaitself.connect()returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):awaitself.close()asyncdefmain():asyncwithAsyncDatabase()asdb:result=awaitdb.query("SELECT * FROM users")print(result)asyncio.run(main())

3.3 异步迭代器

Q:如何处理异步数据流?

A:使用async for__aiter__/__anext__

importasyncioimportrandomclassAsyncDataStream:def__init__(self,count):self.count=count self.current=0def__aiter__(self):returnselfasyncdef__anext__(self):ifself.current>=self.count:raiseStopAsyncIterationawaitasyncio.sleep(0.1)# 模拟异步获取数据self.current+=1returnf"数据块-{self.current}"asyncdefmain():stream=AsyncDataStream(3)asyncfordatainstream:print(f"接收到:{data}")asyncio.run(main())

第四章:异常处理与取消

4.1 协程中的异常

Q:协程中的异常如何捕获?

A:使用常规try/except,或在gather中设置return_exceptions=True

importasyncioasyncdefrisky_task():awaitasyncio.sleep(1)raiseValueError("出错了!")asyncdefmain():# 方式1:直接捕获try:awaitrisky_task()exceptValueErrorase:print(f"捕获异常:{e}")# 方式2:gather 中处理results=awaitasyncio.gather(risky_task(),risky_task(),return_exceptions=True# 异常会作为结果返回,不抛出)forrinresults:ifisinstance(r,Exception):print(f"任务异常:{r}")else:print(f"任务结果:{r}")asyncio.run(main())

4.2 任务取消(Cancellation)

Q:如何取消正在运行的协程?

A:使用task.cancel(),协程内部需捕获CancelledError进行清理

importasyncioasyncdeflong_running_task():try:whileTrue:print("工作中...")awaitasyncio.sleep(1)exceptasyncio.CancelledError:print("收到取消信号,正在清理...")# 执行清理操作(关闭连接、保存状态等)raise# 必须重新抛出,让取消传播asyncdefmain():task=asyncio.create_task(long_running_task())awaitasyncio.sleep(3)task.cancel()# 发送取消请求try:awaittaskexceptasyncio.CancelledError:print("任务已取消")asyncio.run(main())

第五章:高级主题

5.1 在协程中运行同步代码

Q:如何在协程中调用阻塞的同步函数?

A:使用loop.run_in_executor()将同步代码放到线程池中执行

importasyncioimporttimeimportrequests# 同步 HTTP 库defsync_http_request(url):"""同步阻塞函数"""time.sleep(2)# 模拟耗时操作returnf"同步请求结果:{url}"asyncdefmain():loop=asyncio.get_event_loop()# 在线程池中执行同步函数,不阻塞事件循环result=awaitloop.run_in_executor(None,# 默认线程池sync_http_request,"https://example.com")print(result)# 批量处理多个同步任务urls=["url1","url2","url3"]tasks=[loop.run_in_executor(None,sync_http_request,url)forurlinurls]results=awaitasyncio.gather(*tasks)print(results)asyncio.run(main())

5.2 有界信号量(Semaphore)控制并发数

Q:如何限制同时运行的协程数量?

A:使用asyncio.Semaphore

importasyncioasyncdeflimited_task(semaphore,task_id):asyncwithsemaphore:# 获取信号量,超过限制会等待print(f"任务{task_id}开始")awaitasyncio.sleep(1)print(f"任务{task_id}完成")asyncdefmain():# 最多同时运行3个协程semaphore=asyncio.Semaphore(3)tasks=[limited_task(semaphore,i)foriinrange(10)]awaitasyncio.gather(*tasks)asyncio.run(main())

5.3 超时控制

Q:如何为协程设置超时?

A:使用asyncio.wait_for()asyncio.timeout()(Python 3.11+)

importasyncioasyncdefslow_task():awaitasyncio.sleep(10)return"完成"asyncdefmain():# 方式1:wait_for(所有版本)try:result=awaitasyncio.wait_for(slow_task(),timeout=2.0)exceptasyncio.TimeoutError:print("任务超时!")# 方式2:timeout 上下文管理器(Python 3.11+)try:asyncwithasyncio.timeout(2.0):result=awaitslow_task()exceptTimeoutError:print("任务超时!")asyncio.run(main())

第六章:常见错误与调试

6.1 常见错误清单

错误原因解决方案
RuntimeError: no running event loop直接调用协程函数使用asyncio.run()或获取事件循环
RuntimeError: cannot be called from a running event loop嵌套调用asyncio.run()使用awaitcreate_task
SyntaxError: 'await' outside async function在非 async 函数中使用 await将函数改为async def
RuntimeWarning: coroutine was never awaited创建了协程但未 await确保所有协程都被 await 或创建为任务
程序卡住无响应混用阻塞 I/O 和协程使用asyncio专用库或run_in_executor

6.2 调试技巧

importasyncioimportlogging# 开启调试模式logging.basicConfig(level=logging.DEBUG)asyncio.run(main(),debug=True)# 查看当前所有任务asyncdefdebug_tasks():tasks=asyncio.all_tasks()fortaskintasks:print(f"任务:{task.get_name()}, 状态:{'完成'iftask.done()else'运行中'}")

第七章:实战模式

7.1 生产者-消费者模式(使用 Queue)

importasyncioasyncdefproducer(queue,n):foriinrange(n):awaitasyncio.sleep(0.5)awaitqueue.put(f"产品-{i}")print(f"生产: 产品-{i}")awaitqueue.put(None)# 结束信号asyncdefconsumer(queue,name):whileTrue:item=awaitqueue.get()ifitemisNone:breakawaitasyncio.sleep(1)# 模拟处理print(f"消费者{name}处理:{item}")asyncdefmain():queue=asyncio.Queue(maxsize=5)# 有界队列防止内存爆炸producers=[asyncio.create_task(producer(queue,5))]consumers=[asyncio.create_task(consumer(queue,i))foriinrange(2)]awaitasyncio.gather(*producers)awaitqueue.join()# 等待队列处理完成forcinconsumers:c.cancel()asyncio.run(main())

7.2 Web 爬虫并发模板

importasyncioimportaiohttp# 需要: pip install aiohttpasyncdeffetch(session,url,semaphore):asyncwithsemaphore:try:asyncwithsession.get(url,timeout=10)asresponse:returnawaitresponse.text()exceptExceptionase:returnf"Error:{e}"asyncdefmain():urls=["https://example.com"]*100semaphore=asyncio.Semaphore(10)# 限制并发10asyncwithaiohttp.ClientSession()assession:tasks=[fetch(session,url,semaphore)forurlinurls]results=awaitasyncio.gather(*tasks,return_exceptions=True)print(f"成功获取{len(results)}个页面")asyncio.run(main())

附录:速查表

# 1. 定义协程asyncdefmy_coro():awaitasyncio.sleep(1)return"done"# 2. 运行协程asyncio.run(my_coro())# 入口点,只调用一次# 3. 并发执行tasks=[asyncio.create_task(my_coro())for_inrange(10)]results=awaitasyncio.gather(*tasks)# 4. 等待第一个完成done,pending=awaitasyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)# 5. 超时控制result=awaitasyncio.wait_for(my_coro(),timeout=5.0)# 6. 限制并发sem=asyncio.Semaphore(5)asyncwithsem:awaitmy_coro()# 7. 后台任务task=asyncio.create_task(my_coro())# ... 其他代码 ...result=awaittask# 获取结果# 8. 取消任务task.cancel()try:awaittaskexceptasyncio.CancelledError:pass

http://www.jsqmd.com/news/650494/

相关文章:

  • 热议抖音外卖官方品牌服务商哪家好,口碑企业大盘点 - myqiye
  • 基于西门子200smart PLC与昆仑通态触摸屏的真空泵智能运行控制程序
  • 企业级大模型API聚合平台选型Checklist:从PoC到生产的架构考量
  • 如何看懂AIGC检测报告:各指标含义和达标判断方法解读 - 还在做实验的师兄
  • 5分钟快速上手:Windows平台最强C/C++编译器MinGW-w64完全指南
  • LLMWiki研究
  • 从零构建模拟电子系统:核心器件与电路设计实战指南
  • 2026有实力的抖音外卖官方品牌服务伙伴说说怎么收费 - 工业品网
  • Android Studio中文语言包:3分钟打造专属中文开发环境
  • Kali更新报错127.0.0.2?手把手教你清理磁盘空间+永久修复resolv.conf配置
  • 【GitHub开源项目专栏】Google ADK深度解析:多智能体开发的工程化实践
  • 告别库版本困惑:手把手教你区分并获取STM32 MotorControl Workbench 5.4.3的FULL与非FULL版本
  • 终极指南:如何用MetaShark插件完美解决Jellyfin中文影视元数据刮削难题
  • 有实力的抖音外卖官方合作权威品牌服务商探讨,选哪家 - 工业品牌热点
  • 2026年智能客服黑马推荐,值得关注系统与厂商深度测评 - 品牌2026
  • C语言中的变量
  • 【Java】2026 Java学习路线:语言根基(三)★ 核心
  • 如何在foobar2000中实现智能歌词同步?OpenLyrics插件深度解析
  • WSL2 网络困境突围:为 Antigravity 插件构建透明代理隧道
  • 本体驱动:AI操作系统的范式革命 - 资讯焦点
  • 别再为打印不全发愁了!手把手教你用print-js搞定Vue项目中的超长table打印
  • Unity 2D智能寻路终极指南:5分钟掌握NavMeshPlus核心技巧
  • 移动端响应优化
  • 实战指南:在Qt项目中集成Crashpad实现跨平台崩溃自动上报与分析
  • 2026年新疆新能源汽车漆面防护与轻改升级一站式方案深度横评 - 精选优质企业推荐榜
  • 如何用Java自动化工具告别i茅台手动抢购烦恼:完整指南
  • BetterGI视觉导航系统深度解析:从像素坐标到游戏世界的算法实现
  • Nano-Banana基础教程:如何将AI生成的爆炸图导入SolidWorks作参考
  • 基于N2N实现Windows跨地域局域网联机:从公网服务器搭建到游戏联机实战
  • 【笔面试算法学习专栏】合并K个升序链表:堆与分治的完美结合