当前位置: 首页 > news >正文

解密执行:Python并发与并行编程终极指南

🔎大家好,我是ZTLJQ,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流

📝个人主页-ZTLJQ的主页

🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝​📣系列果你对这个系列感兴趣的话

专栏 - ​​​​​​Python从零到企业级应用:短时间成为市场抢手的程序员

✔说明⇢本人讲解主要包括Python爬虫、JS逆向、Python的企业级应用

如果你对这个系列感兴趣的话,可以关注订阅哟👋

CPU密集型 vs I/O密集型

在开始之前,我们必须先理解一个核心问题:我们的程序瓶颈在哪里?

  • I/O密集型 (I/O-Bound): 程序大部分时间花在等待外部操作上,如:

    • 网络请求(爬虫、API调用)
    • 文件读写
    • 数据库查询
    • 用户输入/输出
      在这些场景中,CPU往往是空闲的,等待着数据的到来。
  • CPU密集型 (CPU-Bound): 程序大部分时间花在计算上,如:

    • 数学运算、科学计算
    • 图像/视频处理
    • 数据加密/解密
    • 复杂的循环和算法
      在这些场景中,CPU是满负荷运转的。

为什么这很重要?
因为不同的瓶颈需要不同的解决方案!

  • 对于I/O密集型任务,并发(Concurrency)是救星。
  • 对于CPU密集型任务,在Python中,并行(Parallelism)是唯一能真正提升性能的方法。

博客将带你厘清“并发”与“并行”的区别,并通过大量实际案例,展示如何在Python中正确选择和使用threadingmultiprocessingasyncio


第一部分:概念辨析 - 并发 vs 并行
1.1 并发 (Concurrency)
  • 定义: 并发是指同时处理多个任务的能力。它关注的是程序的结构和设计
  • 类比: 想象一个厨师(单个CPU核心)。他正在做三道菜。他不会做完一道再做下一道,而是:
    1. 把第一道菜的食材放进烤箱,设好定时器。
    2. 趁着烤箱工作的间隙,去切第二道菜的菜。
    3. 听到烤箱响了,暂停切菜,取出第一道菜。
    4. 继续准备第二道菜...
  • 关键: 厨师在同一时刻只在做一件事,但他通过巧妙地安排任务,让三道菜都在“进行中”,整体效率很高。
  • 在Python中:asyncio是实现高并发的典范。它在一个线程内,通过事件循环和协程的协作,实现了对成百上千个I/O任务的高效管理。
1.2 并行 (Parallelism)
  • 定义: 并行是指同时执行多个任务。它关注的是硬件资源的实际利用
  • 类比: 现在有三个厨师(多个CPU核心)。他们每人负责一道菜,同时开火,同时操作。三道菜是真正意义上“同时”完成的。
  • 关键: 需要多个物理执行单元(多核CPU)。
  • 在Python中:multiprocessing模块通过创建独立的子进程,每个进程运行在自己的Python解释器和内存空间中,从而绕过GIL,实现真正的并行计算。
1.3 核心障碍:全局解释器锁 (GIL)

这是理解Python并发/并行的关键!

  • 什么是GIL?CPython解释器为了保证内存管理的安全性,在同一时刻只允许一个线程执行Python字节码。
  • 影响:
    • 对于CPU密集型任务:即使你开了10个线程,也只有一个线程能在CPU上运行,其他9个都在等待GIL。因此,多线程无法加速纯Python的CPU计算。
    • 对于I/O密集型任务:当一个线程执行I/O操作(如time.sleep()或网络请求)时,它会释放GIL。这时,其他线程就有机会获取GIL并运行。所以,多线程在I/O场景下依然有效。

总结图示:

场景 / 工具threading(多线程)multiprocessing(多进程)asyncio(异步)
I/O密集型✅ 有效 (但不如asyncio高效)⚠️ 有效但开销大✅✅✅ 最佳选择 (高并发、低开销)
CPU密集型❌ 无效 (受GIL限制)✅✅✅ 最佳选择 (真正并行)❌ 无效 (单线程)
内存开销低 (共享内存)高 (独立内存空间)极低 (单线程)
启动/切换开销中等极低
编程复杂度中等 (需处理锁)高 (需处理进程间通信)中等 (需理解异步模型)

第二部分:实战案例 - 三大模块详解

我们将用同一个“计算斐波那契数列”的例子,分别用三种方式实现,并比较它们在CPU和I/O场景下的表现。

import time from typing import List # 一个耗时的CPU计算函数 def fibonacci(n: int) -> int: if n <= 1: return n return fibonacci(n-1) + fibonacci(n-2) # 一个模拟的I/O操作 def io_operation(duration: float) -> str: time.sleep(duration) # 模拟等待 return f"IO completed after {duration}s"
案例一:threading- 多线程

适用于I/O密集型任务。

import threading from concurrent.futures import ThreadPoolExecutor # ---- CPU密集型测试 ---- def test_cpu_with_threads(): print("=== 测试 threading (CPU密集型) ===") start_time = time.time() # 计算斐波那契数列 with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(fibonacci, [35, 35, 35, 35])) end_time = time.time() print(f"结果: {results}") print(f"耗时: {end_time - start_time:.2f} 秒") # 输出约 12秒 (3秒 * 4个任务串行),没有提速! # ---- I/O密集型测试 ---- def test_io_with_threads(): print("\n=== 测试 threading (I/O密集型) ===") start_time = time.time() # 执行多个I/O操作 with ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(io_operation, 1.0) for _ in range(4)] results = [future.result() for future in futures] # 获取结果 end_time = time.time() print(f"结果: {results}") print(f"耗时: {end_time - start_time:.2f} 秒") # 输出约 1秒,成功并发! # test_cpu_with_threads() # test_io_with_threads()

解析:

  • CPU测试: 由于GIL,四个耗时的fibonacci计算实际上是串行执行的,总时间接近单个任务的4倍。多线程对CPU任务无益
  • I/O测试: 当一个线程在sleep时,它释放了GIL,其他线程可以立即接管。四个1秒的I/O操作在约1秒内完成。多线程对I/O任务有效
案例二:multiprocessing- 多进程

适用于CPU密集型任务。

​ 1import multiprocessing 2from concurrent.futures import ProcessPoolExecutor 3 4# ---- CPU密集型测试 ---- 5def test_cpu_with_processes(): 6 print("=== 测试 multiprocessing (CPU密集型) ===") 7 start_time = time.time() 8 9 # 计算斐波那契数列 10 with ProcessPoolExecutor(max_workers=4) as executor: 11 results = list(executor.map(fibonacci, [35, 35, 35, 35])) 12 13 end_time = time.time() 14 print(f"结果: {results}") 15 print(f"耗时: {end_time - start_time:.2f} 秒") 16 # 输出约 3-4秒,实现了近4倍的加速! 17 18# ---- I/O密集型测试 ---- 19def test_io_with_processes(): 20 print("\n=== 测试 multiprocessing (I/O密集型) ===") 21 start_time = time.time() 22 23 with ProcessPoolExecutor(max_workers=4) as executor: 24 futures = [executor.submit(io_operation, 1.0) for _ in range(4)] 25 results = [future.result() for future in futures] 26 27 end_time = time.time() 28 print(f"结果: {results}") 29 print(f"耗时: {end_time - start_time:.2f} 秒") 30 # 输出约 1秒,也能并发,但进程开销更大。 31 32# test_cpu_with_processes() 33# test_io_with_processes() ​

解析:

  • CPU测试: 每个进程有自己的Python解释器和GIL,因此四个fibonacci计算可以在四个CPU核心上真正并行执行,性能大幅提升。
  • I/O测试: 进程同样能在I/O阻塞时释放CPU,所以也能并发。但由于创建进程的开销远大于线程,对于纯I/O任务,通常不是最优选。
案例三:asyncio- 异步I/O

专为I/O密集型任务设计,提供最高并发。

​ 1import asyncio 2import aiohttp 3 4# 将同步的io_operation包装成异步版本 5async def async_io_operation(duration: float) -> str: 6 await asyncio.sleep(duration) 7 return f"Async IO completed after {duration}s" 8 9# 一个耗时的异步计算(注意:这只是模拟,实际不应在async中放CPU任务) 10async def async_fibonacci(n: int) -> int: 11 # 如果是真正的CPU任务,应该用 run_in_executor 12 if n <= 1: 13 return n 14 # 递归本身是同步的,这里只是为了演示 15 result1 = await async_fibonacci(n-1) 16 result2 = await async_fibonacci(n-2) 17 return result1 + result2 18 19# ---- I/O密集型测试 ---- 20async def test_io_with_asyncio(): 21 print("=== 测试 asyncio (I/O密集型) ===") 22 start_time = time.time() 23 24 # 创建多个异步I/O任务 25 tasks = [async_io_operation(1.0) for _ in range(100)] # 并发100个! 26 results = await asyncio.gather(*tasks) 27 28 end_time = time.time() 29 print(f"结果数量: {len(results)}") 30 print(f"耗时: {end_time - start_time:.2f} 秒") 31 # 输出约 1秒,轻松实现高并发。 32 33# ---- 如何在asyncio中安全地执行CPU任务 ---- 34async def safe_cpu_with_asyncio(): 35 print("\n=== 在 asyncio 中执行CPU任务 ===") 36 start_time = time.time() 37 38 # 获取事件循环 39 loop = asyncio.get_running_loop() 40 41 # 使用线程池执行器来运行CPU密集型任务 42 with ThreadPoolExecutor() as pool: 43 # 提交任务到线程池 44 results = await asyncio.gather( 45 loop.run_in_executor(pool, fibonacci, 35), 46 loop.run_in_executor(pool, fibonacci, 35) 47 ) 48 49 end_time = time.time() 50 print(f"结果: {results}") 51 print(f"耗时: {end_time - start_time:.2f} 秒") 52 53# asyncio.run(test_io_with_asyncio()) 54# asyncio.run(safe_cpu_with_asyncio()) ​

解析:

  • I/O测试:asyncio展现了其真正的威力。它可以轻松地并发处理成百上千个I/O任务,而内存和CPU开销极小。
  • CPU任务: 在asyncio中直接运行fibonacci会阻塞整个事件循环。正确的做法是使用loop.run_in_executor,将CPU任务放到一个线程池中执行,避免阻塞。
第三部分:进程间通信 (IPC) 与线程同步

当使用多进程或多线程时,数据共享和同步至关重要。

3.1threading- 线程同步

线程共享内存,但需要防止竞态条件。

import threading # 共享资源 counter = 0 # 锁 lock = threading.Lock() def increment(): global counter for _ in range(100000): with lock: # 获取锁 counter += 1 # 安全地修改共享变量 # 创建两个线程 t1 = threading.Thread(target=increment) t2 = threading.Thread(target=increment) t1.start() t2.start() t1.join() t2.join() print(f"最终计数: {counter}") # 应该是 200000

解析:Lock确保了同一时刻只有一个线程能执行counter += 1,避免了数据竞争。

3.2multiprocessing- 进程间通信

进程有独立的内存空间,需要通过特殊机制通信。

import multiprocessing def worker(num, queue): """工作进程,将结果放入队列""" result = num * num queue.put(result) def producer(queue): """生产者进程,发送数据""" for i in range(5): queue.put(f"Data-{i}") def consumer(queue): """消费者进程,接收数据""" while True: item = queue.get() if item is None: # 收到结束信号 break print(f"消费: {item}") if __name__ == '__main__': # --- 使用 Queue --- print("=== 使用 Queue 通信 ===") q = multiprocessing.Queue() p1 = multiprocessing.Process(target=worker, args=(5, q)) p2 = multiprocessing.Process(target=worker, args=(10, q)) p1.start() p2.start() result1 = q.get() result2 = q.get() p1.join() p2.join() print(f"结果: {result1}, {result2}") # 25, 100 # --- 使用 Queue 实现 生产者-消费者 --- print("\n=== 生产者-消费者模式 ===") q = multiprocessing.Queue() producer_proc = multiprocessing.Process(target=producer, args=(q,)) consumer_proc = multiprocessing.Process(target=consumer, args=(q,)) consumer_proc.start() producer_proc.start() producer_proc.join() # 发送结束信号 q.put(None) consumer_proc.join()

解析:multiprocessing.Queue是一个线程和进程安全的队列,用于在进程间传递数据。

第四部分:最佳实践与选择指南
  1. 首要决策: 明确你的任务是I/O密集型还是CPU密集型
  2. I/O密集型:
    • 首选asyncio: 性能最高,资源占用最少。
    • 次选threading: 如果第三方库不支持异步,或者逻辑简单。
  3. CPU密集型:
    • 必须用multiprocessing: 这是绕过GIL的唯一途径。
  4. 混合型任务: 结合使用。例如,用asyncio作为主框架,用run_in_executor来处理其中的CPU任务。
  5. 避免过度设计: 对于简单的、短时间的任务,直接用同步代码可能更清晰。
  6. 工具链: 使用concurrent.futures模块可以让你更容易地在ThreadPoolExecutorProcessPoolExecutor之间切换。
结语

Python提供了丰富的工具来应对不同的并发需求。理解“并发”与“并行”的本质区别,以及GIL的影响,是做出正确技术选型的前提。

  • asyncio驾驭海量I/O,打造高性能Web服务和爬虫。
  • multiprocessing解锁多核算力,加速科学计算和数据处理。
  • threading处理简单的、需要共享状态的I/O任务。
http://www.jsqmd.com/news/511366/

相关文章:

  • Stable Yogi Leather-Dress-Collection开源模型实践:SD 1.5生态LoRA工程最佳范例
  • 京东E卡回收价格多少?2026年最新行情分享 - 抖抖收
  • CAN FD错误帧捕获率不足30%?你可能正在用错struct canfd_frame——权威解读Linux 6.1+内核CAN FD ABI变更及兼容性迁移清单
  • 能快速上手高项的方法
  • 2026年财富管理GEO优化公司深度分析:从技术适配到效果归因的选型逻辑 - 小白条111
  • Monocle 3实战:5分钟搞定单细胞聚类比较与差异基因分析(附完整R代码)
  • 水墨江南模型在网络安全领域的创新应用:生成式蜜罐与诱饵文档
  • 常用的单机运维操作命令
  • 手把手调通台达PLC与变频器的实战通讯
  • 阿里小云KWS模型与嵌入式Linux的深度优化实践
  • 30行代码实现“语言热切换“:用户说“我要中文“,系统秒变中文!
  • 2026年银行保险GEO优化服务商深度测评:从技术适配到效果落地的选型指南 - 小白条111
  • AIGlasses_for_navigation作品分享:12类典型城市道路场景分割效果合辑
  • AudioSeal效果可视化:嵌入前后频谱对比+检测置信度热力图展示
  • 【技术解析】卫星通信NTN 3GPP标准化演进路线与关键挑战
  • B端拓客号码核验行业发展研究:痛点、革新与未来方向氪迹科技法人股东号码智能筛选系统
  • 解构的艺术:Python元组拆包与模式匹配完全解析
  • 视频查重工具避坑指南:为什么90%的免费工具都检测不出画中画和贴图?
  • 3D打印效率提升全流程指南:从问题诊断到场景应用的开源切片软件实战
  • 用Turtlebot3+PyTorch实战多机器人避障:DDPG-LSTM算法移植心得与PER调参技巧
  • Pixel Dimension Fissioner保姆级教学:像素UI无障碍访问与键盘导航支持
  • Unity数字孪生插件PLOY3D:从GLTF到WebUI的全栈开发实战
  • Qwen3-Reranker-0.6B惊艳效果:短视频脚本与素材库语义匹配
  • Qwen2.5与MiniMax对比:中文理解能力部署实测分析
  • Chandra OCR效果展示:手写数学公式识别→LaTeX代码生成→Jupyter Notebook嵌入
  • SparkFun BMA400 Arduino库深度解析:超低功耗加速度计驱动实践
  • OpenCV本质矩阵实战:RANSAC和LMedS到底怎么选?我用代码测试给你看
  • 构建与转化:Python数据结构与推导式完全解析
  • 海外Apple App Store情感陪伴类App调查报告
  • GLM-4-9B-Chat-1M入门指南:Streamlit UI功能详解与Prompt工程建议