当前位置: 首页 > news >正文

go: Worker Pool Pattern

项目结构:

/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 17:56 # User : geovindu # Product : GoLand # Project : godesginpattern # File : logger.go */ package utils import ( "godesginpattern/workerpool/config" "io" "log" "os" ) var Logger *log.Logger func InitLogger() { // 打开日志文件 file, err := os.OpenFile(config.LogFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatal("日志文件创建失败:", err) } // 同时输出到:控制台 + 文件 multiWriter := io.MultiWriter(os.Stdout, file) // 初始化日志 Logger = log.New(multiWriter, "", config.LogFlag) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:12 # User : geovindu # Product : GoLand # Project : godesginpattern # File : retry.go */ package utils import ( "godesginpattern/workerpool/config" "time" ) // Retry 执行函数并自动重试 func Retry(task func() error) error { var err error for i := 0; i < config.MaxRetryTimes; i++ { err = task() if err == nil { return nil } Logger.Printf("任务失败,第 %d 次重试,错误: %v", i+1, err) time.Sleep(500 * time.Millisecond) } return err } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:12 # User : geovindu # Product : GoLand # Project : godesginpattern # File : monitor.go */ package utils import "sync/atomic" type Monitor struct { Running int64 Waiting int64 Finished int64 Failed int64 } func (m *Monitor) IncRunning() { atomic.AddInt64(&m.Running, 1) } func (m *Monitor) DecRunning() { atomic.AddInt64(&m.Running, -1) } func (m *Monitor) IncWaiting() { atomic.AddInt64(&m.Waiting, 1) } func (m *Monitor) DecWaiting() { atomic.AddInt64(&m.Waiting, -1) } func (m *Monitor) IncFinished() { atomic.AddInt64(&m.Finished, 1) } func (m *Monitor) IncFailed() { atomic.AddInt64(&m.Failed, 1) } func (m *Monitor) Get() (running, waiting, finished, failed int64) { return atomic.LoadInt64(&m.Running), atomic.LoadInt64(&m.Waiting), atomic.LoadInt64(&m.Finished), atomic.LoadInt64(&m.Failed) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 17:58 # User : geovindu # Product : GoLand # Project : godesginpattern # File : settings.go */ package config import "log" const ( WorkerCount = 3 QueueMaxSize = 100 MinTaskDelay = 0.3 MaxTaskDelay = 0.8 MaxRetryTimes = 3 LogFileName = "jewelry.log" DBPath = "jewelry.db" HTTPPort = ":8080" MonitorInterval = 2 ) const LogFlag = log.LstdFlags | log.Lmicroseconds /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:01 # User : geovindu # Product : GoLand # Project : godesginpattern # File : task.go */ package tasks import ( "godesginpattern/workerpool/config" "math/rand" "time" ) func RawMaterialCheck(orderID string) error { simulateDelay() return nil } func JewelryProcess(orderID string) error { simulateDelay() return nil } func FinishedGoodsCheck(orderID string) error { simulateDelay() return nil } func InventoryRecord(orderID string) error { simulateDelay() return nil } func OrderDelivery(orderID string) error { simulateDelay() return nil } var FullProcessTasks = []func(string) error{ RawMaterialCheck, JewelryProcess, FinishedGoodsCheck, InventoryRecord, OrderDelivery, } func simulateDelay() { rand.Seed(time.Now().UnixNano()) delay := config.MinTaskDelay + rand.Float64()*(config.MaxTaskDelay-config.MinTaskDelay) time.Sleep(time.Duration(delay*1000) * time.Millisecond) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:01 # User : geovindu # Product : GoLand # Project : godesginpattern # File : worker_pool.go */ package core import ( "godesginpattern/workerpool/config" "godesginpattern/workerpool/utils" "os" "os/signal" "sync" "syscall" "time" ) type Task struct { OrderID string Func func(string) error } type WorkerPool struct { workerCount int taskChan chan *Task wg sync.WaitGroup monitor *utils.Monitor quit chan os.Signal } func NewWorkerPool(workerCnt int, queueSize int) *WorkerPool { wp := &WorkerPool{ workerCount: workerCnt, taskChan: make(chan *Task, queueSize), monitor: &utils.Monitor{}, quit: make(chan os.Signal, 1), } signal.Notify(wp.quit, syscall.SIGINT, syscall.SIGTERM) return wp } func (wp *WorkerPool) worker(id int) { utils.Logger.Printf("Worker %d 已启动", id) for { select { case task, ok := <-wp.taskChan: if !ok { utils.Logger.Printf("Worker %d 安全退出", id) return } wp.monitor.DecWaiting() wp.monitor.IncRunning() utils.Logger.Printf("Worker %d 开始任务: %s", id, task.OrderID) err := utils.Retry(func() error { return task.Func(task.OrderID) }) if err != nil { utils.Logger.Printf("Worker %d 任务失败: %s, 错误: %v", id, task.OrderID, err) } else { utils.Logger.Printf("Worker %d 完成任务: %s", id, task.OrderID) } wp.monitor.DecRunning() wp.monitor.IncFinished() wp.wg.Done() case <-wp.quit: utils.Logger.Printf("Worker %d 收到关闭信号,退出", id) return } } } func (wp *WorkerPool) Start() { utils.Logger.Println("工作池启动") for i := 1; i <= wp.workerCount; i++ { go wp.worker(i) } go wp.monitorLoop() } func (wp *WorkerPool) Submit(task *Task) { wp.wg.Add(1) wp.monitor.IncWaiting() wp.taskChan <- task } func (wp *WorkerPool) monitorLoop() { ticker := time.NewTicker(time.Duration(config.MonitorInterval) * time.Second) defer ticker.Stop() for { select { case <-ticker.C: r, w, f, failed := wp.monitor.Get() utils.Logger.Printf("[监控] 运行:%d | 等待:%d | 完成:%d | 失败:%d", r, w, f, failed) // 运行、等待都为0,代表所有任务全部完成 if r == 0 && w == 0 { utils.Logger.Println("所有任务处理完成,自动停止监控输出") return } case <-wp.quit: utils.Logger.Println("监控循环收到关闭信号,停止监控输出") return } } } func (wp *WorkerPool) Wait() { // 阻塞等待 Ctrl+C / kill 信号 <-wp.quit utils.Logger.Println("优雅关闭中...") // 关闭任务通道,worker 不再接收新任务 close(wp.taskChan) // 等待正在执行的任务全部处理完毕 wp.wg.Wait() utils.Logger.Println("✅ 所有任务执行完毕,服务安全退出") }

调用:

/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:02 # User : geovindu # Product : GoLand # Project : godesginpattern # File : workerpoolbll.go */ package bll import ( "fmt" "godesginpattern/workerpool/config" "godesginpattern/workerpool/core" "godesginpattern/workerpool/tasks" "godesginpattern/workerpool/utils" ) func WorkerPoolMain() { utils.InitLogger() logger := utils.Logger logger.Println("================================================") logger.Println(" 珠宝企业级生产系统(Go 企业级 Worker Pool)") logger.Println("================================================") pool := core.NewWorkerPool(config.WorkerCount, config.QueueMaxSize) pool.Start() totalOrder := 10 logger.Printf("开始提交 %d 个珠宝订单\n", totalOrder) for i := 1; i <= totalOrder; i++ { orderID := fmt.Sprintf("订单-%03d", i) for _, fn := range tasks.FullProcessTasks { pool.Submit(&core.Task{ OrderID: orderID, Func: fn, }) } } logger.Println("✅ 所有订单已提交,按 Ctrl+C 优雅关闭") pool.Wait() }

输出:

http://www.jsqmd.com/news/1057141/

相关文章:

  • 基于MSC8101与MPC8260的DSP聚合网关:架构、性能与选型实战
  • GDB源码管理
  • 3分钟上手!B站会员购抢票神器:免费自动化购票终极指南
  • LLD压力测试实战:从设计验证到性能瓶颈定位
  • 彻底解决Selenium自动化测试中的ChromeDriver版本不匹配问题
  • qi dong wen dang
  • GLM-4.7-Flash量化部署实战:单卡RTX 4090稳定运行指南
  • SH9自指螺旋拓扑框架:基础物理与宇宙学疑难破解研究方案(世毫九实验室原创研究)
  • 别再瞎找了!2026年最值得用的专业降AIGC网站 - 降AI小能手
  • 安徽合肥猎头公司前十名名单及联系电话 - 榜单推荐
  • 如何在Windows 11上轻松安装Android应用?APK安装器完整解决方案
  • Rocky Linux 9 手动部署 Elasticsearch 生产级配置指南
  • Sunshine游戏串流终极指南:跨平台兼容性与零延迟实战技巧
  • 如何在《欧洲卡车模拟2》中实现智能车道保持:ETS2LA插件完全指南
  • Java面向对象程序设计——4~6次作业集总结
  • 告别物理显示器限制:Parsec虚拟显示驱动如何为游戏流媒体和远程办公带来自由?
  • HTML打包EXE 2.3.0更新详解(附最新版本下载地址-含免费内核)
  • 郑州猎头公司哪家好?郑州猎头公司推荐南方新华(电话19922876369) - 榜单推荐
  • 英雄联盟玩家的专业效率工具:League Akari 完整使用指南
  • 2026年官方详解:合肥理工学校招生简章 - hflgzz
  • OpenClaw+Claude 4.5 飞书AI工程化实战:权限、上下文与Skill编排
  • 终极智能分层工具:5分钟掌握LayerDivider插画自动分层技巧
  • 后门攻击系统性评估:从核心机理到跨领域实战检测框架
  • Windows触控板三指拖拽终极指南:5分钟解锁macOS级手势体验
  • 2026年合肥市哪所学校有综合高中班?——推荐合肥理工学校 (寿春实验班) - 教育为先
  • 2026年合肥理工学校多少分能上?招生电话是多少? - hflgzz
  • 终极指南:使用OCAT可视化工具轻松配置OpenCore黑苹果系统
  • 用 ChatGPT 5.5 辅助接口需求拆解:从一句话需求到 OpenAPI、Mock 和测试用例
  • AD软件的使用(2)
  • 基于Kinetis K53的血氧仪设计:从光电原理到嵌入式算法全解析