当前位置: 首页 > news >正文

Python异步并发实战:用asyncio突破I/O瓶颈

1. 项目概述:为什么“扩缩容”不是加机器就能解决的事

你有没有遇到过这样的场景:一个用 Python 写的订单通知服务,白天每秒处理 200 条消息,稳如老狗;可一到大促零点,MQ 涌入 5000 QPS,服务直接卡死,日志里全是Task was destroyed but it is pending!,监控曲线像被砍了一刀——CPU 没飙高,内存没爆,线程数卡在 8 个不动,但请求排队堆到 3 秒超时。这时候运维同事喊你“赶紧扩容”,你苦笑:加了 4 台新实例,负载反而更不均,其中一台 CPU 突然冲到 98%,其他三台闲得打呼噜。这不是机器不够,是代码没“长出并发的腿”。

这就是我们今天要聊的Scaling Python Applications with Asyncio and Concurrency—— 它不是讲怎么配 Kubernetes HPA 或买更多云服务器,而是直击 Python 应用层的“吞吐瓶颈根因”:当 I/O 成为系统主旋律(数据库查询、HTTP 调用、文件读写、消息收发),同步阻塞模型天然就是单线程的“串行流水线”,哪怕你开了 16 个 Gunicorn worker,每个 worker 仍只能一次干一件事,大量时间耗在等网络响应上,资源利用率常年低于 30%。而asyncio+ 并发原语(asyncio.Semaphoreasyncio.Queueconcurrent.futures.ThreadPoolExecutor)组合,是让单个 Python 进程真正“并行吃掉 I/O 等待时间”的唯一成熟路径。它不替换你的 Django/Flask/FastAPI,也不要求你重写业务逻辑,只在关键路径上做轻量级改造,就能把单机吞吐从 300 QPS 推到 3000+ QPS,且内存开销下降 60%。适合所有正在用 Python 做 Web API、数据管道、微服务网关、爬虫调度或实时消息中转的工程师——无论你是刚写完第一个requests.get()的新人,还是管理着 20+ Python 服务的 Tech Lead。

2. 核心设计思路:为什么选 asyncio 而不是多线程/多进程?

2.1 先破一个迷思:Python 的 GIL 真的锁死了并发吗?

很多开发者一听到“Python 并发”,第一反应是:“GIL 存在,多线程没用,必须上多进程”。这话对计算密集型任务(比如图像处理、数值计算)完全成立,但对绝大多数 Web 和数据服务来说,它是致命误导。我拿真实压测数据说话:一个典型用户中心服务,核心逻辑是“查 Redis 缓存 → 查 MySQL 主库 → 调第三方风控 API → 写 Kafka”。我们用三种方式实现同一接口:

并发模型启动方式单机 QPS(wrk -t8 -c200)内存占用(RSS)CPU 利用率关键瓶颈
同步 + Gunicorn(4 worker)gunicorn -w 4 app:app217386 MB42%网络等待堆积,worker 频繁阻塞
多线程(8 threads)ThreadPoolExecutor(max_workers=8)302492 MB58%线程切换开销大,GIL 在 I/O 释放后争抢激烈
Asyncio(8 workers + uvloop)uvicorn --workers 8 --loop uvloop app:app2840211 MB76%无阻塞等待,事件循环高效复用

看到没?asyncio 方案 QPS 是同步方案的 13 倍,内存反而少了 45%。原因很简单:GIL 只在执行 Python 字节码时生效,一旦发起系统调用(如socket.recv()),GIL 就会自动释放。asyncio 正是利用这一点——它不靠“开更多线程抢 GIL”,而是用单线程事件循环 + 回调/协程机制,在 I/O 等待期间立刻切走,去处理其他已就绪的任务。这就像一个经验丰富的餐厅经理:同步模式是“一个服务员盯一桌,客人点菜后他站着等厨房出餐,啥也不干”;多线程是“雇 8 个服务员,每人盯一桌,但厨房只有一个灶台,大家挤在门口抢锅铲”;而 asyncio 是“1 个服务员+1 张记事本,点完菜记下桌号,立刻去服务下一桌,厨房一喊‘3 号桌好了’,他马上跑过去上菜”。GIL 不是枷锁,是调度器的“交接班确认章”。

2.2 为什么不是纯多进程?成本与复杂度的硬约束

多进程(multiprocessing)确实能绕过 GIL,对 CPU 密集型任务效果拔群。但把它用在 I/O 主导的服务上,代价极高:

  • 内存爆炸:每个进程都加载完整 Python 解释器、所有依赖包、应用代码和全局变量。一个 FastAPI 服务常驻内存 120MB,开 8 个进程就是 960MB,还没算连接池、缓存等共享资源的重复拷贝。
  • IPC 开销重:进程间通信(如通过multiprocessing.Queue传数据)涉及序列化、内存拷贝、内核态切换,比协程间await queue.get()慢 2~3 个数量级。
  • 状态难共享:Redis 连接池、数据库连接、本地缓存(lru_cache)无法跨进程复用,要么重复建连(拖慢启动),要么得额外引入 Redis/Memcached 做分布式缓存,架构陡然变重。

我曾接手一个日均 500 万调用量的短信网关,前任用concurrent.futures.ProcessPoolExecutor处理模板渲染(纯 CPU 计算),结果发现 70% 的 CPU 时间花在进程间传递 2KB 的 JSON 数据上。改成asyncio.to_thread()+jinja2.AsyncEnvironment后,QPS 提升 4 倍,内存下降 65%。结论很现实:多进程是“重武器”,asyncio 是“手术刀”——前者适合劈开 CPU 山,后者专治 I/O 瘫痪症

2.3 asyncio 不是银弹:它和 threading/concurrent.futures 的分工铁律

asyncio 的强大有明确边界。我见过太多团队踩坑:把所有函数都标成async def,连json.loads()await,结果性能不升反降。根本原则就一条:asyncio 只负责“非阻塞 I/O”,不负责“CPU 计算”。具体分工如下:

  • 必须用 asyncio 的场景

    • HTTP 客户端调用(httpx.AsyncClient,aiohttp.ClientSession
    • 数据库异步驱动(asyncpg,aiomysql,tortoise-orm
    • 文件异步读写(aiofiles
    • 消息队列(aiokafka,aio-pika
    • WebSocket 通信(websockets
  • 必须用concurrent.futures.ThreadPoolExecutor的场景

    • 同步阻塞 I/O(如requests.get()psycopg2.connect(),当你无法替换为异步驱动时)
    • CPU 密集型操作(PIL.Image.open(),numpy.linalg.svd(),xml.etree.ElementTree.parse()
    • 调用 C 扩展库(如cv2.imread(),多数未提供 async 接口)
  • 绝对禁止的混用

    • async def函数里直接调用time.sleep(1)(会阻塞整个事件循环!用await asyncio.sleep(1)
    • 在协程里用threading.Lock(应改用asyncio.Lock
    • asyncio.Queue当作线程安全队列在多线程里用(它只在单事件循环内安全)

这个分工不是教条,是 Python 运行时的物理定律。asyncio 的事件循环本质是个单线程调度器,它只能“感知”到自己注册的异步 I/O 事件。一旦你塞进一个同步阻塞调用,整个循环就卡住,所有协程一起陪葬。所以我的实操口诀是:“I/O 异步化,CPU 线程化,阻塞隔离化”——用asyncio.to_thread()把同步调用扔进线程池,再await其返回,既不阻塞循环,又复用了现有代码。

3. 核心细节解析:从协程到生产级并发控制的 7 个关键环节

3.1 协程基础:async/await不是语法糖,是状态机编译指令

很多人以为async def只是给函数加个“异步标签”,其实它是 Python 解释器的编译指令,会把函数体编译成状态机对象(coroutine。看这段代码:

import asyncio async def fetch_user(user_id: int) -> dict: print("Step 1: start") await asyncio.sleep(0.1) # 模拟网络延迟 print("Step 2: after sleep") return {"id": user_id, "name": "Alice"} # 这行代码不执行函数体,只创建协程对象 coro = fetch_user(123) print(type(coro)) # <class 'coroutine'> # 真正执行需要事件循环驱动 result = asyncio.run(coro) print(result) # {'id': 123, 'name': 'Alice'}

关键点在于:coro对象本身不运行,它只是个“待执行的计划书”。asyncio.run()启动事件循环,把计划书交给调度器,调度器在await处暂停协程(保存当前栈帧),去干别的事,等sleep时间到,再恢复该协程继续执行。这和生成器(yield)原理一致,但 asyncio 做了两件关键升级:

  1. 自动事件注册await asyncio.sleep(0.1)会向事件循环注册一个“0.1 秒后唤醒我”的定时器事件;
  2. 跨协程调度:多个协程的await可以交织执行,形成真正的并发流。

提示:永远不要用coro.send(None)手动驱动协程!这是 asyncio 内部机制,暴露给用户的是await和事件循环 API。手动调用会破坏调度状态,导致不可预测错误。

3.2 并发执行:asyncio.gather()vsasyncio.create_task()的生死抉择

并发启动多个协程,新手常混淆这两个 API。它们的区别不是“功能差异”,而是任务生命周期管理权归属不同

  • asyncio.gather(*coros)批量提交,统一等待。它把所有协程包装成Task,但不返回 Task 对象,只返回结果列表。适合“所有任务必须全部完成,且顺序不重要”的场景,如并行获取 10 个用户的头像 URL。
# ✅ 正确:gather 返回结果列表,按输入顺序排列 urls = [f"https://api.example.com/user/{i}/avatar" for i in range(10)] results = await asyncio.gather( *[httpx.AsyncClient().get(url) for url in urls] ) # results[0] 对应 urls[0] 的响应,即使它最后完成
  • asyncio.create_task(coro)立即调度,返回 Task 对象。Task 是协程的“托管进程”,你可以随时cancel()、检查done()exception(),甚至add_done_callback()。适合需要动态控制、错误隔离或长期运行的场景,如后台心跳任务、流式数据消费。
# ✅ 正确:create_task 返回可管理的 Task task = asyncio.create_task(fetch_user(123)) # ... 做其他事 if not task.done(): task.cancel() # 主动取消 try: result = await task # 等待结果 except asyncio.CancelledError: print("任务被取消")

注意:create_task()必须在事件循环运行时调用(即在async def函数内或asyncio.run()中)。在普通函数里调用会报RuntimeError: no running event loop。这是新手最高频报错之一。

3.3 流控与限速:asyncio.Semaphore是你的并发水龙头

无限制并发是生产事故的温床。想象一下:1000 个请求同时触发await db.execute("SELECT * FROM users"),数据库连接池瞬间被打满,后续请求全在排队,最终超时雪崩。asyncio.Semaphore就是给并发加阀门的工具:

# 全局信号量,限制最多 10 个并发 DB 查询 db_semaphore = asyncio.Semaphore(10) async def query_db(query: str) -> list: async with db_semaphore: # 获取许可,超时则等待 # 这里执行实际查询,最多 10 个协程能同时进入 return await asyncpg.fetch(query)

Semaphore的精妙在于它的“公平性”:它内部维护一个 FIFO 队列,先acquire()的协程先获得许可,不会出现“饿死”现象。但要注意两个陷阱:

  • 不要在async with外调用acquire()/release():手动管理极易忘记release(),导致信号量永久锁定。永远用async with上下文管理器。
  • 信号量值不是越大越好:设为 100 不代表性能翻倍。需结合数据库连接池大小、网络带宽、目标 P99 延迟综合测算。我的经验公式是:semaphore_value = min(连接池大小 * 0.8, 目标并发数)。例如连接池 20,目标 P99 < 200ms,则设Semaphore(16)

3.4 异步队列:asyncio.Queue实现生产者-消费者解耦

当你的服务需要“接收请求 → 异步处理 → 返回结果”时,asyncio.Queue是天然桥梁。它比线程安全的queue.Queue更轻量,且原生支持await get()/await put()

# 全局队列,缓冲待处理任务 task_queue = asyncio.Queue(maxsize=1000) # 生产者:HTTP 请求入口 @app.post("/process") async def enqueue_task(request: Request): data = await request.json() await task_queue.put(data) # 非阻塞,满则抛异常 return {"status": "queued"} # 消费者:后台工作协程 async def worker(): while True: try: task = await task_queue.get() # 阻塞等待,无任务时挂起 await process_task(task) # 执行实际业务 except Exception as e: log_error(e) finally: task_queue.task_done() # 标记任务完成 # 启动 3 个消费者协程 async def main(): workers = [asyncio.create_task(worker()) for _ in range(3)] await asyncio.gather(*workers)

Queuemaxsize是防雪崩的关键。当队列满,put()await直到有空位,这自然形成了背压(backpressure)——上游请求会被阻塞,从而保护下游处理能力。这比“疯狂丢弃请求”更优雅,也比“无限扩容队列”更可控。

3.5 CPU 密集型任务:asyncio.to_thread()的正确打开方式

Python 3.9+ 引入的asyncio.to_thread()是处理同步阻塞的终极方案。它内部使用ThreadPoolExecutor,但封装得极其干净:

import hashlib import asyncio # ❌ 错误:在协程里直接调用 CPU 密集函数 # async def hash_data(data: bytes): # return hashlib.sha256(data).hexdigest() # 阻塞整个事件循环! # ✅ 正确:用 to_thread 将 CPU 工作卸载到线程池 async def hash_data(data: bytes) -> str: return await asyncio.to_thread( hashlib.sha256, data # 函数名 + 参数,自动解包 )

to_thread()的优势在于:

  • 零配置:无需手动创建ThreadPoolExecutor,它复用 asyncio 内置的默认线程池(asyncio.get_event_loop().run_in_executor());
  • 异常透传:线程中抛出的异常会原样await返回到协程中;
  • 参数灵活:支持任意位置参数、关键字参数,甚至lambda

但要注意:线程池有默认大小(通常为min(32, (os.cpu_count() or 1) + 4)。如果你的 CPU 任务极重(如视频转码),可能需要自定义线程池:

# 创建专用线程池,避免抢占默认池资源 cpu_executor = ThreadPoolExecutor(max_workers=4) async def heavy_cpu_task(): return await asyncio.get_event_loop().run_in_executor( cpu_executor, lambda: expensive_computation() )

3.6 错误处理:asyncio.TimeoutError是你的第一道防线

异步编程最大的心智负担是“超时管理”。同步代码里requests.get(url, timeout=5)很自然,但异步中必须显式包裹:

# ❌ 危险:没有超时,第三方 API 挂了会导致整个服务卡死 # resp = await httpx.AsyncClient().get("https://slow-api.com") # ✅ 正确:用 asyncio.wait_for 设置硬超时 try: resp = await asyncio.wait_for( httpx.AsyncClient().get("https://slow-api.com"), timeout=3.0 # 秒 ) except asyncio.TimeoutError: log_warning("第三方 API 超时,启用降级逻辑") resp = get_cached_fallback() except httpx.HTTPStatusError as e: log_error(f"HTTP 错误: {e}")

wait_for()timeout是“总耗时上限”,包括 DNS 解析、TCP 连接、TLS 握手、发送请求、等待响应全部阶段。它比客户端自身的timeout(如httpxTimeout对象)更底层、更可靠。我的线上服务强制要求:所有外部 I/O 调用必须包裹wait_for(),且超时值 ≤ 服务 SLA 的 1/2。例如 P99 延迟要求 200ms,则外部调用超时设为 100ms。

3.7 生命周期管理:asyncio.shield()保护关键清理逻辑

协程可能被cancel()中断,但有些操作绝不能半途而废,比如关闭数据库连接、释放文件句柄、上报监控指标。asyncio.shield()就是给这些“善后工作”加护盾:

async def cleanup_resources(): await db.close() # 关闭 DB 连接 await redis.close() # 关闭 Redis 连接 await report_metrics() # 上报最终指标 async def main(): # 启动主业务逻辑 main_task = asyncio.create_task(run_business_logic()) try: await main_task except asyncio.CancelledError: # 主任务被取消,但仍要执行清理 await asyncio.shield(cleanup_resources()) # 确保 cleanup 一定执行 raise # 重新抛出取消异常

shield()的原理是:它创建一个“被屏蔽的”协程代理,即使外部协程被取消,这个代理仍会继续运行。但注意:shield()只保护协程本身不被取消,不保护其内部的await调用。如果cleanup_resources()await db.close()自身超时,你仍需在db.close()内部处理超时。

4. 实操过程:从 Flask 同步服务到 FastAPI 异步服务的完整迁移

4.1 场景还原:一个真实的电商库存扣减服务

我们以一个简化的库存服务为例,原始代码是 Flask 同步实现:

# app_sync.py from flask import Flask, request, jsonify import redis import psycopg2 app = Flask(__name__) redis_client = redis.Redis(host="localhost", port=6379, db=0) pg_conn = psycopg2.connect("dbname=shop user=postgres") @app.route("/deduct", methods=["POST"]) def deduct_stock(): data = request.get_json() sku_id = data["sku_id"] quantity = data["quantity"] # 1. 检查 Redis 缓存 cache_key = f"stock:{sku_id}" cached = redis_client.get(cache_key) if cached and int(cached) < quantity: return jsonify({"error": "insufficient stock"}), 400 # 2. 扣减数据库 with pg_conn.cursor() as cur: cur.execute( "UPDATE inventory SET stock = stock - %s WHERE sku_id = %s AND stock >= %s", (quantity, sku_id, quantity) ) if cur.rowcount == 0: return jsonify({"error": "insufficient stock"}), 400 # 3. 更新缓存 redis_client.setex(cache_key, 300, str(int(cached or 0) - quantity)) return jsonify({"success": True})

这个服务在压测中 QPS 卡在 180,P99 延迟 1200ms。问题根源很明显:每个请求都同步阻塞在 Redis 和 PostgreSQL 上,连接复用率低,I/O 等待白白浪费 CPU。

4.2 第一步:基础设施异步化——替换同步驱动

先不动业务逻辑,只升级底层依赖。Redis 改用aioredis,PostgreSQL 改用asyncpg

pip install aioredis asyncpg fastapi uvicorn
# app_async.py import asyncio import aioredis import asyncpg from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel app = FastAPI() # 全局异步连接池 redis_pool = None pg_pool = None @app.on_event("startup") async def startup(): global redis_pool, pg_pool # 创建 Redis 连接池 redis_pool = await aioredis.from_url( "redis://localhost:6379/0", max_connections=20 ) # 创建 PostgreSQL 连接池 pg_pool = await asyncpg.create_pool( "postgresql://postgres@localhost:5432/shop", min_size=10, max_size=20 ) @app.on_event("shutdown") async def shutdown(): if redis_pool: await redis_pool.close() if pg_pool: await pg_pool.close()

这里的关键是连接池(Pool)而非单连接(Connection)aioredis.from_url()asyncpg.create_pool()创建的是可复用的连接池,避免了每次请求都新建连接的开销。min_sizemax_size需根据并发量调整,我的经验值是:min_size = 期望最小并发数max_size = min(50, 期望峰值并发数 * 1.2)

4.3 第二步:业务逻辑协程化——逐层async/await

将原同步函数改造成协程,注意所有 I/O 调用都要await

class DeductRequest(BaseModel): sku_id: str quantity: int @app.post("/deduct") async def deduct_stock(request: DeductRequest): sku_id = request.sku_id quantity = request.quantity # 1. 检查 Redis 缓存(await) cache_key = f"stock:{sku_id}" cached = await redis_pool.get(cache_key) if cached and int(cached) < quantity: raise HTTPException(status_code=400, detail="insufficient stock") # 2. 扣减数据库(await) async with pg_pool.acquire() as conn: # 使用事务确保原子性 async with conn.transaction(): row = await conn.fetchrow( "SELECT stock FROM inventory WHERE sku_id = $1 FOR UPDATE", sku_id ) if not row or row["stock"] < quantity: raise HTTPException(status_code=400, detail="insufficient stock") await conn.execute( "UPDATE inventory SET stock = stock - $1 WHERE sku_id = $2", quantity, sku_id ) # 3. 更新缓存(await) new_stock = (int(cached) if cached else 0) - quantity await redis_pool.setex(cache_key, 300, str(new_stock)) return {"success": True}

变化点解析:

  • redis_pool.get()await redis_pool.get()
  • pg_pool.acquire()async with pg_pool.acquire()(自动归还连接)
  • conn.fetchrow()await conn.fetchrow()
  • conn.execute()await conn.execute()

所有数据库操作都包裹在async with conn.transaction():中,确保扣减的原子性。FOR UPDATE是关键,它会在读取时加行锁,防止并发扣减超卖。

4.4 第三步:引入并发控制——添加信号量与超时

在高并发下,数据库连接池和 Redis 连接池仍是瓶颈。我们加入Semaphore限流,并为所有外部调用加wait_for

# 全局信号量 db_semaphore = asyncio.Semaphore(15) # 匹配 pg_pool.max_size=20 的 75% redis_semaphore = asyncio.Semaphore(20) # 匹配 redis_pool.max_connections=20 @app.post("/deduct") async def deduct_stock(request: DeductRequest): sku_id = request.sku_id quantity = request.quantity # 1. Redis 检查(带超时和信号量) cache_key = f"stock:{sku_id}" try: async with redis_semaphore: cached = await asyncio.wait_for( redis_pool.get(cache_key), timeout=0.5 ) except asyncio.TimeoutError: raise HTTPException(status_code=503, detail="redis timeout") if cached and int(cached) < quantity: raise HTTPException(status_code=400, detail="insufficient stock") # 2. 数据库扣减(带超时和信号量) try: async with db_semaphore: async with pg_pool.acquire() as conn: async with conn.transaction(): try: row = await asyncio.wait_for( conn.fetchrow( "SELECT stock FROM inventory WHERE sku_id = $1 FOR UPDATE", sku_id ), timeout=1.0 ) except asyncio.TimeoutError: raise HTTPException(status_code=503, detail="db select timeout") if not row or row["stock"] < quantity: raise HTTPException(status_code=400, detail="insufficient stock") try: await asyncio.wait_for( conn.execute( "UPDATE inventory SET stock = stock - $1 WHERE sku_id = $2", quantity, sku_id ), timeout=1.0 ) except asyncio.TimeoutError: raise HTTPException(status_code=503, detail="db update timeout") except asyncio.TimeoutError: raise HTTPException(status_code=503, detail="db timeout") # 3. 更新缓存 new_stock = (int(cached) if cached else 0) - quantity await redis_pool.setex(cache_key, 300, str(new_stock)) return {"success": True}

现在服务有了完整的熔断和限流能力。redis_semaphoredb_semaphore分别控制对两个资源的并发访问,wait_for确保单次操作不拖垮整体。

4.5 第四步:部署与压测——验证效果

使用uvicorn启动(它原生支持 asyncio):

# 启动命令,指定 workers 数和事件循环 uvicorn app_async:app --workers 4 --loop uvloop --host 0.0.0.0:8000

--workers 4启动 4 个进程,每个进程一个事件循环;--loop uvloop替换默认asyncio事件循环为更快的uvloop(基于 libuv,性能提升 2~3 倍)。

压测对比(wrk -t8 -c200 -d30s http://localhost:8000/deduct):

指标同步 Flask异步 FastAPI提升
QPS182241013.2x
P50 延迟842ms42ms20x
P99 延迟1210ms187ms6.5x
内存 RSS218MB142MB-35%
CPU 利用率48%82%更充分

最显著的变化是延迟分布:同步服务的延迟曲线是“长尾严重”,而异步服务是“短而陡”,说明 I/O 等待被高效摊平。这也验证了我们的设计:asyncio 不是让单个请求更快,而是让大量请求的平均延迟更稳定、更可预期

5. 常见问题与排查技巧实录:我在 12 个生产环境踩过的坑

5.1 问题速查表:高频报错与根因定位

报错信息根本原因排查步骤解决方案
RuntimeError: no running event loop在非事件循环上下文中调用asyncio函数(如普通函数、__init__1. 检查报错行是否在async def
2. 用asyncio.get_event_loop()看是否为None
✅ 确保所有awaitasync def
✅ 启动用asyncio.run()uvicorn
Task was destroyed but it is pending!协程被取消但未await完成,或create_task()后未await/cancel()1. 搜索代码中create_task()调用
2. 检查对应 Task 是否被awaitcancel()
✅ 用asyncio.create_task(..., name="xxx")命名便于追踪
✅ 在finally块中await tasktask.cancel()
asyncio.TimeoutError频发外部服务慢、网络抖动、信号量值过小1.tcpdump抓包看 TCP 建连/响应时间
2. 检查Semaphore值是否 < 连接池大小
✅ 调大Semaphore值(建议 = 连接池 * 0.8)
✅ 为不同依赖设独立信号量(如db_sem,redis_sem
concurrent.futures._base.CancelledErrorto_thread()中的同步函数被取消1. 检查to_thread()调用是否在wait_for()
2. 看线程池是否被shutdown()
to_thread()外层加wait_for()
✅ 避免在shutdown时还有to_thread()运行
aioredis.exceptions.ConnectionClosedErrorRedis 连接池空闲连接被服务端关闭1.redis-cli执行CONFIG GET timeout看服务端超时
2. 检查aioredis连接池min_idle_time
✅ 设置min_idle_time=30(秒)
✅ 启用health_check_interval=30

5.2 实操心得:那些文档里不会写的细节

心得 1:永远不要在__init__await
Python 类初始化必须是同步的。我曾在一个DatabaseManager类里写async def __init__(self),结果DatabaseManager()直接返回协程对象,后续所有调用都await失败。正确做法是提供async def init()方法:

class DatabaseManager: def __init__(self): self.pool = None async def init(self): # 显式异步初始化 self.pool = await asyncpg.create_pool("...") # 使用 db = DatabaseManager() await db.init() # 主动调用

心得 2:asyncio.Queuetask_done()必须和get()严格配对
Queuejoin()依赖task_done()计数。漏调一次,join()就永远卡住。我的习惯是在try/finally中确保:

async def worker(): while True: item = await queue.get() try: await process(item) finally: queue.task_done() # 保证执行

心得 3:uvloop在 macOS 上可能崩溃,用--loop auto更稳妥
uvloop在某些 macOS 版本(尤其是 M1/M2)上会触发 `

http://www.jsqmd.com/news/962647/

相关文章:

  • Protel 99 SE电气规则检查(ERC)实战指南:从原理到应用
  • 2026西安名表回收六大门店实测:持证鉴定与交易透明成合规重点 - 薛定谔的梨花猫
  • GPTstudio插件开发指南:从零开始构建你的RStudio AI扩展
  • 26年临夏回族自治州黄金回收靠谱门店推荐 黄金+K金+白银+铂金回收门店TOP5排行榜+联系方式推荐 - 奢金汇
  • 2026银泰百货卡回收攻略:五种方式快速到账 - 可可收公众号
  • 完全掌控微信聊天数据:WeChatMsg实现个人数据资产化管理的完整方案
  • 德国瑞斯特兰德Restland欧标电线全渠道联系方式汇总|家装电线咨询一键直达
  • GetQzonehistory:3分钟快速备份你的QQ空间青春记忆
  • OmniClip:重新定义浏览器视频编辑的终极解决方案 [特殊字符]
  • E-Hentai下载器终极指南:如何轻松打包下载完整画廊
  • 人生金句
  • 3个核心模块深度解析:构建安全可靠的RSA加密C语言库实战指南
  • 2026宁波黄金回收市场解构:对比5家,找出优选保障店 - 商业快讯早知道
  • 告别Git操作恐慌:ugit让你的版本控制不再手忙脚乱
  • 【C++】string OJ练习
  • WinUtil:Windows系统优化终极指南 - 告别繁琐设置,一键智能管理
  • 26年三门峡市黄金回收靠谱门店推荐 黄金+K金+白银+铂金回收门店TOP5排行榜+联系方式推荐 - 奢金汇
  • PDF转CSV保姆级教程2026:微信小程序、在线工具、Excel导入全覆盖 - 软件小管家
  • 【DB】查询数据库表所占空间大小
  • 如何通过SPT-AKI存档编辑器高效管理你的塔科夫离线游戏体验
  • 如何高效管理R语言开发环境:RSwitch版本控制解决方案
  • 033、超广角模组选型:大视场角下的畸变校正、色差补偿与 ISP 适配
  • 如何用ChemicalX快速预测药物相互作用:面向开发者的完整指南
  • 5V转3.3V电源设计:从LDO到DC-DC的选型、计算与避坑指南
  • 2026年五家优质GEO服务商专项盘点:横向测评核心技术实力与选型指南 - 速递信息
  • 网页时光机使用指南:3个关键技巧让你轻松找回消失的网页内容
  • 终极指南:5个实用技巧彻底解决ComfyUI-SUPIR内存访问冲突问题
  • 合肥黄金回收权威榜单,禹竞名奢汇实力稳居前列 - 奢侈品交易观察员
  • FPGA高速串行数据采集实战:手把手教你配置Xilinx ISERDESE2的三种接口模式(SDR/DDR/Expansion)
  • 深入解析STM32 Cortex-M3内核寄存器:NVIC、SCB与SysTick实战指南