| //Copyright 2017 Huawei Technologies Co., Ltd |
| // |
| //Licensed under the Apache License, Version 2.0 (the "License"); |
| //you may not use this file except in compliance with the License. |
| //You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| //Unless required by applicable law or agreed to in writing, software |
| //distributed under the License is distributed on an "AS IS" BASIS, |
| //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| //See the License for the specific language governing permissions and |
| //limitations under the License. |
| package util |
| |
| import ( |
| "encoding/json" |
| "errors" |
| "fmt" |
| "github.com/ServiceComb/service-center/pkg/cache" |
| "github.com/ServiceComb/service-center/pkg/util" |
| apt "github.com/ServiceComb/service-center/server/core" |
| "github.com/ServiceComb/service-center/server/core/backend" |
| "github.com/ServiceComb/service-center/server/core/backend/store" |
| pb "github.com/ServiceComb/service-center/server/core/proto" |
| scerr "github.com/ServiceComb/service-center/server/error" |
| "github.com/ServiceComb/service-center/server/infra/registry" |
| "github.com/ServiceComb/service-center/server/mux" |
| "golang.org/x/net/context" |
| "strings" |
| "time" |
| ) |
| |
| var consumerCache *cache.Cache |
| var providerCache *cache.Cache |
| |
| /* |
| 缓存2分钟过期 |
| 1分钟周期缓存consumers 遍历所有serviceid并查询consumers 做缓存 |
| 当发现新查询到的consumers列表变成0时则不做cache set操作 |
| 这样当consumers关系完全被删除也有1分钟的时间窗让实例变化推送到相应的consumers里 1分鐘后緩存也會自動清理 |
| 实例推送中的依赖发现实时性为T+1分钟 |
| */ |
| func init() { |
| d, _ := time.ParseDuration("2m") |
| consumerCache = cache.New(d, d) |
| providerCache = cache.New(d, d) |
| } |
| |
| func GetConsumersInCache(ctx context.Context, domainProject string, providerId string, provider *pb.MicroService) ([]string, error) { |
| // 查询所有consumer |
| dr := NewProviderDependencyRelation(ctx, domainProject, providerId, provider) |
| consumerIds, err := dr.GetDependencyConsumerIds() |
| if err != nil { |
| util.Logger().Errorf(err, "Get dependency consumerIds failed.%s", providerId) |
| return nil, err |
| } |
| |
| if len(consumerIds) == 0 { |
| util.Logger().Warnf(nil, "Get consumer for publish from database is empty. %s, get from cache", providerId) |
| consumerIds, found := consumerCache.Get(providerId) |
| if found && len(consumerIds.([]string)) > 0 { |
| return consumerIds.([]string), nil |
| } |
| return nil, nil |
| } |
| |
| return consumerIds, nil |
| } |
| |
| func GetProvidersInCache(ctx context.Context, domainProject string, consumerId string, consumer *pb.MicroService) ([]string, error) { |
| // 查询所有provider |
| dr := NewConsumerDependencyRelation(ctx, domainProject, consumerId, consumer) |
| providerIds, err := dr.GetDependencyProviderIds() |
| if err != nil { |
| util.Logger().Errorf(err, "Get dependency providerIds failed.%s", consumerId) |
| return nil, err |
| } |
| |
| if len(providerIds) == 0 { |
| util.Logger().Warnf(nil, "Get consumer for publish from database is empty.%s , get from cache", consumerId) |
| providerIds, found := providerCache.Get(consumerId) |
| if found && len(providerIds.([]string)) > 0 { |
| return providerIds.([]string), nil |
| } |
| return nil, nil |
| } |
| |
| return providerIds, nil |
| } |
| |
| func RefreshDependencyCache(ctx context.Context, domainProject string, serviceId string, service *pb.MicroService) error { |
| dr := NewDependencyRelation(ctx, domainProject, serviceId, service, serviceId, service) |
| consumerIds, err := dr.GetDependencyConsumerIds() |
| if err != nil { |
| util.Logger().Errorf(err, "%s,refresh dependency cache failed, get consumerIds failed.", serviceId) |
| return err |
| } |
| providerIds, err := dr.GetDependencyProviderIds() |
| if err != nil { |
| util.Logger().Errorf(err, "%s,refresh dependency cache failed, get providerIds failed.", serviceId) |
| return err |
| } |
| MsCache().Set(serviceId, service, 5*time.Minute) |
| if len(consumerIds) == 0 { |
| util.Logger().Infof("refresh dependency cache: this services %s has no consumer dependency.", serviceId) |
| } else { |
| consumerCache.Set(serviceId, consumerIds, 5*time.Minute) |
| } |
| if len(providerIds) == 0 { |
| util.Logger().Infof("refresh dependency cache: this services %s has no consumer dependency.", serviceId) |
| } else { |
| providerCache.Set(serviceId, providerIds, 5*time.Minute) |
| } |
| return nil |
| } |
| |
| func GetConsumerIds(ctx context.Context, domainProject string, provider *pb.MicroService) (allow []string, deny []string, _ error) { |
| if provider == nil || len(provider.ServiceId) == 0 { |
| return nil, nil, fmt.Errorf("invalid provider") |
| } |
| |
| //todo 删除服务,最后实例推送有误差 |
| providerRules, err := GetRulesUtil(util.SetContext(util.CloneContext(ctx), "cacheOnly", "1"), |
| domainProject, provider.ServiceId) |
| if err != nil { |
| return nil, nil, err |
| } |
| if len(providerRules) == 0 { |
| return getConsumerIdsWithFilter(ctx, domainProject, provider.ServiceId, provider, noFilter) |
| } |
| |
| rf := RuleFilter{ |
| DomainProject: domainProject, |
| Provider: provider, |
| ProviderRules: providerRules, |
| } |
| |
| allow, deny, err = getConsumerIdsWithFilter(ctx, domainProject, provider.ServiceId, provider, rf.Filter) |
| if err != nil { |
| return nil, nil, err |
| } |
| return allow, deny, nil |
| } |
| |
| func getConsumerIdsWithFilter(ctx context.Context, domainProject, providerId string, provider *pb.MicroService, |
| filter func(ctx context.Context, consumerId string) (bool, error)) (allow []string, deny []string, err error) { |
| consumerIds, err := GetConsumersInCache(ctx, domainProject, providerId, provider) |
| if err != nil { |
| return nil, nil, err |
| } |
| return filterConsumerIds(ctx, consumerIds, filter) |
| } |
| |
| func filterConsumerIds(ctx context.Context, consumerIds []string, |
| filter func(ctx context.Context, consumerId string) (bool, error)) (allow []string, deny []string, err error) { |
| l := len(consumerIds) |
| if l == 0 { |
| return nil, nil, nil |
| } |
| allowIdx, denyIdx := 0, l |
| consumers := make([]string, l) |
| for _, consumerId := range consumerIds { |
| ok, err := filter(ctx, consumerId) |
| if err != nil { |
| return nil, nil, err |
| } |
| if ok { |
| consumers[allowIdx] = consumerId |
| allowIdx++ |
| } else { |
| denyIdx-- |
| consumers[denyIdx] = consumerId |
| } |
| } |
| return consumers[:allowIdx], consumers[denyIdx:], nil |
| } |
| |
| func noFilter(_ context.Context, _ string) (bool, error) { |
| return true, nil |
| } |
| |
| func GetProviderIdsByConsumerId(ctx context.Context, domainProject, consumerId string, service *pb.MicroService) (allow []string, deny []string, _ error) { |
| providerIdsInCache, err := GetProvidersInCache(ctx, domainProject, consumerId, service) |
| if err != nil { |
| return nil, nil, err |
| } |
| l := len(providerIdsInCache) |
| rf := RuleFilter{ |
| DomainProject: domainProject, |
| } |
| allowIdx, denyIdx := 0, l |
| providerIds := make([]string, l) |
| copyCtx := util.SetContext(util.CloneContext(ctx), "cacheOnly", "1") |
| for _, providerId := range providerIdsInCache { |
| provider, err := GetService(ctx, domainProject, providerId) |
| if provider == nil { |
| continue |
| } |
| providerRules, err := GetRulesUtil(copyCtx, domainProject, provider.ServiceId) |
| if err != nil { |
| return nil, nil, err |
| } |
| if len(providerRules) == 0 { |
| providerIds[allowIdx] = providerId |
| allowIdx++ |
| continue |
| } |
| rf.Provider = provider |
| rf.ProviderRules = providerRules |
| ok, err := rf.Filter(ctx, consumerId) |
| if err != nil { |
| return nil, nil, err |
| } |
| if ok { |
| providerIds[allowIdx] = providerId |
| allowIdx++ |
| } else { |
| denyIdx-- |
| providerIds[denyIdx] = providerId |
| } |
| } |
| return providerIds[:allowIdx], providerIds[denyIdx:], nil |
| } |
| |
| func ProviderDependencyRuleExist(ctx context.Context, domainProject string, provider *pb.MicroServiceKey, consumer *pb.MicroServiceKey) (bool, error) { |
| providerKey := apt.GenerateProviderDependencyRuleKey(domainProject, provider) |
| consumers, err := TransferToMicroServiceDependency(ctx, providerKey) |
| if err != nil { |
| return false, err |
| } |
| if len(consumers.Dependency) != 0 { |
| isEqual, err := containServiceDependency(consumers.Dependency, consumer) |
| if err != nil { |
| return false, err |
| } |
| if isEqual { |
| //删除之前的依赖 |
| return true, nil |
| } |
| } |
| return false, nil |
| } |
| |
| func AddServiceVersionRule(ctx context.Context, domainProject string, provider *pb.MicroServiceKey, consumer *pb.MicroServiceKey, consumerId string) error { |
| //创建依赖一致 |
| exist, err := ProviderDependencyRuleExist(ctx, domainProject, provider, consumer) |
| if exist || err != nil { |
| return err |
| } |
| |
| lock, err := mux.Lock(mux.GLOBAL_LOCK) |
| if err != nil { |
| return err |
| } |
| err = CreateDependencyRuleForFind(ctx, domainProject, provider, consumer) |
| lock.Unlock() |
| return err |
| } |
| |
| func UpdateServiceForAddDependency(ctx context.Context, consumerId string, providers []*pb.DependencyKey, domainProject string) error { |
| conServiceKey := apt.GenerateServiceKey(domainProject, consumerId) |
| service, err := GetService(ctx, domainProject, consumerId) |
| if err != nil { |
| util.Logger().Errorf(err, "create dependency faild: get service failed. consumerId %s", consumerId) |
| return err |
| } |
| if service == nil { |
| util.Logger().Errorf(nil, "create dependency faild: service not exist.serviceId %s", consumerId) |
| return errors.New("Get service is empty") |
| } |
| |
| service.Providers = providers |
| data, err := json.Marshal(service) |
| if err != nil { |
| util.Logger().Errorf(err, "create dependency faild: marshal service failed.") |
| return err |
| } |
| _, err = backend.Registry().Do(ctx, |
| registry.PUT, |
| registry.WithStrKey(conServiceKey), |
| registry.WithValue(data)) |
| if err != nil { |
| util.Logger().Errorf(err, "create dependency faild: commit service data into etcd failed.") |
| return err |
| } |
| return nil |
| } |
| |
| func DeleteDependencyForService(ctx context.Context, consumer *pb.MicroServiceKey, serviceId string) ([]registry.PluginOp, error) { |
| ops := []registry.PluginOp{} |
| opsTmps := []registry.PluginOp{} |
| domainProject := consumer.Tenant |
| flag := map[string]bool{} |
| //删除依赖规则 |
| conKey := apt.GenerateConsumerDependencyRuleKey(domainProject, consumer) |
| providerValue, err := TransferToMicroServiceDependency(ctx, conKey) |
| if err != nil { |
| return nil, err |
| } |
| if providerValue != nil && len(providerValue.Dependency) != 0 { |
| proProkey := "" |
| for _, providerRule := range providerValue.Dependency { |
| proProkey = apt.GenerateProviderDependencyRuleKey(domainProject, providerRule) |
| consumers, err := TransferToMicroServiceDependency(ctx, proProkey) |
| if err != nil { |
| return nil, err |
| } |
| err = deleteDependencyRuleUtil(ctx, consumers, consumer, proProkey) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| util.Logger().Debugf("conKey is %s.", conKey) |
| ops = append(ops, registry.OpDel(registry.WithStrKey(conKey))) |
| } |
| //作为provider的依赖规则 |
| providerKey := apt.GenerateProviderDependencyRuleKey(domainProject, consumer) |
| |
| util.Logger().Debugf("providerKey is %s", providerKey) |
| ops = append(ops, registry.OpDel(registry.WithStrKey(providerKey))) |
| |
| //删除依赖关系 |
| opsTmps, err = deleteDependencyUtil(ctx, "c", domainProject, serviceId, flag) |
| if err != nil { |
| return nil, err |
| } |
| ops = append(ops, opsTmps...) |
| util.Logger().Debugf("flag is %s", flag) |
| opsTmps, err = deleteDependencyUtil(ctx, "p", domainProject, serviceId, flag) |
| if err != nil { |
| return nil, err |
| } |
| util.Logger().Debugf("flag is %s", flag) |
| ops = append(ops, opsTmps...) |
| return ops, nil |
| } |
| |
| func TransferToMicroServiceDependency(ctx context.Context, key string) (*pb.MicroServiceDependency, error) { |
| microServiceDependency := &pb.MicroServiceDependency{ |
| Dependency: []*pb.MicroServiceKey{}, |
| } |
| |
| opts := append(FromContext(ctx), registry.WithStrKey(key)) |
| res, err := store.Store().DependencyRule().Search(ctx, opts...) |
| if err != nil { |
| util.Logger().Errorf(nil, "Get dependency rule failed.") |
| return nil, err |
| } |
| if len(res.Kvs) != 0 { |
| err = json.Unmarshal(res.Kvs[0].Value, microServiceDependency) |
| if err != nil { |
| util.Logger().Errorf(nil, "Unmarshal res failed.") |
| return nil, err |
| } |
| } else { |
| util.Logger().Infof("for key %s, no mircroServiceDependency stored", key) |
| } |
| return microServiceDependency, nil |
| } |
| |
| func deleteDependencyRuleUtil(ctx context.Context, microServiceDependency *pb.MicroServiceDependency, service *pb.MicroServiceKey, serviceKey string) error { |
| for key, serviceTmp := range microServiceDependency.Dependency { |
| if ok := equalServiceDependency(serviceTmp, service); ok { |
| microServiceDependency.Dependency = append(microServiceDependency.Dependency[:key], microServiceDependency.Dependency[key+1:]...) |
| util.Logger().Debugf("delete versionRule from %s", serviceTmp.ServiceName) |
| break |
| } |
| } |
| opts := []registry.PluginOpOption{} |
| if len(microServiceDependency.Dependency) == 0 { |
| opts = append(opts, registry.DEL, registry.WithStrKey(serviceKey)) |
| util.Logger().Debugf("serviceKey is .", serviceKey) |
| util.Logger().Debugf("After deleting versionRule from %s,provider's consumer is empty.", serviceKey) |
| |
| } else { |
| data, err := json.Marshal(microServiceDependency) |
| if err != nil { |
| util.Logger().Errorf(nil, "Marshal tmpValue failed.") |
| return err |
| } |
| opts = append(opts, registry.PUT, registry.WithStrKey(serviceKey), registry.WithValue(data)) |
| util.Logger().Debugf("serviceKey is %s.", serviceKey) |
| } |
| _, err := backend.Registry().Do(ctx, opts...) |
| if err != nil { |
| util.Logger().Errorf(err, "Submit update dependency failed.") |
| return err |
| } |
| return nil |
| } |
| |
| func equalServiceDependency(serviceA *pb.MicroServiceKey, serviceB *pb.MicroServiceKey) bool { |
| stringA := toString(serviceA) |
| stringB := toString(serviceB) |
| if stringA == stringB { |
| return true |
| } |
| return false |
| } |
| |
| func toString(in *pb.MicroServiceKey) string { |
| return apt.GenerateProviderDependencyRuleKey(in.Tenant, in) |
| } |
| |
| func deleteDependencyUtil(ctx context.Context, serviceType string, domainProject string, serviceId string, flag map[string]bool) ([]registry.PluginOp, error) { |
| serviceKey := apt.GenerateServiceDependencyKey(serviceType, domainProject, serviceId, "") |
| rsp, err := store.Store().Dependency().Search(ctx, |
| registry.WithStrKey(serviceKey), |
| registry.WithPrefix()) |
| if err != nil { |
| return nil, err |
| } |
| ops := []registry.PluginOp{} |
| if rsp != nil { |
| serviceTmpId := "" |
| serviceTmpKey := "" |
| deleteKey := "" |
| for _, kv := range rsp.Kvs { |
| tmpKeyArr := strings.Split(util.BytesToStringWithNoCopy(kv.Key), "/") |
| serviceTmpId = tmpKeyArr[len(tmpKeyArr)-1] |
| if serviceType == "p" { |
| serviceTmpKey = apt.GenerateConsumerDependencyKey(domainProject, serviceTmpId, serviceId) |
| deleteKey = util.StringJoin([]string{"c", serviceTmpId, serviceId}, "/") |
| } else { |
| serviceTmpKey = apt.GenerateProviderDependencyKey(domainProject, serviceTmpId, serviceId) |
| deleteKey = util.StringJoin([]string{"p", serviceTmpId, serviceId}, "/") |
| } |
| if _, ok := flag[serviceTmpKey]; ok { |
| util.Logger().Debugf("serviceTmpKey is more exist.%s", serviceTmpKey) |
| continue |
| } |
| flag[serviceTmpKey] = true |
| util.Logger().Infof("delete dependency %s", deleteKey) |
| ops = append(ops, registry.OpDel(registry.WithStrKey(serviceTmpKey))) |
| } |
| util.Logger().Infof("delete dependency serviceKey is %s", serviceType+"/"+serviceId) |
| ops = append(ops, registry.OpDel(registry.WithStrKey(serviceKey), registry.WithPrefix())) |
| } |
| return ops, nil |
| } |
| |
| func CreateDependencyRule(ctx context.Context, dep *Dependency) error { |
| //更新consumer的providers的值,consumer的版本是确定的 |
| consumerFlag := strings.Join([]string{dep.Consumer.AppId, dep.Consumer.ServiceName, dep.Consumer.Version}, "/") |
| |
| conKey := apt.GenerateConsumerDependencyRuleKey(dep.DomainProject, dep.Consumer) |
| |
| oldProviderRules, err := TransferToMicroServiceDependency(ctx, conKey) |
| if err != nil { |
| util.Logger().Errorf(err, "maintain dependency rule failed, consumer %s: get consumer depedency rule failed.", consumerFlag) |
| return err |
| } |
| |
| unExistDependencyRuleList := make([]*pb.MicroServiceKey, 0, len(oldProviderRules.Dependency)) |
| newDependencyRuleList := make([]*pb.MicroServiceKey, 0, len(dep.ProvidersRule)) |
| existDependencyRuleList := make([]*pb.MicroServiceKey, 0, len(oldProviderRules.Dependency)) |
| for _, oldProviderRule := range oldProviderRules.Dependency { |
| if ok, _ := containServiceDependency(dep.ProvidersRule, oldProviderRule); !ok { |
| unExistDependencyRuleList = append(unExistDependencyRuleList, oldProviderRule) |
| } else { |
| existDependencyRuleList = append(existDependencyRuleList, oldProviderRule) |
| } |
| } |
| for _, tmpProviderRule := range dep.ProvidersRule { |
| if ok, _ := containServiceDependency(existDependencyRuleList, tmpProviderRule); !ok { |
| newDependencyRuleList = append(newDependencyRuleList, tmpProviderRule) |
| } |
| } |
| |
| dep.err = make(chan error, 5) |
| dep.chanNum = 0 |
| if len(unExistDependencyRuleList) != 0 { |
| util.Logger().Infof("Unexist dependency rule remove for consumer %s, %v, ", consumerFlag, unExistDependencyRuleList) |
| dep.removedDependencyRuleList = unExistDependencyRuleList |
| dep.RemoveConsumerOfProviderRule() |
| } |
| |
| if len(newDependencyRuleList) != 0 { |
| util.Logger().Infof("New dependency rule add for consumer %s, %v, ", consumerFlag, newDependencyRuleList) |
| dep.NewDependencyRuleList = newDependencyRuleList |
| dep.AddConsumerOfProviderRule() |
| } |
| |
| err = dep.UpdateProvidersRuleOfConsumer(conKey) |
| if err != nil { |
| return err |
| } |
| |
| if dep.chanNum != 0 { |
| for tmpErr := range dep.err { |
| dep.chanNum-- |
| if tmpErr != nil { |
| return tmpErr |
| } |
| if 0 == dep.chanNum { |
| close(dep.err) |
| } |
| } |
| } |
| return nil |
| } |
| |
| func CreateDependencyRuleForFind(ctx context.Context, domainProject string, provider *pb.MicroServiceKey, consumer *pb.MicroServiceKey) error { |
| //更新consumer的providers的值,consumer的版本是确定的 |
| consumerFlag := strings.Join([]string{consumer.AppId, consumer.ServiceName, consumer.Version}, "/") |
| conKey := apt.GenerateConsumerDependencyRuleKey(domainProject, consumer) |
| |
| oldProviderRules, err := TransferToMicroServiceDependency(ctx, conKey) |
| if err != nil { |
| util.Logger().Errorf(err, "get dependency rule failed, consumer %s: get consumer depedency rule failed.", consumerFlag) |
| return err |
| } |
| if isDependencyAll(oldProviderRules) { |
| util.Logger().Infof("find update dep rule, exist * for %v", consumer) |
| return nil |
| } |
| opts := make([]registry.PluginOp, 0) |
| if oldProviderRule := isNeedUpdate(oldProviderRules.Dependency, provider); oldProviderRule != nil { |
| opt, err := deleteConsumerDepOfProviderRule(ctx, domainProject, oldProviderRule, consumer) |
| if err != nil { |
| util.Logger().Errorf(err, "marshal consumerDepRules failed for delete consumer rule from provider rule's dep.%s", consumerFlag) |
| return err |
| } |
| util.Logger().Infof("delete consumer %v from provider dep %v", consumer, oldProviderRule) |
| opts = append(opts, opt) |
| |
| oldRule, opt, err := updateDepRuleUtil(conKey, oldProviderRules, provider) |
| if err != nil { |
| util.Logger().Errorf(err, "update provider rule into consumer's dep rule failed, %s", consumerFlag) |
| return err |
| } |
| util.Logger().Infof("update provider %v(from version '%s') into consumer dep %s", provider, oldRule, consumerFlag) |
| opts = append(opts, opt) |
| |
| opt, err = updateProviderRuleDep(ctx, domainProject, provider, consumer) |
| if err != nil { |
| return err |
| } |
| opts = append(opts, opt) |
| } else { |
| if !isExist(oldProviderRules.Dependency, provider) { |
| opt, err := addDepRuleUtil(conKey, oldProviderRules, provider) |
| if err != nil { |
| util.Logger().Errorf(err, "add provider rule into consumer's dep rule failed, %s", consumerFlag) |
| return err |
| } |
| util.Logger().Infof("add consumer dep, %s, add %v", consumerFlag, provider) |
| opts = append(opts, opt) |
| } |
| |
| proKey := apt.GenerateProviderDependencyRuleKey(domainProject, provider) |
| consumerDepRules, err := TransferToMicroServiceDependency(ctx, proKey) |
| if err != nil { |
| util.Logger().Errorf(err, "get consumer rule of provider failed,%v", provider) |
| return err |
| } |
| if !isExist(consumerDepRules.Dependency, consumer) { |
| opt, err := addDepRuleUtil(proKey, consumerDepRules, consumer) |
| if err != nil { |
| util.Logger().Errorf(err, "add consumer rule into provider's dep rule failed,%s", consumerFlag) |
| return err |
| } |
| util.Logger().Infof("add provider dep, %s, add %v", consumerFlag, provider) |
| opts = append(opts, opt) |
| } |
| } |
| |
| if len(opts) != 0 { |
| _, err = backend.Registry().Txn(ctx, opts) |
| if err != nil { |
| util.Logger().Errorf(err, "update dep rule for consumer failed, %s", consumerFlag) |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func updateProviderRuleDep(ctx context.Context, domainProject string, providerRule , consumer *pb.MicroServiceKey) (registry.PluginOp, error) { |
| proKey := apt.GenerateProviderDependencyRuleKey(domainProject, providerRule) |
| consumerDepRules, err := TransferToMicroServiceDependency(ctx, proKey) |
| opt := registry.PluginOp{} |
| if err != nil { |
| util.Logger().Errorf(err, "get provider rule's dep failed, providerRule %v, consumer %v", providerRule, consumer) |
| return opt, err |
| } |
| opt, err = addDepRuleUtil(proKey, consumerDepRules, consumer) |
| if err != nil { |
| util.Logger().Errorf(err, "add consumer into provider's dep rule failed, providerRule %v, consumer %v", providerRule, consumer) |
| return opt, err |
| } |
| return opt, nil |
| } |
| |
| func isDependencyAll(dep *pb.MicroServiceDependency) bool { |
| for _, servicedep := range dep.Dependency { |
| if servicedep.ServiceName == "*" { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func isExist(services []*pb.MicroServiceKey, service *pb.MicroServiceKey) bool { |
| for _, tmp := range services { |
| if equalServiceDependency(tmp, service) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func deleteConsumerDepOfProviderRule(ctx context.Context, domainProject string, providerRule *pb.MicroServiceKey, deleteConsumer *pb.MicroServiceKey) (registry.PluginOp, error) { |
| proKey := apt.GenerateProviderDependencyRuleKey(domainProject, providerRule) |
| consumerDepRules, err := TransferToMicroServiceDependency(ctx, proKey) |
| if err != nil { |
| return registry.PluginOp{}, err |
| } |
| return deleteDepRuleUtil(proKey, consumerDepRules, deleteConsumer) |
| } |
| |
| func deleteDepRuleUtil(key string, deps *pb.MicroServiceDependency, deleteDepRule *pb.MicroServiceKey) (registry.PluginOp, error) { |
| for key, consumerDepRule := range deps.Dependency { |
| if equalServiceDependency(consumerDepRule, deleteDepRule) { |
| deps.Dependency = append(deps.Dependency[:key], deps.Dependency[key+1:]...) |
| break |
| } |
| } |
| data, err := json.Marshal(deps) |
| if err != nil { |
| return registry.PluginOp{}, err |
| } |
| return registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)), nil |
| } |
| |
| func addDepRuleUtil(key string, deps *pb.MicroServiceDependency, updateDepRule *pb.MicroServiceKey) (registry.PluginOp, error) { |
| deps.Dependency = append(deps.Dependency, updateDepRule) |
| data, err := json.Marshal(deps) |
| if err != nil { |
| util.Logger().Errorf(err, "marshal consumerDepRules failed for delete consumer rule from provider rule's dep.") |
| return registry.PluginOp{}, err |
| } |
| return registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)), nil |
| } |
| |
| func updateDepRuleUtil(key string, deps *pb.MicroServiceDependency, updateDepRule *pb.MicroServiceKey) (string, registry.PluginOp, error) { |
| oldRule := "" |
| for _, serviceRule := range deps.Dependency { |
| if serviceRule.Environment == updateDepRule.Environment && |
| serviceRule.AppId == updateDepRule.AppId && |
| serviceRule.ServiceName == updateDepRule.ServiceName && |
| serviceRule.Version != updateDepRule.Version { |
| oldRule = serviceRule.Version |
| serviceRule.Version = updateDepRule.Version |
| break |
| } |
| } |
| data, err := json.Marshal(deps) |
| if err != nil { |
| return oldRule, registry.PluginOp{}, err |
| } |
| return oldRule, registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)), nil |
| } |
| |
| func isNeedUpdate(services []*pb.MicroServiceKey, service *pb.MicroServiceKey) *pb.MicroServiceKey { |
| for _, tmp := range services { |
| if tmp.Environment == service.Environment && |
| tmp.AppId == service.AppId && |
| tmp.ServiceName == service.ServiceName && |
| tmp.Version != service.Version { |
| return tmp |
| } |
| } |
| return nil |
| } |
| |
| func containServiceDependency(services []*pb.MicroServiceKey, service *pb.MicroServiceKey) (bool, error) { |
| if services == nil || service == nil { |
| return false, errors.New("Invalid params input.") |
| } |
| for _, value := range services { |
| rst := equalServiceDependency(service, value) |
| if rst { |
| return true, nil |
| } |
| } |
| return false, nil |
| } |
| |
| // fuzzyMatch: 是否使用模糊规则 |
| func validateMicroServiceKey(in *pb.MicroServiceKey, fuzzyMatch bool) error { |
| if fuzzyMatch { |
| // provider的ServiceName, Version支持模糊规则 |
| return apt.ProviderMsValidator.Validate(in) |
| } else { |
| return apt.DependencyMSValidator.Validate(in) |
| } |
| } |
| |
| func BadParamsResponse(detailErr string) *pb.CreateDependenciesResponse { |
| util.Logger().Errorf(nil, "Request params is invalid.") |
| if len(detailErr) == 0 { |
| detailErr = "Request params is invalid." |
| } |
| return &pb.CreateDependenciesResponse{ |
| Response: pb.CreateResponse(scerr.ErrInvalidParams, detailErr), |
| } |
| } |
| |
| func ParamsChecker(consumerInfo *pb.MicroServiceKey, providersInfo []*pb.MicroServiceKey) *pb.CreateDependenciesResponse { |
| if err := validateMicroServiceKey(consumerInfo, false); err != nil { |
| return BadParamsResponse(err.Error()) |
| } |
| if providersInfo == nil { |
| return BadParamsResponse("Invalid request body for provider info.") |
| } |
| flag := make(map[string]bool, len(providersInfo)) |
| for _, providerInfo := range providersInfo { |
| //存在带*的情况,后面的数据就不校验了 |
| if providerInfo.ServiceName == "*" { |
| util.Logger().Debugf("%s 's provider contains *.", consumerInfo.ServiceName) |
| break |
| } |
| if len(providerInfo.AppId) == 0 { |
| providerInfo.AppId = consumerInfo.AppId |
| } |
| if err := validateMicroServiceKey(providerInfo, true); err != nil { |
| return BadParamsResponse(err.Error()) |
| } |
| |
| version := providerInfo.Version |
| providerInfo.Version = "" |
| if _, ok := flag[toString(providerInfo)]; ok { |
| return BadParamsResponse("Invalid request body for provider info.Duplicate provider or (serviceName and appid is same).") |
| } else { |
| flag[toString(providerInfo)] = true |
| } |
| providerInfo.Version = version |
| } |
| return nil |
| } |
| |
| type Dependency struct { |
| ConsumerId string |
| DomainProject string |
| removedDependencyRuleList []*pb.MicroServiceKey |
| NewDependencyRuleList []*pb.MicroServiceKey |
| err chan error |
| chanNum int8 |
| Consumer *pb.MicroServiceKey |
| ProvidersRule []*pb.MicroServiceKey |
| } |
| |
| func (dep *Dependency) RemoveConsumerOfProviderRule() { |
| dep.chanNum++ |
| go dep.removeConsumerOfProviderRule() |
| } |
| |
| func (dep *Dependency) removeConsumerOfProviderRule() { |
| ctx := context.TODO() |
| opts := make([]registry.PluginOp, 0, len(dep.removedDependencyRuleList)) |
| for _, providerRule := range dep.removedDependencyRuleList { |
| proProkey := apt.GenerateProviderDependencyRuleKey(dep.DomainProject, providerRule) |
| util.Logger().Debugf("This proProkey is %s.", proProkey) |
| consumerValue, err := TransferToMicroServiceDependency(ctx, proProkey) |
| if err != nil { |
| dep.err <- err |
| return |
| } |
| for key, tmp := range consumerValue.Dependency { |
| if ok := equalServiceDependency(tmp, dep.Consumer); ok { |
| consumerValue.Dependency = append(consumerValue.Dependency[:key], consumerValue.Dependency[key+1:]...) |
| break |
| } |
| util.Logger().Debugf("tmp and dep.Consumer not equal, tmp %v, consumer %v", tmp, dep.Consumer) |
| } |
| //删除后,如果不存在依赖规则了,就删除该provider的依赖规则,如果有,则更新该依赖规则 |
| if len(consumerValue.Dependency) == 0 { |
| opts = append(opts, registry.OpDel(registry.WithStrKey(proProkey))) |
| continue |
| } |
| data, err := json.Marshal(consumerValue) |
| if err != nil { |
| util.Logger().Errorf(nil, "Marshal tmpValue failed.") |
| dep.err <- err |
| return |
| } |
| opts = append(opts, registry.OpPut( |
| registry.WithStrKey(proProkey), |
| registry.WithValue(data))) |
| } |
| if len(opts) != 0 { |
| _, err := backend.Registry().Txn(ctx, opts) |
| if err != nil { |
| dep.err <- err |
| return |
| } |
| } |
| dep.err <- nil |
| } |
| |
| func (dep *Dependency) AddConsumerOfProviderRule() { |
| dep.chanNum++ |
| go dep.addConsumerOfProviderRule() |
| } |
| |
| func (dep *Dependency) addConsumerOfProviderRule() { |
| ctx := context.TODO() |
| opts := []registry.PluginOp{} |
| for _, prividerRule := range dep.NewDependencyRuleList { |
| proProkey := apt.GenerateProviderDependencyRuleKey(dep.DomainProject, prividerRule) |
| tmpValue, err := TransferToMicroServiceDependency(ctx, proProkey) |
| if err != nil { |
| dep.err <- err |
| return |
| } |
| tmpValue.Dependency = append(tmpValue.Dependency, dep.Consumer) |
| |
| data, errMarshal := json.Marshal(tmpValue) |
| if errMarshal != nil { |
| util.Logger().Errorf(nil, "Marshal tmpValue failed.") |
| dep.err <- errors.New("Marshal tmpValue failed.") |
| return |
| } |
| opts = append(opts, registry.OpPut( |
| registry.WithStrKey(proProkey), |
| registry.WithValue(data))) |
| if prividerRule.ServiceName == "*" { |
| break |
| } |
| } |
| if len(opts) != 0 { |
| _, err := backend.Registry().Txn(ctx, opts) |
| if err != nil { |
| dep.err <- err |
| return |
| } |
| } |
| dep.err <- nil |
| } |
| |
| func (dep *Dependency) UpdateProvidersRuleOfConsumer(conKey string) error { |
| dependency := &pb.MicroServiceDependency{ |
| Dependency: dep.ProvidersRule, |
| } |
| data, err := json.Marshal(dependency) |
| if err != nil { |
| util.Logger().Errorf(nil, "Marshal tmpValue fialed.") |
| return err |
| } |
| _, err = backend.Registry().Do(context.TODO(), |
| registry.PUT, |
| registry.WithStrKey(conKey), |
| registry.WithValue(data)) |
| if err != nil { |
| util.Logger().Errorf(nil, "Upload dependency rule failed.") |
| return err |
| } |
| return nil |
| } |
| |
| type DependencyRelation struct { |
| ctx context.Context |
| domainProject string |
| consumerId string |
| consumer *pb.MicroService |
| providerId string |
| provider *pb.MicroService |
| } |
| |
| func NewProviderDependencyRelation(ctx context.Context, domainProject string, providerId string, provider *pb.MicroService) *DependencyRelation { |
| return NewDependencyRelation(ctx, domainProject, "", nil, providerId, provider) |
| } |
| |
| func NewConsumerDependencyRelation(ctx context.Context, domainProject string, consumerId string, consumer *pb.MicroService) *DependencyRelation { |
| return NewDependencyRelation(ctx, domainProject, consumerId, consumer, "", nil) |
| } |
| |
| func NewDependencyRelation(ctx context.Context, domainProject string, consumerId string, consumer *pb.MicroService, providerId string, provider *pb.MicroService) *DependencyRelation { |
| return &DependencyRelation{ |
| ctx: ctx, |
| domainProject: domainProject, |
| consumerId: consumerId, |
| consumer: consumer, |
| providerId: providerId, |
| provider: provider, |
| } |
| } |
| |
| func (dr *DependencyRelation) GetDependencyProviders() ([]*pb.MicroService, error) { |
| providerIds, err := dr.GetDependencyProviderIds() |
| if err != nil { |
| return nil, err |
| } |
| services := make([]*pb.MicroService, 0) |
| for _, providerId := range providerIds { |
| provider, err := GetService(dr.ctx, dr.domainProject, providerId) |
| if err != nil { |
| return nil, err |
| } |
| if provider == nil { |
| util.Logger().Warnf(nil, "Provider not exist, %s", providerId) |
| continue |
| } |
| services = append(services, provider) |
| } |
| return services, nil |
| } |
| |
| func (dr *DependencyRelation) GetDependencyProviderIds() ([]string, error) { |
| if dr.consumer == nil { |
| util.LOGGER.Infof("dr.consumer is nil ------->") |
| return nil, fmt.Errorf("Invalid consumer") |
| } |
| consumerMicroServiceKey := pb.MicroServiceToKey(dr.domainProject, dr.consumer) |
| |
| conKey := apt.GenerateConsumerDependencyRuleKey(dr.domainProject, consumerMicroServiceKey) |
| consumerDependency, err := TransferToMicroServiceDependency(dr.ctx, conKey) |
| if err != nil { |
| return nil, err |
| } |
| return dr.getDependencyProviderIds(consumerDependency.Dependency) |
| } |
| |
| func (dr *DependencyRelation) getDependencyProviderIds(providerRules []*pb.MicroServiceKey) ([]string, error) { |
| provideServiceIds := make([]string, 0) |
| opts := FromContext(dr.ctx) |
| for _, provider := range providerRules { |
| switch { |
| case provider.ServiceName == "*": |
| util.Logger().Infof("Rely all service,* type, consumerId %s", dr.consumerId) |
| splited := strings.Split(apt.GenerateServiceIndexKey(provider), "/") |
| allServiceKey := util.StringJoin(splited[:len(splited)-3], "/") + "/" |
| sopts := append(opts, |
| registry.WithStrKey(allServiceKey), |
| registry.WithPrefix()) |
| resp, err := store.Store().Service().Search(dr.ctx, sopts...) |
| if err != nil { |
| util.Logger().Errorf(err, "Add dependency failed, rely all service: get all services failed.") |
| return provideServiceIds, err |
| } |
| |
| for _, kv := range resp.Kvs { |
| provideServiceIds = append(provideServiceIds, util.BytesToStringWithNoCopy(kv.Value)) |
| } |
| return provideServiceIds, nil |
| default: |
| serviceIds, err := FindServiceIds(dr.ctx, provider.Version, provider) |
| if err != nil { |
| util.Logger().Errorf(err, "Get providerIds failed, service: %s/%s/%s", |
| provider.AppId, provider.ServiceName, provider.Version) |
| return provideServiceIds, err |
| } |
| if len(serviceIds) == 0 { |
| util.Logger().Warnf(nil, "Get providerIds is empty, service: %s/%s/%s does not exist", |
| provider.AppId, provider.ServiceName, provider.Version) |
| continue |
| } |
| provideServiceIds = append(provideServiceIds, serviceIds...) |
| } |
| } |
| return provideServiceIds, nil |
| } |
| |
| func (dr *DependencyRelation) GetDependencyConsumers() ([]*pb.MicroService, error) { |
| consumerDependAllList, err := dr.getDependencyConsumersOfProvider() |
| if err != nil { |
| util.Logger().Errorf(err, "Get consumers of provider rule failed, %s", dr.providerId) |
| return nil, err |
| } |
| consumers := make([]*pb.MicroService, 0) |
| |
| for _, consumer := range consumerDependAllList { |
| service, err := dr.getServiceByMicroServiceKey(dr.domainProject, consumer) |
| if err != nil { |
| return nil, err |
| } |
| if service == nil { |
| util.Logger().Warnf(nil, "Consumer not exist,%v", service) |
| continue |
| } |
| consumers = append(consumers, service) |
| } |
| return consumers, nil |
| } |
| |
| func (dr *DependencyRelation) getServiceByMicroServiceKey(domainProject string, service *pb.MicroServiceKey) (*pb.MicroService, error) { |
| serviceId, err := GetServiceId(dr.ctx, service) |
| if err != nil { |
| return nil, err |
| } |
| if len(serviceId) == 0 { |
| util.Logger().Warnf(nil, "Service not exist,%v", service) |
| return nil, nil |
| } |
| return GetService(dr.ctx, domainProject, serviceId) |
| } |
| |
| func (dr *DependencyRelation) GetDependencyConsumerIds() ([]string, error) { |
| consumerDependAllList, err := dr.getDependencyConsumersOfProvider() |
| if err != nil { |
| return nil, err |
| } |
| consumerIds := make([]string, 0) |
| for _, consumer := range consumerDependAllList { |
| consumerId, err := GetServiceId(context.TODO(), consumer) |
| if err != nil { |
| util.Logger().Errorf(err, "Get consumer failed, %v", consumer) |
| return nil, err |
| } |
| if len(consumerId) == 0 { |
| util.Logger().Warnf(nil, "Get consumer not exist, %v", consumer) |
| continue |
| } |
| consumerIds = append(consumerIds, consumerId) |
| } |
| return consumerIds, nil |
| |
| } |
| |
| func (dr *DependencyRelation) getDependencyConsumersOfProvider() ([]*pb.MicroServiceKey, error) { |
| if dr.provider == nil { |
| util.LOGGER.Infof("dr.provider is nil ------->") |
| return nil, fmt.Errorf("Invalid provider") |
| } |
| providerService := pb.MicroServiceToKey(dr.domainProject, dr.provider) |
| consumerDependAllList, err := dr.getConsumerOfDependAllServices() |
| if err != nil { |
| util.Logger().Errorf(err, "Get consumer that depend on all services failed, %s", dr.providerId) |
| return nil, err |
| } |
| |
| consumerDependList, err := dr.getConsumerOfSameServiceNameAndAppId(providerService) |
| if err != nil { |
| util.Logger().Errorf(err, "Get consumer that depend on same serviceName and appid rule failed, %s", dr.providerId) |
| return nil, err |
| } |
| consumerDependAllList = append(consumerDependAllList, consumerDependList...) |
| return consumerDependAllList, nil |
| } |
| |
| func (dr *DependencyRelation) getConsumerOfDependAllServices() ([]*pb.MicroServiceKey, error) { |
| providerService := pb.MicroServiceToKey(dr.domainProject, dr.provider) |
| providerService.ServiceName = "*" |
| relyAllKey := apt.GenerateProviderDependencyRuleKey(dr.domainProject, providerService) |
| opts := append(FromContext(dr.ctx), registry.WithStrKey(relyAllKey)) |
| rsp, err := store.Store().DependencyRule().Search(dr.ctx, opts...) |
| if err != nil { |
| util.Logger().Errorf(err, "get consumer that rely all service failed.") |
| return nil, err |
| } |
| dependency := &pb.MicroServiceDependency{} |
| if len(rsp.Kvs) != 0 { |
| util.Logger().Infof("consumer that rely all service exist.ServiceName: %s.", dr.provider.ServiceName) |
| err = json.Unmarshal(rsp.Kvs[0].Value, dependency) |
| if err != nil { |
| return nil, err |
| } |
| return dependency.Dependency, nil |
| } |
| return dependency.Dependency, nil |
| } |
| |
| func (dr *DependencyRelation) getConsumerOfSameServiceNameAndAppId(provider *pb.MicroServiceKey) ([]*pb.MicroServiceKey, error) { |
| providerVersion := provider.Version |
| provider.Version = "" |
| prefix := apt.GenerateProviderDependencyRuleKey(dr.domainProject, provider) |
| provider.Version = providerVersion |
| |
| opts := append(FromContext(dr.ctx), |
| registry.WithStrKey(prefix), |
| registry.WithPrefix()) |
| rsp, err := store.Store().DependencyRule().Search(dr.ctx, opts...) |
| if err != nil { |
| util.Logger().Errorf(err, "get all dependency rule failed: provider rule key %v.", provider) |
| return nil, err |
| } |
| |
| allConsumers := make([]*pb.MicroServiceKey, 0, len(rsp.Kvs)) |
| var latestServiceId []string |
| |
| for _, kv := range rsp.Kvs { |
| dependency := &pb.MicroServiceDependency{ |
| Dependency: []*pb.MicroServiceKey{}, |
| } |
| providerVersionRuleArr := strings.Split(util.BytesToStringWithNoCopy(kv.Key), "/") |
| providerVersionRule := providerVersionRuleArr[len(providerVersionRuleArr)-1] |
| if providerVersionRule == "latest" { |
| if latestServiceId == nil { |
| latestServiceId, err = FindServiceIds(dr.ctx, providerVersionRule, provider) |
| if err != nil { |
| util.Logger().Errorf(err, "Get latest service failed.") |
| return nil, err |
| } |
| } |
| if len(latestServiceId) == 0 { |
| util.Logger().Infof("%s 's providerId is empty,no this service.", provider.ServiceName) |
| continue |
| } |
| if dr.providerId != latestServiceId[0] { |
| continue |
| } |
| |
| } else { |
| if !VersionMatchRule(providerVersion, providerVersionRule) { |
| continue |
| } |
| } |
| |
| util.Logger().Debugf("providerETCD is %s", providerVersionRuleArr) |
| err = json.Unmarshal(kv.Value, dependency) |
| if err != nil { |
| util.Logger().Errorf(err, "Unmarshal consumers failed.") |
| return nil, err |
| } |
| allConsumers = append(allConsumers, dependency.Dependency...) |
| } |
| return allConsumers, nil |
| } |