当前位置: 首页 > news >正文

Python异步编程高级模式:asyncio事件循环与并发控制

Python异步编程高级模式:asyncio事件循环与并发控制

前言

Python异步编程是现代高性能应用开发的关键技术。asyncio作为Python的内置异步框架,提供了强大的事件循环和并发控制能力。本文将深入探讨Python异步编程的高级模式,帮助你构建高效的并发应用。

1. asyncio核心概念

1.1 事件循环机制

事件循环是asyncio的核心,负责调度和执行协程:

# 事件循环基本原理importasyncioasyncdefexample_coroutine():"""示例协程"""print("协程开始")awaitasyncio.sleep(1)# 模拟IO操作print("协程结束")return"结果"# 获取事件循环loop=asyncio.get_event_loop()# 运行协程result=loop.run_until_complete(example_coroutine())print(f"结果:{result}")# 关闭循环loop.close()

1.2 协程与任务

协程是asyncio的基本执行单元,任务是对协程的封装:

importasyncioasyncdeffetch_data(url:str,delay:float)->str:"""模拟数据获取"""print(f"开始获取{url}")awaitasyncio.sleep(delay)# 模拟网络延迟print(f"完成获取{url}")returnf"来自{url}的数据"asyncdefmain():# 创建多个协程coros=[fetch_data("https://api1.example.com",2),fetch_data("https://api2.example.com",1),fetch_data("https://api3.example.com",3),]# 方法1:顺序执行(不推荐)# for coro in coros:# result = await coro# 方法2:并发执行(推荐)tasks=[asyncio.create_task(coro)forcoroincoros]results=awaitasyncio.gather(*tasks)print(f"所有结果:{results}")# 运行asyncio.run(main())

2. 高级并发模式

2.1 信号量控制并发数

importasynciofromtypingimportList,AnyclassConcurrencyLimiter:"""并发限制器"""def__init__(self,max_concurrent:int):self.semaphore=asyncio.Semaphore(max_concurrent)self.active_count=0self.max_concurrent=max_concurrentasyncdefexecute(self,coro):"""执行受限的协程"""asyncwithself.semaphore:self.active_count+=1try:result=awaitcororeturnresultfinally:self.active_count-=1asyncdefexecute_many(self,coros:List)->List[Any]:"""批量执行受限的协程"""tasks=[self.execute(coro)forcoroincoros]returnawaitasyncio.gather(*tasks)# 使用示例asyncdeflimited_fetch(url:str,limiter:ConcurrencyLimiter)->str:"""受限的数据获取"""print(f"等待获取{url}(当前活跃:{limiter.active_count})")returnawaitlimiter.execute(fetch_data(url,1))asyncdefmain():# 限制最多3个并发limiter=ConcurrencyLimiter(max_concurrent=3)urls=[f"https://api{i}.example.com"foriinrange(10)]coros=[limited_fetch(url,limiter)forurlinurls]results=awaitlimiter.execute_many(coros)print(f"获取完成:{len(results)}个结果")asyncio.run(main())

2.2 线程池与进程池集成

importasynciofromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimporttimeclassHybridExecutor:"""混合执行器:结合线程池和进程池"""def__init__(self,thread_workers:int=4,process_workers:int=2):self.thread_executor=ThreadPoolExecutor(max_workers=thread_workers)self.process_executor=ProcessPoolExecutor(max_workers=process_workers)asyncdefrun_in_thread(self,func,*args):"""在线程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.thread_executor,func,*args)asyncdefrun_in_process(self,func,*args):"""在进程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.process_executor,func,*args)defshutdown(self):"""关闭执行器"""self.thread_executor.shutdown()self.process_executor.shutdown()# CPU密集型任务defcpu_intensive_task(n:int)->int:"""CPU密集型计算"""total=0foriinrange(n):total+=i*ireturntotal# IO密集型任务defio_intensive_task(duration:float)->str:"""IO密集型任务"""time.sleep(duration)returnf"IO任务完成,耗时{duration}秒"asyncdefmain():executor=HybridExecutor(thread_workers=4,process_workers=2)try:# 混合执行不同类型的任务tasks=[]# CPU密集型任务使用进程池foriinrange(3):task=executor.run_in_process(cpu_intensive_task,1000000)tasks.append(task)# IO密集型任务使用线程池foriinrange(5):task=executor.run_in_thread(io_intensive_task,0.5)tasks.append(task)results=awaitasyncio.gather(*tasks)print(f"所有任务完成:{results}")finally:executor.shutdown()asyncio.run(main())

2.3 异步上下文管理器

importasynciofromtypingimportOptionalclassAsyncDatabaseConnection:"""异步数据库连接"""def__init__(self,connection_string:str):self.connection_string=connection_string self.connection:Optional[object]=Noneself.is_connected=Falseasyncdef__aenter__(self):"""进入异步上下文"""print(f"连接到数据库:{self.connection_string}")awaitasyncio.sleep(0.1)# 模拟连接时间self.connection=f"Connection({self.connection_string})"self.is_connected=Truereturnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):"""退出异步上下文"""ifself.connection:print("关闭数据库连接")awaitasyncio.sleep(0.05)# 模拟关闭时间self.connection=Noneself.is_connected=False# 返回False表示不抑制异常returnFalseasyncdefexecute
http://www.jsqmd.com/news/916293/

相关文章:

  • 保姆级教程:彻底清理Win11更新缓存并解除外设,一次搞定0xc1900101更新错误
  • 从零自制简易直流电机:深入理解电磁原理与动手实践
  • Redis 数据类型命令详解
  • 抖音短视频无水印下载技术解析:从网页解析到桌面应用的完整实现方案
  • ChatGPT如何解答奇葩谜题:从原理到实践的全方位解析
  • 手把手教你:在戴尔R730XD上为Windows Server 2019配置NIC组合与Hyper-V
  • CPAL脚本避坑指南:TestcaseFail和TestCaseSkipped用不对,小心你的测试结果全乱套
  • QMCDecode:QQ音乐加密格式转换方案实现指南
  • AMD Ryzen SMU调试工具实战指南:深度优化CPU性能的5个核心场景
  • 硬核盘点!2026AI论文写作工具大盘点(覆盖 99% 毕业论文需求)
  • 基于ESP32-C3与太阳能供电的物联网植物监测系统全解析
  • OpenClaw代码注释自动生成与优化:适配企业规范,告别手动写注释
  • 3步完成CPU单核稳定性测试:CoreCycler终极指南
  • COM3D2.MaidFiddler:免费实时角色编辑器终极指南 [特殊字符]
  • WechatDecrypt微信消息解密完整指南:三步解锁你的聊天记录
  • 基于TL494的300W开关电源设计:从原理到调试全解析
  • 量子计算硬件基准测试:原理、指标与实践指南
  • Unity3D坦克大战实战:手把手教你用UGUI和刚体组件实现敌人AI与血条系统
  • 商务送礼海参指南:送礼有面子又不踩雷
  • 用导电材料与微控制器打造地面互动版西蒙游戏:从电路原理到Scratch编程实践
  • KMS智能激活脚本:3分钟永久激活Windows与Office的终极指南
  • AI心智得分实战指南:如何用搜极星掌握品牌AI话语权
  • C语言数组10秒搞懂!从原理到代码,新手一看就会
  • Claude NPV分析私密白皮书首次流出:含17个行业基准折现率数据库+政策变动弹性系数表
  • 机器人舵机供电方案:多路可调电源设计与避坑指南
  • MoE 训练为什么一降路由温度就开始前期更稳却后期专家固化:从 Router Temperature 到 Entropy Floor 的工程实战
  • 南昌黄金上门回收平台推荐2026 - 黄金回收
  • 猫抓Cat-Catch技术架构解析与实战指南:浏览器资源嗅探的现代解决方案
  • 论文查重真的有那么可怕吗?用书匠策AI免费查重,三分钟搞懂全流程
  • 从技术布道到行业偶像:解析山姆·奥特曼的AI领导力与OpenAI崛起