Python HTTP客户端实战:从urllib到异步请求
Python HTTP客户端实战:从urllib到异步请求
引言
HTTP客户端是后端开发中不可或缺的工具,用于与外部API进行通信。Python提供了多种HTTP客户端方案,从标准库的urllib到第三方库如requests、httpx等。
本文将深入探讨Python中的HTTP客户端技术,包括同步和异步方案,并分享生产环境中的最佳实践。
一、标准库HTTP客户端
1.1 urllib.request基础用法
import urllib.request import urllib.parse # 发送GET请求 url = 'https://api.example.com/data' response = urllib.request.urlopen(url) data = response.read().decode('utf-8') print(data) # 发送POST请求 url = 'https://api.example.com/submit' data = urllib.parse.urlencode({'key': 'value'}).encode('utf-8') response = urllib.request.urlopen(url, data=data) print(response.status)1.2 自定义请求头和代理
import urllib.request url = 'https://api.example.com/data' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Authorization': 'Bearer token123' } req = urllib.request.Request(url, headers=headers) # 设置代理 proxy_handler = urllib.request.ProxyHandler({'http': 'http://proxy.example.com:8080'}) opener = urllib.request.build_opener(proxy_handler) response = opener.open(req)二、requests库详解
2.1 基本用法
import requests # GET请求 response = requests.get('https://api.example.com/data', params={'page': 1, 'limit': 10}) print(response.json()) # POST请求 data = {'username': 'user', 'password': 'pass'} response = requests.post('https://api.example.com/login', json=data) # 响应处理 print(response.status_code) print(response.headers) print(response.cookies)2.2 请求会话和连接池
import requests # 创建会话对象,自动处理cookies和连接复用 session = requests.Session() session.headers.update({'Authorization': 'Bearer token'}) # 多次请求复用连接 response1 = session.get('https://api.example.com/data1') response2 = session.get('https://api.example.com/data2') # 自定义适配器配置 from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount('https://', adapter)2.3 文件上传和下载
import requests # 文件上传 with open('file.txt', 'rb') as f: response = requests.post( 'https://api.example.com/upload', files={'file': ('file.txt', f, 'text/plain')} ) # 大文件流式下载 url = 'https://example.com/large_file.zip' response = requests.get(url, stream=True) with open('large_file.zip', 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk)三、异步HTTP客户端
3.1 aiohttp基础
import aiohttp import asyncio async def fetch(session, url): async with session.get(url) as response: return await response.json() async def main(): async with aiohttp.ClientSession() as session: tasks = [ fetch(session, 'https://api.example.com/data1'), fetch(session, 'https://api.example.com/data2'), fetch(session, 'https://api.example.com/data3') ] results = await asyncio.gather(*tasks) print(results) asyncio.run(main())3.2 httpx异步客户端
import httpx import asyncio async def main(): async with httpx.AsyncClient() as client: # 并发请求 responses = await asyncio.gather( client.get('https://api.example.com/data1'), client.get('https://api.example.com/data2') ) for response in responses: print(response.json()) asyncio.run(main())四、HTTP客户端对比
4.1 性能对比实验
import requests import httpx import time def test_requests_sync(): start = time.time() for _ in range(10): requests.get('https://httpbin.org/get') return time.time() - start def test_httpx_sync(): start = time.time() client = httpx.Client() for _ in range(10): client.get('https://httpbin.org/get') return time.time() - start print(f"requests sync: {test_requests_sync():.3f}s") print(f"httpx sync: {test_httpx_sync():.3f}s")4.2 方案选择指南
| 特性 | urllib | requests | httpx | aiohttp |
|---|---|---|---|---|
| 易用性 | 低 | 高 | 高 | 中等 |
| 异步支持 | 否 | 否 | 是 | 是 |
| 连接池 | 基础 | 优秀 | 优秀 | 优秀 |
| 文件上传 | 复杂 | 简单 | 简单 | 简单 |
| 代理支持 | 支持 | 支持 | 支持 | 支持 |
五、生产环境最佳实践
5.1 请求超时配置
import requests # 设置超时时间 try: response = requests.get( 'https://api.example.com/data', timeout=5 # 5秒超时 ) except requests.exceptions.Timeout: print("请求超时") except requests.exceptions.RequestException as e: print(f"请求失败: {e}")5.2 错误重试机制
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) def fetch_data(url): response = requests.get(url) response.raise_for_status() return response.json() try: data = fetch_data('https://api.example.com/data') except Exception as e: print(f"重试后仍失败: {e}")5.3 请求日志记录
import logging import requests logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def log_request(func): def wrapper(*args, **kwargs): url = args[0] if args else kwargs.get('url') logger.info(f"请求: {url}") try: response = func(*args, **kwargs) logger.info(f"响应: {response.status_code}") return response except Exception as e: logger.error(f"请求失败: {e}") raise return wrapper @log_request def safe_request(url): return requests.get(url)六、高级HTTP客户端技术
6.1 自定义认证处理器
from requests.auth import AuthBase class BearerAuth(AuthBase): def __init__(self, token): self.token = token def __call__(self, r): r.headers['Authorization'] = f'Bearer {self.token}' return r # 使用自定义认证 response = requests.get( 'https://api.example.com/data', auth=BearerAuth('my_token') )6.2 请求限流
import time from collections import deque class RateLimiter: def __init__(self, max_requests, time_window): self.max_requests = max_requests self.time_window = time_window self.request_times = deque() def wait(self): now = time.time() # 移除时间窗口外的请求记录 while self.request_times and now - self.request_times[0] > self.time_window: self.request_times.popleft() # 如果达到限制,等待 if len(self.request_times) >= self.max_requests: wait_time = self.time_window - (now - self.request_times[0]) time.sleep(max(0, wait_time)) self.request_times.append(time.time()) # 使用限流 limiter = RateLimiter(max_requests=100, time_window=60) for _ in range(150): limiter.wait() # 发送请求七、总结
选择合适的HTTP客户端需要考虑:
- 同步vs异步:高并发场景选择异步客户端
- 功能需求:文件上传、代理、认证等
- 性能要求:连接池、重试机制
- 生态系统:与现有代码的兼容性
在实际项目中,推荐:
- 简单场景使用requests
- 异步场景使用httpx或aiohttp
- 生产环境添加超时、重试、限流等机制
思考:在你的项目中,HTTP客户端的最大挑战是什么?欢迎分享!
