| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package mongo |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| |
| dmongo "github.com/go-chassis/cari/db/mongo" |
| "github.com/go-chassis/cari/discovery" |
| "go.mongodb.org/mongo-driver/bson" |
| "go.mongodb.org/mongo-driver/mongo" |
| "go.mongodb.org/mongo-driver/mongo/options" |
| |
| "github.com/apache/servicecomb-service-center/datasource" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/path" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/model" |
| mutil "github.com/apache/servicecomb-service-center/datasource/mongo/util" |
| "github.com/apache/servicecomb-service-center/pkg/log" |
| "github.com/apache/servicecomb-service-center/pkg/util" |
| ) |
| |
| type DepManager struct { |
| } |
| |
| func (ds *DepManager) ListConsumers(ctx context.Context, request *discovery.GetDependenciesRequest) (*discovery.GetProDependenciesResponse, error) { |
| domainProject := util.ParseDomainProject(ctx) |
| providerServiceID := request.ServiceId |
| provider, err := GetServiceByID(ctx, providerServiceID) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("query provider service failed, there is no provider %s in db", providerServiceID)) |
| return nil, discovery.NewError(discovery.ErrServiceNotExists, "Provider does not exist") |
| } |
| log.Error(fmt.Sprintf("query provider from db error, provider is %s", providerServiceID), err) |
| return nil, err |
| } |
| if provider == nil { |
| log.Error(fmt.Sprintf("ListConsumers failed for provider %s", providerServiceID), err) |
| return nil, discovery.NewError(discovery.ErrServiceNotExists, "Provider does not exist") |
| } |
| |
| services, err := GetConsumers(ctx, domainProject, provider.Service, ToDependencyFilterOptions(request)...) |
| if err != nil { |
| log.Error(fmt.Sprintf("ListConsumers failed, provider is %s/%s/%s/%s", |
| provider.Service.Environment, provider.Service.AppId, provider.Service.ServiceName, provider.Service.Version), err) |
| return nil, discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| |
| return &discovery.GetProDependenciesResponse{ |
| Consumers: services, |
| }, nil |
| } |
| |
| func (ds *DepManager) ListProviders(ctx context.Context, request *discovery.GetDependenciesRequest) (*discovery.GetConDependenciesResponse, error) { |
| domainProject := util.ParseDomainProject(ctx) |
| consumerID := request.ServiceId |
| consumer, err := GetServiceByID(ctx, consumerID) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("query consumer service failed, there is no consumer %s in db", consumerID)) |
| return nil, discovery.NewError(discovery.ErrServiceNotExists, "Consumer does not exist") |
| } |
| log.Error(fmt.Sprintf("query consumer from db error, consumer is %s", consumerID), err) |
| return nil, err |
| } |
| if consumer == nil { |
| log.Error(fmt.Sprintf("ListProviders failed for consumer %s does not exist", consumerID), err) |
| return nil, discovery.NewError(discovery.ErrServiceNotExists, "Consumer does not exist") |
| } |
| |
| services, err := GetProviders(ctx, domainProject, consumer.Service, ToDependencyFilterOptions(request)...) |
| if err != nil { |
| log.Error(fmt.Sprintf("query consumer failed, consumer is %s/%s/%s/%s", |
| consumer.Service.Environment, consumer.Service.AppId, consumer.Service.ServiceName, consumer.Service.Version), err) |
| return nil, discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| |
| return &discovery.GetConDependenciesResponse{ |
| Providers: services, |
| }, nil |
| } |
| |
| func (ds *DepManager) PutDependencies(ctx context.Context, dependencyInfos []*discovery.ConsumerDependency, override bool) error { |
| return updateDepTxn(ctx, dependencyInfos, override) |
| } |
| |
| func updateDepTxn(ctx context.Context, dependencyInfos []*discovery.ConsumerDependency, override bool) error { |
| return dmongo.GetClient().ExecTxn(ctx, func(sessionContext mongo.SessionContext) error { |
| domainProject := util.ParseDomainProject(ctx) |
| for _, dependency := range dependencyInfos { |
| consumerFlag := util.StringJoin([]string{ |
| dependency.Consumer.Environment, |
| dependency.Consumer.AppId, |
| dependency.Consumer.ServiceName, |
| dependency.Consumer.Version}, "/") |
| consumerInfo := discovery.DependenciesToKeys([]*discovery.MicroServiceKey{dependency.Consumer}, domainProject)[0] |
| providersInfo := discovery.DependenciesToKeys(dependency.Providers, domainProject) |
| |
| if err := datasource.ParamsChecker(consumerInfo, providersInfo); err != nil { |
| log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t consumer is %s", |
| override, consumerFlag), nil) |
| return err |
| } |
| |
| consumerID, err := GetServiceID(ctx, consumerInfo) |
| if err != nil && !errors.Is(err, datasource.ErrNoData) { |
| log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, get consumer %s id failed", |
| override, consumerFlag), err) |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| if len(consumerID) == 0 { |
| log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t consumer %s does not exist", |
| override, consumerFlag), err) |
| return discovery.NewError(discovery.ErrServiceNotExists, fmt.Sprintf("Consumer %s does not exist.", consumerFlag)) |
| } |
| |
| dependency.Override = override |
| if !override { |
| id := util.GenerateUUID() |
| |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| data := &model.ConsumerDep{ |
| Domain: domain, |
| Project: project, |
| ConsumerID: consumerID, |
| UUID: id, |
| ConsumerDep: dependency, |
| } |
| |
| insertRes, err := dmongo.GetClient().GetDB().Collection(model.CollectionDep).InsertOne(ctx, data) |
| if err != nil { |
| log.Error("failed to insert dep to mongodb", err) |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| log.Info(fmt.Sprintf("insert dep to mongodb success %s", insertRes.InsertedID)) |
| } |
| err = syncDependencyRule(ctx, domainProject, dependency) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| }) |
| } |
| |
| func (ds *DepManager) DependencyHandle(_ context.Context) (err error) { |
| return nil |
| } |
| |
| func syncDependencyRule(ctx context.Context, domainProject string, r *discovery.ConsumerDependency) error { |
| |
| consumerInfo := discovery.DependenciesToKeys([]*discovery.MicroServiceKey{r.Consumer}, domainProject)[0] |
| providersInfo := discovery.DependenciesToKeys(r.Providers, domainProject) |
| |
| var dep datasource.Dependency |
| //var err error |
| dep.DomainProject = domainProject |
| dep.Consumer = consumerInfo |
| dep.ProvidersRule = providersInfo |
| // add mongo get dep here |
| |
| oldProviderRules, err := GetOldProviderRules(&dep) |
| if err != nil { |
| return err |
| } |
| if r.Override { |
| datasource.ParseOverrideRules(ctx, &dep, oldProviderRules) |
| } else { |
| datasource.ParseAddOrUpdateRules(ctx, &dep, oldProviderRules) |
| } |
| return updateDeps(domainProject, &dep) |
| } |
| |
| func GetOldProviderRules(dep *datasource.Dependency) (*discovery.MicroServiceDependency, error) { |
| microServiceDependency := &discovery.MicroServiceDependency{ |
| Dependency: []*discovery.MicroServiceKey{}, |
| } |
| filter := GenerateConsumerDependencyRuleKey(dep.DomainProject, dep.Consumer) |
| findRes := dmongo.GetClient().GetDB().Collection(model.CollectionDep).FindOne(context.TODO(), filter) |
| if findRes.Err() != nil { |
| log.Error(fmt.Sprintf("get dependency rule [%v] failed", filter), findRes.Err()) |
| return microServiceDependency, nil |
| } |
| |
| var depRule *model.DependencyRule |
| err := findRes.Decode(&depRule) |
| if err != nil { |
| return nil, err |
| } |
| return depRule.Dep, nil |
| } |
| |
| func updateDeps(domainProject string, dep *datasource.Dependency) error { |
| var upsert = true |
| for _, r := range dep.DeleteDependencyRuleList { |
| filter := GenerateProviderDependencyRuleKey(domainProject, r) |
| _, err := dmongo.GetClient().GetDB().Collection(model.CollectionDep).UpdateMany(context.TODO(), filter, |
| bson.M{"$pull": bson.M{mutil.ConnectWithDot([]string{model.ColumnDep, model.ColumnDependency}): dep.Consumer}}) |
| if err != nil { |
| return err |
| } |
| } |
| for _, r := range dep.CreateDependencyRuleList { |
| filter := GenerateProviderDependencyRuleKey(domainProject, r) |
| data := bson.M{ |
| "$addToSet": bson.M{mutil.ConnectWithDot([]string{model.ColumnDep, model.ColumnDependency}): dep.Consumer}, |
| } |
| _, err := dmongo.GetClient().GetDB().Collection(model.CollectionDep).UpdateMany(context.TODO(), filter, |
| data, &options.UpdateOptions{Upsert: &upsert}) |
| if err != nil { |
| return err |
| } |
| } |
| filter := GenerateConsumerDependencyRuleKey(domainProject, dep.Consumer) |
| if len(dep.ProvidersRule) == 0 { |
| _, err := dmongo.GetClient().GetDB().Collection(model.CollectionDep).DeleteMany(context.TODO(), filter) |
| if err != nil { |
| return err |
| } |
| } else { |
| updateData := bson.M{ |
| "$set": bson.M{mutil.ConnectWithDot([]string{model.ColumnDep, model.ColumnDependency}): dep.ProvidersRule}, |
| } |
| _, err := dmongo.GetClient().GetDB().Collection(model.CollectionDep).UpdateMany(context.TODO(), filter, |
| updateData, &options.UpdateOptions{Upsert: &upsert}) |
| if err != nil { |
| return err |
| } |
| } |
| |
| err := CleanUpDepRules(context.TODO(), domainProject) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func CleanUpDepRules(ctx context.Context, domainProject string) error { |
| if len(domainProject) == 0 { |
| return mutil.ErrInvalidDomainProject |
| } |
| |
| cache := make(map[*model.DelDepCacheKey]bool) |
| err := removeProviderRuleOfConsumer(ctx, domainProject, cache) |
| |
| if err != nil { |
| return err |
| } |
| |
| return removeProviderRuleKeys(ctx, domainProject, cache) |
| } |
| |
| func removeProviderRuleOfConsumer(ctx context.Context, domainProject string, cache map[*model.DelDepCacheKey]bool) error { |
| filter := GenerateConsumerDependencyRuleKey(domainProject, nil) |
| depRules, err := GetDepRules(ctx, filter) |
| if err != nil { |
| return err |
| } |
| for _, depRule := range depRules { |
| err := removeConsumerDeps(ctx, depRule, cache) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func removeProviderRuleKeys(ctx context.Context, domainProject string, cache map[*model.DelDepCacheKey]bool) error { |
| filter := GenerateProviderDependencyRuleKey(domainProject, nil) |
| depRules, err := GetDepRules(ctx, filter) |
| if err != nil { |
| return err |
| } |
| for _, depRule := range depRules { |
| err := removeProviderDeps(ctx, depRule, cache) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func GetDepRules(ctx context.Context, filter bson.M) ([]*model.DependencyRule, error) { |
| findRes, err := dmongo.GetClient().GetDB().Collection(model.CollectionDep).Find(ctx, filter) |
| if err != nil { |
| return nil, err |
| } |
| |
| var depRules []*model.DependencyRule |
| for findRes.Next(ctx) { |
| var depRule *model.DependencyRule |
| err := findRes.Decode(&depRule) |
| if err != nil { |
| return nil, err |
| } |
| depRules = append(depRules, depRule) |
| } |
| return depRules, nil |
| } |
| |
| func removeProviderDeps(ctx context.Context, depRule *model.DependencyRule, cache map[*model.DelDepCacheKey]bool) (err error) { |
| id := &model.DelDepCacheKey{ |
| Key: depRule.ServiceKey, |
| Type: path.DepsConsumer, |
| } |
| exist, ok := cache[id] |
| if !ok { |
| _, exist, err = FindServiceIds(ctx, depRule.ServiceKey, false) |
| if err != nil { |
| cache[id] = exist |
| return err |
| } |
| } |
| |
| filter := bson.M{ |
| model.ColumnServiceKey: depRule.ServiceKey, |
| } |
| if !exist { |
| _, err = dmongo.GetClient().GetDB().Collection(model.CollectionDep).DeleteOne(ctx, filter) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func removeConsumerDeps(ctx context.Context, depRule *model.DependencyRule, cache map[*model.DelDepCacheKey]bool) (err error) { |
| var left []*discovery.MicroServiceKey |
| for _, key := range depRule.Dep.Dependency { |
| id := &model.DelDepCacheKey{ |
| Key: key, |
| Type: path.DepsProvider, |
| } |
| exist, ok := cache[id] |
| if !ok { |
| _, exist, err = FindServiceIds(ctx, key, false) |
| if err != nil { |
| return err |
| } |
| cache[id] = exist |
| } |
| |
| if exist { |
| left = append(left, key) |
| } |
| } |
| if len(depRule.Dep.Dependency) == len(left) { |
| return nil |
| } |
| |
| filter := bson.M{ |
| model.ColumnServiceKey: depRule.ServiceKey, |
| } |
| if len(left) == 0 { |
| _, err = dmongo.GetClient().GetDB().Collection(model.CollectionDep).DeleteOne(ctx, filter) |
| } else { |
| updateData := bson.M{ |
| "$set": bson.M{mutil.ConnectWithDot([]string{model.ColumnDep, model.ColumnDependency}): left}, |
| } |
| _, err = dmongo.GetClient().GetDB().Collection(model.CollectionDep).UpdateMany(ctx, filter, updateData) |
| } |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func TransferToMicroServiceDependency(ctx context.Context, filter bson.M) (*discovery.MicroServiceDependency, error) { |
| microServiceDependency := &discovery.MicroServiceDependency{ |
| Dependency: []*discovery.MicroServiceKey{}, |
| } |
| result := dmongo.GetClient().GetDB().Collection(model.CollectionDep).FindOne(ctx, filter) |
| if result.Err() == nil { |
| var depRule *model.DependencyRule |
| err := result.Decode(&depRule) |
| if err != nil { |
| return nil, err |
| } |
| microServiceDependency.Dependency = append(microServiceDependency.Dependency, depRule.Dep.Dependency...) |
| return microServiceDependency, nil |
| } |
| return microServiceDependency, nil |
| } |