Python并发编程实战:ThreadPoolExecutor深度解析
Python并发编程实战:ThreadPoolExecutor深度解析
引言
在Python后端开发中,并发编程是提高程序性能的关键技术。作为一名从Rust转向Python的后端开发者,我深刻体会到线程池在处理IO密集型任务时的重要性。concurrent.futures.ThreadPoolExecutor提供了简洁的线程池接口,使得并发编程变得更加容易。
线程池核心概念
什么是线程池
线程池是一种管理线程的技术,具有以下特点:
- 复用线程:减少线程创建和销毁的开销
- 控制并发:限制同时运行的线程数量
- 提高效率:减少上下文切换次数
- 任务队列:管理待执行的任务
架构设计
┌─────────────────────────────────────────────────────────────┐ │ ThreadPoolExecutor │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 任务队列 → 工作线程池 → 结果收集 │ │ │ │ ↓ ↓ ↓ │ │ │ │ 提交任务 执行任务 返回结果 │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘基础用法
安装依赖
线程池是Python标准库的一部分,无需额外安装。
基本使用
from concurrent.futures import ThreadPoolExecutor def task(name): print(f"Task {name} starting") # 模拟耗时操作 import time time.sleep(1) print(f"Task {name} completed") return f"Result {name}" with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(task, i) for i in range(5)] for future in futures: result = future.result() print(result)map方法
from concurrent.futures import ThreadPoolExecutor def process_item(item): return item * 2 with ThreadPoolExecutor(max_workers=4) as executor: items = [1, 2, 3, 4, 5] results = list(executor.map(process_item, items)) print(results) # [2, 4, 6, 8, 10]高级特性实战
带超时的任务执行
from concurrent.futures import ThreadPoolExecutor, TimeoutError def slow_task(): import time time.sleep(5) return "Done" with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(slow_task) try: result = future.result(timeout=2) print(result) except TimeoutError: print("Task timed out")任务取消
from concurrent.futures import ThreadPoolExecutor import time def long_running_task(): time.sleep(10) return "Completed" with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(long_running_task) time.sleep(1) if future.cancel(): print("Task cancelled") else: print("Task already running, cannot cancel")异常处理
from concurrent.futures import ThreadPoolExecutor def task_with_exception(): raise ValueError("Something went wrong") with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(task_with_exception) try: result = future.result() except Exception as e: print(f"Caught exception: {e}")实际业务场景
场景一:批量下载文件
import requests from concurrent.futures import ThreadPoolExecutor def download_file(url, save_path): response = requests.get(url) with open(save_path, 'wb') as f: f.write(response.content) return save_path urls = [ 'https://example.com/file1.jpg', 'https://example.com/file2.jpg', 'https://example.com/file3.jpg', 'https://example.com/file4.jpg' ] with ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(download_file, url, f"file{i}.jpg") for i, url in enumerate(urls)] for future in futures: result = future.result() print(f"Downloaded: {result}")场景二:API批量请求
import requests from concurrent.futures import ThreadPoolExecutor def fetch_api(endpoint): url = f"https://api.example.com{endpoint}" response = requests.get(url) return response.json() endpoints = ['/users', '/posts', '/comments', '/products'] with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(fetch_api, endpoints)) for endpoint, result in zip(endpoints, results): print(f"{endpoint}: {len(result)} items")场景三:图片处理
from PIL import Image from concurrent.futures import ThreadPoolExecutor import os def resize_image(input_path, output_path, size): with Image.open(input_path) as img: img = img.resize(size) img.save(output_path) return output_path image_paths = ['image1.jpg', 'image2.jpg', 'image3.jpg'] output_dir = 'resized/' with ThreadPoolExecutor(max_workers=4) as executor: futures = [] for img_path in image_paths: output_path = os.path.join(output_dir, img_path) futures.append(executor.submit(resize_image, img_path, output_path, (800, 600))) for future in futures: print(f"Resized: {future.result()}")性能优化
线程数量调优
import os from concurrent.futures import ThreadPoolExecutor cpu_count = os.cpu_count() print(f"CPU count: {cpu_count}") # IO密集型任务:线程数可以是CPU核心数的2-4倍 with ThreadPoolExecutor(max_workers=cpu_count * 4) as executor: # 执行IO密集型任务 pass任务优先级
from concurrent.futures import ThreadPoolExecutor from queue import PriorityQueue class PriorityTask: def __init__(self, priority, func, *args): self.priority = priority self.func = func self.args = args def __lt__(self, other): return self.priority < other.priority priority_queue = PriorityQueue() priority_queue.put(PriorityTask(1, task, "low")) priority_queue.put(PriorityTask(0, task, "high"))结果回调
from concurrent.futures import ThreadPoolExecutor def task(name): return f"Result from {name}" def handle_result(future): result = future.result() print(f"Callback received: {result}") with ThreadPoolExecutor(max_workers=2) as executor: future = executor.submit(task, "Task1") future.add_done_callback(handle_result) future = executor.submit(task, "Task2") future.add_done_callback(handle_result)总结
ThreadPoolExecutor为Python后端开发者提供了简洁的并发编程接口。通过线程池,可以高效处理IO密集型任务,提高程序性能。从Rust开发者的角度来看,Python的线程池虽然在性能上不如Rust的并发模型,但在开发效率和易用性方面具有优势。
在实际项目中,建议根据任务类型合理设置线程数量,并注意处理任务异常和超时情况。
