| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except request compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to request writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package mongo |
| |
| import ( |
| "context" |
| "crypto/sha1" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "reflect" |
| "regexp" |
| "sort" |
| "strconv" |
| "strings" |
| "time" |
| |
| "github.com/go-chassis/cari/discovery" |
| "github.com/jinzhu/copier" |
| "go.mongodb.org/mongo-driver/bson" |
| "go.mongodb.org/mongo-driver/mongo" |
| "go.mongodb.org/mongo-driver/mongo/options" |
| |
| "github.com/apache/servicecomb-service-center/datasource" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/client" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/client/dao" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/client/model" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/sd" |
| mutil "github.com/apache/servicecomb-service-center/datasource/mongo/util" |
| "github.com/apache/servicecomb-service-center/pkg/gopool" |
| "github.com/apache/servicecomb-service-center/pkg/log" |
| "github.com/apache/servicecomb-service-center/pkg/util" |
| apt "github.com/apache/servicecomb-service-center/server/core" |
| "github.com/apache/servicecomb-service-center/server/plugin/quota" |
| "github.com/apache/servicecomb-service-center/server/plugin/uuid" |
| ) |
| |
| const baseTen = 10 |
| |
| func (ds *DataSource) RegisterService(ctx context.Context, request *discovery.CreateServiceRequest) (*discovery.CreateServiceResponse, error) { |
| service := request.Service |
| remoteIP := util.GetIPFromContext(ctx) |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| serviceFlag := util.StringJoin([]string{ |
| service.Environment, service.AppId, service.ServiceName, service.Version}, "/") |
| requestServiceID := service.ServiceId |
| |
| if len(requestServiceID) == 0 { |
| ctx = util.SetContext(ctx, uuid.ContextKey, util.StringJoin([]string{domain, project, service.Environment, service.AppId, service.ServiceName, service.Alias, service.Version}, "/")) |
| service.ServiceId = uuid.Generator().GetServiceID(ctx) |
| } |
| service.Timestamp = strconv.FormatInt(time.Now().Unix(), 10) |
| service.ModTimestamp = service.Timestamp |
| // the service unique index in table is (serviceId/serviceEnv,serviceAppid,servicename,serviceVersion) |
| if len(service.Alias) != 0 { |
| serviceID, err := GetServiceID(ctx, &discovery.MicroServiceKey{ |
| Environment: service.Environment, |
| AppId: service.AppId, |
| ServiceName: service.ServiceName, |
| Version: service.Version, |
| Alias: service.Alias, |
| }) |
| if err != nil && !errors.Is(err, datasource.ErrNoData) { |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrUnavailableBackend, err.Error()), |
| }, err |
| } |
| if len(serviceID) != 0 { |
| if len(requestServiceID) != 0 && requestServiceID != serviceID { |
| log.Warn(fmt.Sprintf("create micro-service[%s] failed, service already exists, operator: %s", |
| serviceFlag, remoteIP)) |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceAlreadyExists, |
| "ServiceID conflict or found the same service with different id."), |
| }, nil |
| } |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "register service successfully"), |
| ServiceId: serviceID, |
| }, nil |
| } |
| } |
| insertRes, err := client.GetMongoClient().Insert(ctx, model.CollectionService, &model.Service{Domain: domain, Project: project, Service: service}) |
| if err != nil { |
| if client.IsDuplicateKey(err) { |
| serviceIDInner, err := GetServiceID(ctx, &discovery.MicroServiceKey{ |
| Environment: service.Environment, |
| AppId: service.AppId, |
| ServiceName: service.ServiceName, |
| Version: service.Version, |
| }) |
| if err != nil && !errors.Is(err, datasource.ErrNoData) { |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrUnavailableBackend, err.Error()), |
| }, err |
| } |
| // serviceid conflict with the service in the database |
| if len(requestServiceID) != 0 && serviceIDInner != requestServiceID { |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceAlreadyExists, |
| "ServiceID conflict or found the same service with different id."), |
| }, nil |
| } |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "register service successfully"), |
| ServiceId: serviceIDInner, |
| }, nil |
| } |
| log.Error(fmt.Sprintf("create micro-service[%s] failed, service already exists, operator: %s", |
| serviceFlag, remoteIP), err) |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| |
| log.Info(fmt.Sprintf("create micro-service[%s][%s] successfully,operator: %s", service.ServiceId, insertRes.InsertedID, remoteIP)) |
| |
| return &discovery.CreateServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Register service successfully"), |
| ServiceId: service.ServiceId, |
| }, nil |
| } |
| |
| func (ds *DataSource) GetServices(ctx context.Context, request *discovery.GetServicesRequest) (*discovery.GetServicesResponse, error) { |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| |
| filter := bson.M{model.ColumnDomain: domain, model.ColumnProject: project} |
| |
| services, err := dao.GetMicroServices(ctx, filter) |
| if err != nil { |
| return &discovery.GetServicesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "get services data failed."), |
| }, nil |
| } |
| |
| return &discovery.GetServicesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get all services successfully."), |
| Services: services, |
| }, nil |
| } |
| |
| func (ds *DataSource) GetApplications(ctx context.Context, request *discovery.GetAppsRequest) (*discovery.GetAppsResponse, error) { |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| |
| filter := bson.M{ |
| model.ColumnDomain: domain, |
| model.ColumnProject: project, |
| mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnEnv}): request.Environment} |
| |
| services, err := dao.GetMicroServices(ctx, filter) |
| if err != nil { |
| return &discovery.GetAppsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "get services data failed."), |
| }, nil |
| } |
| l := len(services) |
| if l == 0 { |
| return &discovery.GetAppsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get all applications successfully."), |
| }, nil |
| } |
| apps := make([]string, 0, l) |
| hash := make(map[string]struct{}, l) |
| for _, svc := range services { |
| if !request.WithShared && apt.IsGlobal(discovery.MicroServiceToKey(util.ParseDomainProject(ctx), svc)) { |
| continue |
| } |
| if _, ok := hash[svc.AppId]; ok { |
| continue |
| } |
| hash[svc.AppId] = struct{}{} |
| apps = append(apps, svc.AppId) |
| } |
| return &discovery.GetAppsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get all applications successfully."), |
| AppIds: apps, |
| }, nil |
| } |
| |
| func (ds *DataSource) GetService(ctx context.Context, request *discovery.GetServiceRequest) (*discovery.GetServiceResponse, error) { |
| svc, err := GetServiceByID(ctx, request.ServiceId) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("service %s not exist in db", request.ServiceId)) |
| return &discovery.GetServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service not exist."), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("failed to get single service %s from mongo", request.ServiceId), err) |
| return &discovery.GetServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "get service data from mongodb failed."), |
| }, err |
| } |
| return &discovery.GetServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get service successfully."), |
| Service: svc.Service, |
| }, nil |
| } |
| |
| func (ds *DataSource) ExistServiceByID(ctx context.Context, request *discovery.GetExistenceByIDRequest) (*discovery.GetExistenceByIDResponse, error) { |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| if err != nil { |
| return &discovery.GetExistenceByIDResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "Check service exist failed."), |
| Exist: false, |
| }, err |
| } |
| |
| return &discovery.GetExistenceByIDResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Check ExistService successfully."), |
| Exist: exist, |
| }, nil |
| } |
| |
| func (ds *DataSource) ExistService(ctx context.Context, request *discovery.GetExistenceRequest) (*discovery.GetExistenceResponse, error) { |
| domainProject := util.ParseDomainProject(ctx) |
| serviceFlag := util.StringJoin([]string{ |
| request.Environment, request.AppId, request.ServiceName, request.Version}, "/") |
| |
| ids, exist, err := FindServiceIds(ctx, request.Version, &discovery.MicroServiceKey{ |
| Environment: request.Environment, |
| AppId: request.AppId, |
| ServiceName: request.ServiceName, |
| Alias: request.ServiceName, |
| Version: request.Version, |
| Tenant: domainProject, |
| }) |
| if err != nil { |
| log.Error(fmt.Sprintf("micro-service[%s] exist failed, find serviceIDs failed", serviceFlag), err) |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if !exist { |
| log.Info(fmt.Sprintf("micro-service[%s] exist failed, service does not exist", serviceFlag)) |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, serviceFlag+" does not exist."), |
| }, nil |
| } |
| if len(ids) == 0 { |
| log.Info(fmt.Sprintf("micro-service[%s] exist failed, version mismatch", serviceFlag)) |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceVersionNotExists, serviceFlag+" version mismatch."), |
| }, nil |
| } |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "get service id successfully."), |
| ServiceId: ids[0], // 约定多个时,取较新版本 |
| }, nil |
| } |
| |
| func (ds *DataSource) UnregisterService(ctx context.Context, request *discovery.DeleteServiceRequest) (*discovery.DeleteServiceResponse, error) { |
| res, err := ds.DelServicePri(ctx, request.ServiceId, request.Force) |
| if err != nil { |
| return &discovery.DeleteServiceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "Delete service failed"), |
| }, err |
| } |
| return &discovery.DeleteServiceResponse{ |
| Response: res, |
| }, nil |
| } |
| |
| func (ds *DataSource) DelServicePri(ctx context.Context, serviceID string, force bool) (*discovery.Response, error) { |
| remoteIP := util.GetIPFromContext(ctx) |
| title := "delete" |
| if force { |
| title = "force delete" |
| } |
| |
| if serviceID == apt.Service.ServiceId { |
| log.Error(fmt.Sprintf("%s micro-service %s failed, operator: %s", title, serviceID, remoteIP), mutil.ErrNotAllowDeleteSC) |
| return discovery.CreateResponse(discovery.ErrInvalidParams, mutil.ErrNotAllowDeleteSC.Error()), nil |
| } |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(serviceID)) |
| microservice, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("%s micro-service %s failed, service does not exist, operator: %s", |
| title, serviceID, remoteIP)) |
| return discovery.CreateResponse(discovery.ErrServiceNotExists, "Service does not exist."), nil |
| } |
| log.Error(fmt.Sprintf("%s micro-service %s failed, get service file failed, operator: %s", |
| title, serviceID, remoteIP), err) |
| return discovery.CreateResponse(discovery.ErrInternal, err.Error()), err |
| } |
| // 强制删除,则与该服务相关的信息删除,非强制删除: 如果作为该被依赖(作为provider,提供服务,且不是只存在自依赖)或者存在实例,则不能删除 |
| if !force { |
| dr := NewProviderDependencyRelation(ctx, util.ParseDomainProject(ctx), microservice.Service) |
| services, err := dr.GetDependencyConsumerIds() |
| if err != nil { |
| log.Error(fmt.Sprintf("delete micro-service[%s] failed, get service dependency failed, operator: %s", |
| serviceID, remoteIP), err) |
| return discovery.CreateResponse(discovery.ErrInternal, err.Error()), err |
| } |
| if l := len(services); l > 1 || (l == 1 && services[0] != serviceID) { |
| log.Error(fmt.Sprintf("delete micro-service[%s] failed, other services[%d] depend on it, operator: %s", |
| serviceID, l, remoteIP), err) |
| return discovery.CreateResponse(discovery.ErrDependedOnConsumer, "Can not delete this service, other service rely it."), err |
| } |
| //todo wait for dep interface |
| instancesExist, err := client.GetMongoClient().DocExist(ctx, model.CollectionInstance, bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance, model.ColumnServiceID}): serviceID}) |
| if err != nil { |
| log.Error(fmt.Sprintf("delete micro-service[%s] failed, get instances number failed, operator: %s", |
| serviceID, remoteIP), err) |
| return discovery.CreateResponse(discovery.ErrUnavailableBackend, err.Error()), err |
| } |
| if instancesExist { |
| log.Error(fmt.Sprintf("delete micro-service[%s] failed, service deployed instances, operator: %s", |
| serviceID, remoteIP), nil) |
| return discovery.CreateResponse(discovery.ErrDeployedInstance, "Can not delete the service deployed instance(s)."), err |
| } |
| |
| } |
| |
| schemaOps := client.MongoOperation{Table: model.CollectionSchema, Models: []mongo.WriteModel{mongo.NewDeleteManyModel().SetFilter(bson.M{model.ColumnServiceID: serviceID})}} |
| rulesOps := client.MongoOperation{Table: model.CollectionRule, Models: []mongo.WriteModel{mongo.NewDeleteManyModel().SetFilter(bson.M{model.ColumnServiceID: serviceID})}} |
| instanceOps := client.MongoOperation{Table: model.CollectionInstance, Models: []mongo.WriteModel{mongo.NewDeleteManyModel().SetFilter(bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance, model.ColumnServiceID}): serviceID})}} |
| serviceOps := client.MongoOperation{Table: model.CollectionService, Models: []mongo.WriteModel{mongo.NewDeleteOneModel().SetFilter(bson.M{mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnServiceID}): serviceID})}} |
| |
| err = client.GetMongoClient().MultiTableBatchUpdate(ctx, []client.MongoOperation{schemaOps, rulesOps, instanceOps, serviceOps}) |
| if err != nil { |
| log.Error(fmt.Sprintf("micro-service[%s] failed, operator: %s", serviceID, remoteIP), err) |
| return discovery.CreateResponse(discovery.ErrUnavailableBackend, err.Error()), err |
| } |
| |
| domainProject := util.ToDomainProject(microservice.Domain, microservice.Project) |
| serviceKey := &discovery.MicroServiceKey{ |
| Tenant: domainProject, |
| Environment: microservice.Service.Environment, |
| AppId: microservice.Service.AppId, |
| ServiceName: microservice.Service.ServiceName, |
| Version: microservice.Service.Version, |
| Alias: microservice.Service.Alias, |
| } |
| |
| err = DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey) |
| if err != nil { |
| log.Error(fmt.Sprintf("micro-service[%s] failed, operator: %s", serviceID, remoteIP), err) |
| return discovery.CreateResponse(discovery.ErrUnavailableBackend, err.Error()), err |
| } |
| |
| return discovery.CreateResponse(discovery.ResponseSuccess, "Unregister service successfully."), nil |
| } |
| |
| func (ds *DataSource) UpdateService(ctx context.Context, request *discovery.UpdateServicePropsRequest) (*discovery.UpdateServicePropsResponse, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ServiceId)) |
| setFilter := mutil.NewFilter( |
| mutil.ServiceModTime(strconv.FormatInt(time.Now().Unix(), baseTen)), |
| mutil.ServiceProperty(request.Properties), |
| ) |
| updateFilter := mutil.NewFilter( |
| mutil.Set(setFilter), |
| ) |
| err := dao.UpdateService(ctx, filter, updateFilter) |
| if err != nil { |
| log.Error(fmt.Sprintf("update service %s properties failed, update mongo failed", request.ServiceId), err) |
| return &discovery.UpdateServicePropsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrUnavailableBackend, "Update doc in mongo failed."), |
| }, nil |
| } |
| return &discovery.UpdateServicePropsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Update service successfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) GetDeleteServiceFunc(ctx context.Context, serviceID string, force bool, serviceRespChan chan<- *discovery.DelServicesRspInfo) func(context.Context) { |
| return func(_ context.Context) { |
| serviceRst := &discovery.DelServicesRspInfo{ |
| ServiceId: serviceID, |
| ErrMessage: "", |
| } |
| resp, err := ds.DelServicePri(ctx, serviceID, force) |
| if err != nil { |
| serviceRst.ErrMessage = err.Error() |
| } else if resp.GetCode() != discovery.ResponseSuccess { |
| serviceRst.ErrMessage = resp.GetMessage() |
| } |
| |
| serviceRespChan <- serviceRst |
| } |
| } |
| |
| func (ds *DataSource) GetServiceDetail(ctx context.Context, request *discovery.GetServiceRequest) (*discovery.GetServiceDetailResponse, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ServiceId)) |
| mgSvc, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| return &discovery.GetServiceDetailResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service does not exist."), |
| }, nil |
| } |
| return &discovery.GetServiceDetailResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| svc := mgSvc.Service |
| key := &discovery.MicroServiceKey{ |
| Environment: svc.Environment, |
| AppId: svc.AppId, |
| ServiceName: svc.ServiceName, |
| } |
| filter = mutil.NewBasicFilter(ctx, |
| mutil.ServiceEnv(key.Environment), |
| mutil.ServiceAppID(key.AppId), |
| mutil.ServiceServiceName(key.ServiceName), |
| ) |
| versions, err := dao.GetServicesVersions(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("get service %s %s %s all versions failed", svc.Environment, svc.AppId, svc.ServiceName), err) |
| return &discovery.GetServiceDetailResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| options := []string{"tags", "rules", "instances", "schemas", "dependencies"} |
| serviceInfo, err := getServiceDetailUtil(ctx, mgSvc, false, options) |
| if err != nil { |
| return &discovery.GetServiceDetailResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| serviceInfo.MicroService = svc |
| serviceInfo.MicroServiceVersions = versions |
| return &discovery.GetServiceDetailResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get service successfully"), |
| Service: serviceInfo, |
| }, nil |
| |
| } |
| |
| func (ds *DataSource) GetServicesInfo(ctx context.Context, request *discovery.GetServicesInfoRequest) (*discovery.GetServicesInfoResponse, error) { |
| optionMap := make(map[string]struct{}, len(request.Options)) |
| for _, opt := range request.Options { |
| optionMap[opt] = struct{}{} |
| } |
| |
| options := make([]string, 0, len(optionMap)) |
| if _, ok := optionMap["all"]; ok { |
| optionMap["statistics"] = struct{}{} |
| options = []string{"tags", "rules", "instances", "schemas", "dependencies"} |
| } else { |
| for opt := range optionMap { |
| options = append(options, opt) |
| } |
| } |
| var st *discovery.Statistics |
| if _, ok := optionMap["statistics"]; ok { |
| var err error |
| st, err = statistics(ctx, request.WithShared) |
| if err != nil { |
| return &discovery.GetServicesInfoResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if len(optionMap) == 1 { |
| return &discovery.GetServicesInfoResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Statistics successfully."), |
| Statistics: st, |
| }, nil |
| } |
| } |
| services, err := dao.GetServices(ctx, bson.M{}) |
| if err != nil { |
| log.Error("get all services by domain failed", err) |
| return &discovery.GetServicesInfoResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| allServiceDetails := make([]*discovery.ServiceDetail, 0, len(services)) |
| domainProject := util.ParseDomainProject(ctx) |
| for _, mgSvc := range services { |
| if !request.WithShared && apt.IsGlobal(discovery.MicroServiceToKey(domainProject, mgSvc.Service)) { |
| continue |
| } |
| if len(request.AppId) > 0 { |
| if request.AppId != mgSvc.Service.AppId { |
| continue |
| } |
| if len(request.ServiceName) > 0 && request.ServiceName != mgSvc.Service.ServiceName { |
| continue |
| } |
| } |
| |
| serviceDetail, err := getServiceDetailUtil(ctx, mgSvc, request.CountOnly, options) |
| if err != nil { |
| return &discovery.GetServicesInfoResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| serviceDetail.MicroService = mgSvc.Service |
| tmpServiceDetail := &discovery.ServiceDetail{} |
| err = copier.CopyWithOption(tmpServiceDetail, serviceDetail, copier.Option{DeepCopy: true}) |
| if err != nil { |
| return &discovery.GetServicesInfoResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| tmpServiceDetail.MicroService.Properties = nil |
| tmpServiceDetail.MicroService.Schemas = nil |
| instances := tmpServiceDetail.Instances |
| for _, instance := range instances { |
| instance.Properties = nil |
| } |
| allServiceDetails = append(allServiceDetails, tmpServiceDetail) |
| } |
| |
| return &discovery.GetServicesInfoResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get services info successfully."), |
| AllServicesDetail: allServiceDetails, |
| Statistics: nil, |
| }, nil |
| } |
| |
| func (ds *DataSource) GetServicesStatistics(ctx context.Context, request *discovery.GetServicesRequest) ( |
| *discovery.GetServicesInfoStatisticsResponse, error) { |
| ctx = util.WithCacheOnly(ctx) |
| var st *discovery.Statistics |
| var err error |
| st, err = statistics(ctx, true) |
| if err != nil { |
| return &discovery.GetServicesInfoStatisticsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| return &discovery.GetServicesInfoStatisticsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get services statistics successfully."), |
| Statistics: st, |
| }, nil |
| } |
| |
| func (ds *DataSource) AddTags(ctx context.Context, request *discovery.AddServiceTagsRequest) (*discovery.AddServiceTagsResponse, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ServiceId)) |
| setFilter := mutil.NewFilter( |
| mutil.Tags(request.Tags), |
| ) |
| updateFilter := mutil.NewFilter( |
| mutil.Set(setFilter), |
| ) |
| err := dao.UpdateService(ctx, filter, updateFilter) |
| if err == nil { |
| return &discovery.AddServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Add service tags successfully."), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("update service %s tags failed.", request.ServiceId), err) |
| if err == client.ErrNoDocuments { |
| return &discovery.AddServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, err.Error()), |
| }, nil |
| } |
| return &discovery.AddServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| |
| func (ds *DataSource) GetTags(ctx context.Context, request *discovery.GetServiceTagsRequest) (*discovery.GetServiceTagsResponse, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ServiceId)) |
| svc, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("service %s not exist in db", request.ServiceId)) |
| return &discovery.GetServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service does not exist"), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("failed to get service %s tags", request.ServiceId), err) |
| return &discovery.GetServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| return &discovery.GetServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get service tags successfully."), |
| Tags: svc.Tags, |
| }, nil |
| } |
| |
| func (ds *DataSource) UpdateTag(ctx context.Context, request *discovery.UpdateServiceTagRequest) (*discovery.UpdateServiceTagResponse, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ServiceId)) |
| svc, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("service %s not exist in db", request.ServiceId)) |
| return &discovery.UpdateServiceTagResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service does not exist"), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("failed to get %s tags", request.ServiceId), err) |
| return &discovery.UpdateServiceTagResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| dataTags := svc.Tags |
| if len(dataTags) > 0 { |
| if _, ok := dataTags[request.Key]; !ok { |
| return &discovery.UpdateServiceTagResponse{ |
| Response: discovery.CreateResponse(discovery.ErrTagNotExists, "Tag does not exist"), |
| }, nil |
| } |
| } |
| newTags := make(map[string]string, len(dataTags)) |
| for k, v := range dataTags { |
| newTags[k] = v |
| } |
| newTags[request.Key] = request.Value |
| setFilter := mutil.NewFilter( |
| mutil.Tags(newTags), |
| ) |
| updateFilter := mutil.NewFilter( |
| mutil.Set(setFilter), |
| ) |
| err = dao.UpdateService(ctx, filter, updateFilter) |
| if err != nil { |
| log.Error(fmt.Sprintf("update service %s tags failed", request.ServiceId), err) |
| return &discovery.UpdateServiceTagResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| return &discovery.UpdateServiceTagResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Update service tag success."), |
| }, nil |
| } |
| |
| func (ds *DataSource) DeleteTags(ctx context.Context, request *discovery.DeleteServiceTagsRequest) (*discovery.DeleteServiceTagsResponse, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ServiceId)) |
| svc, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("service %s not exist in db", request.ServiceId)) |
| return &discovery.DeleteServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service does not exist"), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("failed to get service %s tags", request.ServiceId), err) |
| return &discovery.DeleteServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| dataTags := svc.Tags |
| newTags := make(map[string]string, len(dataTags)) |
| for k, v := range dataTags { |
| newTags[k] = v |
| } |
| if len(dataTags) > 0 { |
| for _, key := range request.Keys { |
| if _, ok := dataTags[key]; !ok { |
| return &discovery.DeleteServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrTagNotExists, "Tag does not exist"), |
| }, nil |
| } |
| delete(newTags, key) |
| } |
| } |
| setFilter := mutil.NewFilter( |
| mutil.Tags(newTags), |
| ) |
| updateFilter := mutil.NewFilter( |
| mutil.Set(setFilter), |
| ) |
| err = dao.UpdateService(ctx, filter, updateFilter) |
| if err != nil { |
| log.Error(fmt.Sprintf("delete service %s tags failed", request.ServiceId), err) |
| return &discovery.DeleteServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| return &discovery.DeleteServiceTagsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Update service tag success."), |
| }, nil |
| } |
| |
| func (ds *DataSource) GetSchema(ctx context.Context, request *discovery.GetSchemaRequest) (*discovery.GetSchemaResponse, error) { |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| if err != nil { |
| return &discovery.GetSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "GetSchema failed to check service exist."), |
| }, nil |
| } |
| if !exist { |
| return &discovery.GetSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "GetSchema service does not exist."), |
| }, nil |
| } |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceID(request.ServiceId), mutil.SchemaID(request.SchemaId)) |
| schema, err := dao.GetSchema(ctx, filter) |
| if err != nil { |
| return &discovery.GetSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "GetSchema failed from mongodb."), |
| }, nil |
| } |
| if schema == nil { |
| return &discovery.GetSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrSchemaNotExists, "Do not have this schema info."), |
| }, nil |
| } |
| return &discovery.GetSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get schema info successfully."), |
| Schema: schema.Schema, |
| SchemaSummary: schema.SchemaSummary, |
| }, nil |
| } |
| |
| func (ds *DataSource) GetAllSchemas(ctx context.Context, request *discovery.GetAllSchemaRequest) (*discovery.GetAllSchemaResponse, error) { |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(request.ServiceId)) |
| svc, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("service %s not exist in db", request.ServiceId)) |
| return &discovery.GetAllSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "GetAllSchemas failed for service not exist"), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("get service[%s] all schemas failed, get service failed", request.ServiceId), err) |
| return &discovery.GetAllSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| schemasList := svc.Service.Schemas |
| if len(schemasList) == 0 { |
| return &discovery.GetAllSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Do not have this schema info."), |
| Schemas: []*discovery.Schema{}, |
| }, nil |
| } |
| schemas := make([]*discovery.Schema, 0, len(schemasList)) |
| for _, schemaID := range schemasList { |
| tempSchema := &discovery.Schema{} |
| tempSchema.SchemaId = schemaID |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(request.ServiceId), mutil.SchemaID(schemaID)) |
| schema, err := dao.GetSchema(ctx, filter) |
| if err != nil { |
| return &discovery.GetAllSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if schema == nil { |
| schemas = append(schemas, tempSchema) |
| continue |
| } |
| tempSchema.Summary = schema.SchemaSummary |
| if request.WithSchema { |
| tempSchema.Schema = schema.Schema |
| } |
| schemas = append(schemas, tempSchema) |
| } |
| return &discovery.GetAllSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get all schema info successfully."), |
| Schemas: schemas, |
| }, nil |
| } |
| |
| func (ds *DataSource) ExistSchema(ctx context.Context, request *discovery.GetExistenceRequest) (*discovery.GetExistenceResponse, error) { |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| if err != nil { |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "ExistSchema failed for get service failed"), |
| }, nil |
| } |
| if !exist { |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "ExistSchema failed for service not exist"), |
| }, nil |
| } |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceID(request.ServiceId), mutil.SchemaID(request.SchemaId)) |
| Schema, err := dao.GetSchema(ctx, filter) |
| if err != nil { |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "ExistSchema failed for get schema failed."), |
| }, nil |
| } |
| if Schema == nil { |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrSchemaNotExists, "ExistSchema failed for schema not exist."), |
| }, nil |
| } |
| return &discovery.GetExistenceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Schema exist."), |
| Summary: Schema.SchemaSummary, |
| SchemaId: Schema.SchemaID, |
| ServiceId: Schema.ServiceID, |
| }, nil |
| } |
| |
| func (ds *DataSource) DeleteSchema(ctx context.Context, request *discovery.DeleteSchemaRequest) (*discovery.DeleteSchemaResponse, error) { |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| if err != nil { |
| return &discovery.DeleteSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "DeleteSchema failed for get service failed."), |
| }, nil |
| } |
| if !exist { |
| return &discovery.DeleteSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "DeleteSchema failed for service not exist."), |
| }, nil |
| } |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceID(request.ServiceId), mutil.SchemaID(request.SchemaId)) |
| res, err := client.GetMongoClient().DocDelete(ctx, model.CollectionSchema, filter) |
| if err != nil { |
| return &discovery.DeleteSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrUnavailableBackend, "DeleteSchema failed for delete schema failed."), |
| }, err |
| } |
| if !res { |
| return &discovery.DeleteSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ErrSchemaNotExists, "DeleteSchema failed for schema not exist."), |
| }, nil |
| } |
| return &discovery.DeleteSchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Delete schema info successfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) ModifySchema(ctx context.Context, request *discovery.ModifySchemaRequest) (*discovery.ModifySchemaResponse, error) { |
| remoteIP := util.GetIPFromContext(ctx) |
| serviceID := request.ServiceId |
| schemaID := request.SchemaId |
| schema := discovery.Schema{ |
| SchemaId: request.SchemaId, |
| Summary: request.Summary, |
| Schema: request.Schema, |
| } |
| err := ds.modifySchema(ctx, request.ServiceId, &schema) |
| if err != nil { |
| log.Error(fmt.Sprintf("modify schema[%s/%s] failed, operator: %s", serviceID, schemaID, remoteIP), err) |
| resp := &discovery.ModifySchemaResponse{ |
| Response: discovery.CreateResponseWithSCErr(err), |
| } |
| if err.InternalError() { |
| return resp, err |
| } |
| return resp, nil |
| } |
| log.Info(fmt.Sprintf("modify schema[%s/%s] successfully, operator: %s", serviceID, schemaID, remoteIP)) |
| return &discovery.ModifySchemaResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "modify schema info success."), |
| }, nil |
| } |
| |
| func (ds *DataSource) ModifySchemas(ctx context.Context, request *discovery.ModifySchemasRequest) (*discovery.ModifySchemasResponse, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ServiceId)) |
| svc, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| return &discovery.ModifySchemasResponse{Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service not exist")}, nil |
| } |
| return &discovery.ModifySchemasResponse{Response: discovery.CreateResponse(discovery.ErrInternal, err.Error())}, err |
| } |
| respErr := ds.modifySchemas(ctx, svc.Service, request.Schemas) |
| if respErr != nil { |
| resp := &discovery.ModifySchemasResponse{ |
| Response: discovery.CreateResponseWithSCErr(respErr), |
| } |
| if respErr.InternalError() { |
| return resp, err |
| } |
| return resp, nil |
| } |
| return &discovery.ModifySchemasResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "modify schemas info success"), |
| }, nil |
| |
| } |
| |
| func (ds *DataSource) modifySchemas(ctx context.Context, service *discovery.MicroService, schemas []*discovery.Schema) *discovery.Error { |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| remoteIP := util.GetIPFromContext(ctx) |
| serviceID := service.ServiceId |
| filter := mutil.NewFilter(mutil.ServiceID(serviceID)) |
| schemasFromDatabase, err := dao.GetSchemas(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("modify service %s schemas failed, get schemas failed, operator: %s", serviceID, remoteIP), err) |
| return discovery.NewError(discovery.ErrUnavailableBackend, err.Error()) |
| } |
| |
| needUpdateSchemas, needAddSchemas, needDeleteSchemas, nonExistSchemaIds := |
| datasource.SchemasAnalysis(schemas, schemasFromDatabase, service.Schemas) |
| |
| var schemasOps []mongo.WriteModel |
| var serviceOps []mongo.WriteModel |
| if !ds.isSchemaEditable(service) { |
| if len(service.Schemas) == 0 { |
| res := quota.NewApplyQuotaResource(quota.TypeSchema, util.ParseDomainProject(ctx), serviceID, int64(len(nonExistSchemaIds))) |
| errQuota := quota.Apply(ctx, res) |
| if errQuota != nil { |
| log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), errQuota) |
| return errQuota |
| } |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(serviceID)) |
| setFilter := mutil.NewFilter(mutil.ServiceSchemas(nonExistSchemaIds)) |
| updateFilter := mutil.NewFilter(mutil.Set(setFilter)) |
| serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(updateFilter).SetFilter(filter)) |
| } else { |
| if len(nonExistSchemaIds) != 0 { |
| errInfo := fmt.Errorf("non-existent schemaIDs %v", nonExistSchemaIds) |
| log.Error(fmt.Sprintf("modify service %s schemas failed, operator: %s", serviceID, remoteIP), err) |
| return discovery.NewError(discovery.ErrUndefinedSchemaID, errInfo.Error()) |
| } |
| for _, needUpdateSchema := range needUpdateSchemas { |
| exist, err := dao.SchemaSummaryExist(ctx, serviceID, needUpdateSchema.SchemaId) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| if !exist { |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(needUpdateSchema.SchemaId)) |
| setFilter := mutil.NewFilter( |
| mutil.Schema(needUpdateSchema.Schema), |
| mutil.SchemaSummary(needUpdateSchema.Summary), |
| ) |
| updateFilter := mutil.NewFilter( |
| mutil.Set(setFilter), |
| ) |
| schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(updateFilter)) |
| } else { |
| log.Warn(fmt.Sprintf("schema[%s/%s] and it's summary already exist, skip to update, operator: %s", |
| serviceID, needUpdateSchema.SchemaId, remoteIP)) |
| } |
| } |
| } |
| |
| for _, schema := range needAddSchemas { |
| log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) |
| schemasOps = append(schemasOps, mongo.NewInsertOneModel().SetDocument(&model.Schema{ |
| Domain: domain, |
| Project: project, |
| ServiceID: serviceID, |
| SchemaID: schema.SchemaId, |
| Schema: schema.Schema, |
| SchemaSummary: schema.Summary, |
| })) |
| } |
| } else { |
| quotaSize := len(needAddSchemas) - len(needDeleteSchemas) |
| if quotaSize > 0 { |
| res := quota.NewApplyQuotaResource(quota.TypeSchema, util.ParseDomainProject(ctx), serviceID, int64(quotaSize)) |
| err := quota.Apply(ctx, res) |
| if err != nil { |
| log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), err) |
| return err |
| } |
| } |
| var schemaIDs []string |
| for _, schema := range needAddSchemas { |
| log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) |
| schemasOps = append(schemasOps, mongo.NewInsertOneModel().SetDocument(&model.Schema{ |
| Domain: domain, |
| Project: project, |
| ServiceID: serviceID, |
| SchemaID: schema.SchemaId, |
| Schema: schema.Schema, |
| SchemaSummary: schema.Summary, |
| })) |
| schemaIDs = append(schemaIDs, schema.SchemaId) |
| } |
| |
| for _, schema := range needUpdateSchemas { |
| log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId)) |
| setFilter := mutil.NewFilter( |
| mutil.Schema(schema.Schema), |
| mutil.SchemaSummary(schema.Summary), |
| ) |
| updateFilter := mutil.NewFilter( |
| mutil.Set(setFilter), |
| ) |
| schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(updateFilter)) |
| schemaIDs = append(schemaIDs, schema.SchemaId) |
| } |
| |
| for _, schema := range needDeleteSchemas { |
| log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP)) |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId)) |
| schemasOps = append(schemasOps, mongo.NewDeleteOneModel().SetFilter(filter)) |
| } |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(serviceID)) |
| setFilter := mutil.NewFilter(mutil.ServiceSchemas(schemaIDs)) |
| updateFilter := mutil.NewFilter(mutil.Set(setFilter)) |
| serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(updateFilter).SetFilter(filter)) |
| } |
| if len(schemasOps) > 0 { |
| _, err = client.GetMongoClient().BatchUpdate(ctx, model.CollectionSchema, schemasOps) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| } |
| if len(serviceOps) > 0 { |
| _, err = client.GetMongoClient().BatchUpdate(ctx, model.CollectionService, serviceOps) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| } |
| return nil |
| } |
| |
| // modifySchema will be modified in the following cases |
| // 1.service have no relation --> update the schema && update the service |
| // 2.service is editable && service have relation with the schema --> update the shema |
| // 3.service is editable && service have no relation with the schema --> update the schema && update the service |
| // 4.service can't edit && service have relation with the schema && schema summary not exist --> update the schema |
| func (ds *DataSource) modifySchema(ctx context.Context, serviceID string, schema *discovery.Schema) *discovery.Error { |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(serviceID)) |
| remoteIP := util.GetIPFromContext(ctx) |
| svc, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| return discovery.NewError(discovery.ErrServiceNotExists, "Service does not exist.") |
| } |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| microservice := svc.Service |
| var isExist bool |
| for _, sid := range microservice.Schemas { |
| if sid == schema.SchemaId { |
| isExist = true |
| break |
| } |
| } |
| var newSchemas []string |
| if !ds.isSchemaEditable(microservice) { |
| if len(microservice.Schemas) != 0 && !isExist { |
| return discovery.NewError(discovery.ErrUndefinedSchemaID, "Non-existent schemaID can't be added request "+discovery.ENV_PROD) |
| } |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId)) |
| respSchema, err := dao.GetSchema(ctx, filter) |
| if err != nil { |
| return discovery.NewError(discovery.ErrUnavailableBackend, err.Error()) |
| } |
| if respSchema != nil { |
| if len(schema.Summary) == 0 { |
| log.Error(fmt.Sprintf("modify schema %s %s failed, get schema summary failed, operator: %s", |
| serviceID, schema.SchemaId, remoteIP), err) |
| return discovery.NewError(discovery.ErrModifySchemaNotAllow, |
| "schema already exist, can not be changed request "+discovery.ENV_PROD) |
| } |
| if len(respSchema.SchemaSummary) != 0 { |
| log.Error(fmt.Sprintf("mode, schema %s %s already exist, can not be changed, operator: %s", |
| serviceID, schema.SchemaId, remoteIP), err) |
| return discovery.NewError(discovery.ErrModifySchemaNotAllow, "schema already exist, can not be changed request "+discovery.ENV_PROD) |
| } |
| } |
| if len(microservice.Schemas) == 0 { |
| copy(newSchemas, microservice.Schemas) |
| newSchemas = append(newSchemas, schema.SchemaId) |
| } |
| } else { |
| if !isExist { |
| copy(newSchemas, microservice.Schemas) |
| newSchemas = append(newSchemas, schema.SchemaId) |
| } |
| } |
| if len(newSchemas) != 0 { |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(serviceID)) |
| setFilter := mutil.NewFilter( |
| mutil.ServiceSchemas(newSchemas), |
| ) |
| updateFilter := mutil.NewFilter( |
| mutil.Set(setFilter), |
| ) |
| err = dao.UpdateService(ctx, filter, updateFilter) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| } |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID), mutil.SchemaID(schema.SchemaId)) |
| setFilter := mutil.NewFilter( |
| mutil.Schema(schema.Schema), |
| mutil.SchemaSummary(schema.Summary), |
| ) |
| updateFilter := mutil.NewFilter(mutil.Set(setFilter)) |
| err = dao.UpdateSchema(ctx, filter, updateFilter, options.FindOneAndUpdate().SetUpsert(true)) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, err.Error()) |
| } |
| return nil |
| } |
| |
| func (ds *DataSource) AddRule(ctx context.Context, request *discovery.AddServiceRulesRequest) (*discovery.AddServiceRulesResponse, error) { |
| remoteIP := util.GetIPFromContext(ctx) |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| if err != nil { |
| log.Error(fmt.Sprintf("failed to add rules for service %s for get service failed", request.ServiceId), err) |
| return &discovery.AddServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "Failed to check service exist"), |
| }, nil |
| } |
| if !exist { |
| return &discovery.AddServiceRulesResponse{Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service does not exist")}, nil |
| } |
| res := quota.NewApplyQuotaResource(quota.TypeRule, util.ParseDomainProject(ctx), request.ServiceId, int64(len(request.Rules))) |
| errQuota := quota.Apply(ctx, res) |
| if errQuota != nil { |
| log.Error(fmt.Sprintf("add service[%s] rule failed, operator: %s", request.ServiceId, remoteIP), errQuota) |
| response := &discovery.AddServiceRulesResponse{ |
| Response: discovery.CreateResponseWithSCErr(errQuota), |
| } |
| if errQuota.InternalError() { |
| return response, errQuota |
| } |
| return response, nil |
| } |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(request.ServiceId)) |
| rules, err := dao.GetServiceRules(ctx, filter) |
| if err != nil { |
| return &discovery.AddServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| var ruleType string |
| if len(rules) != 0 { |
| ruleType = rules[0].RuleType |
| } |
| ruleIDs := make([]string, 0, len(request.Rules)) |
| for _, rule := range request.Rules { |
| if len(ruleType) == 0 { |
| ruleType = rule.RuleType |
| } else if ruleType != rule.RuleType { |
| return &discovery.AddServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrBlackAndWhiteRule, "Service can only contain one rule type,Black or white."), |
| }, nil |
| } |
| //the rule unique index is (serviceid,attribute,pattern) |
| filter = mutil.NewFilter( |
| mutil.ServiceID(request.ServiceId), |
| mutil.RuleAttribute(rule.Attribute), |
| mutil.RulePattern(rule.Pattern), |
| ) |
| exist, err := dao.RuleExist(ctx, filter) |
| if err != nil { |
| return &discovery.AddServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrUnavailableBackend, "Can not check rule if exist."), |
| }, nil |
| } |
| if exist { |
| continue |
| } |
| timestamp := strconv.FormatInt(time.Now().Unix(), 10) |
| ruleAdd := &model.Rule{ |
| Domain: util.ParseDomain(ctx), |
| Project: util.ParseProject(ctx), |
| ServiceID: request.ServiceId, |
| Rule: &discovery.ServiceRule{ |
| RuleId: util.GenerateUUID(), |
| RuleType: rule.RuleType, |
| Attribute: rule.Attribute, |
| Pattern: rule.Pattern, |
| Description: rule.Description, |
| Timestamp: timestamp, |
| ModTimestamp: timestamp, |
| }, |
| } |
| ruleIDs = append(ruleIDs, ruleAdd.Rule.RuleId) |
| _, err = client.GetMongoClient().Insert(ctx, model.CollectionRule, ruleAdd) |
| if err != nil { |
| return &discovery.AddServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| } |
| return &discovery.AddServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Add service rules successfully."), |
| RuleIds: ruleIDs, |
| }, nil |
| } |
| |
| func (ds *DataSource) GetRules(ctx context.Context, request *discovery.GetServiceRulesRequest) (*discovery.GetServiceRulesResponse, error) { |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| if err != nil { |
| return &discovery.GetServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "GetRules failed for get service failed."), |
| }, nil |
| } |
| if !exist { |
| return &discovery.GetServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "GetRules failed for service not exist."), |
| }, nil |
| } |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceID(request.ServiceId)) |
| rules, err := dao.GetServiceRules(ctx, filter) |
| if err != nil { |
| return &discovery.GetServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| return &discovery.GetServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get service rules successfully."), |
| Rules: rules, |
| }, nil |
| } |
| |
| func (ds *DataSource) DeleteRule(ctx context.Context, request *discovery.DeleteServiceRulesRequest) (*discovery.DeleteServiceRulesResponse, error) { |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| if err != nil { |
| log.Error(fmt.Sprintf("failed to add tags for service %s for get service failed", request.ServiceId), err) |
| return &discovery.DeleteServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "Failed to check service exist"), |
| }, err |
| } |
| if !exist { |
| return &discovery.DeleteServiceRulesResponse{Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "Service not exist")}, nil |
| } |
| var delRules []mongo.WriteModel |
| for _, ruleID := range request.RuleIds { |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(request.ServiceId), mutil.RuleRuleID(ruleID)) |
| exist, err := dao.RuleExist(ctx, filter) |
| if err != nil { |
| return &discovery.DeleteServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if !exist { |
| return &discovery.DeleteServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrRuleNotExists, "This rule does not exist."), |
| }, nil |
| } |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(request.ServiceId), mutil.RuleRuleID(ruleID)) |
| delRules = append(delRules, mongo.NewDeleteOneModel().SetFilter(filter)) |
| } |
| if len(delRules) > 0 { |
| _, err := client.GetMongoClient().BatchDelete(ctx, model.CollectionRule, delRules) |
| if err != nil { |
| return &discovery.DeleteServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| } |
| return &discovery.DeleteServiceRulesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Delete service rules successfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) UpdateRule(ctx context.Context, request *discovery.UpdateServiceRuleRequest) (*discovery.UpdateServiceRuleResponse, error) { |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| exist, err := ServiceExistID(ctx, request.ServiceId) |
| if err != nil { |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "UpdateRule failed for get service failed."), |
| }, nil |
| } |
| if !exist { |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, "UpdateRule failed for service not exist."), |
| }, nil |
| } |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(request.ServiceId)) |
| rules, err := dao.GetServiceRules(ctx, filter) |
| if err != nil { |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ErrUnavailableBackend, "UpdateRule failed for get rule."), |
| }, nil |
| } |
| if len(rules) >= 1 && rules[0].RuleType != request.Rule.RuleType { |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ErrModifyRuleNotAllow, "Exist multiple rules, can not change rule type. Rule type is ."+rules[0].RuleType), |
| }, nil |
| } |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(request.ServiceId), mutil.RuleRuleID(request.RuleId)) |
| exist, err = dao.RuleExist(ctx, filter) |
| if err != nil { |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| if !exist { |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ErrRuleNotExists, "This rule does not exist."), |
| }, nil |
| } |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(request.ServiceId), mutil.RuleRuleID(request.RuleId)) |
| setFilter := mutil.NewFilter( |
| mutil.RuleRuleType(request.Rule.RuleType), |
| mutil.RulePattern(request.Rule.Pattern), |
| mutil.RuleAttribute(request.Rule.Attribute), |
| mutil.RuleDescription(request.Rule.Description), |
| mutil.RuleModTime(strconv.FormatInt(time.Now().Unix(), baseTen)), |
| ) |
| updateFilter := mutil.NewFilter(mutil.Set(setFilter)) |
| err = dao.UpdateRule(ctx, filter, updateFilter) |
| if err != nil { |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| return &discovery.UpdateServiceRuleResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Update service rules succesfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) isSchemaEditable(service *discovery.MicroService) bool { |
| return (len(service.Environment) != 0 && service.Environment != discovery.ENV_PROD) || ds.SchemaEditable |
| } |
| |
| func ServiceExistID(ctx context.Context, serviceID string) (bool, error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(serviceID)) |
| return client.GetMongoClient().DocExist(ctx, model.CollectionService, filter) |
| } |
| |
| func getServiceDetailUtil(ctx context.Context, mgs *model.Service, countOnly bool, options []string) (*discovery.ServiceDetail, error) { |
| serviceDetail := new(discovery.ServiceDetail) |
| serviceID := mgs.Service.ServiceId |
| domainProject := util.ParseDomainProject(ctx) |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| if countOnly { |
| serviceDetail.Statics = new(discovery.Statistics) |
| } |
| for _, opt := range options { |
| expr := opt |
| switch expr { |
| case "tags": |
| serviceDetail.Tags = mgs.Tags |
| case "rules": |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(mgs.Service.ServiceId)) |
| rules, err := dao.GetServiceRules(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("get service %s's all rules failed", mgs.Service.ServiceId), err) |
| return nil, err |
| } |
| for _, rule := range rules { |
| rule.Timestamp = rule.ModTimestamp |
| } |
| serviceDetail.Rules = rules |
| case "instances": |
| if countOnly { |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.InstanceServiceID(serviceID)) |
| instanceCount, err := dao.CountInstance(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("get number of service [%s]'s instances failed", serviceID), err) |
| return nil, err |
| } |
| serviceDetail.Statics.Instances = &discovery.StInstance{ |
| Count: instanceCount, |
| } |
| continue |
| } |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.InstanceServiceID(serviceID)) |
| instances, err := dao.GetMicroServiceInstances(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("get service[%s]'s all instances failed", serviceID), err) |
| return nil, err |
| } |
| serviceDetail.Instances = instances |
| case "schemas": |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceID(serviceID)) |
| schemas, err := dao.GetSchemas(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("get service %s's all schemas failed", mgs.Service.ServiceId), err) |
| return nil, err |
| } |
| serviceDetail.SchemaInfos = schemas |
| case "dependencies": |
| service := mgs.Service |
| dr := NewDependencyRelation(ctx, domainProject, service, service) |
| consumers, err := dr.GetDependencyConsumers(WithoutSelfDependency(), WithSameDomainProject()) |
| if err != nil { |
| log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s all consumers failed", |
| service.ServiceId, service.Environment, service.AppId, service.ServiceName, service.Version), err) |
| } |
| providers, err := dr.GetDependencyProviders(WithoutSelfDependency(), WithSameDomainProject()) |
| if err != nil { |
| log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s all providers failed", |
| service.ServiceId, service.Environment, service.AppId, service.ServiceName, service.Version), err) |
| return nil, err |
| } |
| serviceDetail.Consumers = consumers |
| serviceDetail.Providers = providers |
| case "": |
| continue |
| default: |
| log.Info(fmt.Sprintf("request option %s is invalid", opt)) |
| } |
| } |
| return serviceDetail, nil |
| } |
| |
| // Instance management |
| func (ds *DataSource) RegisterInstance(ctx context.Context, request *discovery.RegisterInstanceRequest) (*discovery.RegisterInstanceResponse, error) { |
| remoteIP := util.GetIPFromContext(ctx) |
| instance := request.Instance |
| |
| // 允许自定义 id |
| if len(instance.InstanceId) > 0 { |
| resp, err := ds.Heartbeat(ctx, &discovery.HeartbeatRequest{ |
| InstanceId: instance.InstanceId, |
| ServiceId: instance.ServiceId, |
| }) |
| if err != nil || resp == nil { |
| log.Error(fmt.Sprintf("register service %s's instance failed, endpoints %s, host '%s', operator %s", |
| instance.ServiceId, instance.Endpoints, instance.HostName, remoteIP), err) |
| return &discovery.RegisterInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, nil |
| } |
| switch resp.Response.GetCode() { |
| case discovery.ResponseSuccess: |
| log.Info(fmt.Sprintf("register instance successful, reuse instance[%s/%s], operator %s", |
| instance.ServiceId, instance.InstanceId, remoteIP)) |
| return &discovery.RegisterInstanceResponse{ |
| Response: resp.Response, |
| InstanceId: instance.InstanceId, |
| }, nil |
| case discovery.ErrInstanceNotExists: |
| // register a new one |
| if request.Instance.HealthCheck == nil { |
| request.Instance.HealthCheck = &discovery.HealthCheck{ |
| Mode: discovery.CHECK_BY_HEARTBEAT, |
| Interval: apt.RegistryDefaultLeaseRenewalinterval, |
| Times: apt.RegistryDefaultLeaseRetrytimes, |
| } |
| } |
| return registryInstance(ctx, request) |
| default: |
| log.Error(fmt.Sprintf("register instance failed, reuse instance %s %s, operator %s", |
| instance.ServiceId, instance.InstanceId, remoteIP), err) |
| return &discovery.RegisterInstanceResponse{ |
| Response: resp.Response, |
| }, err |
| } |
| } |
| |
| if err := preProcessRegisterInstance(ctx, instance); err != nil { |
| log.Error(fmt.Sprintf("register service %s instance failed, endpoints %s, host %s operator %s", |
| instance.ServiceId, instance.Endpoints, instance.HostName, remoteIP), err) |
| return &discovery.RegisterInstanceResponse{ |
| Response: discovery.CreateResponseWithSCErr(err), |
| }, nil |
| } |
| return registryInstance(ctx, request) |
| } |
| |
| // GetInstance returns instance under the current domain |
| func (ds *DataSource) GetInstance(ctx context.Context, request *discovery.GetOneInstanceRequest) (*discovery.GetOneInstanceResponse, error) { |
| var service *model.Service |
| var err error |
| var serviceIDs []string |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| if len(request.ConsumerServiceId) > 0 { |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(request.ConsumerServiceId)) |
| service, err = dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("consumer does not exist, consumer %s find provider instance %s %s", |
| request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId)) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, |
| fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)), |
| }, nil |
| } |
| log.Error(fmt.Sprintf(" get consumer failed, consumer %s find provider instance %s", |
| request.ConsumerServiceId, request.ProviderInstanceId), err) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| } |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.ServiceServiceID(request.ProviderServiceId)) |
| provider, err := dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("provider does not exist, consumer %s find provider instance %s %s", |
| request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId)) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, |
| fmt.Sprintf("Provider[%s] does not exist.", request.ProviderServiceId)), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("get provider failed, consumer %s find provider instance %s %s", |
| request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId), err) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| findFlag := func() string { |
| return fmt.Sprintf("consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instance[%s]", |
| request.ConsumerServiceId, service.Service.Environment, service.Service.AppId, service.Service.ServiceName, service.Service.Version, |
| provider.Service.ServiceId, provider.Service.Environment, provider.Service.AppId, provider.Service.ServiceName, provider.Service.Version, |
| request.ProviderInstanceId) |
| } |
| |
| domainProject := util.ParseDomainProject(ctx) |
| services, err := filterServices(ctx, discovery.MicroServiceToKey(domainProject, provider.Service)) |
| if err != nil { |
| log.Error(fmt.Sprintf("get instance failed %s", findFlag()), err) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| rev, _ := ctx.Value(util.CtxRequestRevision).(string) |
| serviceIDs = filterServiceIDs(ctx, request.ConsumerServiceId, request.Tags, services) |
| if len(serviceIDs) == 0 { |
| mes := fmt.Errorf("%s failed, provider instance does not exist", findFlag()) |
| log.Error("query service failed", mes) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInstanceNotExists, mes.Error()), |
| }, nil |
| } |
| instances, err := GetInstancesByServiceID(ctx, request.ProviderServiceId) |
| if err != nil { |
| log.Error(fmt.Sprintf("get instance failed %s", findFlag()), err) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| instance := instances[0] |
| // use explicit instanceId to query |
| if len(request.ProviderInstanceId) != 0 { |
| isExist := false |
| for _, ins := range instances { |
| if ins.InstanceId == request.ProviderInstanceId { |
| instance = ins |
| isExist = true |
| break |
| } |
| } |
| if !isExist { |
| mes := fmt.Errorf("%s failed, provider instance does not exist", findFlag()) |
| log.Error("get instance failed", mes) |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInstanceNotExists, mes.Error()), |
| }, nil |
| } |
| } |
| |
| newRev, _ := formatRevision(request.ConsumerServiceId, instances) |
| if rev == newRev { |
| instance = nil // for gRPC |
| } |
| // TODO support gRPC output context |
| _ = util.WithResponseRev(ctx, newRev) |
| |
| return &discovery.GetOneInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get instance successfully."), |
| Instance: instance, |
| }, nil |
| } |
| |
| func (ds *DataSource) GetInstances(ctx context.Context, request *discovery.GetInstancesRequest) (*discovery.GetInstancesResponse, error) { |
| service := &model.Service{} |
| var err error |
| |
| if len(request.ConsumerServiceId) > 0 { |
| service, err = GetServiceByID(ctx, request.ConsumerServiceId) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("consumer does not exist, consumer %s find provider %s instances", |
| request.ConsumerServiceId, request.ProviderServiceId)) |
| return &discovery.GetInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, |
| fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("get consumer failed, consumer %s find provider %s instances", |
| request.ConsumerServiceId, request.ProviderServiceId), err) |
| return &discovery.GetInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| } |
| |
| provider, err := GetServiceByID(ctx, request.ProviderServiceId) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("provider does not exist, consumer %s find provider %s instances", |
| request.ConsumerServiceId, request.ProviderServiceId)) |
| return &discovery.GetInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, |
| fmt.Sprintf("provider[%s] does not exist.", request.ProviderServiceId)), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("get provider failed, consumer %s find provider instances %s", |
| request.ConsumerServiceId, request.ProviderServiceId), err) |
| return &discovery.GetInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| |
| findFlag := func() string { |
| return fmt.Sprintf("consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instances", |
| request.ConsumerServiceId, service.Service.Environment, service.Service.AppId, service.Service.ServiceName, service.Service.Version, |
| provider.Service.ServiceId, provider.Service.Environment, provider.Service.AppId, provider.Service.ServiceName, provider.Service.Version) |
| } |
| |
| rev, _ := ctx.Value(util.CtxRequestRevision).(string) |
| serviceIDs := filterServiceIDs(ctx, request.ConsumerServiceId, request.Tags, []*model.Service{provider}) |
| if len(serviceIDs) == 0 { |
| mes := fmt.Errorf("%s failed, provider does not exist", findFlag()) |
| log.Error("query service failed", mes) |
| return &discovery.GetInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()), |
| }, nil |
| } |
| instances, err := GetInstancesByServiceID(ctx, request.ProviderServiceId) |
| if err != nil { |
| log.Error(fmt.Sprintf("get instances failed %s", findFlag()), err) |
| return &discovery.GetInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| newRev, _ := formatRevision(request.ConsumerServiceId, instances) |
| if rev == newRev { |
| instances = nil // for gRPC |
| } |
| _ = util.WithResponseRev(ctx, newRev) |
| return &discovery.GetInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Query service instances successfully."), |
| Instances: instances, |
| }, nil |
| } |
| |
| // GetProviderInstances returns instances under the specified domain |
| func (ds *DataSource) GetProviderInstances(ctx context.Context, request *discovery.GetProviderInstancesRequest) (instances []*discovery.MicroServiceInstance, rev string, err error) { |
| filter := mutil.NewBasicFilter(ctx, mutil.InstanceServiceID(request.ProviderServiceId)) |
| instances, err = dao.GetMicroServiceInstances(ctx, filter) |
| if err != nil { |
| return |
| } |
| return instances, "", nil |
| } |
| |
| func (ds *DataSource) GetAllInstances(ctx context.Context, request *discovery.GetAllInstancesRequest) (*discovery.GetAllInstancesResponse, error) { |
| filter := mutil.NewBasicFilter(ctx) |
| findRes, err := client.GetMongoClient().Find(ctx, model.CollectionInstance, filter) |
| if err != nil { |
| return nil, err |
| } |
| resp := &discovery.GetAllInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get all instances successfully"), |
| } |
| |
| for findRes.Next(ctx) { |
| var instance model.Instance |
| err := findRes.Decode(&instance) |
| if err != nil { |
| return &discovery.GetAllInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| resp.Instances = append(resp.Instances, instance.Instance) |
| } |
| |
| return resp, nil |
| } |
| |
| func (ds *DataSource) BatchGetProviderInstances(ctx context.Context, request *discovery.BatchGetInstancesRequest) (instances []*discovery.MicroServiceInstance, rev string, err error) { |
| if request == nil || len(request.ServiceIds) == 0 { |
| return nil, "", mutil.ErrInvalidParam |
| } |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| |
| for _, providerServiceID := range request.ServiceIds { |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.InstanceServiceID(providerServiceID)) |
| findRes, err := client.GetMongoClient().Find(ctx, model.CollectionInstance, filter) |
| if err != nil { |
| return instances, "", nil |
| } |
| |
| for findRes.Next(ctx) { |
| var mongoInstance model.Instance |
| err := findRes.Decode(&mongoInstance) |
| if err == nil { |
| instances = append(instances, mongoInstance.Instance) |
| } |
| } |
| } |
| |
| return instances, "", nil |
| } |
| |
| // FindInstances returns instances under the specified domain |
| func (ds *DataSource) FindInstances(ctx context.Context, request *discovery.FindInstancesRequest) (*discovery.FindInstancesResponse, error) { |
| provider := &discovery.MicroServiceKey{ |
| Tenant: util.ParseTargetDomainProject(ctx), |
| Environment: request.Environment, |
| AppId: request.AppId, |
| ServiceName: request.ServiceName, |
| Alias: request.Alias, |
| Version: request.VersionRule, |
| } |
| rev, ok := ctx.Value(util.CtxRequestRevision).(string) |
| if !ok { |
| err := errors.New("rev request context is not type string") |
| log.Error("", err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| |
| if apt.IsGlobal(provider) { |
| return ds.findSharedServiceInstance(ctx, request, provider, rev) |
| } |
| |
| return ds.findInstance(ctx, request, provider, rev) |
| } |
| |
| func (ds *DataSource) UpdateInstanceStatus(ctx context.Context, request *discovery.UpdateInstanceStatusRequest) (*discovery.UpdateInstanceStatusResponse, error) { |
| updateStatusFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId, request.Status}, "/") |
| |
| // todo finish get instance |
| filter := mutil.NewBasicFilter(ctx, mutil.InstanceServiceID(request.ServiceId), mutil.InstanceInstanceID(request.InstanceId)) |
| instance, err := dao.GetInstance(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("update instance %s status failed", updateStatusFlag), err) |
| return &discovery.UpdateInstanceStatusResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if instance == nil { |
| log.Error(fmt.Sprintf("update instance %s status failed, instance does not exist", updateStatusFlag), err) |
| return &discovery.UpdateInstanceStatusResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInstanceNotExists, "Service instance does not exist."), |
| }, nil |
| } |
| |
| copyInstanceRef := *instance |
| copyInstanceRef.Instance.Status = request.Status |
| setFilter := mutil.NewFilter( |
| mutil.InstanceModTime(strconv.FormatInt(time.Now().Unix(), baseTen)), |
| mutil.InstanceStatus(copyInstanceRef.Instance.Status), |
| ) |
| updateFilter := mutil.NewFilter(mutil.Set(setFilter)) |
| if err := dao.UpdateInstance(ctx, filter, updateFilter); err != nil { |
| log.Error(fmt.Sprintf("update instance %s status failed", updateStatusFlag), err) |
| resp := &discovery.UpdateInstanceStatusResponse{ |
| Response: discovery.CreateResponseWithSCErr(err), |
| } |
| if err.InternalError() { |
| return resp, err |
| } |
| return resp, nil |
| } |
| |
| log.Info(fmt.Sprintf("update instance[%s] status successfully", updateStatusFlag)) |
| return &discovery.UpdateInstanceStatusResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Update service instance status successfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) UpdateInstanceProperties(ctx context.Context, request *discovery.UpdateInstancePropsRequest) (*discovery.UpdateInstancePropsResponse, error) { |
| instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, "/") |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| filter := mutil.NewDomainProjectFilter(domain, project, mutil.InstanceServiceID(request.ServiceId), mutil.InstanceInstanceID(request.InstanceId)) |
| instance, err := dao.GetInstance(ctx, filter) |
| if err != nil { |
| log.Error(fmt.Sprintf("update instance %s properties failed", instanceFlag), err) |
| return &discovery.UpdateInstancePropsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if instance == nil { |
| log.Error(fmt.Sprintf("update instance %s properties failed, instance does not exist", instanceFlag), err) |
| return &discovery.UpdateInstancePropsResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInstanceNotExists, "Service instance does not exist."), |
| }, nil |
| } |
| |
| copyInstanceRef := *instance |
| copyInstanceRef.Instance.Properties = request.Properties |
| |
| // todo finish update instance |
| filter = mutil.NewDomainProjectFilter(domain, project, mutil.InstanceServiceID(request.ServiceId), mutil.InstanceInstanceID(request.InstanceId)) |
| setFilter := mutil.NewFilter( |
| mutil.InstanceModTime(strconv.FormatInt(time.Now().Unix(), baseTen)), |
| mutil.InstanceProperties(copyInstanceRef.Instance.Properties), |
| ) |
| updateFilter := mutil.NewFilter(mutil.Set(setFilter)) |
| if err := dao.UpdateInstance(ctx, filter, updateFilter); err != nil { |
| log.Error(fmt.Sprintf("update instance %s properties failed", instanceFlag), err) |
| resp := &discovery.UpdateInstancePropsResponse{ |
| Response: discovery.CreateResponseWithSCErr(err), |
| } |
| if err.InternalError() { |
| return resp, err |
| } |
| return resp, nil |
| } |
| |
| log.Info(fmt.Sprintf("update instance[%s] properties successfully", instanceFlag)) |
| return &discovery.UpdateInstancePropsResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Update service instance properties successfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) UnregisterInstance(ctx context.Context, request *discovery.UnregisterInstanceRequest) (*discovery.UnregisterInstanceResponse, error) { |
| remoteIP := util.GetIPFromContext(ctx) |
| serviceID := request.ServiceId |
| instanceID := request.InstanceId |
| |
| instanceFlag := util.StringJoin([]string{serviceID, instanceID}, "/") |
| |
| filter := mutil.NewBasicFilter(ctx, mutil.InstanceServiceID(serviceID), mutil.InstanceInstanceID(instanceID)) |
| result, err := client.GetMongoClient().Delete(ctx, model.CollectionInstance, filter) |
| if err != nil || result.DeletedCount == 0 { |
| log.Error(fmt.Sprintf("unregister instance failed, instance %s, operator %s revoke instance failed", instanceFlag, remoteIP), err) |
| return &discovery.UnregisterInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, "delete instance failed"), |
| }, err |
| } |
| |
| log.Info(fmt.Sprintf("unregister instance[%s], operator %s", instanceFlag, remoteIP)) |
| return &discovery.UnregisterInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Unregister service instance successfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) Heartbeat(ctx context.Context, request *discovery.HeartbeatRequest) (*discovery.HeartbeatResponse, error) { |
| remoteIP := util.GetIPFromContext(ctx) |
| instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, "/") |
| err := KeepAliveLease(ctx, request) |
| if err != nil { |
| log.Error(fmt.Sprintf("heartbeat failed, instance %s operator %s", instanceFlag, remoteIP), err) |
| resp := &discovery.HeartbeatResponse{ |
| Response: discovery.CreateResponseWithSCErr(err), |
| } |
| if err.InternalError() { |
| return resp, err |
| } |
| return resp, nil |
| } |
| return &discovery.HeartbeatResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, |
| "Update service instance heartbeat successfully."), |
| }, nil |
| } |
| |
| func (ds *DataSource) HeartbeatSet(ctx context.Context, request *discovery.HeartbeatSetRequest) (*discovery.HeartbeatSetResponse, error) { |
| domainProject := util.ParseDomainProject(ctx) |
| |
| heartBeatCount := len(request.Instances) |
| existFlag := make(map[string]bool, heartBeatCount) |
| instancesHbRst := make(chan *discovery.InstanceHbRst, heartBeatCount) |
| noMultiCounter := 0 |
| |
| for _, heartbeatElement := range request.Instances { |
| if _, ok := existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId]; ok { |
| log.Warn(fmt.Sprintf("instance[%s/%s] is duplicate request heartbeat set", |
| heartbeatElement.ServiceId, heartbeatElement.InstanceId)) |
| continue |
| } else { |
| existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true |
| noMultiCounter++ |
| } |
| gopool.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement)) |
| } |
| |
| count := 0 |
| successFlag := false |
| failFlag := false |
| instanceHbRstArr := make([]*discovery.InstanceHbRst, 0, heartBeatCount) |
| |
| for hbRst := range instancesHbRst { |
| count++ |
| if len(hbRst.ErrMessage) != 0 { |
| failFlag = true |
| } else { |
| successFlag = true |
| } |
| instanceHbRstArr = append(instanceHbRstArr, hbRst) |
| if count == noMultiCounter { |
| close(instancesHbRst) |
| } |
| } |
| |
| if !failFlag && successFlag { |
| log.Info(fmt.Sprintf("batch update heartbeats[%d] successfully", count)) |
| return &discovery.HeartbeatSetResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Heartbeat set successfully."), |
| Instances: instanceHbRstArr, |
| }, nil |
| } |
| |
| log.Info(fmt.Sprintf("batch update heartbeats failed %v", request.Instances)) |
| return &discovery.HeartbeatSetResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInstanceNotExists, "Heartbeat set failed."), |
| Instances: instanceHbRstArr, |
| }, nil |
| } |
| |
| func (ds *DataSource) BatchFind(ctx context.Context, request *discovery.BatchFindInstancesRequest) (*discovery.BatchFindInstancesResponse, error) { |
| response := &discovery.BatchFindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Batch query service instances successfully."), |
| } |
| |
| var err error |
| |
| response.Services, err = ds.batchFindServices(ctx, request) |
| if err != nil { |
| return &discovery.BatchFindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| |
| response.Instances, err = ds.batchFindInstances(ctx, request) |
| if err != nil { |
| return &discovery.BatchFindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| |
| return response, nil |
| } |
| |
| func registryInstance(ctx context.Context, request *discovery.RegisterInstanceRequest) (*discovery.RegisterInstanceResponse, error) { |
| domain := util.ParseDomain(ctx) |
| project := util.ParseProject(ctx) |
| remoteIP := util.GetIPFromContext(ctx) |
| instance := request.Instance |
| ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1) |
| |
| instanceFlag := fmt.Sprintf("ttl %ds, endpoints %v, host '%s', serviceID %s", |
| ttl, instance.Endpoints, instance.HostName, instance.ServiceId) |
| |
| instanceID := instance.InstanceId |
| data := &model.Instance{ |
| Domain: domain, |
| Project: project, |
| RefreshTime: time.Now(), |
| Instance: instance, |
| } |
| |
| insertRes, err := client.GetMongoClient().Insert(ctx, model.CollectionInstance, data) |
| if err != nil { |
| if client.IsDuplicateKey(err) { |
| return &discovery.RegisterInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Register service instance successfully."), |
| InstanceId: instanceID, |
| }, nil |
| } |
| log.Error(fmt.Sprintf("register instance failed %s instanceID %s operator %s", instanceFlag, instanceID, remoteIP), err) |
| return &discovery.RegisterInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ErrUnavailableBackend, err.Error()), |
| }, err |
| } |
| |
| // need to complete the instance offline function in time, so you need to check the heartbeat after registering the instance |
| err = heartbeat.Instance().CheckInstance(ctx, instance) |
| if err != nil { |
| log.Error(fmt.Sprintf("fail to check instance, instance[%s]. operator %s", instance.InstanceId, remoteIP), err) |
| } |
| |
| log.Info(fmt.Sprintf("register instance %s, instanceID %s, operator %s", |
| instanceFlag, insertRes.InsertedID, remoteIP)) |
| return &discovery.RegisterInstanceResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Register service instance successfully."), |
| InstanceId: instanceID, |
| }, nil |
| } |
| |
| func (ds *DataSource) findSharedServiceInstance(ctx context.Context, request *discovery.FindInstancesRequest, provider *discovery.MicroServiceKey, rev string) (*discovery.FindInstancesResponse, error) { |
| var err error |
| // it means the shared micro-services must be the same env with SC. |
| provider.Environment = apt.Service.Environment |
| findFlag := func() string { |
| return fmt.Sprintf("find shared provider[%s/%s/%s/%s]", provider.Environment, provider.AppId, provider.ServiceName, provider.Version) |
| } |
| basicFilterServices, err := servicesBasicFilter(ctx, provider) |
| if err != nil { |
| log.Error(fmt.Sprintf("find shared service instance failed %s", findFlag()), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| services, err := filterServices(ctx, provider) |
| if err != nil { |
| log.Error(fmt.Sprintf("find shared service instance failed %s", findFlag()), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if services == nil && len(basicFilterServices) == 0 { |
| mes := fmt.Errorf("%s failed, provider does not exist", findFlag()) |
| log.Error("find shared service instance failed", mes) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()), |
| }, nil |
| } |
| serviceIDs := filterServiceIDs(ctx, request.ConsumerServiceId, request.Tags, services) |
| inFilter := mutil.NewFilter(mutil.In(serviceIDs)) |
| filter := mutil.NewFilter(mutil.InstanceServiceID(inFilter)) |
| option := &options.FindOptions{Sort: bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance, model.ColumnVersion}): -1}} |
| instances, err := dao.GetMicroServiceInstances(ctx, filter, option) |
| if err != nil { |
| log.Error(fmt.Sprintf("find shared service instance failed %s", findFlag()), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| newRev, _ := formatRevision(request.ConsumerServiceId, instances) |
| if rev == newRev { |
| instances = nil // for gRPC |
| } |
| // TODO support gRPC output context |
| _ = util.WithResponseRev(ctx, newRev) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Query service instances successfully."), |
| Instances: instances, |
| }, nil |
| } |
| |
| func (ds *DataSource) findInstance(ctx context.Context, request *discovery.FindInstancesRequest, provider *discovery.MicroServiceKey, rev string) (*discovery.FindInstancesResponse, error) { |
| var err error |
| domainProject := util.ParseDomainProject(ctx) |
| service := &model.Service{Service: &discovery.MicroService{Environment: request.Environment}} |
| if len(request.ConsumerServiceId) > 0 { |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(request.ConsumerServiceId)) |
| service, err = dao.GetService(ctx, filter) |
| if err != nil { |
| if errors.Is(err, datasource.ErrNoData) { |
| log.Debug(fmt.Sprintf("consumer does not exist, consumer %s find provider %s/%s/%s/%s", |
| request.ConsumerServiceId, request.Environment, request.AppId, request.ServiceName, request.VersionRule)) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, |
| fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)), |
| }, nil |
| } |
| log.Error(fmt.Sprintf("get consumer failed, consumer %s find provider %s/%s/%s/%s", |
| request.ConsumerServiceId, request.Environment, request.AppId, request.ServiceName, request.VersionRule), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| provider.Environment = service.Service.Environment |
| } |
| |
| // provider is not a shared micro-service, |
| // only allow shared micro-service instances found request different domains. |
| ctx = util.SetTargetDomainProject(ctx, util.ParseDomain(ctx), util.ParseProject(ctx)) |
| provider.Tenant = util.ParseTargetDomainProject(ctx) |
| |
| findFlag := func() string { |
| return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s/%s/%s/%s]", |
| request.ConsumerServiceId, service.Service.Environment, service.Service.AppId, service.Service.ServiceName, service.Service.Version, |
| provider.Environment, provider.AppId, provider.ServiceName, provider.Version) |
| } |
| basicFilterServices, err := servicesBasicFilter(ctx, provider) |
| if err != nil { |
| log.Error(fmt.Sprintf("find instance failed %s", findFlag()), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| services, err := filterServices(ctx, provider) |
| if err != nil { |
| log.Error(fmt.Sprintf("find instance failed %s", findFlag()), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| if services == nil && len(basicFilterServices) == 0 { |
| mes := fmt.Errorf("%s failed, provider does not exist", findFlag()) |
| log.Error("find instance failed", mes) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()), |
| }, nil |
| } |
| serviceIDs := filterServiceIDs(ctx, request.ConsumerServiceId, request.Tags, services) |
| inFilter := mutil.NewFilter(mutil.In(serviceIDs)) |
| filter := mutil.NewFilter(mutil.InstanceServiceID(inFilter)) |
| option := &options.FindOptions{Sort: bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance, model.ColumnVersion}): -1}} |
| instances, err := dao.GetMicroServiceInstances(ctx, filter, option) |
| if err != nil { |
| log.Error(fmt.Sprintf("find instance failed %s", findFlag()), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| // add dependency queue |
| if len(request.ConsumerServiceId) > 0 && |
| len(serviceIDs) > 0 { |
| provider, err = ds.reshapeProviderKey(ctx, provider, serviceIDs[0]) |
| if err != nil { |
| return nil, err |
| } |
| if provider != nil { |
| err = AddServiceVersionRule(ctx, domainProject, service.Service, provider) |
| } else { |
| mes := fmt.Errorf("%s failed, provider does not exist", findFlag()) |
| log.Error("add service version rule failed", mes) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()), |
| }, nil |
| } |
| if err != nil { |
| log.Error(fmt.Sprintf("add service version rule failed %s", findFlag()), err) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()), |
| }, err |
| } |
| } |
| newRev, _ := formatRevision(request.ConsumerServiceId, instances) |
| if rev == newRev { |
| instances = nil // for gRPC |
| } |
| // TODO support gRPC output context |
| _ = util.WithResponseRev(ctx, newRev) |
| return &discovery.FindInstancesResponse{ |
| Response: discovery.CreateResponse(discovery.ResponseSuccess, "Query service instances successfully."), |
| Instances: instances, |
| }, nil |
| } |
| |
| func (ds *DataSource) reshapeProviderKey(ctx context.Context, provider *discovery.MicroServiceKey, providerID string) (*discovery.MicroServiceKey, error) { |
| //维护version的规则,service name 可能是别名,所以重新获取 |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(providerID)) |
| providerService, err := dao.GetService(ctx, filter) |
| if err != nil { |
| return nil, err |
| } |
| |
| versionRule := provider.Version |
| provider = discovery.MicroServiceToKey(provider.Tenant, providerService.Service) |
| provider.Version = versionRule |
| return provider, nil |
| } |
| |
| func AddServiceVersionRule(ctx context.Context, domainProject string, consumer *discovery.MicroService, provider *discovery.MicroServiceKey) error { |
| consumerKey := discovery.MicroServiceToKey(domainProject, consumer) |
| exist, err := DependencyRuleExist(ctx, provider, consumerKey) |
| if exist || err != nil { |
| return err |
| } |
| |
| r := &discovery.ConsumerDependency{ |
| Consumer: consumerKey, |
| Providers: []*discovery.MicroServiceKey{provider}, |
| Override: false, |
| } |
| err = syncDependencyRule(ctx, domainProject, r) |
| |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func DependencyRuleExist(ctx context.Context, provider *discovery.MicroServiceKey, consumer *discovery.MicroServiceKey) (bool, error) { |
| targetDomainProject := provider.Tenant |
| if len(targetDomainProject) == 0 { |
| targetDomainProject = consumer.Tenant |
| } |
| consumerKey := GenerateConsumerDependencyRuleKey(consumer.Tenant, consumer) |
| existed, err := DependencyRuleExistUtil(ctx, consumerKey, provider) |
| if err != nil || existed { |
| return existed, err |
| } |
| providerKey := GenerateProviderDependencyRuleKey(targetDomainProject, provider) |
| return DependencyRuleExistUtil(ctx, providerKey, consumer) |
| } |
| |
| func DependencyRuleExistUtil(ctx context.Context, key bson.M, target *discovery.MicroServiceKey) (bool, error) { |
| compareData, err := TransferToMicroServiceDependency(ctx, key) |
| if err != nil { |
| return false, err |
| } |
| |
| if len(compareData.Dependency) != 0 { |
| isEqual, err := datasource.ContainServiceDependency(compareData.Dependency, target) |
| if err != nil { |
| return false, err |
| } |
| if isEqual { |
| return true, nil |
| } |
| } |
| return false, nil |
| } |
| |
| func KeepAliveLease(ctx context.Context, request *discovery.HeartbeatRequest) *discovery.Error { |
| _, err := heartbeat.Instance().Heartbeat(ctx, request) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInstanceNotExists, err.Error()) |
| } |
| return nil |
| } |
| |
| func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *discovery.InstanceHbRst, element *discovery.HeartbeatSetElement) func(context.Context) { |
| return func(_ context.Context) { |
| hbRst := &discovery.InstanceHbRst{ |
| ServiceId: element.ServiceId, |
| InstanceId: element.InstanceId, |
| ErrMessage: "", |
| } |
| |
| req := &discovery.HeartbeatRequest{ |
| InstanceId: element.InstanceId, |
| ServiceId: element.ServiceId, |
| } |
| |
| err := KeepAliveLease(ctx, req) |
| if err != nil { |
| hbRst.ErrMessage = err.Error() |
| log.Error(fmt.Sprintf("heartbeat set failed %s %s", element.ServiceId, element.InstanceId), err) |
| } |
| instancesHbRst <- hbRst |
| } |
| } |
| |
| func (ds *DataSource) batchFindServices(ctx context.Context, request *discovery.BatchFindInstancesRequest) (*discovery.BatchFindResult, error) { |
| if len(request.Services) == 0 { |
| return nil, nil |
| } |
| cloneCtx := util.CloneContext(ctx) |
| |
| services := &discovery.BatchFindResult{} |
| failedResult := make(map[int32]*discovery.FindFailedResult) |
| for index, key := range request.Services { |
| findCtx := util.SetContext(cloneCtx, util.CtxRequestRevision, key.Rev) |
| resp, err := ds.FindInstances(findCtx, &discovery.FindInstancesRequest{ |
| ConsumerServiceId: request.ConsumerServiceId, |
| AppId: key.Service.AppId, |
| ServiceName: key.Service.ServiceName, |
| VersionRule: key.Service.Version, |
| Environment: key.Service.Environment, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| failed, ok := failedResult[resp.Response.GetCode()] |
| AppendFindResponse(findCtx, int64(index), resp.Response, resp.Instances, |
| &services.Updated, &services.NotModified, &failed) |
| if !ok && failed != nil { |
| failedResult[resp.Response.GetCode()] = failed |
| } |
| } |
| for _, result := range failedResult { |
| services.Failed = append(services.Failed, result) |
| } |
| return services, nil |
| } |
| |
| func (ds *DataSource) batchFindInstances(ctx context.Context, request *discovery.BatchFindInstancesRequest) (*discovery.BatchFindResult, error) { |
| if len(request.Instances) == 0 { |
| return nil, nil |
| } |
| cloneCtx := util.CloneContext(ctx) |
| // can not find the shared provider instances |
| cloneCtx = util.SetTargetDomainProject(cloneCtx, util.ParseDomain(ctx), util.ParseProject(ctx)) |
| |
| instances := &discovery.BatchFindResult{} |
| failedResult := make(map[int32]*discovery.FindFailedResult) |
| for index, key := range request.Instances { |
| getCtx := util.SetContext(cloneCtx, util.CtxRequestRevision, key.Rev) |
| resp, err := ds.GetInstance(getCtx, &discovery.GetOneInstanceRequest{ |
| ConsumerServiceId: request.ConsumerServiceId, |
| ProviderServiceId: key.Instance.ServiceId, |
| ProviderInstanceId: key.Instance.InstanceId, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| failed, ok := failedResult[resp.Response.GetCode()] |
| AppendFindResponse(getCtx, int64(index), resp.Response, []*discovery.MicroServiceInstance{resp.Instance}, |
| &instances.Updated, &instances.NotModified, &failed) |
| if !ok && failed != nil { |
| failedResult[resp.Response.GetCode()] = failed |
| } |
| } |
| for _, result := range failedResult { |
| instances.Failed = append(instances.Failed, result) |
| } |
| return instances, nil |
| } |
| |
| func AppendFindResponse(ctx context.Context, index int64, resp *discovery.Response, instances []*discovery.MicroServiceInstance, |
| updatedResult *[]*discovery.FindResult, notModifiedResult *[]int64, failedResult **discovery.FindFailedResult) { |
| if code := resp.GetCode(); code != discovery.ResponseSuccess { |
| if *failedResult == nil { |
| *failedResult = &discovery.FindFailedResult{ |
| Error: discovery.NewError(code, resp.GetMessage()), |
| } |
| } |
| (*failedResult).Indexes = append((*failedResult).Indexes, index) |
| return |
| } |
| iv, _ := ctx.Value(util.CtxRequestRevision).(string) |
| ov, _ := ctx.Value(util.CtxResponseRevision).(string) |
| if len(iv) > 0 && iv == ov { |
| *notModifiedResult = append(*notModifiedResult, index) |
| return |
| } |
| *updatedResult = append(*updatedResult, &discovery.FindResult{ |
| Index: index, |
| Instances: instances, |
| Rev: ov, |
| }) |
| } |
| |
| func preProcessRegisterInstance(ctx context.Context, instance *discovery.MicroServiceInstance) *discovery.Error { |
| if len(instance.Status) == 0 { |
| instance.Status = discovery.MSI_UP |
| } |
| |
| if len(instance.InstanceId) == 0 { |
| instance.InstanceId = uuid.Generator().GetInstanceID(ctx) |
| } |
| |
| instance.Timestamp = strconv.FormatInt(time.Now().Unix(), 10) |
| instance.ModTimestamp = instance.Timestamp |
| |
| // 这里应该根据租约计时 |
| renewalInterval := apt.RegistryDefaultLeaseRenewalinterval |
| retryTimes := apt.RegistryDefaultLeaseRetrytimes |
| if instance.HealthCheck == nil { |
| instance.HealthCheck = &discovery.HealthCheck{ |
| Mode: discovery.CHECK_BY_HEARTBEAT, |
| Interval: renewalInterval, |
| Times: retryTimes, |
| } |
| } else { |
| // Health check对象仅用于呈现服务健康检查逻辑,如果CHECK_BY_PLATFORM类型,表明由sidecar代发心跳,实例120s超时 |
| switch instance.HealthCheck.Mode { |
| case discovery.CHECK_BY_HEARTBEAT: |
| d := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1) |
| if d <= 0 { |
| return discovery.NewError(discovery.ErrInvalidParams, "invalid 'healthCheck' settings in request body.") |
| } |
| case discovery.CHECK_BY_PLATFORM: |
| // 默认120s |
| instance.HealthCheck.Interval = renewalInterval |
| instance.HealthCheck.Times = retryTimes |
| } |
| } |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(instance.ServiceId)) |
| microservice, err := dao.GetService(ctx, filter) |
| if err != nil { |
| return discovery.NewError(discovery.ErrServiceNotExists, "invalid 'serviceID' in request body.") |
| } |
| instance.Version = microservice.Service.Version |
| return nil |
| } |
| |
| // servicesBasicFilter query services with domain, project, env, appID, serviceName, alias |
| func servicesBasicFilter(ctx context.Context, key *discovery.MicroServiceKey) ([]*model.Service, error) { |
| tenant := strings.Split(key.Tenant, "/") |
| if len(tenant) != 2 { |
| return nil, errors.New("invalid 'domain' or 'project'") |
| } |
| filter := mutil.NewDomainProjectFilter(tenant[0], tenant[1], |
| mutil.ServiceEnv(key.Environment), |
| mutil.ServiceAppID(key.AppId), |
| mutil.ServiceServiceName(key.ServiceName), |
| mutil.ServiceAlias(key.Alias), |
| ) |
| rangeIdx := strings.Index(key.Version, "-") |
| // if the version number is clear, need to add the version number to query |
| switch { |
| case key.Version == "latest": |
| return dao.GetServices(ctx, filter) |
| case len(key.Version) > 0 && key.Version[len(key.Version)-1:] == "+": |
| return dao.GetServices(ctx, filter) |
| case rangeIdx > 0: |
| return dao.GetServices(ctx, filter) |
| default: |
| filter[mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnVersion})] = key.Version |
| return dao.GetServices(ctx, filter) |
| } |
| } |
| |
| func filterServices(ctx context.Context, key *discovery.MicroServiceKey) ([]*model.Service, error) { |
| tenant := strings.Split(key.Tenant, "/") |
| if len(tenant) != 2 { |
| return nil, errors.New("invalid 'domain' or 'project'") |
| } |
| rangeIdx := strings.Index(key.Version, "-") |
| filter := mutil.NewDomainProjectFilter(tenant[0], tenant[1], |
| mutil.ServiceEnv(key.Environment), |
| mutil.ServiceAppID(key.AppId), |
| mutil.ServiceServiceName(key.ServiceName), |
| mutil.ServiceAlias(key.Alias), |
| ) |
| switch { |
| case key.Version == "latest": |
| findOption := &options.FindOptions{Sort: bson.M{mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnVersion}): -1}} |
| return dao.GetServices(ctx, filter, findOption) |
| case len(key.Version) > 0 && key.Version[len(key.Version)-1:] == "+": |
| start := key.Version[:len(key.Version)-1] |
| filter[mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnVersion})] = bson.M{"$gte": start} |
| return dao.GetServices(ctx, filter) |
| case rangeIdx > 0: |
| start := key.Version[:rangeIdx] |
| end := key.Version[rangeIdx+1:] |
| filter[mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnVersion})] = bson.M{"$gte": start, "$lte": end} |
| return dao.GetServices(ctx, filter) |
| default: |
| filter[mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnVersion})] = key.Version |
| return dao.GetServices(ctx, filter) |
| } |
| } |
| |
| func filterServiceIDs(ctx context.Context, consumerID string, tags []string, services []*model.Service) []string { |
| var filterService []*model.Service |
| serviceIDs := make([]string, 0) |
| if len(services) == 0 { |
| return serviceIDs |
| } |
| filterService = filterTags(services, tags) |
| filterService = filterAccess(ctx, consumerID, filterService) |
| for _, service := range filterService { |
| serviceIDs = append(serviceIDs, service.Service.ServiceId) |
| } |
| return serviceIDs |
| } |
| |
| func filterTags(services []*model.Service, tags []string) []*model.Service { |
| if len(tags) == 0 { |
| return services |
| } |
| var newServices []*model.Service |
| for _, service := range services { |
| index := 0 |
| for ; index < len(tags); index++ { |
| if _, ok := service.Tags[tags[index]]; !ok { |
| break |
| } |
| } |
| if index == len(tags) { |
| newServices = append(newServices, service) |
| } |
| } |
| return newServices |
| } |
| |
| func filterAccess(ctx context.Context, consumerID string, services []*model.Service) []*model.Service { |
| newServices := make([]*model.Service, 0) |
| for _, service := range services { |
| if err := accessible(ctx, consumerID, service.Service.ServiceId); err != nil { |
| findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumerID, |
| service.Service.AppId, service.Service.ServiceName, service.Service.Version) |
| log.Error(fmt.Sprintf("accessible filter failed, %s", findFlag), err) |
| continue |
| } |
| newServices = append(newServices, service) |
| } |
| return newServices |
| } |
| |
| func accessible(ctx context.Context, consumerID string, providerID string) *discovery.Error { |
| if len(consumerID) == 0 { |
| return nil |
| } |
| |
| consumerDomain, consumerProject := util.ParseDomain(ctx), util.ParseProject(ctx) |
| providerDomain, providerProject := util.ParseTargetDomain(ctx), util.ParseTargetProject(ctx) |
| |
| filter := mutil.NewDomainProjectFilter(consumerDomain, consumerProject, mutil.ServiceServiceID(consumerID)) |
| consumerService, err := dao.GetService(ctx, filter) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, fmt.Sprintf("an error occurred in query consumer(%s)", err.Error())) |
| } |
| if consumerService == nil { |
| return discovery.NewError(discovery.ErrServiceNotExists, "consumer serviceID is invalid") |
| } |
| |
| filter = mutil.NewDomainProjectFilter(providerDomain, providerProject, mutil.ServiceServiceID(providerID)) |
| // 跨应用权限 |
| providerService, err := dao.GetService(ctx, filter) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, fmt.Sprintf("an error occurred in query provider(%s)", err.Error())) |
| } |
| if providerService == nil { |
| return discovery.NewError(discovery.ErrServiceNotExists, "provider serviceID is invalid") |
| } |
| err = allowAcrossDimension(ctx, providerService, consumerService) |
| if err != nil { |
| return discovery.NewError(discovery.ErrPermissionDeny, err.Error()) |
| } |
| |
| // 黑白名单 |
| filter = mutil.NewDomainProjectFilter(providerDomain, providerProject, mutil.ServiceID(providerID)) |
| rules, err := dao.GetRules(ctx, filter) |
| if err != nil { |
| return discovery.NewError(discovery.ErrInternal, fmt.Sprintf("an error occurred in query provider rules(%s)", err.Error())) |
| } |
| |
| if len(rules) == 0 { |
| return nil |
| } |
| return MatchRules(rules, consumerService.Service, consumerService.Tags) |
| } |
| |
| func MatchRules(rulesOfProvider []*model.Rule, consumer *discovery.MicroService, tagsOfConsumer map[string]string) *discovery.Error { |
| if consumer == nil { |
| return discovery.NewError(discovery.ErrInvalidParams, "consumer is nil") |
| } |
| |
| if len(rulesOfProvider) <= 0 { |
| return nil |
| } |
| if rulesOfProvider[0].Rule.RuleType == "WHITE" { |
| return patternWhiteList(rulesOfProvider, tagsOfConsumer, consumer) |
| } |
| return patternBlackList(rulesOfProvider, tagsOfConsumer, consumer) |
| } |
| |
| func parsePattern(v reflect.Value, rule *discovery.ServiceRule, tagsOfConsumer map[string]string, consumerID string) (string, *discovery.Error) { |
| if strings.HasPrefix(rule.Attribute, "tag_") { |
| key := rule.Attribute[4:] |
| value := tagsOfConsumer[key] |
| if len(value) == 0 { |
| log.Info(fmt.Sprintf("can not find service[%s] tag[%s]", consumerID, key)) |
| } |
| return value, nil |
| } |
| key := v.FieldByName(rule.Attribute) |
| if !key.IsValid() { |
| log.Error(fmt.Sprintf("can not find service[%s] field[%s], ruleID is %s", |
| consumerID, rule.Attribute, rule.RuleId), nil) |
| return "", discovery.NewError(discovery.ErrInternal, fmt.Sprintf("can not find field '%s'", rule.Attribute)) |
| } |
| return key.String(), nil |
| |
| } |
| |
| func patternWhiteList(rulesOfProvider []*model.Rule, tagsOfConsumer map[string]string, consumer *discovery.MicroService) *discovery.Error { |
| v := reflect.Indirect(reflect.ValueOf(consumer)) |
| consumerID := consumer.ServiceId |
| for _, rule := range rulesOfProvider { |
| value, err := parsePattern(v, rule.Rule, tagsOfConsumer, consumerID) |
| if err != nil { |
| return err |
| } |
| if len(value) == 0 { |
| continue |
| } |
| |
| match, _ := regexp.MatchString(rule.Rule.Pattern, value) |
| if match { |
| log.Info(fmt.Sprintf("consumer[%s][%s/%s/%s/%s] match white list, rule.Pattern is %s, value is %s", |
| consumerID, consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version, |
| rule.Rule.Pattern, value)) |
| return nil |
| } |
| } |
| return discovery.NewError(discovery.ErrPermissionDeny, "not found in white list") |
| } |
| |
| func patternBlackList(rulesOfProvider []*model.Rule, tagsOfConsumer map[string]string, consumer *discovery.MicroService) *discovery.Error { |
| v := reflect.Indirect(reflect.ValueOf(consumer)) |
| consumerID := consumer.ServiceId |
| for _, rule := range rulesOfProvider { |
| var value string |
| value, err := parsePattern(v, rule.Rule, tagsOfConsumer, consumerID) |
| if err != nil { |
| return err |
| } |
| if len(value) == 0 { |
| continue |
| } |
| |
| match, _ := regexp.MatchString(rule.Rule.Pattern, value) |
| if match { |
| log.Warn(fmt.Sprintf("no permission to access, consumer[%s][%s/%s/%s/%s] match black list, rule.Pattern is %s, value is %s", |
| consumerID, consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version, |
| rule.Rule.Pattern, value)) |
| return discovery.NewError(discovery.ErrPermissionDeny, "found in black list") |
| } |
| } |
| return nil |
| } |
| func allowAcrossDimension(ctx context.Context, providerService *model.Service, consumerService *model.Service) error { |
| if providerService.Service.AppId != consumerService.Service.AppId { |
| if len(providerService.Service.Properties) == 0 { |
| return fmt.Errorf("not allow across app access") |
| } |
| |
| if allowCrossApp, ok := providerService.Service.Properties[discovery.PropAllowCrossApp]; !ok || strings.ToLower(allowCrossApp) != "true" { |
| return fmt.Errorf("not allow across app access") |
| } |
| } |
| if !apt.IsGlobal(discovery.MicroServiceToKey(util.ParseTargetDomainProject(ctx), providerService.Service)) && |
| providerService.Service.Environment != consumerService.Service.Environment { |
| return fmt.Errorf("not allow across environment access") |
| } |
| return nil |
| } |
| |
| func GetServiceByID(ctx context.Context, serviceID string) (*model.Service, error) { |
| cacheService, ok := sd.Store().Service().Cache().Get(serviceID).(model.Service) |
| if !ok { |
| //no service in cache,get it from mongodb |
| filter := mutil.NewBasicFilter(ctx, mutil.ServiceServiceID(serviceID)) |
| return dao.GetService(ctx, filter) |
| } |
| return cacheToService(cacheService), nil |
| } |
| |
| func cacheToService(service model.Service) *model.Service { |
| return &model.Service{ |
| Domain: service.Domain, |
| Project: service.Project, |
| Tags: service.Tags, |
| Service: service.Service, |
| } |
| } |
| |
| func GetInstancesByServiceID(ctx context.Context, serviceID string) ([]*discovery.MicroServiceInstance, error) { |
| var res []*discovery.MicroServiceInstance |
| var cacheUnavailable bool |
| cacheInstances := sd.Store().Instance().Cache().GetIndexData(serviceID) |
| for _, instID := range cacheInstances { |
| inst, ok := sd.Store().Instance().Cache().Get(instID).(model.Instance) |
| if !ok { |
| cacheUnavailable = true |
| break |
| } |
| res = append(res, inst.Instance) |
| } |
| if cacheUnavailable || len(res) == 0 { |
| filter := mutil.NewFilter(mutil.InstanceServiceID(serviceID)) |
| option := &options.FindOptions{Sort: bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance, model.ColumnVersion}): -1}} |
| res, err := dao.GetMicroServiceInstances(ctx, filter, option) |
| if err != nil { |
| return nil, err |
| } |
| return res, nil |
| } |
| return res, nil |
| } |
| |
| func DeleteDependencyForDeleteService(domainProject string, serviceID string, service *discovery.MicroServiceKey) error { |
| conDep := new(discovery.ConsumerDependency) |
| conDep.Consumer = service |
| conDep.Providers = []*discovery.MicroServiceKey{} |
| conDep.Override = true |
| err := syncDependencyRule(context.TODO(), domainProject, conDep) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func formatRevision(consumerServiceID string, instances []*discovery.MicroServiceInstance) (string, error) { |
| if instances == nil { |
| return fmt.Sprintf("%x", sha1.Sum(util.StringToBytesWithNoCopy(consumerServiceID))), nil |
| } |
| copyInstance := make([]*discovery.MicroServiceInstance, len(instances)) |
| copy(copyInstance, instances) |
| sort.Sort(InstanceSlice(copyInstance)) |
| data, err := json.Marshal(copyInstance) |
| if err != nil { |
| log.Error("fail to marshal instance json", err) |
| return "", err |
| } |
| s := fmt.Sprintf("%s.%x", consumerServiceID, sha1.Sum(data)) |
| return fmt.Sprintf("%x", sha1.Sum(util.StringToBytesWithNoCopy(s))), nil |
| } |