blob: 57027ec0b1cd6542fb527ffe2131a0e1abb1f3f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sd
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/go-chassis/cari/discovery"
"github.com/go-chassis/foundation/gopool"
"github.com/go-chassis/foundation/timeutil"
)
type deferItem struct {
ReplayAfter int32 // in seconds
event kvstore.Event
}
type InstanceEventDeferHandler struct {
Percent float64
cache kvstore.CacheReader
once sync.Once
enabled bool
items map[string]*deferItem
evts chan []kvstore.Event
replayCh chan kvstore.Event
resetCh chan struct{}
}
func (iedh *InstanceEventDeferHandler) OnCondition(cache kvstore.CacheReader, evts []kvstore.Event) bool {
if iedh.Percent <= 0 {
return false
}
iedh.once.Do(func() {
iedh.cache = cache
iedh.items = make(map[string]*deferItem)
iedh.evts = make(chan []kvstore.Event, eventBlockSize)
iedh.replayCh = make(chan kvstore.Event, eventBlockSize)
iedh.resetCh = make(chan struct{})
gopool.Go(iedh.check)
})
iedh.evts <- evts
return true
}
func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt kvstore.Event) {
if evt.KV == nil {
log.Error(fmt.Sprintf("defer or replayEvent a %s nil KV", evt.Type), nil)
return
}
kv := evt.KV
key := util.BytesToStringWithNoCopy(kv.Key)
_, ok := iedh.items[key]
switch evt.Type {
case discovery.EVT_CREATE, discovery.EVT_UPDATE:
if ok {
log.Info(fmt.Sprintf("recovered key %s events", key))
// return nil // no need to publish event to subscribers?
}
iedh.replayEvent(evt)
case discovery.EVT_DELETE:
if ok {
return
}
instance := kv.Value.(*discovery.MicroServiceInstance)
if instance == nil {
log.Error(fmt.Sprintf("defer or replayEvent a %s nil Value, KV is %v", evt.Type, kv), nil)
return
}
ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
if ttl <= 0 || ttl > selfPreservationMaxTTL {
ttl = selfPreservationMaxTTL
}
iedh.items[key] = &deferItem{
ReplayAfter: ttl,
event: evt,
}
}
}
func (iedh *InstanceEventDeferHandler) HandleChan() <-chan kvstore.Event {
return iedh.replayCh
}
func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
defer log.Recover()
t, n := time.NewTimer(deferCheckWindow), false
defer t.Stop()
for {
select {
case <-ctx.Done():
log.Error("self preservation routine dead", nil)
return
case evts := <-iedh.evts:
for _, evt := range evts {
iedh.recoverOrDefer(evt)
}
del := len(iedh.items)
if del == 0 {
continue
}
if iedh.enabled {
continue
}
total := iedh.cache.GetAll(nil)
if total > selfPreservationInitCount && float64(del) >= float64(total)*iedh.Percent {
iedh.enabled = true
log.Warn(fmt.Sprintf("self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
del, total, iedh.Percent*100))
}
if !n {
timeutil.ResetTimer(t, deferCheckWindow)
n = true
}
case <-t.C:
n = false
t.Reset(deferCheckWindow)
if !iedh.enabled {
for _, item := range iedh.items {
iedh.replayEvent(item.event)
}
continue
}
iedh.ReplayEvents()
case <-iedh.resetCh:
iedh.ReplayEvents()
iedh.enabled = false
timeutil.ResetTimer(t, deferCheckWindow)
}
}
}
func (iedh *InstanceEventDeferHandler) ReplayEvents() {
interval := int32(deferCheckWindow / time.Second)
for key, item := range iedh.items {
item.ReplayAfter -= interval
if item.ReplayAfter > 0 {
continue
}
log.Warn(fmt.Sprintf("replay delete event, remove key: %s", key))
iedh.replayEvent(item.event)
}
if len(iedh.items) == 0 {
iedh.enabled = false
log.Warn("self preservation stopped")
}
}
func (iedh *InstanceEventDeferHandler) replayEvent(evt kvstore.Event) {
key := util.BytesToStringWithNoCopy(evt.KV.Key)
delete(iedh.items, key)
iedh.replayCh <- evt
}
func (iedh *InstanceEventDeferHandler) Reset() bool {
if iedh.enabled || len(iedh.items) != 0 {
log.Warn("self preservation is reset")
iedh.resetCh <- struct{}{}
return true
}
return false
}
func NewInstanceEventDeferHandler() *InstanceEventDeferHandler {
return &InstanceEventDeferHandler{Percent: selfPreservationPercentage}
}