当前位置: 首页 > news >正文

python进程和线程(二、主要讲解进程)

Python 多进程(multiprocessing)是绕过 GIL(全局解释器锁)、实现真正并行计算的核心方案,专门解决CPU 密集型任务(如大量计算、数据处理)的效率问题。
 

一、为什么需要多进程?

  1. GIL 限制:CPython 的 GIL 让同一时刻只有一个线程执行 Python 字节码,多线程无法利用多核 CPU。
  2. 多进程优势
    • 每个进程有独立的 Python 解释器和内存空间,真正并行,充分利用多核 CPU
    • 进程间互不干扰,稳定性更高
     
  3. 适用场景
     
    ✅ CPU 密集型任务(科学计算、视频编码、数据分析)
     
    ❌ 不适合 IO 密集型(文件 / 网络请求,用多线程 / 协程更轻量)

二、创建进程的主要方式

Python 标准库multiprocessing是多进程编程的核心,提供完整的进程创建、通信、同步、管理功能。

1. 基础:创建进程

方式 1:Process 类(最常用)

import multiprocessing
import time# 定义任务函数
def task(name):print(f"进程 {name} 开始执行")time.sleep(2)  # 模拟任务print(f"进程 {name} 执行完成")if __name__ == '__main__':# 1. 创建进程对象p1 = multiprocessing.Process(target=task, args=("进程1",))p2 = multiprocessing.Process(target=task, args=("进程2",))# 2. 启动进程p1.start()p2.start()# 3. 等待进程执行完毕(主进程阻塞)p1.join()p2.join()print("所有进程执行完毕")

image

这是最直接和灵活的方式,适用于创建少量、自定义逻辑的进程。

方式 2:继承 Process 类

适合复杂逻辑,封装进程功能:
from multiprocessing import Processclass MyProcess(Process):def __init__(self, name):super().__init__()self.name = name# 进程执行的核心逻辑def run(self):print(f"子进程 {self.name} 运行中")if __name__ == '__main__':p = MyProcess("测试进程")p.start()p.join()

image

2. 使用进程池 (Pool)进行创建

与多线程(线程池)类似,手动创建大量进程会浪费资源,进程池可以复用进程、控制最大并发数,是生产环境首选。

  • pool.map(func, iterable): 将可迭代对象中的每个元素分配给一个进程处理,并按顺序返回结果。

  2.1. 基础用法(with版本)

from multiprocessing import Pool
import timedef task(n):time.sleep(1)return n * nif __name__ == '__main__':# 创建进程池,默认使用CPU核心数with Pool(processes=4) as pool:# 批量提交任务results = pool.map(task, [1, 2, 3, 4, 5])print("结果:", results)   # 结果的类型是列表

image

  2.2. 基础用法(非with版本)

如果不使用 with 语句(上下文管理器)来管理 multiprocessing.Pool,你需要手动控制进程池的生命周期
这意味着你必须显式地调用 close() 和 join() 方法,否则可能会导致子进程变成“僵尸进程”或者主程序在任务完成前就意外退出。
以下是标准的写法模板和关键注意事项:

🛠️ 标准写法模板

核心原则是:先关闭(不再接受新任务),再等待(确保任务执行完毕)。
from multiprocessing import Pool
import timedef worker(n):time.sleep(1)return n * nif __name__ == '__main__':pool = Nonetry:# 1. 创建进程池pool = Pool(processes=4)# 2. 提交任务 (例如 map 或 apply_async)results = pool.map(worker, range(10))# 3. 获取结果 (如果是 apply_async,记得调用 .get())print(results)except Exception as e:print(f"发生错误: {e}")finally:# 4. 资源清理 (至关重要!)if pool is not None:pool.close()  # 告诉Pool不再接收新任务pool.join()  # 阻塞主进程,等待所有工作进程退出print("进程池已安全关闭")

image

  2.3. 使用 concurrent.futures.ProcessPoolExecutor

  这是一个更高级、更现代的接口,用法与 ThreadPoolExecutor 高度一致,使得在进程和进程之间切换变得非常简单。
from concurrent.futures import ProcessPoolExecutor
import time# CPU 密集型任务:模拟计算
def heavy_calculation(n):total = 0for i in range(n):total += ireturn totalif __name__ == '__main__':  # Windows/macOS 必须加这个入口判断start = time.time()# 创建进程池,max_workers=进程数(默认=CPU核心数)with ProcessPoolExecutor(max_workers=4) as executor:# 提交任务,返回结果列表results = executor.map(heavy_calculation, [10**7, 10**7, 10**7, 10**7])print(f"计算结果:{list(results)}")print(f"耗时:{time.time() - start:.2f}s")

image

concurrent.futures.ProcessPoolExecutor常用方法

executor.map()(批量执行,按顺序返回结果)

最常用,适合任务参数相同、需要按顺序拿结果的场景。
# 函数 + 可迭代参数列表 → 自动分配给进程池执行
results = executor.map(func, [arg1, arg2, arg3])
executor.submit()(单个提交,灵活控制)
 
适合需要单独获取每个任务状态 / 结果的场景:
future = executor.submit(heavy_calculation, 10**7)
print(future.result())  # 获取任务返回值
as_completed()(按完成顺序获取结果)
 
谁先跑完就先拿谁的结果,不等待全部完成:
from concurrent.futures import ProcessPoolExecutor, as_completedif __name__ == '__main__':with ProcessPoolExecutor(4) as executor:futures = [executor.submit(heavy_calculation, 10**7) for _ in range(4)]for future in as_completed(futures):print(f"任务完成,结果:{future.result()}")

3. 异步非阻塞(推荐)

apply_async 是 Python multiprocessing.Pool 类中的一个核心方法,用于异步地将单个任务提交到进程池中执行。
它的最大特点是非阻塞:调用后主进程不会等待该任务完成,而是立即继续执行后续代码,从而实现并行处理,这对于提升 CPU 密集型任务的效率至关重要。

⚙️ 语法与参数

pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
  • func: 子进程中要执行的函数。
  • args: 传递给 func 的位置参数,需要一个元组(即使只有一个参数)。
  • kwds: 传递给 func 的关键字参数,需要一个字典。
  • callback: (可选) 一个回调函数,当任务成功执行后会自动调用,任务的返回值会作为参数传给它。
  • error_callback: (可选) 一个错误回调函数,当任务执行出错时会自动调用,异常对象会作为参数传给它。
该方法会立即返回一个 AsyncResult 对象,你可以使用它来获取任务的最终结果。
from multiprocessing import Pooldef task(x):return x**2if __name__ == '__main__':pool = Pool(4)# 异步提交,不阻塞主进程# 返回一个AsyncResult对象result = pool.apply_async(task, args=(5,))# 获取结果print(result.get())pool.close()pool.join()

image

下面再列举一个“异步非阻塞”代码案例:

import multiprocessing
import time# 模拟一个耗时的计算任务
def square_number(n):time.sleep(1)  # 模拟耗时操作return n * n# 任务成功的回调函数
def on_success(result):print(f"回调函数收到结果: {result}")# 任务失败的回调函数
def on_error(error):print(f"任务出错: {error}")if __name__ == '__main__':# 创建一个包含4个工作进程的进程池with multiprocessing.Pool(processes=4) as pool:results = []# 异步提交多个任务for i in range(5):async_result = pool.apply_async(square_number,args=(i,),callback=on_success,error_callback=on_error)results.append(async_result)print(f"主进程:任务 {i} 已提交")print("主进程:所有任务已提交,正在等待...")# 关闭进程池,不再接受新任务pool.close()# 等待所有工作进程完成pool.join()# 通过 AsyncResult 对象获取每个任务的结果# .get() 方法是阻塞的,直到结果就绪final_results = [res.get() for res in results]print(f"最终结果: {final_results}")

image

以上输出结果中,高亮的部分是在执行时瞬间输出的。

我们再来写一个“同步阻塞”的代码案例,做为对比:

import multiprocessing
import time# 模拟一个耗时的计算任务 (保持不变)
def square_number(n):time.sleep(5)  # 模拟耗时操作return n * nif __name__ == '__main__':# 创建一个包含4个工作进程的进程池with multiprocessing.Pool(processes=4) as pool:final_results = []print("主进程:开始同步提交任务...")start_time = time.time()  # 记录开始时间# --- 核心改动区域 ---for i in range(5):# 【阻塞】这里会卡住!主进程必须等待当前任务完成才能继续下一次循环result = pool.apply(square_number, args=(i,))# 只有当上面的任务做完,拿到了 result,才会执行到这里final_results.append(result)print(f"主进程:任务 {i} 已完成,结果为 {result}")# --- 核心改动区域 ---end_time = time.time()print("-" * 30)print(f"主进程:所有任务已串行完成")print(f"最终结果列表: {final_results}")print(f"总耗时: {end_time - start_time:.2f} 秒")

 image

以上代码的输出情况为:第一行输出“主进程:开始同步提交任务...”,5s等待后,输出“主进程:任务 0 已完成,结果为 0”,又5s等待后输出......,以此类推。

 

http://www.jsqmd.com/news/727333/

相关文章:

  • Sakana!石蒜模拟器物理引擎优化:惯性、衰减与粘性参数的数学原理与调优技巧
  • 别再乱写HLSL了!Unity URP Shader中Core.hlsl的正确打开方式
  • TensorRT 10.0深度学习推理优化与部署实战
  • 通过用量看板观测不同模型调用成本实现精细化预算管理
  • 大模型的短期记忆和长期记忆系统:做 RAG、Agent、知识库前的必修课
  • 物联网开发工具链容器化实践:基于Docker Compose的一站式部署方案
  • 对比直接使用原厂 API 体验 Taotoken 在多模型切换上的便捷性
  • 2026年AI代写泛滥,实测5款论文降AI神器:将AI率从80%拉至15% - 降AI实验室
  • 广东地区650T液态模锻设备厂商排行与选型指南 - 奔跑123
  • Vantage:基于MCP协议构建个人AI记忆中枢,打通AI工具信息孤岛
  • C语言类的基本语法详解
  • ARIMA模型保存与部署实战指南
  • 终极指南:如何用Obsidian Style Settings插件轻松自定义笔记外观
  • 别再只盯着模型结构了!用Python和PyTorch给你的模型推理加上TTA(测试时增强),轻松涨点几个百分点
  • 别乱用滤波!Zygo MetroPro软件里这9种滤波算法,到底该怎么选?(附实战对比图)
  • 终极小说下载解决方案:novel-downloader 深度解析与完全指南
  • 别再只用GO/KEGG了!用R语言做GSEA分析,轻松看懂通路是激活还是抑制
  • 4月30日成都地区包钢产热轧H型钢(1998-Q355B;100-1000mm)批发价格 - 四川盛世钢联营销中心
  • Fast-GitHub技术深度解析:如何实现10倍速的GitHub访问优化
  • Windows热键冲突终极排查指南:快速定位占用快捷键的幕后黑手
  • 终极Android滑动布局解决方案:ConsecutiveScrollerLayout让复杂界面丝滑如流
  • Her自定义请求:5种方式扩展你的API调用
  • 避坑指南:UE动画蓝图状态机变量设置与外部调用的那些事儿
  • 2026年深度改写模式和普通模式效果对比:降AI力度与文本保留度横评
  • 网盘直链下载助手终极指南:八大网盘一键获取真实下载链接,告别限速烦恼
  • 4月30日成都地区重钢产无缝钢管(8163-20#;外径38-114mm)批发价格 - 四川盛世钢联营销中心
  • 抖音直播数据采集终极指南:高效应对匿名用户与隐私保护挑战
  • 罗兰艺境“1+11”GEO技术落地,赋能上海制造隐形冠军 - 罗兰艺境GEO
  • Prusa-Firmware配置与定制化:打造专属3D打印体验
  • 保姆级教程:手把手教你用ADB Dumpsys命令深度分析Android应用状态(附查找秘籍)