当前位置: 首页 > news >正文

Cache缓存项目学习3

Group设计
type Group struct { name string getter Getter mainCache *Cache peers PeerPicker loader *singleflight.Group expiration time.Duration // 缓存过期时间,0表示永不过期 closed int32 // 原子变量,标记组是否已关闭 stats groupStats // 统计信息 }

Getter

作为当缓存中拿不到数据后与数据库的连接接口。

type Getter interface { Get(ctx context.Context, key string) ([]byte, error) }
Cache

是对存储的封装,通过store接口提供可插拔底层缓存设计。支持LRU2、LRU等缓存接口。

Options提供缓存配置。其余是辅助记录信息。

PeerPicker

提供peer选择器接口。

type ClientPicker struct { selfAddr string svcName string mu sync.RWMutex consHash *consistenthash.Map clients map[string]*Client etcdCli *clientv3.Client ctx context.Context cancel context.CancelFunc }

通过一致性哈希来选择节点。从clients中获取grpc客户端并与之通信。

Loader

负责单飞机制,一批请求只允许放行一个,其他请求共享结果。

// Do 针对相同的key,保证多次调用Do(),都只会调用一次fn func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // Check if there is already an ongoing call for this key if existing, ok := g.m.Load(key); ok { c := existing.(*call) c.wg.Wait() // Wait for the existing request to finish return c.val, c.err // Return the result from the ongoing call } // If no ongoing request, create a new one c := &call{} c.wg.Add(1) g.m.Store(key, c) // Store the call in the map // Execute the function and set the result c.val, c.err = fn() c.wg.Done() // Mark the request as done // After the request is done, clean up the map g.m.Delete(key) return c.val, c.err }

各模块详细介绍

Getter
type Getter interface { Get(ctx context.Context, key string) ([]byte, error) } // GetterFunc 函数类型实现 Getter 接口 type GetterFunc func(ctx context.Context, key string) ([]byte, error) // Get 实现 Getter 接口 func (f GetterFunc) Get(ctx context.Context, key string) ([]byte, error) { return f(ctx, key) }

Getter使用策略模式,定义了使用接口,并使用GetterFunc类型自动实现了Get接口,让传参既可以是简单方法也可以是结构体。

Getter是作为数据库接口,当加载数据时发现缓存没有数据时对远程数据库进行数据加载。

func (g *Group) loadData(ctx context.Context, key string) (value ByteView, err error) { // 尝试从远程节点获取 ........ // 从数据源加载 bytes, err := g.getter.Get(ctx, key) if err != nil { return ByteView{}, fmt.Errorf("failed to get data: %w", err) } return ByteView{b: cloneBytes(bytes)}, nil }
Cache

Cache是对本地缓存存储实例的封装。

type Cache struct { mu sync.RWMutex store store.Store // 底层存储实现 opts CacheOptions // 缓存配置选项 hits int64 // 缓存命中次数 misses int64 // 缓存未命中次数 initialized int32 // 原子变量,标记缓存是否已初始化 closed int32 // 原子变量,标记缓存是否已关闭 }

Store接口是底层缓存实例的封装,opts是与store相关的配置信息。

store
//根据缓存类型和具体配置进行实例化 c.store = store.NewStore(c.opts.CacheType, storeOpts) func NewStore(cacheType CacheType, opts Options) Store { switch cacheType { case LRU2: return newLRU2Cache(opts) case LRU: return newLRUCache(opts) default: return newLRUCache(opts) } } //store 接口声明 type Store interface { Get(key string) (Value, bool) Set(key string, value Value) error SetWithExpiration(key string, value Value, expiration time.Duration) error Delete(key string) bool Clear() Len() int Close() }
store接口实例
type lruCache struct { mu sync.RWMutex list *list.List // 双向链表,用于维护 LRU 顺序 items map[string]*list.Element // 键到链表节点的映射 expires map[string]time.Time // 过期时间映射 maxBytes int64 // 最大允许字节数 usedBytes int64 // 当前使用的字节数 onEvicted func(key string, value Value) cleanupInterval time.Duration cleanupTicker *time.Ticker closeCh chan struct{} // 用于优雅关闭清理协程 }

cleanupInterval:清理时间间隔,超过则启动清理

cleanupTicker:根据cleanupInterval生成的定时器,定时执行清理任务

func newLRUCache{ c.cleanupTicker = time.NewTicker(c.cleanupInterval) go c.cleanupLoop() } func (c *lruCache) cleanupLoop() { for { select { case <-c.cleanupTicker.C: c.mu.Lock() c.evict() //清理 c.mu.Unlock() case <-c.closeCh: return } } }

list:维护元素顺序,便于淘汰时进行针对性删除

items:映射键与元素

expires:映射键与过期时间

func (c *lruCache) Get(key string) (Value, bool) { c.mu.RLock() elem, ok := c.items[key] if !ok { c.mu.RUnlock() return nil, false } // 检查是否过期 if expTime, hasExp := c.expires[key]; hasExp && time.Now().After(expTime) { c.mu.RUnlock() // 异步删除过期项,避免在读锁内操作 go c.Delete(key) return nil, false } // 获取值并释放读锁 entry := elem.Value.(*lruEntry) value := entry.value c.mu.RUnlock() // 更新 LRU 位置需要写锁 c.mu.Lock() // 再次检查元素是否仍然存在(可能在获取写锁期间被其他协程删除) if _, ok := c.items[key]; ok { c.list.MoveToBack(elem) } c.mu.Unlock() return value, true } func (c *lruCache) SetWithExpiration(key string, value Value, expiration time.Duration) error { if value == nil { c.Delete(key) return nil } c.mu.Lock() defer c.mu.Unlock() // 计算过期时间 var expTime time.Time if expiration > 0 { expTime = time.Now().Add(expiration) c.expires[key] = expTime } else { delete(c.expires, key) } // 如果键已存在,更新值 if elem, ok := c.items[key]; ok { oldEntry := elem.Value.(*lruEntry) c.usedBytes += int64(value.Len() - oldEntry.value.Len()) oldEntry.value = value c.list.MoveToBack(elem) return nil } // 添加新项 entry := &lruEntry{key: key, value: value} elem := c.list.PushBack(entry) c.items[key] = elem c.usedBytes += int64(len(key) + value.Len()) // 检查是否需要淘汰旧项 c.evict() return nil }
PeerPicker
// PeerPicker 定义了peer选择器的接口 type PeerPicker interface { PickPeer(key string) (peer Peer, ok bool, self bool) Close() error } type ClientPicker struct { selfAddr string svcName string mu sync.RWMutex consHash *consistenthash.Map clients map[string]*Client etcdCli *clientv3.Client ctx context.Context cancel context.CancelFunc }

PeerPicker核心通过consHash一致性哈希来解决问题。

一致性Hash并不知道Key在哪里,也不存储具体的K-V数据,只负责计算。

对于输入的Key,它输出应该去哪个节点寻找。纯本地计算,没有网络通信。

具体一致性哈希算法详解,见下一节。

流程:

loader

单飞设计,防止击穿。

的实现使用 sync.Map ,对相同 Key 的并发请求只执行一次加载,有效避免缓存雪崩。

// 代表正在进行或已结束的请求 type call struct { wg sync.WaitGroup val interface{} err error } // Group manages all kinds of calls type Group struct { m sync.Map // 使用sync.Map来优化并发性能 } // Do 针对相同的key,保证多次调用Do(),都只会调用一次fn func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // Check if there is already an ongoing call for this key if existing, ok := g.m.Load(key); ok { c := existing.(*call) c.wg.Wait() // Wait for the existing request to finish return c.val, c.err // Return the result from the ongoing call } // If no ongoing request, create a new one c := &call{} c.wg.Add(1) g.m.Store(key, c) // Store the call in the map // Execute the function and set the result c.val, c.err = fn() c.wg.Done() // Mark the request as done // After the request is done, clean up the map g.m.Delete(key) return c.val, c.err }
http://www.jsqmd.com/news/779206/

相关文章:

  • eMule设置IP绑定
  • 基于Git与API自动化的多平台内容分发系统设计与实践
  • 仿生机器人手ExoHand:气动驱动与触觉反馈的工程实践
  • 从资源收藏到实战应用:构建个人提示工程知识体系的系统指南
  • 大厂逼员工用AI:是提效神器,还是裁员前的形式主义套路?
  • 从2E服务写入超长DID说起:一个案例拆解Autosar UDS诊断中‘非主流’的帧交互流程
  • neon源码分析(5)计算层使用slru的一些问题
  • 吴恩达老师课程《AI Prompting for Everyone》
  • 如何通过图解了解 Kubernetes 内部的架构?
  • 桌面应用Docker化:跨平台部署与图形界面容器化实践
  • 2026届最火的五大AI辅助论文平台实测分析
  • 精英的边界:从货币本质到社会进步——关于内卷与正和博弈的底层思考
  • 山西GEO公司怎么选?看这5点避坑指南
  • VS Code实时协作绘图扩展开发:从Monorepo架构到CRDT同步实战
  • 2026 南通黄金回收机构实测:市区+县域全覆盖,变现渠道清晰 - GrowthUME
  • 从零构建自动化静态博客:Hexo + GitHub Pages 全栈实践指南
  • 2025届必备的十大降AI率网站实际效果
  • 降解塑料原料检测进入绿色数字化阶段,IACheck用AI报告审核强化环保合规闭环能力
  • 基于MCP协议的Web自动化:wappmcp项目详解与AI助手集成实践
  • Claude AI与OpenClaw结合:打造能执行系统操作的智能副驾驶
  • 家居建材行业做GEO服务的第三方,哪家靠谱 - GrowthUME
  • 双黄蛋工厂如何甄选?深度解析高邮湖生态与德媛鑫双黄蛋生产基地 - GrowthUME
  • 影刀RPA如何实现店群自动化:带你构建TEMU与拼多多的“弹性并发矩阵”,打破规模化魔咒
  • 90dB回音消除+30dB降噪,不写一行代码:这个模块把我看傻了
  • 告别疲惫与僵硬:五大按摩椅品牌深度测评,助你选对私人按摩师 - GrowthUME
  • 灰度发布的策略
  • Hive JDBC vs MySQL JDBC:**“服务端推完就跑,客户端慢慢吃”**详解
  • PTA L1-039 古风排版:用C语言二维数组模拟竖排文字,保姆级图解教程
  • 2026高档旅行箱实测|5款国标认证款,静音耐造适配商务家庭全场景 - GrowthUME
  • uni-app怎么做类似于淘宝的物流单号自动识别 uni-app正则匹配逻辑实现【实战】.txt