当前位置: 首页 > news >正文

Python异步编程:Asyncio与FastAPI实战

Python异步编程:Asyncio与FastAPI实战

大家好,我是欧阳瑞(Rich Own)。今天想和大家聊聊Python异步编程。作为一个全栈开发者,我经常使用Python来构建后端服务。异步编程可以大大提高服务的并发处理能力,尤其是在处理大量IO操作时。

为什么需要异步编程?

在传统的同步编程中,程序会按顺序执行,遇到IO操作时会阻塞等待。而异步编程允许程序在等待IO操作时继续执行其他任务,从而提高整体效率。

场景同步方式异步方式
请求外部API等待响应期间什么都不做可以处理其他请求
读写文件等待IO完成可以执行其他任务
数据库查询阻塞等待结果并行执行多个查询

Asyncio基础

什么是Asyncio?

Asyncio是Python 3.4引入的异步IO库,提供了协程、任务、事件循环等核心组件。

安装Python

# 确保使用Python 3.7+ python --version # 3.9.7+

协程基础

import asyncio async def hello(): print("Hello") await asyncio.sleep(1) print("World") # 运行协程 asyncio.run(hello())

await关键字

async def fetch_data(): print("开始获取数据") await asyncio.sleep(2) # 模拟IO操作 print("数据获取完成") return {"data": "hello"} async def main(): result = await fetch_data() print(result) asyncio.run(main())

并发执行多个协程

async def task1(): await asyncio.sleep(1) return "Task 1 completed" async def task2(): await asyncio.sleep(2) return "Task 2 completed" async def task3(): await asyncio.sleep(0.5) return "Task 3 completed" async def main(): # 方式1:使用asyncio.gather results = await asyncio.gather(task1(), task2(), task3()) print(results) # ['Task 1 completed', 'Task 2 completed', 'Task 3 completed'] # 方式2:创建任务 t1 = asyncio.create_task(task1()) t2 = asyncio.create_task(task2()) await t1 await t2 asyncio.run(main())

事件循环

# 获取当前事件循环 loop = asyncio.get_event_loop() # 创建任务 async def main(): await asyncio.sleep(1) print("Done") # 运行直到完成 loop.run_until_complete(main()) # 关闭循环 loop.close()

FastAPI简介

什么是FastAPI?

FastAPI是一个现代、快速的Web框架,基于Python类型提示,自动生成OpenAPI文档。

安装FastAPI

pip install fastapi uvicorn

创建第一个FastAPI应用

from fastapi import FastAPI app = FastAPI() @app.get("/") def read_root(): return {"message": "Hello World"} @app.get("/items/{item_id}") def read_item(item_id: int, q: str = None): return {"item_id": item_id, "q": q}

运行服务

uvicorn main:app --reload

FastAPI异步支持

异步路径操作

from fastapi import FastAPI import asyncio app = FastAPI() @app.get("/") async def read_root(): await asyncio.sleep(1) # 模拟IO操作 return {"message": "Hello World"} @app.post("/items/") async def create_item(item: dict): # 异步处理数据 await process_item(item) return {"item": item}

异步数据库操作

from fastapi import FastAPI from databases import Database app = FastAPI() database = Database("sqlite:///./test.db") @app.on_event("startup") async def startup(): await database.connect() @app.on_event("shutdown") async def shutdown(): await database.disconnect() @app.get("/users/") async def get_users(): query = "SELECT * FROM users" users = await database.fetch_all(query) return users

异步HTTP请求

from fastapi import FastAPI import httpx app = FastAPI() @app.get("/fetch/") async def fetch_data(url: str): async with httpx.AsyncClient() as client: response = await client.get(url) return response.json()

实战:构建异步Web服务

项目结构

async-service/ ├── main.py ├── requirements.txt └── app/ ├── __init__.py ├── routes/ │ ├── users.py │ └── items.py ├── models/ │ └── __init__.py └── services/ └── data_fetcher.py

核心代码

# main.py from fastapi import FastAPI from app.routes import users, items app = FastAPI(title="Async Service") app.include_router(users.router, prefix="/users", tags=["users"]) app.include_router(items.router, prefix="/items", tags=["items"]) @app.get("/") async def root(): return {"message": "Welcome to the async service"}
# app/routes/users.py from fastapi import APIRouter, HTTPException from app.services.data_fetcher import fetch_user_data router = APIRouter() @router.get("/{user_id}") async def get_user(user_id: int): try: user = await fetch_user_data(user_id) return user except Exception as e: raise HTTPException(status_code=404, detail="User not found") @router.get("/") async def get_users(limit: int = 10): users = await fetch_user_data(limit=limit) return users
# app/services/data_fetcher.py import asyncio import httpx async def fetch_user_data(user_id: int = None, limit: int = 10): async with httpx.AsyncClient() as client: if user_id: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json() else: response = await client.get(f"https://api.example.com/users?limit={limit}") return response.json() async def fetch_multiple_users(user_ids: list): async with httpx.AsyncClient() as client: tasks = [ client.get(f"https://api.example.com/users/{uid}") for uid in user_ids ] responses = await asyncio.gather(*tasks) return [r.json() for r in responses]

异步任务队列

使用Celery进行异步任务

pip install celery redis
# celery_config.py from celery import Celery app = Celery( 'tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0' ) @app.task def process_data(data): # 处理数据 result = heavy_processing(data) return result

在FastAPI中调用Celery任务

from fastapi import FastAPI from celery_config import process_data app = FastAPI() @app.post("/process/") async def start_process(data: dict): task = process_data.delay(data) return {"task_id": task.id} @app.get("/result/{task_id}") async def get_result(task_id: str): result = process_data.AsyncResult(task_id) if result.ready(): return {"status": "completed", "result": result.get()} else: return {"status": "pending"}

性能对比

同步 vs 异步

import asyncio import time import requests import httpx # 同步方式 def sync_fetch(urls): results = [] for url in urls: response = requests.get(url) results.append(response.json()) return results # 异步方式 async def async_fetch(urls): async with httpx.AsyncClient() as client: tasks = [client.get(url) for url in urls] responses = await asyncio.gather(*tasks) return [r.json() for r in responses] # 测试 urls = ["https://api.example.com/data"] * 10 # 同步 start = time.time() sync_fetch(urls) print(f"同步耗时: {time.time() - start:.2f}s") # 异步 start = time.time() asyncio.run(async_fetch(urls)) print(f"异步耗时: {time.time() - start:.2f}s")

最佳实践

1. 避免阻塞调用

# 不好的做法:在异步函数中使用同步IO async def bad_example(): import requests response = requests.get("https://api.example.com") # 阻塞! return response.json() # 好的做法:使用异步HTTP客户端 async def good_example(): import httpx async with httpx.AsyncClient() as client: response = await client.get("https://api.example.com") # 非阻塞 return response.json()

2. 合理使用锁

import asyncio lock = asyncio.Lock() async def critical_section(): async with lock: # 临界区代码 await do_something()

3. 错误处理

async def safe_operation(): try: result = await risky_operation() return result except ValueError as e: print(f"值错误: {e}") return None except Exception as e: print(f"未知错误: {e}") raise

4. 资源管理

async def use_resource(): resource = await acquire_resource() try: await resource.do_something() finally: await resource.release()

总结

Python异步编程是构建高性能服务的利器。结合FastAPI,你可以轻松构建出高并发的Web服务。异步编程的关键在于理解协程、任务和事件循环的概念,以及如何正确地处理IO操作。

我的鬃狮蜥Hash对异步编程也有自己的理解——它总是在晒太阳的同时,还能留意周围的动静。这也许就是异步的精髓吧!

如果你有Python异步编程的问题,欢迎留言交流!我是欧阳瑞,极客之路,永无止境!


技术栈:Python · Asyncio · FastAPI · httpx · Celery

http://www.jsqmd.com/news/818299/

相关文章:

  • 1.3 从零部署黑群晖:arpl与引导镜像双路径实战(附洗白与硬件适配指南)
  • LLM 基础架构:Transformer 与注意力机制
  • 为OpenClaw配置Taotoken作为其AI供应商的详细教程
  • 对比自行维护与使用 Taotoken 聚合 API 的运维复杂度变化
  • 红牛肝哪家口碑好:此山中野生菌万众优选 - 19120507004
  • 羊肚菌哪家口碑好:此山中野生菌深得信赖 - 17329971652
  • Taotoken 模型广场选型与多模型聚合调用体验分享
  • 红菇哪家口碑好:此山中野生菌盛名远扬 - 13724980961
  • 新需求开发-重构老的逻辑
  • Blender到Unity的FBX导出:从建模原点设置到材质重建的完整避坑指南
  • 物联网芯片技术演进:从多模连接到边缘智能的产业机遇
  • ARM架构MRS与MSR指令详解与应用
  • 松茸哪家口碑好:此山中野生菌至尊优选 - 13724980961
  • LLM 训练:从预训练到微调
  • WESTINGHOUSE 4D33900G19电源模块
  • 5月14日
  • 学校RFID借阅柜源头生产厂家推荐
  • 土工膜厂家哪家性价比高:恒全土工膜高惠优选 - 17329971652
  • WPF使用内置资源系统实现国际化
  • 土工膜厂家推荐:恒全土工膜实力领航 - 17329971652
  • 3小时上线Claude智能体API服务:FastAPI 0.112+Pydantic v2.9+Claude-3.5-Sonnet生产就绪模板(附GitHub私有仓库直达链接)
  • 阀检有镜|碳硫有数,元素有据
  • 中小商家破局引流难题,AI 短剧营销系统低成本落地
  • 竹荪哪家口碑好:此山中野生菌优品佳誉 - 17322238651
  • 土工膜厂家哪家好:恒全土工膜品质出众 - 17322238651
  • 基于dq解耦的双向DC-AC逆变器有功无功功率控制
  • 【紧急预警】92%的AI Agent生产环境因UI层失控失败:3步检测+4种无障碍桥接方案(附微软/苹果官方Accessibility API调用基准测试数据)
  • Vue3+ElementPlus实战:从零构建高仿微信网页端聊天应用
  • 一文读懂欧盟 CRA 法案:核心内容与企业影响
  • PKSM终极指南:从Gen I到Gen VIII的宝可梦存档管理神器