当前位置: 首页 > news >正文

Golang基于Redis的高性能发布订阅(PubSub)系统设计与实现

引言

在分布式系统中,发布订阅(Pub/Sub)是最常见的异步通信模型之一。
本项目在core/pkg/pubsub中基于 Redis 实现了一套高性能、可配置、支持批量发送的 PubSub 组件,用于在多节点之间高效传递事件消息。

参考代码:
https://github.com/openskeye/go-vss/blob/main/core/pkg/pubsub


一、背景与需求

在实际业务中,我们需要满足以下场景:

  • 多节点消息广播:IM 消息、通知、事件推送等,需要跨节点分发。
  • 高吞吐写入:单节点每秒可能产生大量消息,如果直接一条一发,会给 Redis 和网络带来较大压力。
  • 可控的延迟与批量:需要在「实时性」和「吞吐」之间做折中,通过配置控制。
  • 可观测与可靠:出现异常能够发告警邮件,避免静默失败。

设计目标

  • 批量发送:同一频道的多条消息聚合后一次性 Publish,减少 Redis QPS 与网络开销。
  • 可配置节流策略:支持按消息数量与时间窗口两种维度做发送节流。
  • 健壮的并发与关闭机制:在高并发下不会 panic,不会出现数据竞争。
  • 简单易用的 API:对业务方暴露的只有SendSubscribe两个核心入口。

二、整体架构与核心组件

2.1 关键结构体

typeConfstruct{Email tps.YamlEmail// 消息列表最大容量MaxMessageCount,// 心跳检测清空数据周期HeartbeatInterval,// 没有消息进入是最后一次发送时间间隔SendIntervalint// 当前节点域名Hoststring}
// 内部状态typepsstruct{Ctx context.Context conf*Conf closeOnce sync.Once MessagechanredisPublishMessageChanType// 生产者写入的消息Messages sync.Map// channel -> []stringSendTimestamps sync.Map// channel -> int64(lastSend)PublishMessageschan*redisMessageChanType// 待发布到 Redis 的批量消息ExitSignalchanerrorIsClosedboolclosedint32}// 对外暴露的 Redis 客户端包装typeRedisClientstruct{*ps isClusterboolclient*redis.Client clusterClient*redis.ClusterClient}

2.2 系统时序图:消息发布全流程

RedisClient.SubscribeRedispublishProc(批量发布)heartbeatProc(心跳flush)queueProc(缓冲队列)RedisClient.Send业务方(Producer)RedisClient.SubscribeRedispublishProc(批量发布)heartbeatProc(心跳flush)queueProc(缓冲队列)RedisClient.Send业务方(Producer)消息发布流程alt[队列满 或 超过SendInterval]loop[批量发布]消息订阅流程Send(channel, message)写入 Message chan按channel累积消息\n(channel ->> []string)写入 PublishMessages定时遍历Messages\n将剩余消息写入 PublishMessages过滤空消息、组装[]stringPublish(channel, json(messages))Subscribe(channel)推送 json([]string)反序列化为 []string通过worker pool并发执行completion

三、核心流程解析

3.1 消息生产:Send 与内部队列

业务方只需要调用一个简单的接口:

// 推送消息func(r*RedisClient)Send(channelstring,message[]byte){ifr.isClosed(){return}r.Message<-redisPublishMessageChanType{channel,string(message)}}

特性:

  • 非阻塞场景可控Message通道有缓冲容量(5000),足以应对普通峰值。
  • 统一入口:所有发布请求都汇聚到queueProc,集中做批量与节流控制。

3.2 队列聚合与按 channel 维度的节流

queueProc是整个组件的“心脏”,负责:

  • 将同一channel的消息聚合为一个[]string
  • 根据配置决定何时触发一次批量发送。
  • 维护每个 channel 的最近发送时间。

核心逻辑简化如下(伪代码):

for{select{case<-r.Ctx.Done():r.close()r.sendEmail("redis publish 消息队列异常结束",...)returncaseval:=<-r.Message:ifr.isClosed()||val.channel==""{continue}now:=nowMilli()// 取出当前 channel 的消息列表msgs:=loadOrInitMessages(val.channel)// 读取上次发送时间lastSend:=loadOrInitLastSend(val.channel,now)// 满足任一条件则触发批量发送iflen(msgs)>=conf.MaxMessageCount||now-lastSend>=int64(conf.SendInterval){PublishMessages<-{channel:val.channel,messages:msgs}updateLastSend(val.channel,now)clearMessages(val.channel)}// 追加当前消息appendMessage(val.channel,val.message)caseerr:=<-r.ExitSignal:r.sendEmail("redis publish 消息队列异常退出",...,err.Error())r.close()return}}

关键点:

  • 按 channel 维度独立节流MessagesSendTimestamps都是按 channel 分片管理,不同业务频道互不干扰。
  • 双条件触发
    • 数量阈值:MaxMessageCount
    • 时间阈值:SendInterval毫秒
  • 配置驱动:所有阈值都由Conf控制,支持按场景调参。

3.3 心跳 flush:避免残留

仅依靠SendInterval可能会出现一种情况:

  • 某个 channel 短时间内只收到少量消息,数量未达阈值,但长时间也没有新的消息进入。

为了解决这个数据残留问题,引入了heartbeatProc

func(r*RedisClient)heartbeatProc(){ticker:=time.NewTicker(time.Millisecond*time.Duration(r.conf.HeartbeatInterval))deferticker.Stop()for{select{case<-r.Ctx.Done():returncase<-ticker.C:ifr.isClosed(){return}now:=nowMilli()r.Messages.Range(func(key,value any)bool{ifr.isClosed(){returnfalse}channel,ok:=key.(string)msgs,ok2:=value.(redisMessages)if!ok||!ok2||len(msgs)==0{returntrue}r.PublishMessages<-&redisMessageChanType{channel:channel,messages:msgs}r.SendTimestamps.Store(channel,now)r.Messages.Store(channel,redisMessages(nil))returntrue})}}}

作用:

  • 定期扫描所有 channel,主动 flush 剩余消息。
  • 避免“低频” channel 的消息长时间滞留在内存中。

3.4 批量发布:publishProc

publishProcPublishMessages中的批量消息真正写入 Redis:

for{select{case<-r.Ctx.Done():returncasedata:=<-r.PublishMessages:ifr.isClosed()||data==nil||len(data.messages)==0{continue}// 过滤空消息,组装最终批量消息msgs:=filterEmpty(data.messages)iflen(msgs)==0{continue}payload,err:=JSONMarshal(msgs)iferr!=nil{LogError("redis publish["+data.channel+"] 消息序列化失败")continue}if_,err:=r.publish(data.channel,payload).Result();err!=nil{ifr.isClosed(){return}r.ExitSignal<-errreturn}}}

特点:

  • 统一 JSON 批量格式:订阅端一次性拿到[]string,减少流量与解析开销。
  • 错误上报:发布失败会通过ExitSignal通知queueProc,并最终触发邮件告警。

四、订阅端设计:高并发安全消费

4.1 基础订阅流程

订阅端接口:

func(r*RedisClient)Subscribe(channelstring,completionfunc(messages RedisPublishMessageType)){ps:=r.subscribe(channel)deferfunc(){_=ps.Close()}()const(workerCount=10bufferSize=100)msgCh:=make(chanRedisPublishMessageType,bufferSize)varwg sync.WaitGroup// 固定 worker 数并发消费fori:=0;i<workerCount;i++{wg.Add(1)gofunc(){deferwg.Done()foritem:=rangemsgCh{completion(item)}}()}deferfunc(){close(msgCh)wg.Wait()}()foritem:=rangeps.Channel(){ifitem.Payload==""{continue}varlist RedisPublishMessageTypeiferr:=functions.JSONUnmarshal([]byte(item.Payload),&list);err!=nil{functions.LogError("消息解析失败, err: %s",err)continue}// 统一走 worker pool,避免每条消息起一个 goroutinemsgCh<-list}}

设计要点:

  • 固定 worker 数workerCount控制并发度,防止高 QPS 时疯狂起 goroutine。
  • buffered channel 缓冲bufferSize提供背压缓冲区,在短暂突发时不上来就阻塞。
  • 统一退出机制defer close(msgCh)+wg.Wait()确保所有消息处理完毕再返回,避免 goroutine 泄漏。

4.2 订阅时序图

completion回调Worker PoolRedisClient.SubscribeRediscompletion回调Worker PoolRedisClient.SubscribeRedisloop[并发消费]推送 payload(json: []string)JSONUnmarshal(payload)msgCh <- []stringcompletion([]string)

五、并发安全与优雅关闭

5.1 防止重复 close 与数据竞争

ps内部使用:

  • closeOnce sync.Once:保证close()至多执行一次。
  • closed int32+atomic:提供isClosed()/markClosed()两个方法,并发安全判断状态。
func(r*RedisClient)isClosed()bool{returnatomic.LoadInt32(&r.closed)==1}func(r*RedisClient)markClosed(){r.IsClosed=trueatomic.StoreInt32(&r.closed,1)}func(r*RedisClient)close(){r.closeOnce.Do(func(){r.markClosed()close(r.Message)close(r.PublishMessages)close(r.ExitSignal)})}

效果:

  • 即使多个 goroutine 同时触发关闭逻辑,也不会出现close of closed channel的 panic。
  • 所有发送和消费逻辑都会优先调用isClosed()判断是否需要提前退出。

5.2 异常告警机制

当:

  • Ctx.Done()导致队列异常结束,或
  • publishProc发布失败触发ExitSignal

都会调用sendEmail发送邮件告警,包含:

  • 节点信息(conf.Host
  • 邮件配置(conf.Email
  • 简要错误描述

这保证了发布订阅链路出问题时,不会静默失效


六、配置参数与调优建议

6.1 关键配置

配置项含义典型建议值影响维度
MaxMessageCount单个 channel 批量最大条数100 ~ 5000吞吐量 / 延迟 / 内存
SendInterval未达数量阈值时的最大发送间隔(ms)50 ~ 1000实时性 / 批量程度
HeartbeatInterval心跳强制 flush 周期(ms)500 ~ 5000尾部消息滞留时间
Email告警邮件配置视环境而定故障可观测性
Host当前节点标识,用于日志/路由节点域名/IP运维排查

6.2 调优建议

  • 实时性优先
    • SendInterval调小(如 50~100ms),MaxMessageCount适度降低;
    • HeartbeatInterval可以略大(如 1000ms)。
  • 吞吐优先
    • 提高MaxMessageCount,适当放大SendInterval
    • 结合 Redis 集群能力合理评估单 channel 流量。
  • 内存敏感场景
    • 限制MaxMessageCount,避免单个 channel 堆积过多消息;
    • HeartbeatInterval不宜过大,避免残留过久。

七、总结

这个基于 Redis 的 PubSub 组件通过:

  • 按 channel 维度的批量聚合与节流策略
  • 心跳驱动的尾部 flush 机制
  • 固定 worker pool 的订阅消费模型
  • 并发安全的关闭与异常告警机制

在保证高吞吐的同时,也兼顾了实时性、可靠性与可维护性

在需要跨节点消息广播、事件推送、高频通知的场景下,这套设计可以作为一个通用的基础通信组件,进一步和业务协议封装后即可在多项目间复用。

http://www.jsqmd.com/news/578006/

相关文章:

  • Fish Speech 1.5优化指南:调整参数让语音更自然、更逼真
  • 实战驱动:基于快马平台生成集成openclaw的ubuntu自动化测试项目实例
  • Megatron-LM源码解析:Tensor与Sequence并行训练中的通信优化策略
  • 效率提升:用快马生成脚本自动化你的zotero文献整理与格式化工作
  • 保姆级教程:手把手教你用VCSA 8.0.3接管Windows AD域,实现统一登录
  • 用ESP32-WROOM-32和xiaozhi开源项目,5分钟搞定一个智能温湿度监测站(附Home Assistant联动配置)
  • 跨平台运行Android应用:APK Installer实现Windows系统无缝集成与性能优化指南
  • 4/2
  • 别再手动算脉冲了!用STM32CubeMX的编码器模式,5分钟搞定电机测速(附F103C8T6配置)
  • 3种简单方法实现Windows与Linux双系统文件无缝共享的终极方案
  • FPGA开发板吃灰?用Quartus II和你的旧板子复活一个硬件乘法器(4位乘数/拨码开关输入/LED显示)
  • 灵感不等待:无需安装IDEA,在快马平台快速构建微服务原型
  • 第五章 认知声纳波形设计的强化学习求解
  • 避坑指南:鸿蒙AVPlayer开发音乐App时,你可能会遇到的5个典型问题及解决方案
  • 提升效率:基于快马生成openclaw标准化Docker部署配置,一键完成环境搭建
  • CDN 海外访问不稳定?全球节点与 BGP 线路优化方案
  • 从GRACE gfc到可用数据:一个MATLAB脚本搞定CSR/GFZ/JPL三大机构数据预处理
  • AI辅助开发新体验:让快马智能模型帮你重构与优化日记应用代码
  • 保姆级避坑指南:在Ubuntu 22.04上为LAMMPS配置Kokkos+MPI+GPU(CUDA 12.4实测)
  • BellSoft Liberica JDK:为何成为JetBrains开发工具的首选运行时
  • Golang并发安全泛型集合(Set)设计与实现
  • 保姆级教程:在GD32F103上用Keil MDK5和FreeRTOS 202411.00创建你的第一个多任务LED闪烁项目
  • 从CVE-2018-15473看协议安全:一个数据包畸形引发的OpenSSH‘侧信道’故事
  • 基于联合概率数据关联滤波器(JPDA)的Matlab代码:实时绘制目标与杂波的动态跟踪与RMS...
  • LVGL缓冲区机制深度解析:从源码看性能优化与场景适配
  • 新手避坑指南:Verilog批量例化模块时容易忽略的3个细节(含波形调试演示)
  • 3大场景攻克视频监控难题:WVP-GB28181-Pro开源解决方案实战指南
  • 别再用requests库硬爬了!Python新手必看的robots.txt检查与BeautifulSoup实战避坑指南
  • 遥感小白看过来!无需编程5分钟搞定Landsat8数据下载(2023最新版)
  • 突破模拟器限制的APK直装方案:Windows系统的Android应用无缝运行技术