SCB-2094 Add list provider instances APIs (#726)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index d7793e8..7f26e45 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -36,10 +36,24 @@
scerr "github.com/apache/servicecomb-service-center/server/scerror"
"github.com/apache/servicecomb-service-center/server/service/cache"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
+ "sort"
"strconv"
"time"
)
+var clustersIndex = make(map[string]int)
+
+func init() {
+ var clusters []string
+ for name := range registry.Configuration().Clusters {
+ clusters = append(clusters, name)
+ }
+ sort.Strings(clusters)
+ for i, name := range clusters {
+ clustersIndex[name] = i
+ }
+}
+
// RegisterService() implement:
// 1. capsule request to etcd kv format
// 2. invoke etcd client to store data
@@ -616,6 +630,143 @@
}, nil
}
+func (ds *DataSource) GetInstances(ctx context.Context, request *pb.GetInstancesRequest) (*pb.GetInstancesResponse,
+ error) {
+ domainProject := util.ParseDomainProject(ctx)
+
+ service := &pb.MicroService{}
+ var err error
+ if len(request.ConsumerServiceId) > 0 {
+ service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
+ if err != nil {
+ log.Errorf(err, "get consumer failed, consumer[%s] find provider instances",
+ request.ConsumerServiceId, request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: proto.CreateResponse(scerr.ErrInternal, err.Error()),
+ }, err
+ }
+ if service == nil {
+ log.Errorf(nil, "consumer does not exist, consumer[%s] find provider instances",
+ request.ConsumerServiceId, request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: proto.CreateResponse(scerr.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)),
+ }, nil
+ }
+ }
+
+ provider, err := serviceUtil.GetService(ctx, domainProject, request.ProviderServiceId)
+ if err != nil {
+ log.Errorf(err, "get provider failed, consumer[%s] find provider instances",
+ request.ConsumerServiceId, request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: proto.CreateResponse(scerr.ErrInternal, err.Error()),
+ }, err
+ }
+ if provider == nil {
+ log.Errorf(nil, "provider does not exist, consumer[%s] find provider instances",
+ request.ConsumerServiceId, request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: proto.CreateResponse(scerr.ErrServiceNotExists,
+ fmt.Sprintf("Provider[%s] does not exist.", request.ProviderServiceId)),
+ }, nil
+ }
+
+ findFlag := func() string {
+ return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instances",
+ request.ConsumerServiceId, service.Environment, service.AppId, service.ServiceName, service.Version,
+ provider.ServiceId, provider.Environment, provider.AppId, provider.ServiceName, provider.Version)
+ }
+
+ var item *cache.VersionRuleCacheItem
+ rev, _ := ctx.Value(util.CtxRequestRevision).(string)
+ item, err = cache.FindInstances.GetWithProviderID(ctx, service, proto.MicroServiceToKey(domainProject, provider),
+ &pb.HeartbeatSetElement{
+ ServiceId: request.ProviderServiceId,
+ }, request.Tags, rev)
+ if err != nil {
+ log.Errorf(err, "FindInstances.GetWithProviderID failed, %s failed", findFlag())
+ return &pb.GetInstancesResponse{
+ Response: proto.CreateResponse(scerr.ErrInternal, err.Error()),
+ }, err
+ }
+ if item == nil || len(item.ServiceIds) == 0 {
+ mes := fmt.Errorf("%s failed, provider instance does not exist", findFlag())
+ log.Errorf(mes, "FindInstances.GetWithProviderID failed")
+ return &pb.GetInstancesResponse{
+ Response: proto.CreateResponse(scerr.ErrServiceNotExists, mes.Error()),
+ }, nil
+ }
+
+ instances := item.Instances
+ if rev == item.Rev {
+ instances = nil // for gRPC
+ }
+ _ = util.SetContext(ctx, util.CtxResponseRevision, item.Rev)
+
+ return &pb.GetInstancesResponse{
+ Response: proto.CreateResponse(proto.Response_SUCCESS, "Query service instances successfully."),
+ Instances: instances,
+ }, nil
+}
+
+func (ds *DataSource) GetProviderInstances(ctx context.Context, request *pb.HeartbeatSetElement) (instances []*pb.MicroServiceInstance, rev string, err error) {
+ var (
+ maxRevs = make([]int64, len(clustersIndex))
+ counts = make([]int64, len(clustersIndex))
+ domainProject = util.ParseTargetDomainProject(ctx)
+ )
+ instances, err = ds.findInstances(ctx, domainProject, request.ServiceId, request.InstanceId, maxRevs, counts)
+ if err != nil {
+ return
+ }
+ return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+}
+
+func (ds *DataSource) BatchGetProviderInstances(ctx context.Context, request *pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, rev string, err error) {
+ var (
+ maxRevs = make([]int64, len(clustersIndex))
+ counts = make([]int64, len(clustersIndex))
+ domainProject = util.ParseTargetDomainProject(ctx)
+ )
+ if request == nil || len(request.ServiceIds) == 0 {
+ return nil, "", fmt.Errorf("invalid param BatchGetInstancesRequest")
+ }
+
+ for _, providerServiceID := range request.ServiceIds {
+ insts, err := ds.findInstances(ctx, domainProject, providerServiceID, "", maxRevs, counts)
+ if err != nil {
+ return nil, "", err
+ }
+ instances = append(instances, insts...)
+ }
+
+ return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+}
+
+func (ds *DataSource) findInstances(ctx context.Context, domainProject, serviceID, instanceID string, maxRevs []int64, counts []int64) (instances []*pb.MicroServiceInstance, err error) {
+ key := apt.GenerateInstanceKey(domainProject, serviceID, instanceID)
+ opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix())
+ resp, err := backend.Store().Instance().Search(ctx, opts...)
+ if err != nil {
+ return nil, err
+ }
+ if len(resp.Kvs) == 0 {
+ return
+ }
+
+ for _, kv := range resp.Kvs {
+ if i, ok := clustersIndex[kv.ClusterName]; ok {
+ if kv.ModRevision > maxRevs[i] {
+ maxRevs[i] = kv.ModRevision
+ }
+ counts[i]++
+ }
+ instances = append(instances, kv.Value.(*pb.MicroServiceInstance))
+ }
+ return
+}
+
func (ds *DataSource) FindInstances(ctx context.Context, request *pb.FindInstancesRequest) (*pb.FindInstancesResponse,
error) {
provider := &pb.MicroServiceKey{
@@ -903,86 +1054,6 @@
}, nil
}
-func (ds *DataSource) GetInstances(ctx context.Context, request *pb.GetInstancesRequest) (*pb.GetInstancesResponse,
- error) {
- domainProject := util.ParseDomainProject(ctx)
-
- service := &pb.MicroService{}
- var err error
- if len(request.ConsumerServiceId) > 0 {
- service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
- if err != nil {
- log.Errorf(err, "get consumer failed, consumer[%s] find provider instances",
- request.ConsumerServiceId, request.ProviderServiceId)
- return &pb.GetInstancesResponse{
- Response: proto.CreateResponse(scerr.ErrInternal, err.Error()),
- }, err
- }
- if service == nil {
- log.Errorf(nil, "consumer does not exist, consumer[%s] find provider instances",
- request.ConsumerServiceId, request.ProviderServiceId)
- return &pb.GetInstancesResponse{
- Response: proto.CreateResponse(scerr.ErrServiceNotExists,
- fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)),
- }, nil
- }
- }
-
- provider, err := serviceUtil.GetService(ctx, domainProject, request.ProviderServiceId)
- if err != nil {
- log.Errorf(err, "get provider failed, consumer[%s] find provider instances",
- request.ConsumerServiceId, request.ProviderServiceId)
- return &pb.GetInstancesResponse{
- Response: proto.CreateResponse(scerr.ErrInternal, err.Error()),
- }, err
- }
- if provider == nil {
- log.Errorf(nil, "provider does not exist, consumer[%s] find provider instances",
- request.ConsumerServiceId, request.ProviderServiceId)
- return &pb.GetInstancesResponse{
- Response: proto.CreateResponse(scerr.ErrServiceNotExists,
- fmt.Sprintf("Provider[%s] does not exist.", request.ProviderServiceId)),
- }, nil
- }
-
- findFlag := func() string {
- return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instances",
- request.ConsumerServiceId, service.Environment, service.AppId, service.ServiceName, service.Version,
- provider.ServiceId, provider.Environment, provider.AppId, provider.ServiceName, provider.Version)
- }
-
- var item *cache.VersionRuleCacheItem
- rev, _ := ctx.Value(util.CtxRequestRevision).(string)
- item, err = cache.FindInstances.GetWithProviderID(ctx, service, proto.MicroServiceToKey(domainProject, provider),
- &pb.HeartbeatSetElement{
- ServiceId: request.ProviderServiceId,
- }, request.Tags, rev)
- if err != nil {
- log.Errorf(err, "FindInstances.GetWithProviderID failed, %s failed", findFlag())
- return &pb.GetInstancesResponse{
- Response: proto.CreateResponse(scerr.ErrInternal, err.Error()),
- }, err
- }
- if item == nil || len(item.ServiceIds) == 0 {
- mes := fmt.Errorf("%s failed, provider instance does not exist", findFlag())
- log.Errorf(mes, "FindInstances.GetWithProviderID failed")
- return &pb.GetInstancesResponse{
- Response: proto.CreateResponse(scerr.ErrServiceNotExists, mes.Error()),
- }, nil
- }
-
- instances := item.Instances
- if rev == item.Rev {
- instances = nil // for gRPC
- }
- _ = util.SetContext(ctx, util.CtxResponseRevision, item.Rev)
-
- return &pb.GetInstancesResponse{
- Response: proto.CreateResponse(proto.Response_SUCCESS, "Query service instances successfully."),
- Instances: instances,
- }, nil
-}
-
func (ds *DataSource) BatchFind(ctx context.Context, request *pb.BatchFindInstancesRequest) (
*pb.BatchFindInstancesResponse, error) {
response := &pb.BatchFindInstancesResponse{
diff --git a/datasource/ms.go b/datasource/ms.go
index 424530b..ad18b3c 100644
--- a/datasource/ms.go
+++ b/datasource/ms.go
@@ -25,6 +25,7 @@
// Attention: request validation must be finished before the following interface being invoked!!!
// MetadataManager contains the CRUD of registry metadata
type MetadataManager interface {
+ // Microservice management
RegisterService(ctx context.Context, request *pb.CreateServiceRequest) (*pb.CreateServiceResponse, error)
GetServices(ctx context.Context, request *pb.GetServicesRequest) (*pb.GetServicesResponse, error)
GetService(ctx context.Context, request *pb.GetServiceRequest) (*pb.GetServiceResponse, error)
@@ -39,9 +40,15 @@
GetDeleteServiceFunc(ctx context.Context, serviceID string, force bool,
serviceRespChan chan<- *pb.DelServicesRspInfo) func(context.Context)
+ // Instance management
RegisterInstance(ctx context.Context, request *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error)
+ // GetInstances returns instances under the current domain
GetInstance(ctx context.Context, request *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error)
GetInstances(ctx context.Context, request *pb.GetInstancesRequest) (*pb.GetInstancesResponse, error)
+ // GetProviderInstances returns instances under the specified domain
+ GetProviderInstances(ctx context.Context, request *pb.HeartbeatSetElement) (instances []*pb.MicroServiceInstance, rev string, err error)
+ BatchGetProviderInstances(ctx context.Context, request *pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, rev string, err error)
+ // FindInstances returns instances under the specified domain
FindInstances(ctx context.Context, request *pb.FindInstancesRequest) (*pb.FindInstancesResponse, error)
UpdateInstanceStatus(ctx context.Context, request *pb.UpdateInstanceStatusRequest) (
*pb.UpdateInstanceStatusResponse, error)
@@ -53,6 +60,7 @@
HeartbeatSet(ctx context.Context, request *pb.HeartbeatSetRequest) (*pb.HeartbeatSetResponse, error)
BatchFind(ctx context.Context, request *pb.BatchFindInstancesRequest) (*pb.BatchFindInstancesResponse, error)
+ // Schema management
ModifySchemas(ctx context.Context, request *pb.ModifySchemasRequest) (*pb.ModifySchemasResponse, error)
ModifySchema(ctx context.Context, request *pb.ModifySchemaRequest) (*pb.ModifySchemaResponse, error)
ExistSchema(ctx context.Context, request *pb.GetExistenceRequest) (*pb.GetExistenceResponse, error)
@@ -60,11 +68,13 @@
GetAllSchemas(ctx context.Context, request *pb.GetAllSchemaRequest) (*pb.GetAllSchemaResponse, error)
DeleteSchema(ctx context.Context, request *pb.DeleteSchemaRequest) (*pb.DeleteSchemaResponse, error)
+ // Tag management
AddTags(ctx context.Context, request *pb.AddServiceTagsRequest) (*pb.AddServiceTagsResponse, error)
GetTags(ctx context.Context, request *pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error)
UpdateTag(ctx context.Context, request *pb.UpdateServiceTagRequest) (*pb.UpdateServiceTagResponse, error)
DeleteTags(ctx context.Context, request *pb.DeleteServiceTagsRequest) (*pb.DeleteServiceTagsResponse, error)
+ // White/black list management
AddRule(ctx context.Context, request *pb.AddServiceRulesRequest) (*pb.AddServiceRulesResponse, error)
GetRule(ctx context.Context, request *pb.GetServiceRulesRequest) (*pb.GetServiceRulesResponse, error)
UpdateRule(ctx context.Context, request *pb.UpdateServiceRuleRequest) (*pb.UpdateServiceRuleResponse, error)
diff --git a/pkg/registry/struct.go b/pkg/registry/struct.go
index 453ce57..02b8eb9 100644
--- a/pkg/registry/struct.go
+++ b/pkg/registry/struct.go
@@ -104,7 +104,9 @@
}
type MicroServiceKey struct {
- Tenant string `protobuf:"bytes,1,opt,name=tenant" json:"tenant,omitempty"`
+ // Tenant: The format is "{domain}/{project}"
+ Tenant string `protobuf:"bytes,1,opt,name=tenant" json:"tenant,omitempty"`
+ // Deprecated: Use Tenant instead
Project string `protobuf:"bytes,2,opt,name=project" json:"project,omitempty"`
AppId string `protobuf:"bytes,3,opt,name=appId" json:"appId,omitempty"`
ServiceName string `protobuf:"bytes,4,opt,name=serviceName" json:"serviceName,omitempty"`
@@ -415,3 +417,7 @@
type MicroServiceDependency struct {
Dependency []*MicroServiceKey `json:"Dependency,omitempty"`
}
+
+type BatchGetInstancesRequest struct {
+ ServiceIds []string `json:"serviceIds,omitempty"`
+}