blob: 4094cfdba7fd5f33f15223a99058880d27962655 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except request compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to request writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package heartbeatcache
import (
const (
defaultTTL = 30
defaultCacheCapacity = 10000
defaultWorkNum = 10
defaultTimeout = 10
instanceCheckerInternal = 1 * time.Second
ctxTimeout = 5 * time.Second
var ErrHeartbeatTimeout = errors.New("heartbeat task waiting for processing timeout. ")
var (
once sync.Once
cfg cacheConfig
type cacheConfig struct {
cacheChan chan *instanceHeartbeatInfo
instanceHeartbeatStore *cache.Cache
workerNum int
heartbeatTaskTimeout int
func configuration() *cacheConfig {
once.Do(func() {
cfg.workerNum = runtime.NumCPU()
num := config.GetInt("registry.mongo.heartbeat.workerNum", defaultWorkNum)
if num != 0 {
cfg.workerNum = num
cfg.heartbeatTaskTimeout = config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
cfg.cacheChan = make(chan *instanceHeartbeatInfo, config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity))
cfg.instanceHeartbeatStore = cache.New(0, instanceCheckerInternal)
cfg.instanceHeartbeatStore.OnEvicted(func(k string, v interface{}) {
instanceInfo, ok := v.(*instanceHeartbeatInfo)
if ok && instanceInfo != nil {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
err := cleanInstance(ctx, instanceInfo.serviceID, instanceInfo.instanceID)
if err != nil {
log.Error("failed to cleanInstance in mongodb.", err)
for i := 1; i <= cfg.workerNum; i++ {
gopool.Go(func(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Warn("heartbeat work protocol exit.")
case heartbeatInfo, ok := <-cfg.cacheChan:
if ok {
cfg.instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, time.Duration(heartbeatInfo.ttl)*time.Second)
return &cfg
func (c *cacheConfig) AddHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
// Unassigned setting default value is 30s
if ttl <= 0 {
ttl = defaultTTL
newInstance := &instanceHeartbeatInfo{
serviceID: serviceID,
instanceID: instanceID,
ttl: ttl,
lastRefresh: time.Now(),
select {
case c.cacheChan <- newInstance:
return nil
case <-time.After(time.Duration(c.heartbeatTaskTimeout) * time.Second):
log.Warn("the heartbeat's channel is full. ")
return ErrHeartbeatTimeout
func (c *cacheConfig) RemoveCacheInstance(instanceID string) {
func cleanInstance(ctx context.Context, serviceID string, instanceID string) error {
session, err := client.GetMongoClient().StartSession(ctx)
if err != nil {
return err
if err = session.StartTransaction(); err != nil {
return err
defer session.EndSession(ctx)
filter := util.NewFilter(util.InstanceServiceID(serviceID), util.InstanceInstanceID(instanceID))
result, err := client.GetMongoClient().FindOne(ctx, model.CollectionInstance, filter)
if err != nil {
log.Error("failed to query instance: %v", err)
return err
var ins model.Instance
err = result.Decode(&ins)
if err != nil {
log.Error("decode instance failed: %v", err)
return err
ttl := ins.Instance.HealthCheck.Interval * (ins.Instance.HealthCheck.Times + 1)
if ttl <= 0 {
ttl = defaultTTL
if isOutDate(ins.RefreshTime, ttl) {
return nil
err = removeDBInstance(ctx, ins.Instance.ServiceId, ins.Instance.InstanceId)
if err != nil {
log.Error("fail to remote instance in db: %v", err)
errAbort := session.AbortTransaction(ctx)
if errAbort != nil {
return errAbort
return err
err = session.CommitTransaction(ctx)
return err
func removeDBInstance(ctx context.Context, serviceID string, instanceID string) error {
filter := util.NewFilter(util.InstanceServiceID(serviceID), util.InstanceInstanceID(instanceID))
res, err := client.GetMongoClient().DeleteOne(ctx, model.CollectionInstance, filter)
if err != nil {
log.Error("failed to clean instance", err)
return err
log.Info(fmt.Sprintf("delete from mongodb:%+v", res))
return nil
func findInstance(ctx context.Context, serviceID string, instanceID string) (*model.Instance, error) {
filter := util.NewFilter(util.InstanceServiceID(serviceID), util.InstanceInstanceID(instanceID))
result, err := client.GetMongoClient().FindOne(ctx, model.CollectionInstance, filter)
if err != nil {
return nil, err
var ins model.Instance
err = result.Decode(&ins)
if err != nil {
log.Error("decode instance failed: ", err)
return nil, err
return &ins, nil
func updateInstance(ctx context.Context, serviceID string, instanceID string) error {
filter := util.NewFilter(util.InstanceServiceID(serviceID), util.InstanceInstanceID(instanceID))
update := bson.M{
"$set": bson.M{model.ColumnRefreshTime: time.Now()},
result, err := client.GetMongoClient().FindOneAndUpdate(ctx, model.CollectionInstance, filter, update)
if err != nil {
log.Error("failed to update refresh time of instance: ", err)
return err
return result.Err()
func isOutDate(refreshTime time.Time, ttl int32) bool {
return time.Since(refreshTime) <= time.Duration(ttl)*time.Second