当前位置: 首页 > news >正文

singleflight

singleflight包是 Go 官方扩展库golang.org/x/sync中的一个工具,主要用于抑制重复的函数调用。其经典场景是防止缓存击穿:当多个并发请求同时需要同一个资源(例如同一个 key 的数据库查询)时,只让第一个请求真正执行,其余请求则等待并共享第一个请求的结果。

下面结合你给出的源码,从结构、核心流程、panic/goexit 处理等几个方面进行详细讲解。


1. 核心数据结构

call(在途或已完成的调用)

go

type call struct { wg sync.WaitGroup val any err error dups int chans []chan<- Result }
  • wg:用于等待原始调用完成。重复调用者会Wait在该 WaitGroup 上,原始调用完成时Done

  • val/err:存放fn返回的结果和错误。

  • dups:记录有多少个重复调用共享了这次结果(包括自己不算,只有后来者才增加dups)。

  • chans:仅用于DoChan方法,存放所有等待结果的 channel。

Group(调用组)

go

type Group struct { mu sync.Mutex m map[string]*call }
  • 通过key来区分不同任务。相同 key 的调用会合并。

  • 使用互斥锁保护 map,map 惰性初始化。

Result

go

type Result struct { Val any Err error Shared bool }

Shared表示该结果是否被多个调用者共享(即有重复调用发生)。


2. 具体方法详解

Do方法

go

func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool)

流程:

  1. 加锁,检查 map 是否初始化

  2. 如果该 key 已有call(说明有请求正在执行)

    • c.dups++记录重复次数。

    • 解锁,然后c.wg.Wait()等待原始调用完成。

    • 等待结束后检查错误类型:

      • 若是*panicError(说明原始函数发生了 panic),直接panic(e)将 panic 传播给当前等待者,保持与原始调用一致的行为。

      • 若是errGoexit(说明原始函数调用了runtime.Goexit),则调用runtime.Goexit()退出当前 goroutine。

      • 否则正常返回c.val, c.err, trueshared = true)。

  3. 如果没有call

    • 新建callwg.Add(1),放入 map,解锁。

    • 调用g.doCall(c, key, fn)实际执行用户函数。

    • 返回结果,并根据c.dups > 0决定shared值。

DoChan方法

go

func (g *Group) DoChan(key string, fn func() (any, error)) <-chan Result

Do类似,但不阻塞等待结果,而是返回一个容量为 1 的 channel。
流程:

  1. 创建ch := make(chan Result, 1)

  2. 加锁后,若发现已有call,则将ch追加到c.chans切片中,dups++,然后直接返回ch

  3. 若无call,新建call,把ch作为chans的初始成员,启动一个新 goroutine 执行g.doCall,然后返回ch

调用者可从 channel 中收结果,但当发生 panic 或Goexit时,channel 不会收到任何值,也不会被关闭。这是故意的设计,文档明确说明“返回的 channel 不会被关闭”。


3.doCall—— 调用执行核心

doCall负责实际调用fn,并处理返回、panic 和runtime.Goexit三种情况。

难点在于区分普通 panic 和runtime.Goexit
runtime.Goexit会终止当前 goroutine,但会先执行所有 defer。在 defer 中调用recover会返回nil,因此无法直接用recover的返回值来区分。标准库的做法是利用“双重 defer”配合标志位。

双重 defer 结构
func (g *Group) doCall(c *call, key string, fn func() (any, error)) { normalReturn := false recovered := false // 外层 defer:最终清理和结果分发 defer func() { // 如果既不是正常返回,也没有 recover 到 panic,那就是 Goexit if !normalReturn && !recovered { c.err = errGoexit } g.mu.Lock() c.wg.Done() if g.m[key] == c { delete(g.m, key) // 清理 map } g.mu.Unlock() // 根据 err 类型决定行为 if e, ok := c.err.(*panicError); ok { // 原始函数发生了 panic if len(c.chans) > 0 { go panic(e) // 在新 goroutine 中 panic,确保无法被恢复 select {} // 阻塞当前 goroutine,使其出现在 crash dump 中 } else { panic(e) // 直接 panic } } else if c.err == errGoexit { // 已是 Goexit 流程,无需再调用 } else { // 正常情况,向所有 DoChan 的等待者发送结果 for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }() // 内层匿名函数:调用 fn 并 recover func() { defer func() { if !normalReturn { // 发生了 panic,recover 捕获 if r := recover(); r != nil { c.err = newPanicError(r) } } }() c.val, c.err = fn() normalReturn = true }() // 如果内层函数没有正常返回(normalReturn 仍为 false), // 且没有 recover(即 recover 返回 nil,意味着 runtime.Goexit), // 则 recovered 保持 false,外层 defer 能据此判定为 Goexit。 if !normalReturn { recovered = true } }

执行逻辑解析:

  1. 内层匿名函数func()执行fn()

    • fn正常返回 →normalReturn = true

    • fn中发生 panic → 内层 defer 的recover()捕获到值,c.err = newPanicError(r)normalReturn仍为 false。此时recover返回非 nil,所以外层的!recovered仍成立?等一下:内层 defer 是在if !normalReturn里调用的recover(),一旦 recover 到值,c.err被设置。然后内层函数结束,此时控制流回到外层if !normalReturn { recovered = true }。所以若发生 panic,recovered被置为 true。

    • fn调用了runtime.Goexit→ 该 goroutine 停止执行fn后面的代码,normalReturn保持 false,然后执行所有 defer。内层 defer 中的recover()返回 nil(因 Goexit 不算 panic),因此c.err保持 nil。接着recovered也保持 false(因为if !normalReturn { recovered = true }这句还没执行 goroutine 就退出了?不对,runtime.Goexit会执行所有 defer 然后退出,所以在外层 defer 执行前,控制流不会到达if !normalReturn { recovered = true }。因此外层 defer 中看到!normalReturn && !recovered成立,将c.err设为errGoexit

  2. 外层 defer 保证无论何时都会跑,它在最后根据c.err的类型进行处理:

    • panicError:重新把 panic 抛出去,但为了避免被调用者的recover捕获后无法继续,代码做了分支处理。有chans时(即有人用DoChan等待),启动新 goroutine 去 panic,自身则永远阻塞在select {},这样当前 goroutine 会留在所有 goroutine 转储中,便于排查。

    • errGoexit:啥也不做,外层 defer 结束后,goroutine 将正式退出(Goexit 流程继续)。

    • 正常:向所有DoChanchannel 发送Result


4. 辅助类型和错误处理

panicError
type panicError struct { value any stack []byte }
  • 将 panic 的值和发生时的调用栈包装在一起。

  • 实现了Error接口,输出格式为"panic value\n\n堆栈"

  • Unwrap方法允许通过errors.Is/As提取原始错误(如果valueerror接口)。

newPanicError
  • 调用debug.Stack()获取堆栈。

  • 裁剪掉第一行(形如goroutine 1 [running]:),因为这行信息在后续传播时可能已过时。

  • 返回*panicError

errGoexit
  • 一个哨兵错误,表示用户函数中调用了runtime.Goexit


5.Forget方法

func (g *Group) Forget(key string)

简单地从 map 中删除指定 key。这样后续相同 key 的Do调用就会发起新的实际请求,而不是等待可能还在执行的旧请求。常用在异步场景下,想让某个 key 在未来重新触发加载。


6. 使用上的注意事项

  • 结果共享的正确性shared只表示是否有多个 goroutine 获取到了结果,不代表结果本身可以不加锁并发读写。返回值val若为指针或引用类型,调用方需要自行同步。

  • DoChan 的 panic 泄漏:如代码注释所述,当fn发生 panic 且使用DoChan时,等待结果的 channel 将永远收不到数据且不关闭,这会导致 goroutine 泄漏。因此使用DoChan时,最好配合select和超时,或者确保fn不会 panic。

  • Goexit 的传播:通过将errGoexit传递给等待者,再由等待者调用runtime.Goexit()来维持原始语义,这是一个很精妙的设计。

  • 双重 defer 技巧:这段代码是 Go 中区分 panic 和runtime.Goexit的经典范例,值得细读。

官方的singleflight库虽然核心的去重机制很优秀,但也存在一些设计上的局限和边界情况问题;而社区涌现的许多替代库正是为了解决这些痛点而生的,各自有不同的侧重点。

官方库(singleflight)的设计局限与问题

  1. 缺乏原生超时/取消机制:官方库的方法不支持传入context.Context,这是一个关键限制。如果飞行中的请求阻塞(如DB卡死),所有等待者将无限期挂起,这比缓存击穿本身风险更高。通常需要使用方自己封装一层来解决。

  2. interface{}导致类型不安全:官方库使用interface{}传递和返回结果,这意味着放弃了编译时类型检查,必须在运行时进行类型断言,容易引入 panic。同时,这也会带来额外的内存分配和 GC 压力。

  3. 无法处理常态化的批量请求:如果你有常态化的批量查询需求(例如,一批 ID 获取用户信息),等待每个 ID 的独立请求合并效率很低。而官方库不支持在回调中传递多个 Key,一次性批量获取它们的结果。

  4. Forget方法相关的并发 Bug:当异步的ForgetdoCall的清理逻辑并发执行时,Forget删除 key 后,doCall执行完毕会再次删除,这可能会误删其他协程刚刚设置的新 key。官方修复方案是引入一个forgotten布尔值标记,此修复已合入 master 分支。

  5. 关于DoChan方法中go panic(e)的冗余性:源码中处理 panic 的方式曾引发社区对代码冗余的讨论。维护者的意图是保留当前 goroutine 的现场以便调试,但其副作用是让 panic 永远无法被上层恢复,代价是程序直接崩溃。

社区替代库的核心特点

这些库大多都支持泛型,并且绝大多数也支持context.Context。在此基础上,它们各自的突出特点和对比如下:

替代库最突出的差异化特点性能/设计亮点
github.com/samber/go-singleflightx支持批量请求 (Batching)支持在回调中传入多个 Key 批量获取结果;提供分片组 (Sharded Groups),减少锁竞争;支持Nullable Result
github.com/oy3o/singleflight极致零内存分配 (Zero Allocation)通过sync.Pool等手段,在缓存未命中场景下也能实现0 内存分配,对 GC 极度友好。基准测试显示延迟约为 140ns,优于官方的 190ns。
github.com/brunomvsouza/singleflight追求100% API兼容风格保守,力求与原版100%兼容。版本号跟随上游,可作为泛型版的直接替代品。
github.com/marwan-at-work/singleflight原生 Context 支持设计上引入了context.Context,当 Context 被取消(如HTTP请求断开)时,等待者可以提前退出,避免无效等待。

通用的避坑指南与最佳实践

  1. 配置超时与熔断:必须为"飞行中"的函数调用设置超时,或引入熔断机制,防止底层服务故障导致所有请求堆积。

  2. 精确设计 Key:确保 Key 能唯一代表需要去重的业务操作。避免在 Key 中包含无关的动态参数(如时间戳),否则去重会失效。

  3. 保持 Group 为单例singleflight.Group实例必须在整个服务生命周期内全局唯一(通常定义为包级变量),才能保证去重效果。

  4. 警惕高频 Key 的性能开销:在高并发下,如果 Key 数量巨大且大部分都不可重复,sync.Mutex的锁竞争反而会带来微小的性能损耗(约5-10μs),虽然不大,但在极端性能敏感场景值得注意。

  5. 正确使用指针类型:当fn返回的是指针(如*User)时,多个 goroutine 会共享该指针。务必确保后续对该指针的读取操作是并发安全的,避免数据竞争。

下面是golang官方库代码

package singleflight // import "golang.org/x/sync/singleflight" import ( "bytes" "errors" "fmt" "runtime" "runtime/debug" "sync" ) // errGoexit indicates the runtime.Goexit was called in // the user given function. var errGoexit = errors.New("runtime.Goexit was called") // A panicError is an arbitrary value recovered from a panic // with the stack trace during the execution of given function. type panicError struct { value any stack []byte } // Error implements error interface. func (p *panicError) Error() string { return fmt.Sprintf("%v\n\n%s", p.value, p.stack) } func (p *panicError) Unwrap() error { err, ok := p.value.(error) if !ok { return nil } return err } func newPanicError(v any) error { stack := debug.Stack() // The first line of the stack trace is of the form "goroutine N [status]:" // but by the time the panic reaches Do the goroutine may no longer exist // and its status will have changed. Trim out the misleading line. if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { stack = stack[line+1:] } return &panicError{value: v, stack: stack} } // call is an in-flight or completed singleflight.Do call type call struct { wg sync.WaitGroup // These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. val any err error // These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. dups int chans []chan<- Result } // Group represents a class of work and forms a namespace in // which units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized } // Result holds the results of Do, so they can be passed // on a channel. type Result struct { Val any Err error Shared bool } // Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. // The return value shared indicates whether v was given to multiple callers. func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 } // DoChan is like Do but returns a channel that will receive the // results when they are ready. // // The returned channel will not be closed. func (g *Group) DoChan(key string, fn func() (any, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch } // doCall handles the single call for a key. func (g *Group) doCall(c *call, key string, fn func() (any, error)) { normalReturn := false recovered := false // use double-defer to distinguish panic from runtime.Goexit, // more details see https://golang.org/cl/134395 defer func() { // the given function invoked runtime.Goexit if !normalReturn && !recovered { c.err = errGoexit } g.mu.Lock() defer g.mu.Unlock() c.wg.Done() if g.m[key] == c { delete(g.m, key) } if e, ok := c.err.(*panicError); ok { // In order to prevent the waiting channels from being blocked forever, // needs to ensure that this panic cannot be recovered. if len(c.chans) > 0 { go panic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. } else { panic(e) } } else if c.err == errGoexit { // Already in the process of goexit, no need to call again } else { // Normal return for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }() func() { defer func() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // whether this is a panic or a runtime.Goexit. // // Unfortunately, the only way we can distinguish the two is to see // whether the recover stopped the goroutine from terminating, and by // the time we know that, the part of the stack trace relevant to the // panic has been discarded. if r := recover(); r != nil { c.err = newPanicError(r) } } }() c.val, c.err = fn() normalReturn = true }() if !normalReturn { recovered = true } } // Forget tells the singleflight to forget about a key. Future calls // to Do for this key will call the function rather than waiting for // an earlier call to complete. func (g *Group) Forget(key string) { g.mu.Lock() delete(g.m, key) g.mu.Unlock() }
http://www.jsqmd.com/news/773619/

相关文章:

  • AI模型平台选型革命:国产新秀模力方舟如何打破大厂垄断格局
  • 汽车CAN总线实时系统设计与响应时间分析
  • 终极指南:5分钟快速上手Open-Lyrics,让AI为你的音频自动生成精准字幕
  • 洛谷P1074 [NOIP 2009 提高组] 靶形数独题解
  • Fernflower:Java字节码智能反编译的艺术与实践
  • 如何用FUnIE-GAN打破水下视觉迷雾?3分钟掌握实时图像增强核心技术
  • 零基础如何做车载嵌入式开发?学好C++至关重要
  • 【DAY 1.数据结构之反转链表1.牛客网BM1】
  • 多智能体协作框架:AI驱动的软件开发团队自动化实践
  • OpenCore Legacy Patcher:突破苹果硬件限制的系统兼容性架构解析
  • Gemini3.1Pro一键生成高效教研方案
  • 氢燃料微型燃气轮机增程系统建模及控制策略【附代码】
  • 开源中国的国产化突围:构建安全可控的智能研发生态体系
  • 分布式搜索引擎:Elasticsearch 从入门到实战
  • 高通全新骁龙芯片将大幅减少中端安卓手机卡顿现象
  • LTC3783 LED驱动控制器设计与效率优化详解
  • 嵌入式开发新利器:轻量级芯片包管理器vpm实战指南
  • BepInEx完整指南:5分钟掌握Unity游戏插件框架的安装与配置
  • PatreonDownloader终极指南:轻松备份Patreon付费内容的完整解决方案
  • 交互式学习平台Vibe-Learn:架构设计与实战搭建指南
  • 三维计算几何基础
  • 从DS18B20到BMI088:聊聊那些年我用过的传感器,以及如何为你的项目选型
  • 金融智能体开发实战:基于eforest-agent-skills构建领域专家Agent
  • Python科研绘图实践【13】——线性回归拟合图附代码
  • taotoken 的按 token 计费模式让实验性项目成本可控
  • STM32H7实战:用MPU给你的关键外设(如FMC)加把锁,防止程序跑飞误操作
  • 基于向量数据库与语义搜索的智能代码片段管理实践
  • AI工具搭建自动化视频生成LoHa
  • 基于异步IO与模块化设计的Python数据抓取框架Catclaw实战指南
  • 利用MCP协议与mcp-conf工具,为AI编程助手构建深度项目感知能力