blob: acadc386f2ac34858e3efc370cd1618da3a1ace9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcd
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/goutil"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
rmodel "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/foundation/gopool"
"github.com/go-chassis/foundation/timeutil"
"github.com/little-cui/etcdadpt"
)
// KvCacher implements kvstore.Cacher.
// KvCacher manages etcd cache.
// To update cache, KvCacher watch etcd event and pull data periodly from etcd.
// When the cache data changes, KvCacher creates events and notifies it's
// subscribers.
// Use Cfg to set it's behaviors.
type KvCacher struct {
Cfg *kvstore.Options
reListCount int
ready chan struct{}
lw sdcommon.ListWatch
mux sync.Mutex
once sync.Once
cache kvstore.Cache
goroutine *gopool.Pool
}
func (c *KvCacher) Config() *kvstore.Options {
return c.Cfg
}
func (c *KvCacher) needList() bool {
rev := c.getRevision()
if rev == 0 {
c.reListCount = 0
return true
}
c.reListCount++
if c.reListCount < sdcommon.DefaultForceListInterval {
return false
}
c.reListCount = 0
return true
}
func (c *KvCacher) doList(cfg sdcommon.ListWatchConfig) error {
resp, err := c.lw.List(cfg)
if err != nil {
return err
}
rev := c.getRevision()
resources := resp.Resources
start := time.Now()
defer log.DebugOrWarn(start, fmt.Sprintf("finish to cache key %s, %d items, rev: %d",
c.Cfg.Key, len(resources), rev))
// just reset the cacher if cache marked dirty
if c.cache.Dirty() {
c.reset(rev, resources)
log.Warn(fmt.Sprintf("cache[%s] is reset!", c.cache.Name()))
return nil
}
// calc and return the diff between cache and ETCD
evts := c.filter(rev, resources)
// there is no change between List() and cache, then stop the self preservation
if ec, kc := len(evts), len(resources); c.Cfg.DeferHandler != nil && ec == 0 && kc != 0 &&
c.Cfg.DeferHandler.Reset() {
log.Warn(fmt.Sprintf("most of the protected data(%d/%d) are recovered",
kc, c.cache.GetAll(nil)))
}
// notify the subscribers
c.sync(evts)
return nil
}
func (c *KvCacher) reset(rev int64, kvs []*sdcommon.Resource) {
if c.Cfg.DeferHandler != nil {
c.Cfg.DeferHandler.Reset()
}
// clear cache before Set is safe, because the watch operation is stop,
// but here will make all API requests go to ETCD directly.
c.cache.Clear()
// do not notify when cacher is dirty status,
// otherwise, too many events will notify to downstream.
c.buildCache(c.filter(rev, kvs))
}
func (c *KvCacher) doWatch(cfg sdcommon.ListWatchConfig) error {
if eventBus := c.lw.EventBus(cfg); eventBus != nil {
return c.handleEventBus(eventBus)
}
return fmt.Errorf("handle a nil watcher")
}
func (c *KvCacher) ListAndWatch(ctx context.Context) error {
c.mux.Lock()
defer c.mux.Unlock()
defer log.Recover() // ensure ListAndWatch never raise panic
cfg := sdcommon.ListWatchConfig{
Timeout: c.Cfg.Timeout,
Context: ctx,
}
// the scenario need to list etcd:
// 1. Initial: cache is building, the lister's revision is 0.
// 2. Runtime: error occurs in previous watch operation, the lister's revision is set to 0.
// 3. Runtime: watch operation timed out over DEFAULT_FORCE_LIST_INTERVAL times.
if c.needList() {
if err := c.doList(cfg); err != nil && (!c.IsReady() || c.getRevision() == 0) {
return err // do retry to list etcd
}
// keep going to next step:
// 1. doList return OK.
// 2. some traps in etcd client, like the limitation of max response body(4MB),
// doList always return error. So call doWatch to compensate it if cacher is ready.
}
util.SafeCloseChan(c.ready)
return c.doWatch(cfg)
}
func (c *KvCacher) handleEventBus(eventBus *sdcommon.EventBus) error {
defer eventBus.Stop()
for resp := range eventBus.ResourceEventBus() {
if resp == nil {
return errors.New("handle watcher error")
}
start := time.Now()
rev := resp.Revision
evts := make([]kvstore.Event, 0, len(resp.Resources))
for _, resource := range resp.Resources {
evt := kvstore.NewEvent(rmodel.EVT_CREATE, nil, resource.ModRevision)
switch {
case resp.Action == sdcommon.ActionPUT && resource.Version == 1:
evt.Type, evt.KV = rmodel.EVT_CREATE, c.doParse(resource)
case resp.Action == sdcommon.ActionPUT:
evt.Type, evt.KV = rmodel.EVT_UPDATE, c.doParse(resource)
case resp.Action == sdcommon.ActionDelete:
evt.Type = rmodel.EVT_DELETE
if resource.Value == nil {
// it will happen in embed mode, and then need to get the cache value not unmarshal
evt.KV = c.cache.Get(resource.Key)
} else {
evt.KV = c.doParse(resource)
}
default:
log.Error(fmt.Sprintf("unknown KeyValue %v", resource), nil)
continue
}
if evt.KV == nil {
log.Error(fmt.Sprintf("failed to parse KeyValue %v", resource), nil)
continue
}
evts = append(evts, evt)
}
c.sync(evts)
log.DebugOrWarn(start, fmt.Sprintf("finish to handle %d events, prefix: %s, rev: %d",
len(evts), c.Cfg.Key, rev))
}
return nil
}
func (c *KvCacher) needDeferHandle(evts []kvstore.Event) bool {
if c.Cfg.DeferHandler == nil || !c.IsReady() {
return false
}
return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
}
func (c *KvCacher) refresh(ctx context.Context) {
log.Debug(fmt.Sprintf("start to list and watch %s", c.Cfg))
retries := 0
timer := time.NewTimer(sdcommon.MinWaitInterval)
defer timer.Stop()
for {
nextPeriod := sdcommon.MinWaitInterval
if err := c.ListAndWatch(ctx); err != nil {
retries++
nextPeriod = backoff.GetBackoff().Delay(retries)
} else {
retries = 0
}
select {
case <-ctx.Done():
log.Debug(fmt.Sprintf("stop to list and watch %s", c.Cfg))
return
case <-timer.C:
timer.Reset(nextPeriod)
}
}
}
// keep the evts valid when call sync
func (c *KvCacher) sync(evts []kvstore.Event) {
if len(evts) == 0 {
return
}
if c.needDeferHandle(evts) {
return
}
c.onEvents(evts)
}
func (c *KvCacher) filter(rev int64, items []*sdcommon.Resource) []kvstore.Event {
nc := len(items)
newStore := make(map[string]*sdcommon.Resource, nc)
for _, kv := range items {
newStore[kv.Key] = kv
}
filterStopCh := make(chan struct{})
eventsCh := make(chan [sdcommon.EventBlockSize]kvstore.Event, 2)
go c.filterDelete(newStore, rev, eventsCh, filterStopCh)
go c.filterCreateOrUpdate(newStore, rev, eventsCh, filterStopCh)
evts := make([]kvstore.Event, 0, nc)
for block := range eventsCh {
for _, e := range block {
if e.KV == nil {
break
}
evts = append(evts, e)
}
}
return evts
}
func (c *KvCacher) filterDelete(newStore map[string]*sdcommon.Resource,
rev int64, eventsCh chan [sdcommon.EventBlockSize]kvstore.Event, filterStopCh chan struct{}) {
var block [sdcommon.EventBlockSize]kvstore.Event
i := 0
c.cache.ForEach(func(k string, v *kvstore.KeyValue) (next bool) {
next = true
_, ok := newStore[k]
if ok {
return
}
if i >= sdcommon.EventBlockSize {
eventsCh <- block
block = [sdcommon.EventBlockSize]kvstore.Event{}
i = 0
}
block[i] = kvstore.NewEvent(rmodel.EVT_DELETE, v, rev)
i++
return
})
if i > 0 {
eventsCh <- block
}
close(filterStopCh)
}
func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*sdcommon.Resource,
rev int64, eventsCh chan [sdcommon.EventBlockSize]kvstore.Event, filterStopCh chan struct{}) {
var block [sdcommon.EventBlockSize]kvstore.Event
i := 0
for k, v := range newStore {
ov := c.cache.Get(k)
if ov == nil {
if i >= sdcommon.EventBlockSize {
eventsCh <- block
block = [sdcommon.EventBlockSize]kvstore.Event{}
i = 0
}
if kv := c.doParse(v); kv != nil {
block[i] = kvstore.NewEvent(rmodel.EVT_CREATE, kv, rev)
i++
}
continue
}
if ov.CreateRevision == v.CreateRevision && ov.ModRevision == v.ModRevision {
continue
}
if i >= sdcommon.EventBlockSize {
eventsCh <- block
block = [sdcommon.EventBlockSize]kvstore.Event{}
i = 0
}
if kv := c.doParse(v); kv != nil {
block[i] = kvstore.NewEvent(rmodel.EVT_UPDATE, kv, rev)
i++
}
}
if i > 0 {
eventsCh <- block
}
<-filterStopCh
close(eventsCh)
}
func (c *KvCacher) deferHandle(ctx context.Context) {
if c.Cfg.DeferHandler == nil {
return
}
for {
select {
case <-ctx.Done():
return
default:
c.handleDeferEvents(ctx)
}
}
}
func (c *KvCacher) handleDeferEvents(ctx context.Context) {
defer log.Recover()
var (
evts = make([]kvstore.Event, sdcommon.EventBlockSize)
i int
)
interval := 300 * time.Millisecond
timer := time.NewTimer(interval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
if !ok {
log.Error("replay channel is closed", nil)
return
}
if i >= sdcommon.EventBlockSize {
c.onEvents(evts[:i])
evts = make([]kvstore.Event, sdcommon.EventBlockSize)
i = 0
}
evts[i] = evt
i++
timeutil.ResetTimer(timer, interval)
case <-timer.C:
timer.Reset(interval)
if i == 0 {
continue
}
c.onEvents(evts[:i])
evts = make([]kvstore.Event, sdcommon.EventBlockSize)
i = 0
}
}
}
func (c *KvCacher) onEvents(evts []kvstore.Event) {
c.buildCache(evts)
c.notify(evts)
}
func (c *KvCacher) buildCache(evts []kvstore.Event) {
init := !c.IsReady()
for i, evt := range evts {
key := util.BytesToStringWithNoCopy(evt.KV.Key)
prevKv := c.cache.Get(key)
ok := prevKv != nil
switch evt.Type {
case rmodel.EVT_CREATE, rmodel.EVT_UPDATE:
switch {
case init:
evt.Type = rmodel.EVT_INIT
case !ok && evt.Type != rmodel.EVT_CREATE:
log.Warn(fmt.Sprintf("unexpected %s event! it should be %s key %s",
evt.Type, rmodel.EVT_CREATE, key))
evt.Type = rmodel.EVT_CREATE
case ok && evt.Type != rmodel.EVT_UPDATE:
log.Warn(fmt.Sprintf("unexpected %s event! it should be %s key %s",
evt.Type, rmodel.EVT_UPDATE, key))
evt.Type = rmodel.EVT_UPDATE
}
c.cache.Put(key, evt.KV)
evts[i] = evt
case rmodel.EVT_DELETE:
if !ok {
log.Warn(fmt.Sprintf("unexpected %s event! key %s does not cache",
evt.Type, key))
} else {
evt.KV = prevKv
c.cache.Remove(key)
}
evts[i] = evt
}
}
kvstore.ReportProcessEventCompleted(c.Cfg.Key, evts)
}
func (c *KvCacher) notify(evts []kvstore.Event) {
if c.Cfg.OnEvent == nil {
return
}
defer log.Recover()
for _, evt := range evts {
c.Cfg.OnEvent(evt)
}
kvstore.ReportDispatchEventCompleted(c.Cfg.Key, evts)
}
func (c *KvCacher) doParse(src *sdcommon.Resource) (kv *kvstore.KeyValue) {
kv = kvstore.NewKeyValue()
if err := ParseResourceToEtcdKeyValue(kv, src, c.Cfg.Parser); err != nil {
log.Error(fmt.Sprintf("parse %s value failed", src.Key), err)
return nil
}
return
}
func (c *KvCacher) Cache() kvstore.CacheReader {
return c.cache
}
func (c *KvCacher) Run() {
c.once.Do(func() {
c.goroutine.Do(c.refresh)
c.goroutine.Do(c.deferHandle)
c.goroutine.Do(c.reportMetrics)
})
}
func (c *KvCacher) Stop() {
c.goroutine.Close(true)
util.SafeCloseChan(c.ready)
}
func (c *KvCacher) Ready() <-chan struct{} {
return c.ready
}
func (c *KvCacher) IsReady() bool {
select {
case <-c.ready:
return true
default:
return false
}
}
func (c *KvCacher) reportMetrics(ctx context.Context) {
if !config.GetServer().EnablePProf {
return
}
timer := time.NewTimer(sdcommon.DefaultMetricsInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
ReportCacheSize(c.cache.Name(), "raw", c.cache.Size())
timer.Reset(sdcommon.DefaultMetricsInterval)
}
}
}
func NewKvCacher(cfg *kvstore.Options, cache kvstore.Cache) *KvCacher {
return &KvCacher{
Cfg: cfg,
cache: cache,
ready: make(chan struct{}),
lw: &innerListWatch{
Client: etcdadpt.Instance(),
Prefix: cfg.Key,
},
goroutine: goutil.New(),
}
}
func (c *KvCacher) getRevision() int64 {
lw := reflect.ValueOf(c.lw)
revMethod := lw.MethodByName(revMethodName)
if !revMethod.IsValid() {
log.Warn("get revision failed")
return 0
}
revs := revMethod.Call(make([]reflect.Value, 0))
rev := revs[0].Int()
return rev
}