当前位置: 首页 > news >正文

python: Worker Pool Pattern

项目经构:

Worker Pool(工作池)是控制并发的经典模式:
固定数量的「工人」(线程 / 进程),不随任务数量无限创建
工人从「任务队列」中领取任务执行
核心价值:限制并发数、优化资源、稳定可控、优雅关闭

# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:20 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py """ 全局配置文件:所有可变参数统一管理 符合企业级:配置与代码分离 """ import os # 工作池配置 WORKER_COUNT = 3 # 固定生产线数量(可动态修改) TASK_QUEUE_MAXSIZE = 100 # 任务队列最大长度 # 业务模拟配置 SIMULATE_TASK_DELAY = True # 是否模拟任务耗时 RANDOM_DELAY_RANGE = (0.5, 2.5) # 模拟IO延迟 # 日志配置 LOG_LEVEL = "INFO" LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:20 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : exceptions.py """ 统一异常体系:职责单一 专门处理工作池 + 业务异常 """ class WorkerPoolException(Exception): """ 工作池基础异常 """ pass class TaskExecutionException(WorkerPoolException): """ 任务执行失败异常 """ pass class TaskNotFoundException(WorkerPoolException): """ 任务不存在异常 """ pass # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:21 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py """ 日志工具:全局唯一日志实例 职责单一:只做日志输出 """ import logging from WorkerPoolPattern.config.settings import LOG_LEVEL, LOG_FORMAT def get_logger(name: str = "jewelry_production") -> logging.Logger: logger = logging.getLogger(name) logger.setLevel(LOG_LEVEL) if not logger.handlers: handler = logging.StreamHandler() # Windows 必加:解决中文/编码报错 handler.stream.reconfigure(encoding='utf-8') formatter = logging.Formatter(LOG_FORMAT) handler.setFormatter(formatter) logger.addHandler(handler) return logger # 全局可用的日志对象 logger = get_logger() # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:23 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : worker.py """ 核心工作池:通用、可扩展、线程安全 职责单一:只管理工人 + 任务队列 不耦合任何珠宝业务,可直接复用于其他项目 """ import queue import threading from typing import Callable, Any from WorkerPoolPattern.utils.logger import logger from WorkerPoolPattern.core.exceptions import TaskExecutionException class Worker(threading.Thread): """ 单个工作者:无限从队列取任务执行 """ def __init__(self, task_queue: queue.Queue, worker_id: int): super().__init__(daemon=True) self.queue = task_queue self.worker_id = worker_id self.name = f"Worker-{worker_id}" def run(self): """ :return: """ logger.info(f"工人【{self.worker_id}】已启动,等待任务...") while True: try: task_func, task_id, args, kwargs = self.queue.get() logger.info(f"工人【{self.worker_id}】领取任务【{task_id}】") try: task_func(*args, **kwargs) logger.info(f"工人【{self.worker_id}】完成任务【{task_id}】") except Exception as e: logger.error(f"任务【{task_id}】执行失败:{str(e)}") raise TaskExecutionException(f"任务执行异常:{e}") from e finally: self.queue.task_done() except Exception as e: logger.error(f"工人【{self.worker_id}】异常:{str(e)}") class WorkerPool: """ 工作池管理器:职责单一,只负责启动/管理/等待工人 """ def __init__(self, worker_count: int, queue_maxsize: int = 0): self.worker_count = worker_count self.task_queue = queue.Queue(maxsize=queue_maxsize) self.workers: list[Worker] = [] def start(self): """ 启动所有工作者 :return: """ logger.info(f"启动工作池,共 {self.worker_count} 个工作者") for i in range(1, self.worker_count + 1): worker = Worker(self.task_queue, i) worker.start() self.workers.append(worker) def submit(self, task_id: str, task_func: Callable, *args, **kwargs) -> None: """ 提交任务到队列(通用接口) :param task_id: :param task_func: :param args: :param kwargs: :return: """ self.task_queue.put((task_func, task_id, args, kwargs)) def wait_completion(self): """ 等待所有任务执行完成(优雅关闭) :return: """ self.task_queue.join() logger.info("所有任务已完成,工作池优雅关闭")
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:25 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : check_tasks.py """原料质检 / 成品质检""" import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def raw_material_check(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 原料质检:钻石4C、真伪、纯度检测") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) def finished_goods_check(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 成品质检:工艺、成色、尺寸、镶口精度全检") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:27 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : process_tasks.py """首饰加工、设计、镶嵌""" import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def jewelry_process(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 首饰加工:设计 → 执模 → 镶嵌 → 抛光 → 电金") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:28 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logistics_tasks.py """库存录入、物流发货""" import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def inventory_record(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 库存录入:生成唯一防伪码、ERP入库、标签打印") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) def order_delivery(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 订单发货:物流下单、保价、短信通知") time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:29 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : production_service.py """ 珠宝生产业务服务 只负责:接收订单 → 编排任务 → 提交到工作池 """ from WorkerPoolPattern.tasks import JEWELRY_FULL_PROCESS from WorkerPoolPattern.core.worker import WorkerPool from WorkerPoolPattern.utils.logger import logger class JewelryProductionService(object): """ 业务编排层(职责单一) """ def __init__(self, pool: WorkerPool): self.pool = pool # 依赖注入:解耦 def create_order_task(self, order_id: str): """ 为单个订单创建全流程任务链 :param order_id: :return: """ logger.info(f"📦 开始创建订单【{order_id}】的全流程任务") for task_name, task_func in JEWELRY_FULL_PROCESS: task_id = f"{order_id}-{task_name}" self.pool.submit(task_id, task_func, order_id) def batch_create_orders(self, order_count: int): """ 批量创建订单任务 :param order_count: :return: """ logger.info(f"📋 开始批量创建 {order_count} 个珠宝生产订单") for i in range(1, order_count + 1): order_id = f"珠宝订单-{i:03d}" self.create_order_task(order_id)

调用:

# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:31 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : WorkerPoolBll.py """ 项目主入口 职责单一:只负责启动 → 执行业务 → 等待结束 """ from WorkerPoolPattern.config.settings import WORKER_COUNT, TASK_QUEUE_MAXSIZE from WorkerPoolPattern.core.worker import WorkerPool from WorkerPoolPattern.service.production_service import JewelryProductionService from WorkerPoolPattern.utils.logger import logger class WorkerPoolBll(object): """ """ def demo(self): """ :return: """ logger.info("=" * 60) logger.info("珠宝企业级生产系统启动(Worker Pool 模式)") logger.info("=" * 60) pool = WorkerPool( worker_count=WORKER_COUNT, queue_maxsize=TASK_QUEUE_MAXSIZE ) pool.start() production_service = JewelryProductionService(pool) production_service.batch_create_orders(order_count=10) pool.wait_completion() logger.info("珠宝生产系统全部执行完成")

输出:

http://www.jsqmd.com/news/1075422/

相关文章:

  • 2026 年易柯森特解析:工程监理与造价咨询服务的不同点
  • Agent Runtime 范式革命:Session 作为事件日志的工程实践
  • DonkeyCar端到端自动驾驶实战:行为克隆与树莓派部署
  • Java两种创建线程方式:继承Thread vs 实现Runnable 区别详解
  • 国产大模型实战指南:从合同审查到多模态票据分析
  • AI应用方向:AI智能客服与对话AI
  • 5分钟完成FF14国际服中文汉化:开源工具完全指南
  • 傻子可懂的 Harness Engineering 入门教程 + 项目实战,一次搞懂 AI 编程工程化!
  • [Android MVVM 架构笔记] 基于 Kotlin 类委托与系统级安全扩展的全局 Loading 方案
  • 3D医学影像AI建模实战指南:小样本、高鲁棒、可部署的四类可靠路径
  • 6月26日16点直播丨CANNBot支持生成单指令多线程算子
  • OneNote迁移终极指南:三步实现95%格式保留的无损转换
  • 在 Android Kotlin 开发中,Kotlin 无法识别 Lombok 生成的 getter
  • TscanCode静态代码分析解决方案:提升C++/C/Lua代码质量的技术实践
  • 用Google ADK构建行政事务导航智能体:税务与社保场景落地实践
  • AI 建议“更新数据库后删除缓存”,为什么仍可能造成长期脏数据
  • 网络数据包分类与策略执行:FMan硬件加速配置详解
  • WinCC Advanced数据导出行列转换
  • Vue 3 后台管理系统前端骨架小案例1.0版本
  • 大模型知识蒸馏实战:从软标签对齐到认知迁移
  • LangChain作业四---Memory 综合实战:构建具备短期 + 长期记忆的聊天机器人
  • ANTM股票可视化:Plotly交互+Mplfinance专业K线实战
  • LG Ultrafine 亮度调节工具:解决Windows下显示器亮度控制的智能方案
  • FIFA 23 Live Editor终极指南:打造你的完美足球世界
  • 5大核心功能深度解析:G-Helper如何让华硕笔记本性能飙升
  • 深度解析猫抓浏览器扩展:从M3U8嗅探到加密流处理的10个核心技术
  • 负责任AI工程落地:六个可编码的实践维度
  • 10104黄大年茶思屋榜文101期 第4题 大模型上下文窗口高效无损扩容技术
  • 零基础学AI人工智能:10.3 ANN人工神经网络
  • iOS安全测试框架Needle:自动化漏洞挖掘与移动应用安全评估实战指南