blob: df0f23f4483d923e0af25d45829d6d9b4f767987 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"fmt"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/queue"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
"github.com/apache/servicecomb-service-center/server/mux"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
"golang.org/x/net/context"
"time"
)
type DependencyEventHandler struct {
signals *queue.UniQueue
}
func (h *DependencyEventHandler) Type() discovery.Type {
return backend.DEPENDENCY_QUEUE
}
func (h *DependencyEventHandler) OnEvent(evt discovery.KvEvent) {
action := evt.Type
if action != pb.EVT_CREATE && action != pb.EVT_UPDATE && action != pb.EVT_INIT {
return
}
h.notify()
}
func (h *DependencyEventHandler) notify() {
h.signals.Put(struct{}{})
}
func (h *DependencyEventHandler) backoff(backoff func(), retries int) int {
if backoff != nil {
<-time.After(util.GetBackoff().Delay(retries))
backoff()
}
return retries + 1
}
func (h *DependencyEventHandler) tryWithBackoff(success func() error, backoff func(), retries int) (error, int) {
lock, err := mux.Try(mux.DEP_QUEUE_LOCK)
if err != nil {
log.Errorf(err, "try to lock %s failed", mux.DEP_QUEUE_LOCK)
return err, h.backoff(backoff, retries)
}
if lock == nil {
return nil, 0
}
err = success()
lock.Unlock()
if err != nil {
log.Errorf(err, "handle dependency event failed")
return err, h.backoff(backoff, retries)
}
return nil, 0
}
func (h *DependencyEventHandler) eventLoop() {
gopool.Go(func(ctx context.Context) {
retries := 0
for {
select {
case <-ctx.Done():
return
case <-h.signals.Chan():
_, retries = h.tryWithBackoff(h.Handle, h.notify, retries)
}
}
})
}
type DependencyEventHandlerResource struct {
dep *pb.ConsumerDependency
kv *discovery.KeyValue
domainProject string
}
func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *discovery.KeyValue, domainProject string) *DependencyEventHandlerResource {
return &DependencyEventHandlerResource{
dep,
kv,
domainProject,
}
}
func isAddToLeft(centerNode *util.Node, addRes interface{}) bool {
res := addRes.(*DependencyEventHandlerResource)
compareRes := centerNode.Res.(*DependencyEventHandlerResource)
if res.kv.ModRevision > compareRes.kv.ModRevision {
return false
}
return true
}
func (h *DependencyEventHandler) Handle() error {
key := core.GetServiceDependencyQueueRootKey("")
resp, err := backend.Store().DependencyQueue().Search(context.Background(),
registry.WithStrKey(key),
registry.WithPrefix())
if err != nil {
return err
}
// maintain dependency rules.
l := len(resp.Kvs)
if l == 0 {
return nil
}
dependencyTree := util.NewTree(isAddToLeft)
cleanUpDomainProjects := make(map[string]struct{})
defer h.CleanUp(cleanUpDomainProjects)
for _, kv := range resp.Kvs {
r := kv.Value.(*pb.ConsumerDependency)
_, domainProject, uuid := core.GetInfoFromDependencyQueueKV(kv.Key)
if uuid == core.DEPS_QUEUE_UUID {
cleanUpDomainProjects[domainProject] = struct{}{}
}
res := NewDependencyEventHandlerResource(r, kv, domainProject)
dependencyTree.AddNode(res)
}
return dependencyTree.InOrderTraversal(dependencyTree.GetRoot(), h.dependencyRuleHandle)
}
func (h *DependencyEventHandler) dependencyRuleHandle(res interface{}) error {
ctx := context.Background()
dependencyEventHandlerRes := res.(*DependencyEventHandlerResource)
r := dependencyEventHandlerRes.dep
consumerFlag := util.StringJoin([]string{r.Consumer.Environment, r.Consumer.AppId, r.Consumer.ServiceName, r.Consumer.Version}, "/")
domainProject := dependencyEventHandlerRes.domainProject
consumerInfo := pb.DependenciesToKeys([]*pb.MicroServiceKey{r.Consumer}, domainProject)[0]
providersInfo := pb.DependenciesToKeys(r.Providers, domainProject)
var dep serviceUtil.Dependency
var err error
dep.DomainProject = domainProject
dep.Consumer = consumerInfo
dep.ProvidersRule = providersInfo
if r.Override {
err = serviceUtil.CreateDependencyRule(ctx, &dep)
} else {
err = serviceUtil.AddDependencyRule(ctx, &dep)
}
if err != nil {
log.Errorf(err, "modify dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
return fmt.Errorf("override: %t, consumer is %s, %s", r.Override, consumerFlag, err.Error())
}
if err = h.removeKV(ctx, dependencyEventHandlerRes.kv); err != nil {
log.Errorf(err, "remove dependency rule failed, override: %t, consumer %s", r.Override, consumerFlag)
return err
}
log.Infof("maintain dependency [%v] successfully", r)
return nil
}
func (h *DependencyEventHandler) removeKV(ctx context.Context, kv *discovery.KeyValue) error {
dResp, err := backend.Registry().TxnWithCmp(ctx, []registry.PluginOp{registry.OpDel(registry.WithKey(kv.Key))},
[]registry.CompareOp{registry.OpCmp(registry.CmpVer(kv.Key), registry.CMP_EQUAL, kv.Version)},
nil)
if err != nil {
return fmt.Errorf("can not remove the dependency %s request, %s", util.BytesToStringWithNoCopy(kv.Key), err.Error())
}
if !dResp.Succeeded {
log.Infof("the dependency %s request is changed", util.BytesToStringWithNoCopy(kv.Key))
}
return nil
}
func (h *DependencyEventHandler) CleanUp(domainProjects map[string]struct{}) {
for domainProject := range domainProjects {
if err := serviceUtil.CleanUpDependencyRules(context.Background(), domainProject); err != nil {
log.Errorf(err, "clean up '%s' dependency rules failed", domainProject)
}
}
}
func NewDependencyEventHandler() *DependencyEventHandler {
h := &DependencyEventHandler{
signals: queue.NewUniQueue(),
}
h.eventLoop()
return h
}