当前位置: 首页 > news >正文

Python异步IO:asyncio深度解析

Python异步IO:asyncio深度解析

引言

异步编程是现代后端开发的核心技能,它能够显著提升应用程序的性能和并发处理能力。作为一名从Python转向Rust的后端开发者,我在实践中总结了异步编程的最佳实践。本文将深入探讨Python的asyncio库,帮助你掌握异步编程的核心技术。

一、异步编程基础概念

1.1 同步与异步

特性同步异步
执行方式阻塞等待非阻塞继续
资源利用
编程复杂度简单较高
适用场景CPU密集型IO密集型

1.2 协程概念

协程是一种轻量级的并发编程方式,允许在单个线程中实现多个任务的并发执行。

1.3 asyncio核心组件

  • Event Loop:事件循环,负责调度协程
  • Coroutine:协程,可暂停的函数
  • Task:任务,封装协程的对象
  • Future:未来对象,表示异步操作的结果

二、asyncio入门

2.1 基本协程定义

import asyncio async def hello(): print("Hello") await asyncio.sleep(1) print("World") async def main(): await hello() asyncio.run(main())

2.2 协程调度

async def task1(): for i in range(3): print(f"Task 1: {i}") await asyncio.sleep(0.5) async def task2(): for i in range(3): print(f"Task 2: {i}") await asyncio.sleep(0.7) async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main())

2.3 创建Task

async def coro(): await asyncio.sleep(1) return 42 async def main(): task = asyncio.create_task(coro()) result = await task print(f"Result: {result}") asyncio.run(main())

三、并发模式

3.1 并行执行多个任务

async def fetch_data(url): print(f"Fetching {url}") await asyncio.sleep(1) return f"Data from {url}" async def main(): urls = ["https://example.com", "https://google.com", "https://github.com"] tasks = [asyncio.create_task(fetch_data(url)) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())

3.2 带超时的任务

async def long_running_task(): await asyncio.sleep(5) return "Done" async def main(): try: result = await asyncio.wait_for(long_running_task(), timeout=2) print(result) except asyncio.TimeoutError: print("Task timed out") asyncio.run(main())

3.3 任务取消

async def task(): try: while True: print("Running...") await asyncio.sleep(0.5) except asyncio.CancelledError: print("Task cancelled") raise async def main(): t = asyncio.create_task(task()) await asyncio.sleep(2) t.cancel() try: await t except asyncio.CancelledError: print("Main caught cancelled error") asyncio.run(main())

四、异步IO操作

4.1 文件操作

async def read_file(filepath): async with asyncio.timeout(10): with open(filepath, 'r') as f: return f.read() async def write_file(filepath, content): async with asyncio.timeout(10): with open(filepath, 'w') as f: f.write(content)

4.2 网络请求

import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'https://example.com') print(html[:100]) asyncio.run(main())

4.3 数据库操作

import asyncpg async def query_database(): conn = await asyncpg.connect(user='user', password='pass', database='db', host='localhost') values = await conn.fetch('SELECT * FROM users WHERE id = $1', 1) await conn.close() return values asyncio.run(query_database())

五、同步代码与异步代码互操作

5.1 同步转异步

import time def blocking_task(): time.sleep(2) return "Blocking result" async def async_wrapper(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, blocking_task) return result async def main(): result = await async_wrapper() print(result) asyncio.run(main())

5.2 异步转同步

async def async_task(): await asyncio.sleep(1) return "Async result" def sync_wrapper(): return asyncio.run(async_task()) result = sync_wrapper() print(result)

5.3 使用线程池

async def cpu_bound_task(data): loop = asyncio.get_event_loop() def process_data(data): # CPU密集型操作 return sum(data) result = await loop.run_in_executor(None, process_data, data) return result

六、异步上下文管理器

6.1 自定义异步上下文管理器

class AsyncResource: def __init__(self, name): self.name = name async def __aenter__(self): print(f"Entering {self.name}") await asyncio.sleep(0.5) return self async def __aexit__(self, exc_type, exc_val, exc_tb): print(f"Exiting {self.name}") await asyncio.sleep(0.5) async def main(): async with AsyncResource("database") as resource: print(f"Using {resource.name}") asyncio.run(main())

6.2 异步迭代器

class AsyncDataStream: def __init__(self, max_items): self.max_items = max_items self.current = 0 def __aiter__(self): return self async def __anext__(self): if self.current >= self.max_items: raise StopAsyncIteration await asyncio.sleep(0.3) self.current += 1 return self.current async def main(): async for item in AsyncDataStream(5): print(f"Item: {item}") asyncio.run(main())

七、异步编程模式

7.1 Producer-Consumer模式

async def producer(queue): for i in range(5): await asyncio.sleep(0.5) await queue.put(i) print(f"Produced: {i}") async def consumer(queue): while True: item = await queue.get() print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue(maxsize=2) producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) await producer_task await queue.join() consumer_task.cancel() await consumer_task asyncio.run(main())

7.2 异步锁

async def update_counter(lock, counter): async with lock: current = counter['value'] await asyncio.sleep(0.1) counter['value'] = current + 1 async def main(): lock = asyncio.Lock() counter = {'value': 0} tasks = [update_counter(lock, counter) for _ in range(10)] await asyncio.gather(*tasks) print(f"Final counter: {counter['value']}") asyncio.run(main())

7.3 信号量控制并发

async def fetch_url(semaphore, url): async with semaphore: print(f"Fetching {url}") await asyncio.sleep(1) return f"Done: {url}" async def main(): semaphore = asyncio.Semaphore(3) urls = [f"https://example.com/{i}" for i in range(10)] tasks = [fetch_url(semaphore, url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())

八、实战案例:异步Web爬虫

import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch_page(session, url): try: async with session.get(url, timeout=10) as response: if response.status == 200: return await response.text() return None except Exception as e: print(f"Error fetching {url}: {e}") return None async def parse_page(html): if not html: return [] soup = BeautifulSoup(html, 'html.parser') links = [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'] if href.startswith('http'): links.append(href) return links async def crawl(url, max_depth=3, visited=None, semaphore=None): if visited is None: visited = set() if semaphore is None: semaphore = asyncio.Semaphore(5) if url in visited or max_depth <= 0: return [] visited.add(url) print(f"Crawling: {url} (depth: {max_depth})") async with semaphore: html = await fetch_page(session, url) links = await parse_page(html) tasks = [] for link in links[:10]: if link not in visited: tasks.append(crawl(link, max_depth - 1, visited, semaphore)) results = await asyncio.gather(*tasks) all_links = [url] for result in results: all_links.extend(result) return all_links async def main(): global session async with aiohttp.ClientSession() as session: result = await crawl("https://example.com", max_depth=2) print(f"Total links crawled: {len(result)}") if __name__ == "__main__": asyncio.run(main())

总结

异步编程是构建高性能后端系统的关键技术。通过本文的学习,你应该掌握了以下核心要点:

  1. 异步基础:同步与异步的区别、协程概念
  2. asyncio核心:Event Loop、Coroutine、Task、Future
  3. 并发模式:并行执行、超时控制、任务取消
  4. 异步IO:文件操作、网络请求、数据库操作
  5. 同步异步互操作:同步转异步、异步转同步、线程池
  6. 异步上下文管理器:异步with、异步迭代器
  7. 异步模式:Producer-Consumer、异步锁、信号量
  8. 实战案例:异步Web爬虫

作为从Python转向Rust的后端开发者,掌握异步编程对于构建高并发系统至关重要。后续文章将深入探讨Rust中的异步运行时Tokio。

http://www.jsqmd.com/news/906335/

相关文章:

  • 成都收账公司实测评测:成都正规收账公司有哪些/成都调查公司/成都调查公司电话/成都靠谱寻人寻车寻物公司/靠谱的调查公司/选择指南 - 优质品牌商家
  • 别再被MOS管炸了!手把手教你设计栅极驱动电路(附TVS管和电阻选型)
  • Dotween动画控制避坑指南:从播放、暂停到倒放,这些细节新手容易忽略
  • 2026年成都锦城学院深度解析:民办高校招生竞争加剧下的品牌突围与质量保障 - 品牌推荐
  • 045、视频慢动作生成卡顿?RIFE/DAIN 插帧模型选型与 GPU 推理加速方案
  • 从‘像素级’到‘结构感知’:手把手教你用NumPy实现SSIM算法,彻底搞懂它为什么比MSE/PSNR更合理
  • 成本控制必修课:如何在代码中精确计算并限制 LLM 的 Token 消耗?
  • Rust分布式追踪:构建可观测的微服务系统
  • 2026年锦城学院深度解析:民办高校选校场景信息不对称与择校迷茫 - 品牌推荐
  • 2026年锦城学院深度解析:民办高校选择中信息不对称与信任焦虑 - 品牌推荐
  • 别再只用TeamViewer了!用WOL+远程桌面,打造你的24小时待命个人云电脑
  • 啤酒厂建设工程技术要点与主流厂家选型参考:现代化啤酒厂建设、精酿啤酒投资、精酿啤酒设备、自酿啤酒设备、鲜啤酿酒设备选择指南 - 优质品牌商家
  • LaserGRBL:5个步骤掌握免费激光雕刻控制软件的终极指南
  • 别再只看Accuracy了!Gemini报告证实:每降低1%推理延迟=年均减碳2.8吨(附实测换算表)
  • 零基础3步打造专业AI翻唱:AICoverGen完全指南
  • ShaderGraph从入门到放弃?新手最容易踩的5个坑及避坑指南(基于Unity 2021.3)
  • 2026年锦城学院深度解析:民办高校招生竞争中的差异化定位与生源质量瓶颈 - 品牌推荐
  • 从裸机到RTOS:你的Cortex-M3代码在FreeRTOS下到底经历了什么?
  • 2026年工业清洗筐品牌推荐:如何选择适配的清洗解决方案供应商 - 2026年企业资讯
  • 无代码组态,快速搭建:云平台云组态降低物联网应用门槛
  • DeepSeek云服务部署全链路解析:从零搭建高可用AI推理平台的7个关键决策点
  • 开源爬虫工具 Crawl4AI 实战:为你的测试知识库抓取干净的网页数据
  • 2026年成都锦城学院深度解析:民办高校择校场景信息不对称与就业质量焦虑 - 品牌推荐
  • 别只盯着local-lvm!PVE存储空间规划与local目录扩容实战(含SSD分区策略)
  • Redis--基础知识点--32--redis底层存储结构
  • 2026年专利向量数据库服务品牌综合实力排行:专利向量数据库服务/专利质押融资估值数据/企业专利数据库购买/全球商标数据集商用/选择指南 - 优质品牌商家
  • 破局2026:长沙白酒茶叶营销策划团队如何定义新消费时代的品牌增长 - 2026年企业资讯
  • 2026年西南欧松板厂家选型全维度技术判定指南:兴宏盛板材/四川板材厂家/实木颗粒板厂家/家居板材/家居环保板材/选择指南 - 优质品牌商家
  • CVPR 2019 GWCNet实战:用PyTorch复现组相关立体匹配网络(附KITTI数据集训练技巧)
  • LinkSwift:九大网盘直链下载助手终极指南,免费解锁高速下载新体验