当前位置: 首页 > news >正文

go: Push Pull Pattern

项目结构:

/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:25 # User : geovindu # Product : GoLand # Project : godesginpattern # File : config.go */ package config import "time" // ZMQ 端口 go get github.com/zeromq/goczmq const ( PortRawMaterial = 5555 PortProcess = 5556 PortQuality = 5557 PortSale = 5558 ) // 超时毫秒 const ZmqTimeoutMs = 3000 // 消息分隔符 const ( MsgSep = "|" KvSep = ":" ) // 连接地址 const ( LocalTcpAddr = "tcp://localhost" BindAddr = "tcp://*" ) // 珠宝品类常量 var JewelryCategory = []string{ "钻石戒指", "黄金项链", "翡翠手镯", "铂金耳钉", "彩宝吊坠", } // 质检等级 var QualityGrade = []string{ "S级(收藏)", "A级(精品)", "B级(常规)", "C级(特价)", } // 休眠间隔 const ( SleepRaw = 2 * time.Second SleepProcess = 1 * time.Second SleepQuality = 500 * time.Millisecond SleepSale = 1 * time.Second IdleSleep = 100 * time.Millisecond // 无消息空转休眠 ) /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : logger.go */ package utils import ( "fmt" "log" "os" "time" ) func GetLogger(module string) *log.Logger { prefix := fmt.Sprintf("[%s] ", module) logger := log.New(os.Stdout, prefix, log.LstdFlags) return logger } // 格式化时间输出辅助 func NowStr() string { return time.Now().Format("2006-01-02 15:04:05") } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : message.go */ package model import ( "godesginpattern/pushpull/config" "strings" ) // BaseMessage 统一消息打包解包 type BaseMessage struct{} // Pack map 转消息字符串 // 首字母大写,跨包可访问 var MsgHelper = &BaseMessage{} func (b *BaseMessage) Pack(data map[string]string) string { var parts []string for k, v := range data { parts = append(parts, k+config.KvSep+v) } return strings.Join(parts, config.MsgSep) } func (b *BaseMessage) Unpack(raw string) map[string]string { res := make(map[string]string) items := strings.Split(raw, config.MsgSep) for _, item := range items { kv := strings.SplitN(item, config.KvSep, 2) if len(kv) == 2 { res[kv[0]] = kv[1] } } return res } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:25 # User : geovindu # Product : GoLand # Project : godesginpattern # File : socket_factory.go */ package core import ( "bufio" "fmt" "godesginpattern/pushpull/config" "godesginpattern/pushpull/utils" "io" "net" "sync" "time" ) var logger = utils.GetLogger("SocketFactory") type TcpPushSocket struct { listener net.Listener clients []net.Conn mu sync.Mutex } func NewTcpPushSocket(port int) *TcpPushSocket { listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { logger.Fatalf("创建Push监听失败: %v", err) } sock := &TcpPushSocket{listener: listener} go sock.acceptLoop() logger.Printf("Push 绑定成功: tcp://localhost:%d", port) return sock } func (s *TcpPushSocket) acceptLoop() { for { conn, err := s.listener.Accept() if err != nil { logger.Printf("Accept错误: %v", err) return } s.mu.Lock() s.clients = append(s.clients, conn) s.mu.Unlock() logger.Printf("新客户端连接: %s", conn.RemoteAddr()) } } func (s *TcpPushSocket) Send(msg string) (int, error) { s.mu.Lock() defer s.mu.Unlock() for _, conn := range s.clients { conn.SetWriteDeadline(time.Now().Add(time.Duration(config.ZmqTimeoutMs) * time.Millisecond)) if _, err := conn.Write([]byte(msg + "\n")); err != nil { logger.Printf("发送失败: %v", err) } } return len(msg), nil } func (s *TcpPushSocket) Close() { s.mu.Lock() defer s.mu.Unlock() for _, conn := range s.clients { conn.Close() } s.listener.Close() } type TcpPullSocket struct { conn net.Conn reader *bufio.Reader } func NewTcpPullSocket(port int) *TcpPullSocket { conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { logger.Fatalf("连接Push失败: %v", err) } logger.Printf("Pull 连接成功: tcp://localhost:%d", port) return &TcpPullSocket{ conn: conn, reader: bufio.NewReader(conn), } } var ErrConnectionClosed = fmt.Errorf("connection closed") func (s *TcpPullSocket) Recv(nonBlocking bool) (string, error) { if nonBlocking { s.conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) } else { s.conn.SetReadDeadline(time.Now().Add(time.Duration(config.ZmqTimeoutMs) * time.Millisecond)) } msg, err := s.reader.ReadString('\n') if err != nil { if nonBlocking && (err == bufio.ErrBufferFull || isTimeoutError(err)) { return "", fmt.Errorf("resource temporarily unavailable") } if err == io.EOF { return "", ErrConnectionClosed } return "", err } return msg[:len(msg)-1], nil } func (s *TcpPullSocket) Close() { s.conn.Close() } func isTimeoutError(err error) bool { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { return true } return false }
/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : rawmaterial.go */ package service import ( "godesginpattern/pushpull/config" "godesginpattern/pushpull/core" "godesginpattern/pushpull/model" "godesginpattern/pushpull/utils" "math/rand" "strconv" "time" ) var rawLog = utils.GetLogger("RawMaterialService") type RawMaterialService struct { pushSock *core.TcpPushSocket } func NewRawMaterialService() *RawMaterialService { return &RawMaterialService{ pushSock: core.NewTcpPushSocket(config.PortRawMaterial), } } func (r *RawMaterialService) RunProduce(total int) { rawLog.Println("原料采购服务启动") rand.Seed(time.Now().UnixNano()) for i := 1; i <= total; i++ { cat := config.JewelryCategory[rand.Intn(len(config.JewelryCategory))] msg := model.MsgHelper.Pack(map[string]string{ "type": "原料订单", "order_id": "RAW_" + strconv.Itoa(i), "category": cat, "status": "已采购待加工", }) _, _ = r.pushSock.Send(msg) rawLog.Printf("推送: %s", msg) time.Sleep(config.SleepRaw) } rawLog.Println("原料订单全部推送完成") time.Sleep(time.Second) r.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : process.go */ package service import ( "godesginpattern/pushpull/config" "godesginpattern/pushpull/core" "godesginpattern/pushpull/model" "godesginpattern/pushpull/utils" "math/rand" "strings" "time" ) var procLog = utils.GetLogger("Process") type ProcessService struct { pullSock *core.TcpPullSocket pushSock *core.TcpPushSocket done chan struct{} } func NewProcessService() *ProcessService { return &ProcessService{ pullSock: core.NewTcpPullSocket(config.PortRawMaterial), pushSock: core.NewTcpPushSocket(config.PortProcess), done: make(chan struct{}), } } func (p *ProcessService) RunPipeline() { procLog.Println("加工车间启动") for { select { case <-p.done: procLog.Println("加工车间停止") return default: msg, err := p.pullSock.Recv(true) if err != nil { if err == core.ErrConnectionClosed { procLog.Println("加工车间连接关闭") return } if err.Error() == "resource temporarily unavailable" { time.Sleep(config.IdleSleep) continue } procLog.Printf("接收错误: %v", err) time.Sleep(config.IdleSleep) continue } data := model.MsgHelper.Unpack(msg) procLog.Printf("收到: %s", msg) time.Sleep(config.SleepProcess + time.Duration(rand.Intn(2000))*time.Millisecond) sendMsg := model.MsgHelper.Pack(map[string]string{ "type": "成品珠宝", "order_id": strings.Replace(data["order_id"], "RAW", "FIN", 1), "category": data["category"], "status": "已加工待质检", }) _, _ = p.pushSock.Send(sendMsg) procLog.Printf("加工完成: %s", sendMsg) } } } func (p *ProcessService) Stop() { close(p.done) p.pullSock.Close() p.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : quality.go */ package service import ( "godesginpattern/pushpull/config" "godesginpattern/pushpull/core" "godesginpattern/pushpull/model" "godesginpattern/pushpull/utils" "math/rand" "time" ) var qualityLog = utils.GetLogger("Quality") type QualityService struct { pullSock *core.TcpPullSocket pushSock *core.TcpPushSocket done chan struct{} } func NewQualityService() *QualityService { return &QualityService{ pullSock: core.NewTcpPullSocket(config.PortProcess), pushSock: core.NewTcpPushSocket(config.PortQuality), done: make(chan struct{}), } } func (q *QualityService) RunPipeline() { qualityLog.Println("质检中心启动") for { select { case <-q.done: qualityLog.Println("质检中心停止") return default: msg, err := q.pullSock.Recv(true) if err != nil { if err == core.ErrConnectionClosed { qualityLog.Println("质检中心连接关闭") return } if err.Error() == "resource temporarily unavailable" { time.Sleep(config.IdleSleep) continue } qualityLog.Printf("接收错误: %v", err) time.Sleep(config.IdleSleep) continue } data := model.MsgHelper.Unpack(msg) qualityLog.Printf("收到: %s", msg) time.Sleep(config.SleepQuality + time.Duration(rand.Intn(1500))*time.Millisecond) sendMsg := model.MsgHelper.Pack(map[string]string{ "type": "质检成品", "order_id": data["order_id"], "category": data["category"], "grade": config.QualityGrade[rand.Intn(len(config.QualityGrade))], "status": "可销售", }) _, _ = q.pushSock.Send(sendMsg) qualityLog.Printf("质检完成: %s", sendMsg) } } } func (q *QualityService) Stop() { close(q.done) q.pullSock.Close() q.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : sale.go */ package service import ( "godesginpattern/pushpull/config" "godesginpattern/pushpull/core" "godesginpattern/pushpull/model" "godesginpattern/pushpull/utils" "math/rand" "strconv" "strings" "time" ) var saleLog = utils.GetLogger("Sale") type SaleService struct { pullSock *core.TcpPullSocket pushSock *core.TcpPushSocket done chan struct{} } func NewSaleService() *SaleService { return &SaleService{ pullSock: core.NewTcpPullSocket(config.PortQuality), pushSock: core.NewTcpPushSocket(config.PortSale), done: make(chan struct{}), } } func (s *SaleService) RunPipeline() { saleLog.Println("销售部启动") for { select { case <-s.done: saleLog.Println("销售部停止") return default: msg, err := s.pullSock.Recv(true) if err != nil { if err == core.ErrConnectionClosed { saleLog.Println("销售部连接关闭") return } if err.Error() == "resource temporarily unavailable" { time.Sleep(config.IdleSleep) continue } saleLog.Printf("接收错误: %v", err) time.Sleep(config.IdleSleep) continue } data := model.MsgHelper.Unpack(msg) saleLog.Printf("收到: %s", msg) time.Sleep(config.SleepSale + time.Duration(rand.Intn(1500))*time.Millisecond) sendMsg := model.MsgHelper.Pack(map[string]string{ "type": "销售完成", "order_id": strings.Replace(data["order_id"], "FIN", "SALE", 1), "category": data["category"], "price": strconv.Itoa(rand.Intn(49000)+1000) + "元", "status": "已售出", }) _, _ = s.pushSock.Send(sendMsg) saleLog.Printf("销售完成: %s", sendMsg) } } } func (s *SaleService) Stop() { close(s.done) s.pullSock.Close() s.pushSock.Close() } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : aftersale.go */ package service import ( "godesginpattern/pushpull/config" "godesginpattern/pushpull/core" "godesginpattern/pushpull/model" "godesginpattern/pushpull/utils" "time" ) var afterLog = utils.GetLogger("AfterSale") type AfterSaleService struct { pullSock *core.TcpPullSocket done chan struct{} } func NewAfterSaleService() *AfterSaleService { return &AfterSaleService{ pullSock: core.NewTcpPullSocket(config.PortSale), done: make(chan struct{}), } } func (a *AfterSaleService) RunConsumer() { afterLog.Println("售后维保启动") for { select { case <-a.done: afterLog.Println("售后维保停止") return default: msg, err := a.pullSock.Recv(true) if err != nil { if err == core.ErrConnectionClosed { afterLog.Println("售后维保连接关闭") return } if err.Error() == "resource temporarily unavailable" { time.Sleep(config.IdleSleep) continue } afterLog.Printf("接收错误: %v", err) time.Sleep(config.IdleSleep) continue } data := model.MsgHelper.Unpack(msg) afterLog.Printf("收到单据: %s", msg) afterLog.Printf("✅ 维保生效 | 品类:%s | 订单:%s", data["category"], data["order_id"]) } } } func (a *AfterSaleService) Stop() { close(a.done) a.pullSock.Close() }

调用:

/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Push & Pull Pattern(推 - 拉)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 21:26 # User : geovindu # Product : GoLand # Project : godesginpattern # File : pushpullbll.go */ package bll import ( "godesginpattern/pushpull/service" "godesginpattern/pushpull/utils" "os" "os/signal" "syscall" "time" ) var ppLog = utils.GetLogger("pushpullbll") func PushPullMain() { sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) raw := service.NewRawMaterialService() proc := service.NewProcessService() quality := service.NewQualityService() sale := service.NewSaleService() after := service.NewAfterSaleService() go func() { <-sig ppLog.Println("退出中...") proc.Stop() quality.Stop() sale.Stop() after.Stop() os.Exit(0) }() go proc.RunPipeline() time.Sleep(200 * time.Millisecond) go quality.RunPipeline() time.Sleep(200 * time.Millisecond) go sale.RunPipeline() time.Sleep(200 * time.Millisecond) go after.RunConsumer() time.Sleep(200 * time.Millisecond) ppLog.Println("✅ 珠宝 Push-Pull 流水线启动完成") raw.RunProduce(5) time.Sleep(5 * time.Second) proc.Stop() quality.Stop() sale.Stop() after.Stop() ppLog.Println("✅ 珠宝 Push-Pull 流水线运行结束") }

输出:

http://www.jsqmd.com/news/1078368/

相关文章:

  • 从任务积压到文件队列:Prometheus业务指标监控与告警指南
  • 2026企业协作网盘推荐:5款企业文档协作平台对比与选型指南
  • 神经算子与GRU-STONe在航空辐射监测中的应用
  • DCU深度技术报告_下篇_性能复盘与研发经验总结
  • PDFSlideshow使用教程,PDF转幻灯片演示工具绿色版下载
  • llamafactory gradient_checkpointing 梯度检查点 通俗完整讲解
  • STM32WB55入门教程(二)
  • 简道云智能助手实测:工单派发→报工→质检→入库,全自动流转到底靠不靠谱?
  • 状态空间模型安全风险剖析:频谱攻击、后门植入与状态饱和的攻防实践
  • NannyML无标签模型监控:实现端到端MLOps性能闭环
  • Docker网络这5种模式,你真的都搞明白了吗?
  • 从CTF EasySQL题解析SQL注入攻防:核心原理与实战绕过技巧
  • 5分钟打造万能启动盘:Ventoy彻底告别重复格式化时代
  • HDFS javaAPI-windows的IDEA中java文件在linux中的hadoop平台运行
  • P89LPC92x1中断与I/O配置实战:从原理到避坑指南
  • 脉冲神经网络多级脉冲设计与能效优化
  • HTTPS 性能优化完全指南:从原理、硬件到架构的全链路调优实战
  • 手动构造链表和二叉树
  • SaaS和低代码厂商的智能体转型路径:两场范式级转型的路线图
  • 2026命理软件付费前怎么看?八字排盘App要看使用频率和可替代成本
  • oauth2授权码模式完整流转
  • DonkeyCar存储系统深度解析:SD卡选型、ext4优化与路径陷阱
  • JSON Schema验证实际应用场景案例
  • JMeter压力测试实战:AI音效生成服务性能调优全解析
  • OpenCloudOS Server 9 安装 Nginx 完整指南
  • MHmarkets:注重效率的使用者更在意的投教内容,这里做个标准对照
  • 项目上线了
  • 【题解】WebGoC绘图题目精选整合集
  • 【Java踩坑笔记】【基础语法篇】05_重写equals不重写hashCode会怎样?
  • 小白stm32入门教程学习记录:3-2 LED闪烁流水灯