blob: 70715e499d80e353d2c9f8fd5bdbc38d95c9a0d2 [file] [log] [blame]
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cache exports functionality for efficiently caching and mapping
// `RangeRequest`s to corresponding `RangeResponse`s.
package cache
import (
"errors"
"sync"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/adt"
"github.com/golang/groupcache/lru"
)
var (
DefaultMaxEntries = 2048
ErrCompacted = rpctypes.ErrGRPCCompacted
)
type Cache interface {
Add(req *pb.RangeRequest, resp *pb.RangeResponse)
Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
Compact(revision int64)
Invalidate(key []byte, endkey []byte)
Size() int
Close()
}
// keyFunc returns the key of a request, which is used to look up its caching response in the cache.
func keyFunc(req *pb.RangeRequest) string {
// TODO: use marshalTo to reduce allocation
b, err := req.Marshal()
if err != nil {
panic(err)
}
return string(b)
}
func NewCache(maxCacheEntries int) Cache {
return &cache{
lru: lru.New(maxCacheEntries),
compactedRev: -1,
}
}
func (c *cache) Close() {}
// cache implements Cache
type cache struct {
mu sync.RWMutex
lru *lru.Cache
// a reverse index for cache invalidation
cachedRanges adt.IntervalTree
compactedRev int64
}
// Add adds the response of a request to the cache if its revision is larger than the compacted revision of the cache.
func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
key := keyFunc(req)
c.mu.Lock()
defer c.mu.Unlock()
if req.Revision > c.compactedRev {
c.lru.Add(key, resp)
}
// we do not need to invalidate a request with a revision specified.
// so we do not need to add it into the reverse index.
if req.Revision != 0 {
return
}
var (
iv *adt.IntervalValue
ivl adt.Interval
)
if len(req.RangeEnd) != 0 {
ivl = adt.NewStringAffineInterval(string(req.Key), string(req.RangeEnd))
} else {
ivl = adt.NewStringAffinePoint(string(req.Key))
}
iv = c.cachedRanges.Find(ivl)
if iv == nil {
c.cachedRanges.Insert(ivl, []string{key})
} else {
iv.Val = append(iv.Val.([]string), key)
}
}
// Get looks up the caching response for a given request.
// Get is also responsible for lazy eviction when accessing compacted entries.
func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
key := keyFunc(req)
c.mu.Lock()
defer c.mu.Unlock()
if req.Revision > 0 && req.Revision < c.compactedRev {
c.lru.Remove(key)
return nil, ErrCompacted
}
if resp, ok := c.lru.Get(key); ok {
return resp.(*pb.RangeResponse), nil
}
return nil, errors.New("not exist")
}
// Invalidate invalidates the cache entries that intersecting with the given range from key to endkey.
func (c *cache) Invalidate(key, endkey []byte) {
c.mu.Lock()
defer c.mu.Unlock()
var (
ivs []*adt.IntervalValue
ivl adt.Interval
)
if len(endkey) == 0 {
ivl = adt.NewStringAffinePoint(string(key))
} else {
ivl = adt.NewStringAffineInterval(string(key), string(endkey))
}
ivs = c.cachedRanges.Stab(ivl)
for _, iv := range ivs {
keys := iv.Val.([]string)
for _, key := range keys {
c.lru.Remove(key)
}
}
// delete after removing all keys since it is destructive to 'ivs'
c.cachedRanges.Delete(ivl)
}
// Compact invalidate all caching response before the given rev.
// Replace with the invalidation is lazy. The actual removal happens when the entries is accessed.
func (c *cache) Compact(revision int64) {
c.mu.Lock()
defer c.mu.Unlock()
if revision > c.compactedRev {
c.compactedRev = revision
}
}
func (c *cache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.lru.Len()
}