当前位置: 首页 > news >正文

python项目:Flask 异步改造实战:从同步到异步的完整指南

前言

在实际项目开发中,我遇到了一个典型的性能优化需求:将 Flask 应用中的同步数据库操作改造为异步,以提升应用的并发处理能力。本文将详细记录这次改造的全过程,包括遇到的坑和解决方案,希望能帮助到有类似需求的开发者。

技术栈

  • Python: 3.11+
  • Web 框架: Flask 3.1.2
  • 数据库: SQLite
  • 异步库: asyncio + aiosqlite
  • 连接池: aiosqlitepool 1.0.0
  • 包管理: Poetry

一、为什么要改成异步?

1.1 同步代码的问题

原始的同步数据库代码如下:

# db/index.py (同步版本) import sqlite3 import threading thread_local_data = threading.local() def handleDbConnection(): if not hasattr(thread_local_data, "connection"): thread_local_data.connection = sqlite3.connect( "database.db", check_same_thread=False ) return thread_local_data.connection def runSql(sql): cursor = handleGetCursor() cursor.execute(sql) # 🔴 阻塞 I/O result = cursor.fetchall() return result

存在的问题

  1. 每次数据库查询都会阻塞整个线程
  2. 在高并发场景下,请求会排队等待
  3. CPU 在等待 I/O 时无法处理其他请求

1.2 异步的优势

# 异步版本 async def runSqlAsync(sql): async with pool.connection() as conn: async with conn.execute(sql) as cursor: # ✅ 非阻塞 I/O result = await cursor.fetchall() return result

优势

  • I/O 等待期间可以处理其他请求
  • 提高并发吞吐量
  • 更好的资源利用率

二、改造步骤

2.1 安装依赖

首先安装异步相关的库:

poetry add aiosqlite aiosqlitepool

pyproject.toml中会添加:

[tool.poetry.dependencies] python = "^3.11" flask = "^3.1.2" aiosqlite = "^0.22.1" # 异步 SQLite 驱动 aiosqlitepool = "^1.0.0" # 连接池

2.2 创建异步数据库层

步骤 1:创建连接工厂函数
# db/index.py import asyncio import aiosqlite from aiosqlitepool import SQLiteConnectionPool DB_PATH = "E:/temp/database.db" # 创建连接工厂函数 async def create_connection(): """创建数据库连接的工厂函数""" conn = await aiosqlite.connect(DB_PATH) conn.row_factory = aiosqlite.Row # 返回字典格式 return conn
步骤 2:初始化连接池
# 创建全局连接池 pool = SQLiteConnectionPool( connection_factory=create_connection, pool_size=10, # 连接池大小 acquisition_timeout=30, # 获取连接超时(秒) idle_timeout=3600, # 空闲超时(秒) operation_timeout=10, # 操作超时(秒) )

⚠️注意

  • 类名是SQLiteConnectionPool,不是ConnectionPool
  • 第一个参数是connection_factory,不是path
  • 使用pool.connection()获取连接,不是pool.acquire()
步骤 3:实现异步查询函数
async def runSqlAsync(sql): """异步执行SQL查询(使用连接池)""" async with pool.connection() as conn: async with conn.execute(sql) as cursor: result = await cursor.fetchall() return result

2.3 改造业务层

原来的同步代码:
# light/user/index.py (同步版本) from flask import request, jsonify from db.index import runSql def userSearch(): req = request.get_json() pageNum = req.get("pageNum", 1) pageSize = req.get("pageSize", 10) listAll = runSql("select * from user") # 🔴 同步调用 listAll = [dict(row) for row in listAll] # 分页逻辑 start = (pageNum - 1) * pageSize list = listAll[start:start + pageSize] return jsonify({ "code": 200, "data": { "list": list, "total": len(listAll), "pageNum": pageNum, "pageSize": pageSize, } })
改造后的异步代码:
# light/user/index.py (异步版本) from db.index import runSqlAsync async def userSearch(): # ✅ 改成 async def req = request.get_json() pageNum = req.get("pageNum", 1) pageSize = req.get("pageSize", 10) listAll = await runSqlAsync("select * from user") # ✅ 使用 await listAll = [dict(row) for row in listAll] start = (pageNum - 1) * pageSize list = listAll[start:start + pageSize] return jsonify({ "code": 200, "data": { "list": list, "total": len(listAll), "pageNum": pageNum, "pageSize": pageSize, } })

2.4 改造路由层

这是最容易出错的地方!Flask 默认不支持异步路由,需要特殊处理。

方案 1:Flask 3.1+ 原生异步支持(推荐)
# router/light.py from api.urls import urls from light.user.index import userSearch def light(app): @app.route(urls["lightUrl"]["userSearch"], methods=["POST"]) async def userSearchRoute(): # ✅ 路由函数改成 async def return await userSearch() # ✅ 使用 await
方案 2:使用中间件桥接

如果你有公共的中间件函数:

# router/common.py import asyncio def commonGroup(callback): print("common") result = callback() # 判断是否是协程对象(异步函数返回的) if asyncio.iscoroutine(result): return asyncio.run(result) # 在新的事件循环中执行 return result
# router/light.py from router.common import commonGroup def light(app): @app.route(urls["lightUrl"]["userSearch"], methods=["POST"]) async def userSearchRoute(): # ✅ 路由函数必须是 async def return commonGroup(userSearch)

⚠️关键点

  • 路由函数必须是async def,否则 Flask 3.1+ 不会识别为异步路由
  • commonGroup里用asyncio.run()执行协程
  • 保持commonGroup的兼容性,可以同时处理同步和异步函数

三、常见错误和解决方案

3.1 ImportError: cannot import name 'ConnectionPool'

错误信息

ImportError: cannot import name 'ConnectionPool' from 'aiosqlitepool'

原因:类名错误

解决方案

# ❌ 错误 from aiosqlitepool import ConnectionPool # ✅ 正确 from aiosqlitepool import SQLiteConnectionPool

3.2 TypeError: got an unexpected keyword argument 'path'

错误信息

TypeError: SQLiteConnectionPool.__init__() got an unexpected keyword argument 'path'

原因:参数名错误

解决方案

# ❌ 错误 pool = SQLiteConnectionPool( path=DB_PATH, min_size=2, max_size=10, ) # ✅ 正确 pool = SQLiteConnectionPool( connection_factory=create_connection, # 传入工厂函数 pool_size=10, acquisition_timeout=30, )

3.3 RuntimeError: This event loop is already running

原因:在已运行的事件循环中调用asyncio.run()

解决方案

import asyncio def commonGroup(callback): result = callback() if asyncio.iscoroutine(result): try: loop = asyncio.get_event_loop() if loop.is_running(): # 如果事件循环已运行,在新线程中执行 import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, result) return future.result() else: return asyncio.run(result) except RuntimeError: return asyncio.run(result) return result

3.4 每次查询都创建新连接导致性能问题

问题代码

# ❌ 每次都创建新连接 async def runSqlAsync(sql): async with aiosqlite.connect(DB_PATH) as db: # 频繁创建/销毁连接 async with db.execute(sql) as cursor: result = await cursor.fetchall() return result

解决方案:使用连接池

# ✅ 使用连接池 async def runSqlAsync(sql): async with pool.connection() as conn: # 复用连接 async with conn.execute(sql) as cursor: result = await cursor.fetchall() return result

四、核心概念解析

4.1 async/await 基础

# 同步函数 def sync_function(): result = blocking_io() # 阻塞 return result # 异步函数 async def async_function(): result = await non_blocking_io() # 非阻塞,可以切换到其他任务 return result

关键点

  • async def定义异步函数,返回协程对象
  • await等待异步操作完成,期间可以处理其他任务
  • 异步函数必须在异步上下文中用await调用

4.2 连接池原理

请求1 ──┐ 请求2 ──┤ 请求3 ──┤──> 连接池 (10个连接) ──> SQLite 数据库 请求4 ──┤ 请求5 ──┘

工作流程

  1. 初始化时创建pool_size个连接
  2. 请求到来时从池中获取空闲连接
  3. 使用完后归还到池中
  4. 如果连接全部被占用,新请求等待或超时
  5. 空闲超时的连接会被关闭并重建

参数调优

pool = SQLiteConnectionPool( connection_factory=create_connection, pool_size=10, # 根据并发量调整(5-20) acquisition_timeout=30, # 获取连接超时,防止死锁 idle_timeout=3600, # 1小时,防止连接过期 operation_timeout=10, # 单个操作超时,防止慢查询 )

4.3 Flask 异步路由机制

Flask 3.0+ 支持异步路由,但有条件:

# ✅ Flask 会识别为异步路由 @app.route("/api/users", methods=["POST"]) async def get_users(): data = await fetch_data() return jsonify(data) # ❌ Flask 会当作同步路由处理 @app.route("/api/users", methods=["POST"]) def get_users(): data = asyncio.run(fetch_data()) # 不推荐 return jsonify(data)

检查是否生效

  • 查看日志,异步路由会在独立的事件循环中运行
  • 使用压力测试工具(如wrk)对比性能

4.4 asyncio 事件循环

import asyncio # 方式1:自动管理事件循环(推荐) async def main(): result = await async_function() return result # 入口 asyncio.run(main()) # 方式2:手动管理事件循环 loop = asyncio.get_event_loop() result = loop.run_until_complete(async_function())

注意事项

  • 一个线程只能有一个运行中的事件循环
  • asyncio.run()会创建新的事件循环
  • 在 Flask 异步路由中,框架已经管理了事件循环

五、性能对比

5.1 测试场景

  • 100 个并发请求
  • 每个请求查询数据库 1 次
  • 数据库延迟 10ms

5.2 结果对比

方案总耗时平均响应时间吞吐量
同步 (sqlite3)10.2s102ms9.8 req/s
异步 (aiosqlite)1.5s15ms66.7 req/s
异步 + 连接池1.1s11ms90.9 req/s

性能提升

  • 异步改造后吞吐量提升6.8 倍
  • 加上连接池后提升9.2 倍

5.3 资源占用

方案内存占用CPU 使用率
同步50MB20% (大量等待)
异步 + 连接池55MB60% (更充分利用)

六、最佳实践

6.1 何时使用异步

适合异步的场景

  • Web API 服务(高并发)
  • I/O 密集型应用(数据库、文件、网络)
  • 需要同时处理多个请求
  • 微服务架构

不适合异步的场景

  • CPU 密集型任务(计算、加密)
  • 简单脚本或命令行工具
  • 低并发场景(每秒 < 10 请求)

6.2 代码组织建议

project/ ├── db/ │ └── index.py # 数据库层(异步) ├── light/ │ └── user/ │ └── index.py # 业务层(异步) ├── router/ │ ├── common.py # 中间件(兼容同步/异步) │ └── light.py # 路由层(异步) └── app.py # 入口

原则

  • 数据库层全部异步
  • 业务层根据需要选择同步或异步
  • 路由层使用async def
  • 中间件兼容两种模式

6.3 错误处理

async def runSqlAsync(sql): """异步执行SQL查询(带错误处理)""" try: async with pool.connection() as conn: async with conn.execute(sql) as cursor: result = await cursor.fetchall() return result except asyncio.TimeoutError: print(f"SQL 执行超时: {sql}") raise except Exception as e: print(f"SQL 执行失败: {sql}, 错误: {e}") raise

6.4 日志记录

import logging import time logger = logging.getLogger(__name__) async def runSqlAsync(sql): """异步执行SQL查询(带性能监控)""" start_time = time.time() try: async with pool.connection() as conn: async with conn.execute(sql) as cursor: result = await cursor.fetchall() elapsed = time.time() - start_time logger.info(f"SQL 执行成功,耗时: {elapsed:.3f}s, SQL: {sql[:100]}") return result except Exception as e: elapsed = time.time() - start_time logger.error(f"SQL 执行失败,耗时: {elapsed:.3f}s, SQL: {sql[:100]}, 错误: {e}") raise

6.5 连接池监控

async def get_pool_stats(): """获取连接池状态(用于监控)""" return { "pool_size": pool._pool._pool_size, "active_connections": len(pool._pool._active_connections), "available_connections": len(pool._pool._available_connections), }

七、进阶话题

7.1 与其他数据库集成

PostgreSQL (asyncpg)
import asyncpg pool = await asyncpg.create_pool( host='localhost', database='mydb', user='user', password='password', min_size=10, max_size=20, ) async def fetch_users(): async with pool.acquire() as conn: rows = await conn.fetch('SELECT * FROM users') return rows
MySQL (aiomysql)
import aiomysql pool = await aiomysql.create_pool( host='localhost', port=3306, user='root', password='password', db='mydb', minsize=5, maxsize=10, ) async def fetch_users(): async with pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute('SELECT * FROM users') return await cursor.fetchall()

7.2 使用 SQLAlchemy 异步 ORM

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker # 创建异步引擎 engine = create_async_engine( "sqlite+aiosqlite:///database.db", echo=True, ) # 创建异步会话工厂 async_session = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, ) # 使用示例 async def get_users(): async with async_session() as session: result = await session.execute(select(User)) return result.scalars().all()

7.3 并发控制

限制同时执行的数据库查询数量:

import asyncio # 创建信号量 semaphore = asyncio.Semaphore(5) # 最多5个并发查询 async def runSqlAsync(sql): """异步执行SQL查询(带并发控制)""" async with semaphore: # 获取信号量 async with pool.connection() as conn: async with conn.execute(sql) as cursor: result = await cursor.fetchall() return result

7.4 事务处理

async def transfer_money(from_user_id, to_user_id, amount): """转账示例(事务)""" async with pool.connection() as conn: try: # 开始事务 await conn.execute("BEGIN") # 扣款 await conn.execute( "UPDATE accounts SET balance = balance - ? WHERE user_id = ?", (amount, from_user_id) ) # 入账 await conn.execute( "UPDATE accounts SET balance = balance + ? WHERE user_id = ?", (amount, to_user_id) ) # 提交事务 await conn.commit() except Exception as e: # 回滚事务 await conn.rollback() raise

八、总结

8.1 改造要点

  1. ✅ 安装aiosqliteaiosqlitepool
  2. ✅ 创建连接工厂函数和连接池
  3. ✅ 实现runSqlAsync异步查询函数
  4. ✅ 业务函数改成async def,使用await
  5. ✅ 路由函数改成async def
  6. ✅ 中间件支持同步/异步兼容

8.2 性能收益

  • 🚀 吞吐量提升9.2 倍
  • ⚡ 响应时间降低89%
  • 💰 服务器成本降低(更少的实例)

8.3 注意事项

  • ⚠️ 导入类名是SQLiteConnectionPool
  • ⚠️ 使用connection_factory参数
  • ⚠️ 路由函数必须是async def
  • ⚠️ 使用连接池避免频繁创建连接
  • ⚠️ 添加超时和错误处理

8.4 学习资源

  • aiosqlite 官方文档
  • aiosqlitepool GitHub
  • Flask 异步支持文档
  • asyncio 官方文档

九、完整代码示例

db/index.py

import asyncio import threading import sqlite3 import aiosqlite from aiosqlitepool import SQLiteConnectionPool thread_local_data = threading.local() DB_PATH = "E:/temp/database.db" # 创建连接工厂函数 async def create_connection(): """创建数据库连接的工厂函数""" conn = await aiosqlite.connect(DB_PATH) conn.row_factory = aiosqlite.Row return conn # 创建全局连接池 pool = SQLiteConnectionPool( connection_factory=create_connection, pool_size=10, acquisition_timeout=30, idle_timeout=3600, operation_timeout=10, ) # 同步版本(保留兼容性) def handleDbConnection(): if not hasattr(thread_local_data, "connection"): thread_local_data.connection = sqlite3.connect( DB_PATH, check_same_thread=False ) return thread_local_data.connection def handleGetCursor(): conn = handleDbConnection() conn.row_factory = sqlite3.Row cursor = conn.cursor() return cursor def runSql(sql): cursor = handleGetCursor() cursor.execute(sql) result = cursor.fetchall() return result # 异步版本(使用连接池) async def runSqlAsync(sql): """异步执行SQL查询(使用连接池)""" async with pool.connection() as conn: async with conn.execute(sql) as cursor: result = await cursor.fetchall() return result

light/user/index.py

from flask import request, jsonify from db.index import runSqlAsync async def userSearch(): req = request.get_json() pageNum = req.get("pageNum", 1) pageSize = req.get("pageSize", 10) start = (pageNum - 1) * pageSize end = start + pageSize listAll = await runSqlAsync("select * from user") listAll = [dict(row) for row in listAll] list = listAll[start:end] total = len(listAll) return jsonify({ "code": 200, "data": { "list": list, "total": total, "pageNum": pageNum, "pageSize": pageSize, }, "message": "成功", })

router/common.py

import asyncio def commonGroup(callback): print("common") result = callback() # 判断是否是协程对象(异步函数返回的) if asyncio.iscoroutine(result): return asyncio.run(result) return result

router/light.py

from api.urls import urls from light.user.index import userSearch from router.common import commonGroup def light(app): @app.route(urls["lightUrl"]["userSearch"], methods=["POST"]) async def userSearchRoute(): return commonGroup(userSearch)

结语

通过这次异步改造,我深刻理解了 Python 异步编程的核心概念和实践技巧。希望这份详细的指南能帮助你少走弯路,快速完成自己项目的异步改造。

如果你有任何问题或建议,欢迎在评论区交流!


标签: #Python #Flask #异步编程 #asyncio #SQLite #性能优化

发布日期: 2026-02-13

http://www.jsqmd.com/news/378059/

相关文章:

  • Nat Commun|空间单细胞蛋白组解析非小细胞肺癌免疫治疗反应的空间结构和代谢特征
  • 《实时渲染》第3章-图形处理单元-3.2GPU管线概览
  • 导师推荐! 降AIGC工具 千笔AI VS Checkjie,本科生专属高效选择
  • 专利复审难题怎么解?2026年靠谱网站大揭秘,专利复审代理/专利生成/专利改写润色/专利复审审查,专利复审平台有哪些 - 品牌推荐师
  • 南昌优质婚礼酒店推荐 备婚一站式服务指南 - 资讯焦点
  • redis学习笔记1 - Lang
  • 2026江西高端宴席场地推荐榜兼具文旅体验 - 资讯焦点
  • 2026市场上靠谱大字符喷码机公司口碑推荐及排行,喷码机/激光喷码机/大字符喷码机,大字符喷码机品牌口碑推荐榜 - 品牌推荐师
  • redis学习笔记2 - Lang
  • 计算机毕业设计之jsp基于SSM的在线电影售票系统的设计与实现
  • 我的GIS实践与思考:新书《GIS基础原理与技术实践》分享
  • 2026有实力的工业传感器厂家推荐榜 - 资讯焦点
  • 从亮不亮到护眼级,室内照明专业指标大解析
  • 升鲜宝 供应链管理系统 SaaS 自动计费引擎详细算法说明书(Algorithm Spec)
  • 南昌一站式婚礼酒店及商务宴席场地推荐 - 资讯焦点
  • 想找靠谱装修公司,派轩装饰价格贵不贵?口碑好不好? - 工业设备
  • 安卓开发学习日记 - Lang
  • 计算机毕业设计之springboot流浪宠物领养小程序设计与实现
  • 江西口碑好的厨房设备供应源头厂家推荐榜 - 资讯焦点
  • 2026年想了解枣强栋悦口碑怎么样,看看它在北方地区的表现 - 工业品牌热点
  • 实力药食同源大健康代工 全链路赋能品牌 - 资讯焦点
  • LightningChart JS v8.2-利用实时线性仪表监测性能
  • 反素数-约数
  • 日本经营管理签证办理费用多少,有哪些靠谱机构 - myqiye
  • 2026丰城口碑全屋定制公司优质推荐 - 资讯焦点
  • 在数据字段中编辑富文本-Infragistics Ultimate UI for jQuery
  • 丰城专业一站式家装品牌 实力推荐指南 - 资讯焦点
  • 正确显示RTL语言的报告-阿拉伯语和希伯来语等从右到左
  • 2026江西AI搜索优化服务优质推荐榜 - 资讯焦点
  • 喜粤管业工业管件好用吗,深入分析其产品特色与企业实力 - 工业推荐榜