当前位置: 首页 > news >正文

多进程运行代码模板

import os
import json
import random
import glob
import multiprocessing as mp
from collections import defaultdict
from tqdm import tqdm# 配置变量:修改这些值来适应不同任务
MAX_BARS = 3  # 可见进度条的最大数量
RESUME_COUNTS_DIR = "/path/to/resume"  # 恢复计数器的目录路径def safe_print(msg: str):tqdm.write(msg)  # 与tqdm兼容的打印def process_item(item, proc_idx, state):"""在这里实现每个任务的工作逻辑。`item` 是任务项。`state` 可以保存每个进程的缓存、计数器等。"""# ... 执行实际工作 ...state["count"] += 1return f"done-{item}"def worker(proc_idx, tasks):touched_dirs = set()counters = defaultdict(int)resume_counts_path=os.path.join(RESUME_COUNTS_DIR, f"file_select_counts_p{proc_idx}.json")# 如果需要恢复,则加载每个进程的计数器if resume_counts_path and os.path.exists(resume_counts_path):try:with open(resume_counts_path, "r", encoding="utf-8") as f:counters.update(json.load(f))except Exception as e:safe_print(f"进程 {proc_idx} 加载计数器失败: {e}")show_bar = proc_idx < MAX_BARSiterator = tqdm(tasks,desc=f"进程 {proc_idx}",unit="task",position=proc_idx if show_bar else 0, # 仅前 N 个占用独立行disable=not show_bar, # 其他进程隐藏进度条leave=True, # 进度条在完成后会保留在终端中,而不是被清除掉ncols=100, # 设置进度条的总宽度为 100 个字符,确保在终端中显示得更整齐)for task_id, item in iterator:process_item(item, proc_idx, counters)if resume_counts_path:with open(resume_counts_path, "w", encoding="utf-8") as f:json.dump(dict(counters), f, indent=2, ensure_ascii=False)return list(touched_dirs)def distribute_tasks(items, num_procs):"""示例:随机抽取与轮询分配相结合的任务分配。根据需要替换为其他分片方式(轮询、块分割、工作窃取等)。"""total_cnt = ...buckets = [[] for _ in range(num_procs)]for i in range(total_cnt):item = random.choices(items, k=1)[0]proc = i % num_procsbuckets[proc].append((i, item))return bucketsdef merge_outputs(dir_list):"""后处理合并每个进程的分片;根据输出布局调整。"""passdef main(root_dir, num_procs):# 发现工作项items = [...]  # 填充你的项列表if not items:safe_print("未找到项。")returnnum_procs = max(1, num_procs)tasks_per_proc = distribute_tasks(items, num_procs)with mp.Pool(processes=num_procs) as pool:results = pool.starmap(worker,[(idx,tasks,)for idx, tasks in enumerate(tasks_per_proc)],)all_dirs = [d for sub in results for d in sub]merge_outputs(all_dirs)if __name__ == "__main__":import sysif len(sys.argv) != 3:safe_print("Usage: python script.py <directory> <num_processes>")sys.exit(1)root_dir = sys.argv[1]num_processes = int(sys.argv[2])main(root_dir, num_processes)

注意事项

  • 多进程代码比较重要的事情:
    • 有恢复功能:上面的代码实现了resume_counts_path
    • 每个进程之间产生的文件不冲突:上面的代码让每个进程并行运行,产生的文件都附带了进程号(比如file_select_counts_p{proc_idx}.json),最后可以通过一个函数合并
  • with mp.Pool(processes=num_procs) as pool:同时启动了num_procsworker函数
    • 每个worker函数处理一个(idx, tasks)元组
    • 应该保证tasks_per_proc的长度与num_procs相同
    • results是一个列表,长度为num_procs,每个元素是每个worker的返回结果
http://www.jsqmd.com/news/67044/

相关文章:

  • 2025具有规模的双卡压式碳钢消防管TOP5权威推荐,甄选靠
  • 2025年环保别墅瓷砖五大品牌排行榜:别墅瓷砖推荐供应商有哪
  • 深入解析:长沙理工《人工智能基础A》实验(上机)报告实验三 电商数据可视化/图像处理
  • 纸杯成型机制造商哪家靠谱?2025 超声波纸杯机、纸咖啡杯机推荐(小白易懂)
  • 2025年GEO 优化服务商怎么收费?:权威榜单精选揭秘
  • 2025企业贷款服务TOP5权威推荐:破解融资难题,甄选专业
  • 【原神UGC】即时弹窗-安装使用教程
  • TWS耳机电池哪家好?国内优质品牌推荐
  • 灰豚GEO系统,以技术硬实力孵化更多的GEO优化服务商
  • 2025年全屋定制哪家值得推荐?全屋定制哪家比较好?官方权威榜单解析TOP10品牌
  • 2025 爆款制杯机厂家推荐:纸咖啡杯机、纸杯机制造商实力及设备特色解析
  • 2025年选择 GEO 优化服务商的技巧:官方TOP10解析
  • 2025年GEO 优化服务商哪家专业度高?:独家深度测评指南
  • 收藏这篇就够了!2025年太古里烧菜火锅品牌综合实力排行。美食/社区火锅/烧菜火锅/特色美食/火锅烧菜火锅品牌口碑推荐
  • 2026年盘点专注子女抚养权纠纷的律师,北京地区推荐哪家
  • 全网公认低温奶十大品牌:奶源优质且口碑过硬
  • 2025年农药包装防水透气塞优质厂家权威推荐:汽车大灯防水透气塞/国产防水透气塞/汽车车灯防水透气塞源头厂家精选
  • 2025年国内智慧水务厂家综合实力排名与市场格局分析
  • 北语 12.6 五彩斑斓的梦想
  • 中国木门十大品牌排行榜:行业标杆与品质之选
  • 2026 海淀工程律师排行榜:靠谱机构口碑排名与专业解析
  • 2025 年 12 月钢结构工程厂家权威推荐榜:桥梁、厂房、冷链库房、场馆等全领域实力解析与匠心之选
  • 行业内符合欧标EI120防火卷帘门厂家排名前十哪家好
  • 2025年武汉西点烘焙培训学校排名TOP5,湖北新东方国际西
  • 珠三角聚合物锂电有哪些?五家实力企业及业务解析
  • 2025年实力强的304/316不锈钢水管品牌厂家排名:环压
  • 专业的ERP系统推荐榜:助力企业数字化转型实践
  • 2025年重庆结构加固施工单位推荐:结构加固改造公司哪家可靠
  • 学生奶十大品牌良心榜单:拒绝营销的真实好奶推荐
  • Python 语法通俗详解:从入门到上手写代码