12_异步编程
Python 异步编程(async/await)
一、同步 vs 异步
同步的问题
同步代码按顺序执行,遇到 I/O 阻塞时,整个线程干等着。
importtimedeftask_sync(name,seconds):print(f"同步任务{name}开始")time.sleep(seconds)# 模拟 I/O 阻塞print(f"同步任务{name}结束")# 顺序执行:总耗时 = 2 + 3 = 5 秒task_sync("A",2)task_sync("B",3)异步的解决
异步用await asyncio.sleep()代替time.sleep(),让出控制权给其他协程。
importasyncioasyncdeftask_async(name,seconds):print(f"异步任务{name}开始")awaitasyncio.sleep(seconds)# 非阻塞等待print(f"异步任务{name}结束")asyncdefmain():# 创建两个任务并发执行task_a=asyncio.create_task(task_async("A",2))task_b=asyncio.create_task(task_async("B",3))awaittask_aawaittask_b asyncio.run(main())# 异步总耗时 ≈ 3 秒(取最长),而非 5 秒核心区别:
| 同步 | 异步 | |
|---|---|---|
| 等待方式 | time.sleep()阻塞整个线程 | await asyncio.sleep()让出控制权 |
| 多个任务 | 顺序执行,总耗时 = 和 | 并发执行,总耗时 ≈ 最长 |
| 关键字 | 无 | async def/await |
Python 异步编程(async/await)
二、协程基础
2.1 async def — 定义协程函数
importasyncioasyncdefhello():print("hello")awaitasyncio.sleep(1)print("world")print(hello())# <coroutine object hello at ...> 不会立即执行调用协程函数返回协程对象,必须放入事件循环才能执行。
2.2 await — 等待可等待对象
asyncdefgreet():print("开始打招呼")awaithello()# 等待 hello() 执行完print("打招呼结束")importasyncio asyncio.run(greet())规则:await只能在async def函数内部使用。普通函数里不能写await。
2.3 协程的返回值
asyncdefadd(a,b):awaitasyncio.sleep(0.5)returna+basyncdefmain():result=awaitadd(3,5)print("相加结果:",result)# 8asyncio.run(main())Python 异步编程(async/await)
三、事件循环与运行方式
3.1 asyncio.run() — 推荐方式
import asyncio
pythonpython
async def main():
await asyncio.sleep(1)
print(“完成”)
asyncio.run(main()) # 创建事件循环,运行协程,关闭循环
### 3.2 手动管理事件循环(旧方式) 极少使用,但了解即可: ```python import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close() ``````python loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close()3.3 create_task — 并发执行
importasyncioasyncdefsay_after(delay,what):awaitasyncio.sleep(delay)print(what)asyncdefmain():task1=asyncio.create_task(say_after(1,"任务一"))task2=asyncio.create_task(say_after(1,"任务二"))awaittask1awaittask2# 两个任务并发执行,总耗时 ≈ 1 秒asyncio.run(main())asyncio.create_task()将协程包装成 Task 对象,立即调度执行,不等待。
Python 异步编程(async/await)
四、并发执行工具
4.1 asyncio.gather() — 并发等所有结果
importasyncio```pythonasyncdeftask(name,delay):awaitasyncio.sleep(delay)returnf"{name}的结果"asyncdefdemo():results=awaitasyncio.gather(task("A",2),task("B",1),task("C",3),)print(results)# ['A的结果', 'B的结果', 'C的结果'] 保持顺序asyncio.run(demo())适用场景:需要等所有任务完成,且关心返回值。
4.2 asyncio.wait() — 按条件等待
importasyncioasyncdeftask(name,delay):awaitasyncio.sleep(delay)print(f"任务{name}完成")asyncdefmain():tasks=[asyncio.create_task(task("X",2)),asyncio.create_task(task("Y",1)),]# 等任意一个完成就返回done,pending=awaitasyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)print(f"完成:{len(done)}个任务")# 取消未完成的fortinpending:t.cancel()asyncio.run(main())适用场景:只需要第一个完成的结果,其余可以取消。
4.3 asyncio.as_completed() — 按完成顺序获取
importasyncioasyncdeftask(name,delay):awaitasyncio.sleep(delay)returnf"任务{name}"tasks=[asyncio.create_task(task("P",3)),asyncio.create_task(task("Q",1)),asyncio.create_task(task("R",2)),]forcoroinasyncio.as_completed(tasks):result=awaitcoroprint("完成一个:",result)# 输出顺序:Q → R → P(按完成快慢)适用场景:需要按完成顺序逐个处理结果。
三种方式对比
| 方式 | 顺序 | 等多久 | 场景 |
|---|---|---|---|
gather | 保持传入顺序 | 等全部 | 需要所有结果 |
wait | 不保证 | 等到条件满足 | 只需部分结果 |
as_completed | 按完成快慢 | 逐个获取 | 流式处理 |
五、异步上下文管理器
用async with管理异步资源(如网络连接)。
5.1 自定义异步上下文管理器
importasyncioclassAsyncResource:asyncdef__aenter__(self):print("异步进入资源")awaitasyncio.sleep(1)returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):print("异步退出资源")awaitasyncio.sleep(0.5)asyncdefdo_work(self):print("工作中...")asyncdefuse_resource():asyncwithAsyncResource()asres:awaitres.do_work()asyncio.run(use_resource())和同步对比:
| 同步 | 异步 |
|---|---|
__enter__/__exit__ | __aenter__/__aexit__ |
with | async with |
5.2 http
实际应用中常用aiohttp:
importaiohttpasyncdeffetch_url(url):asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresponse:returnawaitresponse.text()asyncio.run(fetch_url("https://www.baidu.com"))Python 异步编程(async/await)
六、异步迭代器与异步生成器
6.1 异步迭代器
手写__aiter__+__anext__:
importasyncioclassAsyncCounter:def__init__(self,start,end):self.start=start self.end=enddef__aiter__(self):returnselfasyncdef__anext__(self):ifself.start>=self.end:raiseStopAsyncIterationawaitasyncio.sleep(0.5)# 模拟异步操作value=self.start self.start+=1returnvalueasyncdefuse():asyncfornuminAsyncCounter(1,5):print(f"异步迭代得到:{num}")asyncio.run(use())和同步对比:
| 同步 | 异步 |
|---|---|
__iter__/__next__ | __aiter__/__anext__ |
StopIteration | StopAsyncIteration |
for | async for |
6.2 异步生成器
用async def+yield,比手写迭代器简洁得多:
importasyncioasyncdefasync_range(start,end):foriinrange(start,end):awaitasyncio.sleep(0.3)yieldiasyncdefuse():asyncforvalinasync_range(10,15):print("异步生成器:",val)asyncio.run(use())和同步生成器对比:
| 同步生成器 | 异步生成器 |
|---|---|
def | async def |
yield | yield(不变) |
for | async for |
七、异步队列(生产者消费者)
importasyncio,randomasyncdefproducer(queue,id):foriinrange(3):item=f"生产者{id}的产品{i}"awaitqueue.put(item)print(f"生产者:{item}")awaitasyncio.sleep(random.uniform(0.5,1.5))asyncdefconsumer(queue,id):whileTrue:item=awaitqueue.get()ifitemisNone:# 结束信号queue.task_done()breakprint(f"消费者{id}消费:{item}")awaitasyncio.sleep(random.uniform(0.2,1.0))queue.task_done()asyncdefmain():queue=asyncio.Queue(maxsize=5)producers=[asyncio.create_task(producer(queue,i))foriinrange(2)]consumers=[asyncio.create_task(consumer(queue,i))foriinrange(3)]awaitasyncio.gather(*producers)# 等生产者完成for_inconsumers:awaitqueue.put(None)# 发送结束信号awaitasyncio.gather(*consumers)# 等消费者处理完asyncio.run(main())关键 API:
| 操作 | 方法 |
|---|---|
| 入队 | await queue.put(item) |
| 出队 | item = await queue.get() |
| 标记完成 | queue.task_done() |
| 最大容量 | asyncio.Queue(maxsize=N) |
和同步 queue.Queue 对比:
| 同步 | 异步 |
|---|---|
queue.put(item)阻塞 | await queue.put(item)非阻塞 |
queue.get()阻塞 | await queue.get()非阻塞 |
八、异步 TCP 客户端(底层 I/O)
不用第三方库,用标准库asyncio.open_connection发 HTTP 请求:
asyncdeftcp_client(host,port,message):reader,writer=awaitasyncio.open_connection(host,port)writer.write(message.encode())awaitwriter.drain()# 刷新缓冲区data=awaitreader.read(1024)writer.close()awaitwriter.wait_closed()returndata.decode()# 发 HTTP GET 请求request="GET /get HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n"response=asyncio.run(tcp_client("httpbin.org",80,request))学习意义:理解底层writer(发数据)/reader(收数据)机制,P1 V3 的工具调用(Function Calling)本质上也是异步 HTTP 请求。
Python 异步编程(async/await)
九、异步 vs 多线程
| async/await | 多线程(threading) | |
|---|---|---|
| 并发模型 | 协程(单线程) | 多线程(OS 调度) |
| 切换成本 | 极低(用户态) | 较高(内核态) |
| 适合场景 | I/O 密集型 | I/O 密集型 |
| GIL 限制 | 不受影响 | 受影响(CPU 密集型慢) |
| 调试难度 | 较低 | 较高(竞态条件) |
结论:Python 里做 I/O 密集型并发,优先用 async/await。
