Python异步编程:Asyncio与FastAPI实战
Python异步编程:Asyncio与FastAPI实战
大家好,我是欧阳瑞(Rich Own)。今天想和大家聊聊Python异步编程。作为一个全栈开发者,我经常使用Python来构建后端服务。异步编程可以大大提高服务的并发处理能力,尤其是在处理大量IO操作时。
为什么需要异步编程?
在传统的同步编程中,程序会按顺序执行,遇到IO操作时会阻塞等待。而异步编程允许程序在等待IO操作时继续执行其他任务,从而提高整体效率。
| 场景 | 同步方式 | 异步方式 |
|---|---|---|
| 请求外部API | 等待响应期间什么都不做 | 可以处理其他请求 |
| 读写文件 | 等待IO完成 | 可以执行其他任务 |
| 数据库查询 | 阻塞等待结果 | 并行执行多个查询 |
Asyncio基础
什么是Asyncio?
Asyncio是Python 3.4引入的异步IO库,提供了协程、任务、事件循环等核心组件。
安装Python
# 确保使用Python 3.7+ python --version # 3.9.7+协程基础
import asyncio async def hello(): print("Hello") await asyncio.sleep(1) print("World") # 运行协程 asyncio.run(hello())await关键字
async def fetch_data(): print("开始获取数据") await asyncio.sleep(2) # 模拟IO操作 print("数据获取完成") return {"data": "hello"} async def main(): result = await fetch_data() print(result) asyncio.run(main())并发执行多个协程
async def task1(): await asyncio.sleep(1) return "Task 1 completed" async def task2(): await asyncio.sleep(2) return "Task 2 completed" async def task3(): await asyncio.sleep(0.5) return "Task 3 completed" async def main(): # 方式1:使用asyncio.gather results = await asyncio.gather(task1(), task2(), task3()) print(results) # ['Task 1 completed', 'Task 2 completed', 'Task 3 completed'] # 方式2:创建任务 t1 = asyncio.create_task(task1()) t2 = asyncio.create_task(task2()) await t1 await t2 asyncio.run(main())事件循环
# 获取当前事件循环 loop = asyncio.get_event_loop() # 创建任务 async def main(): await asyncio.sleep(1) print("Done") # 运行直到完成 loop.run_until_complete(main()) # 关闭循环 loop.close()FastAPI简介
什么是FastAPI?
FastAPI是一个现代、快速的Web框架,基于Python类型提示,自动生成OpenAPI文档。
安装FastAPI
pip install fastapi uvicorn创建第一个FastAPI应用
from fastapi import FastAPI app = FastAPI() @app.get("/") def read_root(): return {"message": "Hello World"} @app.get("/items/{item_id}") def read_item(item_id: int, q: str = None): return {"item_id": item_id, "q": q}运行服务
uvicorn main:app --reloadFastAPI异步支持
异步路径操作
from fastapi import FastAPI import asyncio app = FastAPI() @app.get("/") async def read_root(): await asyncio.sleep(1) # 模拟IO操作 return {"message": "Hello World"} @app.post("/items/") async def create_item(item: dict): # 异步处理数据 await process_item(item) return {"item": item}异步数据库操作
from fastapi import FastAPI from databases import Database app = FastAPI() database = Database("sqlite:///./test.db") @app.on_event("startup") async def startup(): await database.connect() @app.on_event("shutdown") async def shutdown(): await database.disconnect() @app.get("/users/") async def get_users(): query = "SELECT * FROM users" users = await database.fetch_all(query) return users异步HTTP请求
from fastapi import FastAPI import httpx app = FastAPI() @app.get("/fetch/") async def fetch_data(url: str): async with httpx.AsyncClient() as client: response = await client.get(url) return response.json()实战:构建异步Web服务
项目结构
async-service/ ├── main.py ├── requirements.txt └── app/ ├── __init__.py ├── routes/ │ ├── users.py │ └── items.py ├── models/ │ └── __init__.py └── services/ └── data_fetcher.py核心代码
# main.py from fastapi import FastAPI from app.routes import users, items app = FastAPI(title="Async Service") app.include_router(users.router, prefix="/users", tags=["users"]) app.include_router(items.router, prefix="/items", tags=["items"]) @app.get("/") async def root(): return {"message": "Welcome to the async service"}# app/routes/users.py from fastapi import APIRouter, HTTPException from app.services.data_fetcher import fetch_user_data router = APIRouter() @router.get("/{user_id}") async def get_user(user_id: int): try: user = await fetch_user_data(user_id) return user except Exception as e: raise HTTPException(status_code=404, detail="User not found") @router.get("/") async def get_users(limit: int = 10): users = await fetch_user_data(limit=limit) return users# app/services/data_fetcher.py import asyncio import httpx async def fetch_user_data(user_id: int = None, limit: int = 10): async with httpx.AsyncClient() as client: if user_id: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json() else: response = await client.get(f"https://api.example.com/users?limit={limit}") return response.json() async def fetch_multiple_users(user_ids: list): async with httpx.AsyncClient() as client: tasks = [ client.get(f"https://api.example.com/users/{uid}") for uid in user_ids ] responses = await asyncio.gather(*tasks) return [r.json() for r in responses]异步任务队列
使用Celery进行异步任务
pip install celery redis# celery_config.py from celery import Celery app = Celery( 'tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0' ) @app.task def process_data(data): # 处理数据 result = heavy_processing(data) return result在FastAPI中调用Celery任务
from fastapi import FastAPI from celery_config import process_data app = FastAPI() @app.post("/process/") async def start_process(data: dict): task = process_data.delay(data) return {"task_id": task.id} @app.get("/result/{task_id}") async def get_result(task_id: str): result = process_data.AsyncResult(task_id) if result.ready(): return {"status": "completed", "result": result.get()} else: return {"status": "pending"}性能对比
同步 vs 异步
import asyncio import time import requests import httpx # 同步方式 def sync_fetch(urls): results = [] for url in urls: response = requests.get(url) results.append(response.json()) return results # 异步方式 async def async_fetch(urls): async with httpx.AsyncClient() as client: tasks = [client.get(url) for url in urls] responses = await asyncio.gather(*tasks) return [r.json() for r in responses] # 测试 urls = ["https://api.example.com/data"] * 10 # 同步 start = time.time() sync_fetch(urls) print(f"同步耗时: {time.time() - start:.2f}s") # 异步 start = time.time() asyncio.run(async_fetch(urls)) print(f"异步耗时: {time.time() - start:.2f}s")最佳实践
1. 避免阻塞调用
# 不好的做法:在异步函数中使用同步IO async def bad_example(): import requests response = requests.get("https://api.example.com") # 阻塞! return response.json() # 好的做法:使用异步HTTP客户端 async def good_example(): import httpx async with httpx.AsyncClient() as client: response = await client.get("https://api.example.com") # 非阻塞 return response.json()2. 合理使用锁
import asyncio lock = asyncio.Lock() async def critical_section(): async with lock: # 临界区代码 await do_something()3. 错误处理
async def safe_operation(): try: result = await risky_operation() return result except ValueError as e: print(f"值错误: {e}") return None except Exception as e: print(f"未知错误: {e}") raise4. 资源管理
async def use_resource(): resource = await acquire_resource() try: await resource.do_something() finally: await resource.release()总结
Python异步编程是构建高性能服务的利器。结合FastAPI,你可以轻松构建出高并发的Web服务。异步编程的关键在于理解协程、任务和事件循环的概念,以及如何正确地处理IO操作。
我的鬃狮蜥Hash对异步编程也有自己的理解——它总是在晒太阳的同时,还能留意周围的动静。这也许就是异步的精髓吧!
如果你有Python异步编程的问题,欢迎留言交流!我是欧阳瑞,极客之路,永无止境!
技术栈:Python · Asyncio · FastAPI · httpx · Celery
