Clear no-instance services automatically (#577)
diff --git a/scripts/prepare_env_ut.sh b/scripts/prepare_env_ut.sh
index 2868ee1..c2fd19a 100755
--- a/scripts/prepare_env_ut.sh
+++ b/scripts/prepare_env_ut.sh
@@ -14,5 +14,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-cp -r etc/conf server/plugin/infra/tls/buildin
-echo "mode: atomic" > coverage.txt
+CURRENT_PATH=$(cd $(dirname $0);pwd)
+ROOT_PATH=$(dirname $CURRENT_PATH)
+
+mkdir -p $ROOT_PATH/server/plugin/infra/tls/buildin
+cp -r $ROOT_PATH/etc/conf $ROOT_PATH/server/plugin/infra/tls/buildin
+echo "mode: atomic" > $ROOT_PATH/coverage.txt
diff --git a/scripts/ut.sh b/scripts/ut.sh
index 849784b..4b19dbc 100755
--- a/scripts/ut.sh
+++ b/scripts/ut.sh
@@ -16,14 +16,18 @@
# limitations under the License.
set -e
+CURRENT_PATH=$(cd $(dirname $0);pwd)
+ROOT_PATH=$(dirname $CURRENT_PATH)
+
export COVERAGE_PATH=$(pwd)
cd $1
-for d in $(go list ./... | grep -v vendor); do
- cd $GOPATH/src/$d
+for d in $(go list -f '{{.Dir}}' ./... | grep -v vendor); do
+ cd $d
if [ $(ls | grep _test.go | wc -l) -gt 0 ]; then
- go test -cover -covermode atomic -coverprofile coverage.out
+ go test -cover -covermode atomic -coverprofile coverage.out
if [ -f coverage.out ]; then
- sed '1d;$d' coverage.out >> $GOPATH/src/github.com/apache/servicecomb-service-center/coverage.txt
+ sed '1d;$d' coverage.out >> $ROOT_PATH/coverage.txt
fi
fi
done
+
diff --git a/server/core/config.go b/server/core/config.go
index ab2f891..ae6ac84 100644
--- a/server/core/config.go
+++ b/server/core/config.go
@@ -17,18 +17,29 @@
package core
import (
+ "os"
+ "runtime"
+ "time"
+
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/plugin"
"github.com/apache/servicecomb-service-center/pkg/util"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
"github.com/apache/servicecomb-service-center/version"
"github.com/astaxie/beego"
- "os"
- "runtime"
)
const (
- INIT_VERSION = "0"
+ InitVersion = "0"
+
+ defaultServiceClearInterval = 12 * time.Hour //0.5 day
+ defaultServiceTTL = 24 * time.Hour //1 day
+
+ minServiceClearInterval = 30 * time.Second
+ minServiceTTL = 30 * time.Second
+
+ maxServiceClearInterval = 24 * time.Hour //1 day
+ maxServiceTTL = 24 * 365 * time.Hour //1 year
)
var ServerInfo = pb.NewServerInformation()
@@ -54,8 +65,19 @@
if maxLogBackupCount < 0 || maxLogBackupCount > 100 {
maxLogBackupCount = 50
}
+
+ serviceClearInterval, err := time.ParseDuration(os.Getenv("SERVICE_CLEAR_INTERVAL"))
+ if err != nil || serviceClearInterval < minServiceClearInterval || serviceClearInterval > maxServiceClearInterval {
+ serviceClearInterval = defaultServiceClearInterval
+ }
+
+ serviceTTL, err := time.ParseDuration(os.Getenv("SERVICE_TTL"))
+ if err != nil || serviceTTL < minServiceTTL || serviceTTL > maxServiceTTL {
+ serviceTTL = defaultServiceTTL
+ }
+
return pb.ServerInformation{
- Version: INIT_VERSION,
+ Version: InitVersion,
Config: pb.ServerConfig{
MaxHeaderBytes: int64(beego.AppConfig.DefaultInt("max_header_bytes", 16384)),
MaxBodyBytes: beego.AppConfig.DefaultInt64("max_body_bytes", 2097152),
@@ -92,6 +114,10 @@
EnablePProf: beego.AppConfig.DefaultInt("enable_pprof", 0) != 0,
EnableCache: beego.AppConfig.DefaultInt("enable_cache", 1) != 0,
SelfRegister: beego.AppConfig.DefaultInt("self_register", 1) != 0,
+
+ ServiceClearEnabled: os.Getenv("SERVICE_CLEAR_ENABLED") == "true",
+ ServiceClearInterval: serviceClearInterval,
+ ServiceTTL: serviceTTL,
},
}
}
diff --git a/server/core/proto/types.go b/server/core/proto/types.go
index 36de9a4..a157ebf 100644
--- a/server/core/proto/types.go
+++ b/server/core/proto/types.go
@@ -16,6 +16,8 @@
package proto
import (
+ "time"
+
"github.com/apache/servicecomb-service-center/pkg/util"
)
@@ -61,6 +63,12 @@
Plugins util.JSONObject `json:"plugins"`
SelfRegister bool `json:"selfRegister"`
+
+ //clear no-instance services
+ ServiceClearEnabled bool `json:"serviceClearEnabled"`
+ ServiceClearInterval time.Duration `json:"serviceClearInterval"`
+ //if a service's existence time reaches this value, it can be cleared
+ ServiceTTL time.Duration `json:"serviceTTL"`
}
type ServerInformation struct {
diff --git a/server/mux/mux.go b/server/mux/mux.go
index 8f3a8bf..c2c844e 100644
--- a/server/mux/mux.go
+++ b/server/mux/mux.go
@@ -33,8 +33,9 @@
}
const (
- GLOBAL_LOCK MuxType = "/cse-sr/lock/global"
- DEP_QUEUE_LOCK MuxType = "/cse-sr/lock/dep-queue"
+ GlobalLock MuxType = "/cse-sr/lock/global"
+ DepQueueLock MuxType = "/cse-sr/lock/dep-queue"
+ ServiceClearLock MuxType = "/cse-sr/lock/service-clear"
)
func Lock(t MuxType) (*etcdsync.DLock, error) {
diff --git a/server/server.go b/server/server.go
index 72ea40b..8f04b66 100644
--- a/server/server.go
+++ b/server/server.go
@@ -19,6 +19,9 @@
import _ "github.com/apache/servicecomb-service-center/server/service/event"
import (
"fmt"
+ "os"
+ "time"
+
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
nf "github.com/apache/servicecomb-service-center/pkg/notify"
@@ -28,11 +31,10 @@
"github.com/apache/servicecomb-service-center/server/notify"
"github.com/apache/servicecomb-service-center/server/plugin"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
+ "github.com/apache/servicecomb-service-center/server/task"
"github.com/apache/servicecomb-service-center/version"
"github.com/astaxie/beego"
"golang.org/x/net/context"
- "os"
- "time"
)
const buildin = "buildin"
@@ -80,7 +82,7 @@
}
func (s *ServiceCenterServer) loadOrUpgradeServerVersion() {
- lock, err := mux.Lock(mux.GLOBAL_LOCK)
+ lock, err := mux.Lock(mux.GlobalLock)
if err != nil {
log.Errorf(err, "wait for server ready failed")
os.Exit(1)
@@ -114,8 +116,8 @@
case <-ctx.Done():
return
case <-time.After(interval):
- lock, err := mux.Try(mux.GLOBAL_LOCK)
- if lock == nil {
+ lock, err := mux.Try(mux.GlobalLock)
+ if err != nil {
log.Errorf(err, "can not compact backend by this service center instance now")
continue
}
@@ -128,6 +130,38 @@
})
}
+// clear services who have no instance
+func (s *ServiceCenterServer) clearNoInstanceServices() {
+ if !core.ServerInfo.Config.ServiceClearEnabled {
+ return
+ }
+ log.Infof("service clear enabled, interval: %s, service TTL: %s",
+ core.ServerInfo.Config.ServiceClearInterval,
+ core.ServerInfo.Config.ServiceTTL)
+
+ s.goroutine.Do(func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(core.ServerInfo.Config.ServiceClearInterval):
+ lock, err := mux.Try(mux.ServiceClearLock)
+ if err != nil {
+ log.Errorf(err, "can not clear no instance services by this service center instance now")
+ continue
+ }
+ err = task.ClearNoInstanceServices(core.ServerInfo.Config.ServiceTTL)
+ lock.Unlock()
+ if err != nil {
+ log.Errorf(err, "no-instance services cleanup failed")
+ continue
+ }
+ log.Info("no-instance services cleanup succeed")
+ }
+ }
+ })
+}
+
func (s *ServiceCenterServer) initialize() {
s.cacheService = backend.Store()
s.apiService = GetAPIServer()
@@ -151,9 +185,11 @@
s.cacheService.Run()
<-s.cacheService.Ready()
- // compact backend automatically
if buildin != beego.AppConfig.DefaultString("registry_plugin", buildin) {
+ // compact backend automatically
s.compactBackendService()
+ // clean no-instance services automatically
+ s.clearNoInstanceServices()
}
// api service
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 2af407b..5e25e12 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -63,9 +63,9 @@
}
func (h *DependencyEventHandler) tryWithBackoff(success func() error, backoff func(), retries int) (error, int) {
- lock, err := mux.Try(mux.DEP_QUEUE_LOCK)
+ lock, err := mux.Try(mux.DepQueueLock)
if err != nil {
- log.Errorf(err, "try to lock %s failed", mux.DEP_QUEUE_LOCK)
+ log.Errorf(err, "try to lock %s failed", mux.DepQueueLock)
return err, h.backoff(backoff, retries)
}
diff --git a/server/service/util/microservice_util.go b/server/service/util/microservice_util.go
index 43ed799..1972395 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -18,6 +18,8 @@
import (
"encoding/json"
+ "strings"
+
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
apt "github.com/apache/servicecomb-service-center/server/core"
@@ -27,6 +29,7 @@
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/quota"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
"golang.org/x/net/context"
)
@@ -72,6 +75,43 @@
return resp.Kvs, err
}
+//GetAllServicesAcrossDomainProject get services of all domains, projects
+//the map's key is domainProject
+func GetAllServicesAcrossDomainProject(ctx context.Context) (map[string][]*pb.MicroService, error) {
+ key := apt.GetServiceRootKey("")
+ opts := append(FromContext(ctx),
+ registry.WithStrKey(key),
+ registry.WithPrefix())
+ serviceResp, err := backend.Store().Service().Search(ctx, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ services := make(map[string][]*pb.MicroService)
+ if len(serviceResp.Kvs) == 0 {
+ return services, nil
+ }
+
+ for _, value := range serviceResp.Kvs {
+ prefix := util.BytesToStringWithNoCopy(value.Key)
+ parts := strings.Split(prefix, apt.SPLIT)
+ if len(parts) != 7 {
+ continue
+ }
+ domainProject := parts[4] + apt.SPLIT + parts[5]
+ microService, ok := value.Value.(*pb.MicroService)
+ if !ok {
+ log.Error("backend data is not type *pb.MicroService", nil)
+ continue
+ }
+ if _, ok := services[domainProject]; !ok {
+ services[domainProject] = make([]*pb.MicroService, 0)
+ }
+ services[domainProject] = append(services[domainProject], microService)
+ }
+ return services, nil
+}
+
func GetServicesByDomainProject(ctx context.Context, domainProject string) ([]*pb.MicroService, error) {
kvs, err := getServicesRawData(ctx, domainProject)
if err != nil {
diff --git a/server/task/clear_service.go b/server/task/clear_service.go
new file mode 100644
index 0000000..8c2daff
--- /dev/null
+++ b/server/task/clear_service.go
@@ -0,0 +1,107 @@
+package task
+
+import (
+ "context"
+ "errors"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ apt "github.com/apache/servicecomb-service-center/server/core"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
+)
+
+// ClearNoInstanceService clears services which have no instance
+func ClearNoInstanceServices(serviceTTL time.Duration) error {
+ services, err := serviceUtil.GetAllServicesAcrossDomainProject(context.Background())
+ if err != nil {
+ return err
+ }
+ if len(services) == 0 {
+ log.Info("no service found, no need to clear")
+ return nil
+ }
+ timeLimit := time.Now().Add(0 - serviceTTL)
+ log.Infof("clear no-instance services created before %s", timeLimit)
+ timeLimitStamp := strconv.FormatInt(timeLimit.Unix(), 10)
+
+ for domainProject, svcList := range services {
+ if len(svcList) == 0 {
+ continue
+ }
+ ctx, err := ctxFromDomainProject(domainProject)
+ if err != nil {
+ log.Errorf(err, "get domain project context failed")
+ continue
+ }
+ for _, svc := range svcList {
+ if svc == nil {
+ continue
+ }
+ ok, err := shouldClear(ctx, timeLimitStamp, svc)
+ if err != nil {
+ log.Errorf(err, "check service clear necessity failed")
+ continue
+ }
+ if !ok {
+ continue
+ }
+ //delete this service
+ svcCtxStr := "domainProject: " + domainProject + ", " +
+ "env: " + svc.Environment + ", " +
+ "service: " + util.StringJoin([]string{svc.AppId, svc.ServiceName, svc.Version}, apt.SPLIT)
+ delSvcReq := &pb.DeleteServiceRequest{
+ ServiceId: svc.ServiceId,
+ Force: true, //force delete
+ }
+ delSvcResp, err := apt.ServiceAPI.Delete(ctx, delSvcReq)
+ if err != nil {
+ log.Errorf(err, "clear service failed, %s", svcCtxStr)
+ continue
+ }
+ if delSvcResp.Response.GetCode() != pb.Response_SUCCESS {
+ log.Errorf(nil, "clear service failed, %s, %s", delSvcResp.Response.GetMessage(), svcCtxStr)
+ continue
+ }
+ log.Warnf("clear service success, %s", svcCtxStr)
+ }
+ }
+ return nil
+}
+
+func ctxFromDomainProject(domainProject string) (ctx context.Context, err error) {
+ splitIndex := strings.Index(domainProject, apt.SPLIT)
+ if splitIndex == -1 {
+ return nil, errors.New("invalid domainProject: " + domainProject)
+ }
+ domain := domainProject[:splitIndex]
+ project := domainProject[splitIndex+1:]
+ return util.SetDomainProject(context.Background(), domain, project), nil
+}
+
+//check whether a service should be cleared
+func shouldClear(ctx context.Context, timeLimitStamp string, svc *pb.MicroService) (bool, error) {
+ //ignore a service if it is created after timeLimitStamp
+ if svc.Timestamp > timeLimitStamp {
+ return false, nil
+ }
+ getInstsReq := &pb.GetInstancesRequest{
+ ConsumerServiceId: svc.ServiceId,
+ ProviderServiceId: svc.ServiceId,
+ }
+ getInstsResp, err := apt.InstanceAPI.GetInstances(ctx, getInstsReq)
+ if err != nil {
+ return false, err
+ }
+ if getInstsResp.Response.GetCode() != pb.Response_SUCCESS {
+ return false, errors.New("get instance failed: " + getInstsResp.Response.GetMessage())
+ }
+ //ignore a service if it has instances
+ if len(getInstsResp.Instances) > 0 {
+ return false, nil
+ }
+ return true, nil
+}
diff --git a/server/task/clear_service_test.go b/server/task/clear_service_test.go
new file mode 100644
index 0000000..29ac868
--- /dev/null
+++ b/server/task/clear_service_test.go
@@ -0,0 +1,133 @@
+package task_test
+
+// initialize
+import _ "github.com/apache/servicecomb-service-center/server/init"
+import _ "github.com/apache/servicecomb-service-center/server/bootstrap"
+import _ "github.com/apache/servicecomb-service-center/server"
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ apt "github.com/apache/servicecomb-service-center/server/core"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
+ "github.com/apache/servicecomb-service-center/server/task"
+ "github.com/astaxie/beego"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = BeforeSuite(func() {
+ beego.AppConfig.Set("registry_plugin", "etcd")
+ //clear service created in last test
+ time.Sleep(timeLimit)
+ task.ClearNoInstanceServices(timeLimit)
+})
+
+// map[domainProject][serviceName]*serviceCleanInfo
+var svcCleanInfos = make(map[string]map[string]*serviceCleanInfo)
+var timeLimit = 2 * time.Second
+
+type serviceCleanInfo struct {
+ ServiceName string
+ ServiceId string
+ WithInstance bool
+ ShouldClear bool
+}
+
+func TestTask(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Task Suite")
+}
+
+func getContext(domain string, project string) context.Context {
+ return util.SetContext(
+ util.SetDomainProject(context.Background(), domain, project),
+ serviceUtil.CTX_NOCACHE, "1")
+}
+
+func createService(domain string, project string, name string, withInstance bool, shouldClear bool) {
+ By(fmt.Sprintf("create service: %s, with instance: %t, should clear: %t", name, withInstance, shouldClear))
+ svc := &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "clear",
+ ServiceName: name,
+ Version: "1.0",
+ },
+ }
+ if withInstance {
+ svc.Instances = []*pb.MicroServiceInstance{
+ &pb.MicroServiceInstance{
+ Endpoints: []string{"http://127.0.0.1:80"},
+ HostName: "1",
+ },
+ }
+ }
+ ctx := getContext(domain, project)
+ svcResp, err := apt.ServiceAPI.Create(ctx, svc)
+ Expect(err).To(BeNil())
+ Expect(svcResp).NotTo(BeNil())
+ Expect(svcResp.Response.GetCode()).To(Equal(pb.Response_SUCCESS))
+ info := &serviceCleanInfo{
+ ServiceName: name,
+ ServiceId: svcResp.ServiceId,
+ WithInstance: withInstance,
+ ShouldClear: shouldClear,
+ }
+ domainProject := domain + apt.SPLIT + project
+ m, ok := svcCleanInfos[domainProject]
+ if !ok {
+ m = make(map[string]*serviceCleanInfo)
+ svcCleanInfos[domainProject] = m
+ }
+ m[name] = info
+}
+
+func checkServiceCleared(domain string, project string) {
+ domainProject := domain + apt.SPLIT + project
+ m := svcCleanInfos[domainProject]
+ for _, v := range m {
+ By(fmt.Sprintf("check cleared, service: %s, should be cleared: %t", v.ServiceName, v.ShouldClear))
+ getSvcReq := &pb.GetServiceRequest{
+ ServiceId: v.ServiceId,
+ }
+ ctx := getContext(domain, project)
+ getSvcResp, err := apt.ServiceAPI.GetOne(ctx, getSvcReq)
+ Expect(err).To(BeNil())
+ Expect(getSvcResp).NotTo(BeNil())
+ Expect(getSvcResp.Response.GetCode() == pb.Response_SUCCESS).To(Equal(!v.ShouldClear))
+ }
+}
+
+func serviceClearCheckFunc(domain string, project string) func() {
+ return func() {
+ var err error
+ It("should run clear task success", func() {
+ withInstance := true
+ withNoInstance := false
+ shouldClear := true
+ shouldNotClear := false
+
+ createService(domain, project, "svc1", withNoInstance, shouldClear)
+ createService(domain, project, "svc2", withInstance, shouldNotClear)
+ time.Sleep(timeLimit)
+ createService(domain, project, "svc3", withNoInstance, shouldNotClear)
+ createService(domain, project, "svc4", withInstance, shouldNotClear)
+
+ err = task.ClearNoInstanceServices(timeLimit)
+ Expect(err).To(BeNil())
+
+ checkServiceCleared(domain, project)
+ })
+ }
+}
+
+var _ = Describe("clear service", func() {
+ Describe("domain project 1", serviceClearCheckFunc("default1", "default"))
+ Describe("domain project 2", serviceClearCheckFunc("default2", "default"))
+})