| /* |
| Copyright 2012 Google Inc. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Package groupcache provides a data loading mechanism with caching |
| // and de-duplication that works across a set of peer processes. |
| // |
| // Each data Get first consults its local cache, otherwise delegates |
| // to the requested key's canonical owner, which then checks its cache |
| // or finally gets the data. In the common case, many concurrent |
| // cache misses across a set of peers for the same key result in just |
| // one cache fill. |
| package groupcache |
| |
| import ( |
| "errors" |
| "math/rand" |
| "strconv" |
| "sync" |
| "sync/atomic" |
| |
| pb "github.com/golang/groupcache/groupcachepb" |
| "github.com/golang/groupcache/lru" |
| "github.com/golang/groupcache/singleflight" |
| ) |
| |
| // A Getter loads data for a key. |
| type Getter interface { |
| // Get returns the value identified by key, populating dest. |
| // |
| // The returned data must be unversioned. That is, key must |
| // uniquely describe the loaded data, without an implicit |
| // current time, and without relying on cache expiration |
| // mechanisms. |
| Get(ctx Context, key string, dest Sink) error |
| } |
| |
| // A GetterFunc implements Getter with a function. |
| type GetterFunc func(ctx Context, key string, dest Sink) error |
| |
| func (f GetterFunc) Get(ctx Context, key string, dest Sink) error { |
| return f(ctx, key, dest) |
| } |
| |
| var ( |
| mu sync.RWMutex |
| groups = make(map[string]*Group) |
| |
| initPeerServerOnce sync.Once |
| initPeerServer func() |
| ) |
| |
| // GetGroup returns the named group previously created with NewGroup, or |
| // nil if there's no such group. |
| func GetGroup(name string) *Group { |
| mu.RLock() |
| g := groups[name] |
| mu.RUnlock() |
| return g |
| } |
| |
| // NewGroup creates a coordinated group-aware Getter from a Getter. |
| // |
| // The returned Getter tries (but does not guarantee) to run only one |
| // Get call at once for a given key across an entire set of peer |
| // processes. Concurrent callers both in the local process and in |
| // other processes receive copies of the answer once the original Get |
| // completes. |
| // |
| // The group name must be unique for each getter. |
| func NewGroup(name string, cacheBytes int64, getter Getter) *Group { |
| return newGroup(name, cacheBytes, getter, nil) |
| } |
| |
| // If peers is nil, the peerPicker is called via a sync.Once to initialize it. |
| func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group { |
| if getter == nil { |
| panic("nil Getter") |
| } |
| mu.Lock() |
| defer mu.Unlock() |
| initPeerServerOnce.Do(callInitPeerServer) |
| if _, dup := groups[name]; dup { |
| panic("duplicate registration of group " + name) |
| } |
| g := &Group{ |
| name: name, |
| getter: getter, |
| peers: peers, |
| cacheBytes: cacheBytes, |
| loadGroup: &singleflight.Group{}, |
| } |
| if fn := newGroupHook; fn != nil { |
| fn(g) |
| } |
| groups[name] = g |
| return g |
| } |
| |
| // newGroupHook, if non-nil, is called right after a new group is created. |
| var newGroupHook func(*Group) |
| |
| // RegisterNewGroupHook registers a hook that is run each time |
| // a group is created. |
| func RegisterNewGroupHook(fn func(*Group)) { |
| if newGroupHook != nil { |
| panic("RegisterNewGroupHook called more than once") |
| } |
| newGroupHook = fn |
| } |
| |
| // RegisterServerStart registers a hook that is run when the first |
| // group is created. |
| func RegisterServerStart(fn func()) { |
| if initPeerServer != nil { |
| panic("RegisterServerStart called more than once") |
| } |
| initPeerServer = fn |
| } |
| |
| func callInitPeerServer() { |
| if initPeerServer != nil { |
| initPeerServer() |
| } |
| } |
| |
| // A Group is a cache namespace and associated data loaded spread over |
| // a group of 1 or more machines. |
| type Group struct { |
| name string |
| getter Getter |
| peersOnce sync.Once |
| peers PeerPicker |
| cacheBytes int64 // limit for sum of mainCache and hotCache size |
| |
| // mainCache is a cache of the keys for which this process |
| // (amongst its peers) is authoritative. That is, this cache |
| // contains keys which consistent hash on to this process's |
| // peer number. |
| mainCache cache |
| |
| // hotCache contains keys/values for which this peer is not |
| // authoritative (otherwise they would be in mainCache), but |
| // are popular enough to warrant mirroring in this process to |
| // avoid going over the network to fetch from a peer. Having |
| // a hotCache avoids network hotspotting, where a peer's |
| // network card could become the bottleneck on a popular key. |
| // This cache is used sparingly to maximize the total number |
| // of key/value pairs that can be stored globally. |
| hotCache cache |
| |
| // loadGroup ensures that each key is only fetched once |
| // (either locally or remotely), regardless of the number of |
| // concurrent callers. |
| loadGroup flightGroup |
| |
| _ int32 // force Stats to be 8-byte aligned on 32-bit platforms |
| |
| // Stats are statistics on the group. |
| Stats Stats |
| } |
| |
| // flightGroup is defined as an interface which flightgroup.Group |
| // satisfies. We define this so that we may test with an alternate |
| // implementation. |
| type flightGroup interface { |
| // Done is called when Do is done. |
| Do(key string, fn func() (interface{}, error)) (interface{}, error) |
| } |
| |
| // Stats are per-group statistics. |
| type Stats struct { |
| Gets AtomicInt // any Get request, including from peers |
| CacheHits AtomicInt // either cache was good |
| PeerLoads AtomicInt // either remote load or remote cache hit (not an error) |
| PeerErrors AtomicInt |
| Loads AtomicInt // (gets - cacheHits) |
| LoadsDeduped AtomicInt // after singleflight |
| LocalLoads AtomicInt // total good local loads |
| LocalLoadErrs AtomicInt // total bad local loads |
| ServerRequests AtomicInt // gets that came over the network from peers |
| } |
| |
| // Name returns the name of the group. |
| func (g *Group) Name() string { |
| return g.name |
| } |
| |
| func (g *Group) initPeers() { |
| if g.peers == nil { |
| g.peers = getPeers(g.name) |
| } |
| } |
| |
| func (g *Group) Get(ctx Context, key string, dest Sink) error { |
| g.peersOnce.Do(g.initPeers) |
| g.Stats.Gets.Add(1) |
| if dest == nil { |
| return errors.New("groupcache: nil dest Sink") |
| } |
| value, cacheHit := g.lookupCache(key) |
| |
| if cacheHit { |
| g.Stats.CacheHits.Add(1) |
| return setSinkView(dest, value) |
| } |
| |
| // Optimization to avoid double unmarshalling or copying: keep |
| // track of whether the dest was already populated. One caller |
| // (if local) will set this; the losers will not. The common |
| // case will likely be one caller. |
| destPopulated := false |
| value, destPopulated, err := g.load(ctx, key, dest) |
| if err != nil { |
| return err |
| } |
| if destPopulated { |
| return nil |
| } |
| return setSinkView(dest, value) |
| } |
| |
| // load loads key either by invoking the getter locally or by sending it to another machine. |
| func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { |
| g.Stats.Loads.Add(1) |
| viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { |
| // Check the cache again because singleflight can only dedup calls |
| // that overlap concurrently. It's possible for 2 concurrent |
| // requests to miss the cache, resulting in 2 load() calls. An |
| // unfortunate goroutine scheduling would result in this callback |
| // being run twice, serially. If we don't check the cache again, |
| // cache.nbytes would be incremented below even though there will |
| // be only one entry for this key. |
| // |
| // Consider the following serialized event ordering for two |
| // goroutines in which this callback gets called twice for hte |
| // same key: |
| // 1: Get("key") |
| // 2: Get("key") |
| // 1: lookupCache("key") |
| // 2: lookupCache("key") |
| // 1: load("key") |
| // 2: load("key") |
| // 1: loadGroup.Do("key", fn) |
| // 1: fn() |
| // 2: loadGroup.Do("key", fn) |
| // 2: fn() |
| if value, cacheHit := g.lookupCache(key); cacheHit { |
| g.Stats.CacheHits.Add(1) |
| return value, nil |
| } |
| g.Stats.LoadsDeduped.Add(1) |
| var value ByteView |
| var err error |
| if peer, ok := g.peers.PickPeer(key); ok { |
| value, err = g.getFromPeer(ctx, peer, key) |
| if err == nil { |
| g.Stats.PeerLoads.Add(1) |
| return value, nil |
| } |
| g.Stats.PeerErrors.Add(1) |
| // TODO(bradfitz): log the peer's error? keep |
| // log of the past few for /groupcachez? It's |
| // probably boring (normal task movement), so not |
| // worth logging I imagine. |
| } |
| value, err = g.getLocally(ctx, key, dest) |
| if err != nil { |
| g.Stats.LocalLoadErrs.Add(1) |
| return nil, err |
| } |
| g.Stats.LocalLoads.Add(1) |
| destPopulated = true // only one caller of load gets this return value |
| g.populateCache(key, value, &g.mainCache) |
| return value, nil |
| }) |
| if err == nil { |
| value = viewi.(ByteView) |
| } |
| return |
| } |
| |
| func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) { |
| err := g.getter.Get(ctx, key, dest) |
| if err != nil { |
| return ByteView{}, err |
| } |
| return dest.view() |
| } |
| |
| func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) { |
| req := &pb.GetRequest{ |
| Group: &g.name, |
| Key: &key, |
| } |
| res := &pb.GetResponse{} |
| err := peer.Get(ctx, req, res) |
| if err != nil { |
| return ByteView{}, err |
| } |
| value := ByteView{b: res.Value} |
| // TODO(bradfitz): use res.MinuteQps or something smart to |
| // conditionally populate hotCache. For now just do it some |
| // percentage of the time. |
| if rand.Intn(10) == 0 { |
| g.populateCache(key, value, &g.hotCache) |
| } |
| return value, nil |
| } |
| |
| func (g *Group) lookupCache(key string) (value ByteView, ok bool) { |
| if g.cacheBytes <= 0 { |
| return |
| } |
| value, ok = g.mainCache.get(key) |
| if ok { |
| return |
| } |
| value, ok = g.hotCache.get(key) |
| return |
| } |
| |
| func (g *Group) populateCache(key string, value ByteView, cache *cache) { |
| if g.cacheBytes <= 0 { |
| return |
| } |
| cache.add(key, value) |
| |
| // Evict items from cache(s) if necessary. |
| for { |
| mainBytes := g.mainCache.bytes() |
| hotBytes := g.hotCache.bytes() |
| if mainBytes+hotBytes <= g.cacheBytes { |
| return |
| } |
| |
| // TODO(bradfitz): this is good-enough-for-now logic. |
| // It should be something based on measurements and/or |
| // respecting the costs of different resources. |
| victim := &g.mainCache |
| if hotBytes > mainBytes/8 { |
| victim = &g.hotCache |
| } |
| victim.removeOldest() |
| } |
| } |
| |
| // CacheType represents a type of cache. |
| type CacheType int |
| |
| const ( |
| // The MainCache is the cache for items that this peer is the |
| // owner for. |
| MainCache CacheType = iota + 1 |
| |
| // The HotCache is the cache for items that seem popular |
| // enough to replicate to this node, even though it's not the |
| // owner. |
| HotCache |
| ) |
| |
| // CacheStats returns stats about the provided cache within the group. |
| func (g *Group) CacheStats(which CacheType) CacheStats { |
| switch which { |
| case MainCache: |
| return g.mainCache.stats() |
| case HotCache: |
| return g.hotCache.stats() |
| default: |
| return CacheStats{} |
| } |
| } |
| |
| // cache is a wrapper around an *lru.Cache that adds synchronization, |
| // makes values always be ByteView, and counts the size of all keys and |
| // values. |
| type cache struct { |
| mu sync.RWMutex |
| nbytes int64 // of all keys and values |
| lru *lru.Cache |
| nhit, nget int64 |
| nevict int64 // number of evictions |
| } |
| |
| func (c *cache) stats() CacheStats { |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| return CacheStats{ |
| Bytes: c.nbytes, |
| Items: c.itemsLocked(), |
| Gets: c.nget, |
| Hits: c.nhit, |
| Evictions: c.nevict, |
| } |
| } |
| |
| func (c *cache) add(key string, value ByteView) { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| if c.lru == nil { |
| c.lru = &lru.Cache{ |
| OnEvicted: func(key lru.Key, value interface{}) { |
| val := value.(ByteView) |
| c.nbytes -= int64(len(key.(string))) + int64(val.Len()) |
| c.nevict++ |
| }, |
| } |
| } |
| c.lru.Add(key, value) |
| c.nbytes += int64(len(key)) + int64(value.Len()) |
| } |
| |
| func (c *cache) get(key string) (value ByteView, ok bool) { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| c.nget++ |
| if c.lru == nil { |
| return |
| } |
| vi, ok := c.lru.Get(key) |
| if !ok { |
| return |
| } |
| c.nhit++ |
| return vi.(ByteView), true |
| } |
| |
| func (c *cache) removeOldest() { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| if c.lru != nil { |
| c.lru.RemoveOldest() |
| } |
| } |
| |
| func (c *cache) bytes() int64 { |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| return c.nbytes |
| } |
| |
| func (c *cache) items() int64 { |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| return c.itemsLocked() |
| } |
| |
| func (c *cache) itemsLocked() int64 { |
| if c.lru == nil { |
| return 0 |
| } |
| return int64(c.lru.Len()) |
| } |
| |
| // An AtomicInt is an int64 to be accessed atomically. |
| type AtomicInt int64 |
| |
| // Add atomically adds n to i. |
| func (i *AtomicInt) Add(n int64) { |
| atomic.AddInt64((*int64)(i), n) |
| } |
| |
| // Get atomically gets the value of i. |
| func (i *AtomicInt) Get() int64 { |
| return atomic.LoadInt64((*int64)(i)) |
| } |
| |
| func (i *AtomicInt) String() string { |
| return strconv.FormatInt(i.Get(), 10) |
| } |
| |
| // CacheStats are returned by stats accessors on Group. |
| type CacheStats struct { |
| Bytes int64 |
| Items int64 |
| Gets int64 |
| Hits int64 |
| Evictions int64 |
| } |