您现在的位置是:网站首页> 编程资料编程资料

Mango Cache缓存管理库TinyLFU源码解析_Golang_

2023-05-26 386人已围观

简介 Mango Cache缓存管理库TinyLFU源码解析_Golang_

介绍

据官方所述,mango Cache是对Guava Cache基于go的部分实现,同时mangoCache参考了Caffeine以及go-tinylfu.

支持以下缓存管理策略:

  • LRU
  • Segmented LRU(默认)
  • TinyLFU(实验性)

本文将从源码对其进行解析,重点将放在loadingCache(一种可以自定义如何载入新内存的cache)上面.

整体架构

mango Cache的主体功能由localCache结构体(local.go)实现,

type localCache struct { cache cache // 真正存放数据的地方 expireAfterAccess time.Duration //用户自定义的过期以及刷新时间 expireAfterWrite time.Duration refreshAfterWrite time.Duration policyName string //缓存管理策略 onInsertion Func //钩子函数 onRemoval Func loader LoaderFunc //自定义加载和重载函数 reloader Reloader stats StatsCounter //状态管理器 cap int //缓存容量 // 访问时的缓存管理策略 accessQueue policy // 写入时的缓存管理策略,只有在expireAfterWrite或refreshAfterWrite被设置以后才启用 writeQueue policy // 用于processEntries的事件队列 events chan entryEvent // 记录距离上一次写期间发生的读次数,用于判断是否进行清扫 readCount int32 // 用于关闭由localCache创建的协程 closing int32 closeWG sync.WaitGroup } 

真正存储数据的结构体是cache(policy.go),所有缓存数据都存储在其中

type cache struct { size int64 // 缓存大小 segs [segmentCount]sync.Map // 分片sync.map, map[key]*entry } 

entry是存放一个缓存数据的单元

type entry struct { hash uint64 //key的hash值 accessTime int64 writeTime int64 invalidated int32 //是否有效 loading int32 //是否处于载入中状态 key Key value atomic.Value // 存储的值 // 下面的属性只被缓存管理策略操作,所以不需要原子性的访问 accessList *list.Element //此entry目前所在的访问链表中的位置 writeList *list.Element //此entry目前所在的写入链表中的位置 listID uint8 //此entry目前所在的缓存段(SLRU) } 

localCache使用事件队列来管理并发读写事件,事件队列是一个chan,其中流转着entryEvent(即数据以及对应的缓存事件),localCache通过processEntries协程来处理各种读写事件.之后将详细讲解mango Cache是如何进行读写操作的.

localCache的write、access等操作底层是通过操作cache、accessQueue以及writeQueue从而实现的,而localCache还会负责清扫过期数据的工作.

前面提到了mango Cache支持诸如LRU、SLRU以及TinyLFU等缓存管理策略,其在localCache中即为accessQueue以及writeQueue,负责对缓存的淘汰准入等操作.

初始化流程

mango Cache提供了两种cache---普通Cache以及LoadingCache,这两者都是接口,而localCache实现了这两个接口,由于LoadingCache继承了普通Cache,因此本文只讲解LoadingCache.

func NewLoadingCache(loader LoaderFunc, options ...Option) LoadingCache { c := newLocalCache() c.loader = loader for _, opt := range options { opt(c) } c.init() return c } 

NewLoadingCache函数先初始化一个LoadingCache,然后根据用户传入的自定义载入函数和一些配置来初始化LoadingCache.配置包括注册插入或者删除时触发的钩子函数以及过期时间等等.然后调用localCache.init.

func (c *localCache) init() { c.accessQueue = newPolicy(c.policyName) c.accessQueue.init(&c.cache, c.cap) if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 { c.writeQueue = &recencyQueue{} } else { c.writeQueue = discardingQueue{} } c.writeQueue.init(&c.cache, c.cap) c.events = make(chan entryEvent, chanBufSize) c.closeWG.Add(1) go c.processEntries() } 

localCache.init会根据用户传入的缓存管理策略名字来初始化accessQueue然后根据是否有写过期和写刷新配置来决定是否初始化写入队列.接着创建事件队列并开启事件处理协程.到此为止,cache启动完成.

读流程

LoadingCache的Get操作可以通过key获取缓存值,其流程为:

先从主缓存中查询entry

若未查询到entry,则记录miss并且调用用户自定义的load方法加载缓存值并返回

若查询到entry,先检查是否过期

  • 若过期且没有设置loader则直接向事件处理协程发送eventDelete
  • 若过期但设置了loader,则异步更新entry值

若没有过期则更新访问时间并向事件处理协程发送eventAccess然后记录命中

最后返回entry

func (c *localCache) Get(k Key) (Value, error) { en := c.cache.get(k, sum(k)) //计算key的hash并查询该key if en == nil { c.stats.RecordMisses(1) return c.load(k) } // 检查entry是否需要更新 now := currentTime() if c.isExpired(en, now) { if c.loader == nil { //如果没有设置加载器则直接删除该entry c.sendEvent(eventDelete, en) } else { //对于loadingCache,我们不删除这个entry //而是把它暂时留在缓存中,所以用户依旧可以读取到旧的缓存值 c.setEntryAccessTime(en, now) c.refreshAsync(en) } c.stats.RecordMisses(1) } else { c.setEntryAccessTime(en, now) c.sendEvent(eventAccess, en) c.stats.RecordHits(1) } return en.getValue(), nil } 

需要注意一下这里的refreshAsync函数:

func (c *localCache) refreshAsync(en *entry) bool { if en.setLoading(true) { // 如果此entry没有在加载 if c.reloader == nil { go c.refresh(en) } else { c.reload(en) } return true } return false } 

如果没有用户设置的重载器,就异步执行refresh,refresh函数实际上就是对entry进行加载.

而如果有重载器那么就同步执行用户自定义的reload函数.

写流程

loadingCache的Put操作与Get操作类似,流程如下:

先去主缓存查询key是否存在,若查询到对应的entry,那么直接更新entry

若没有查询到对应的entry,说明其不存在,因此根据key,value初始化一个新entry

如果缓存容量足够,则让主缓存存储该entry,此时会再次检查主存中是否有该entry(解决并发问题)

  • 若cen不为空,说明主缓存中已经存在该entry,直接修改该entry即可
  • 若cen为空,说明主缓存中还不存在该entry,那么就会在主缓存中存储该entry

最后向事件处理协程发送eventWrite事件

func (c *localCache) Put(k Key, v Value) { h := sum(k) en := c.cache.get(k, h) now := currentTime() if en == nil { en = newEntry(k, v, h) c.setEntryWriteTime(en, now) c.setEntryAccessTime(en, now) // 直接将新value添加进缓存(在缓存容量足够的时候) if c.cap == 0 || c.cache.len() < c.cap { cen := c.cache.getOrSet(en) if cen != nil { cen.setValue(v) c.setEntryWriteTime(cen, now) en = cen } } } else { // 更新entry en.setValue(v) c.setEntryWriteTime(en, now) } c.sendEvent(eventWrite, en) } //当entry在缓存中存在,则返回该entry,否则存储该entry func (c *cache) getOrSet(v *entry) *entry { seg := c.segment(v.hash) en, ok := seg.LoadOrStore(v.key, v) if ok { return en.(*entry) } atomic.AddInt64(&c.size, 1) return nil } 

事件处理机制

主流程

mango Cache通过entryEvent chan以及processEntries协程来处理并发读写事务

缓存事件一共四个,分别为写入、访问、删除以及关闭.每个业务协程通过向localCache的events chan发送entryEvent通知事件处理协程,进而实现并发读写.

而processEntries协程内,会不断从events chan内取出entryEvent并执行对应的操作,在write、access以及delete操作后会执行清理工作(具体清扫工作由expireEntries函数执行)

type event uint8 const ( eventWrite event = iota eventAccess eventDelete eventClose ) type entryEvent struct { entry *entry event event } func (c *localCache) processEntries() { defer c.closeWG.Done() for e := range c.events { switch e.event { case eventWrite: //写入事务 c.write(e.entry) c.postWriteCleanup() //清理操作 case eventAccess: //访问事务 c.access(e.entry) c.postReadCleanup() //清理操作 case eventDelete: if e.entry == nil { //InvalidateAll函数中使用 c.removeAll() } else { c.remove(e.entry) //移除单个entry } c.postReadCleanup() //清理操作 case eventClose: if c.reloader != nil { // 停止所有refresh工作 c.reloader.Close() } c.removeAll() return } } } 

write

由于事件处理机制对于access、delete和write的操作类似,因此这里只讲解较为复杂的write操作:

首先通过调用底层访问队列以及写入队列的write方法

触发用户自定义的钩子函数

如果write方法返回值不为空,说明有entry被驱逐,

因此需要从写入队列将其删除,同时记录驱逐并触发用户自定义的钩子函数

func (c *localCache) write(en *entry) { ren := c.accessQueue.write(en) c.writeQueue.write(en) if c.onInsertion != nil { c.onInsertion(en.key, en.getValue()) } if ren != nil { //有entry被驱逐出了缓存 c.writeQueue.remove(ren) c.stats.RecordEviction() if c.onRemoval != nil { c.onRemoval(ren.key, ren.getValue()) } } } 

后面将详细讲述底层访问队列以及缓存管理是如何实现的.

清理工作

前面讲到过每次进行完write、access以及delete操作后会执行清理工作.具体地,write操作会触发postWriteCleanup而access和delete操作会触发postReadCleanup.

postReadCleanup会根据当前距离上一次写的read操作次数是否达到清理工作阈值来决定是否清理,这个阈值是64,也就是说每隔64次read操作就会触发一次清理工作

而postWriteCleanup将在每一次write操作之后触发

真正的清理工作由expireEntries函数完成,它一次清理工作最多只会清理16个entry避免了对事件处理的长时间阻塞.

func (c *localCache) postReadCleanup() { if atomic.AddInt32(&c.readCount, 1) > drainThreshold { atomic.StoreInt32(&c.readCount, 0) c.expireEntries() } } // 在添加完entry以后再执行 func (c *localCache) postWriteCleanup() { atomic.StoreInt32(&c.readCount, 0) c.expireEntries() } //清理工作函数 func (c *localCache) expireEntries() { remain := drainMax now := currentTime() if c.expireAfterAccess > 0 { expiry := now.Add(-c.expireAfterAccess).UnixNano() c.accessQueue.iterate(func(en *entry) bool { if remain == 0 || en.getAccessTime() >= expiry { // 找到了第一个没有过期的entry或者清理entry数足够了 return false } c.remove(en) c.stats.RecordEviction() remain-- return remain > 0 }) } if remain > 0 && c.expireAfterWrite > 0 { ... } if remain > 0 && c.loader != ni
                
                

-六神源码网