/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package etcd

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"strconv"
	"time"

	"github.com/apache/servicecomb-service-center/syncer/service/event"
	pb "github.com/go-chassis/cari/discovery"
	"github.com/go-chassis/cari/pkg/errsvc"
	"github.com/go-chassis/cari/sync"
	"github.com/go-chassis/foundation/gopool"
	"github.com/little-cui/etcdadpt"

	"github.com/apache/servicecomb-service-center/datasource"
	"github.com/apache/servicecomb-service-center/datasource/etcd/cache"
	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
	"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
	eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
	"github.com/apache/servicecomb-service-center/datasource/schema"
	"github.com/apache/servicecomb-service-center/pkg/log"
	"github.com/apache/servicecomb-service-center/pkg/util"
	"github.com/apache/servicecomb-service-center/server/core"
	"github.com/apache/servicecomb-service-center/server/plugin/uuid"
	quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
)

type MetadataManager struct {
	// InstanceTTL options
	InstanceTTL        int64
	InstanceProperties map[string]string
}

// RegisterService implement:
// 1. capsule request to etcd kv format
// 2. invoke etcd client to store data
// 3. check etcd-client response && construct createServiceResponse
func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.CreateServiceRequest) (
	*pb.CreateServiceResponse, error) {
	remoteIP := util.GetIPFromContext(ctx)
	service := request.Service
	serviceFlag := util.StringJoin([]string{
		service.Environment, service.AppId, service.ServiceName, service.Version}, path.SPLIT)
	domainProject := util.ParseDomainProject(ctx)

	serviceKey := &pb.MicroServiceKey{
		Tenant:      domainProject,
		Environment: service.Environment,
		AppId:       service.AppId,
		ServiceName: service.ServiceName,
		Alias:       service.Alias,
		Version:     service.Version,
	}

	index := path.GenerateServiceIndexKey(serviceKey)

	// 产生全局service id
	requestServiceID := service.ServiceId
	if len(requestServiceID) == 0 {
		ctx = util.SetContext(ctx, uuid.ContextKey, index)
		service.ServiceId = uuid.Generator().GetServiceID(ctx)
	}

	data, err := json.Marshal(service)
	if err != nil {
		log.Error(fmt.Sprintf("create micro-service[%s] failed, json marshal service failed, operator: %s",
			serviceFlag, remoteIP), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	key := path.GenerateServiceKey(domainProject, service.ServiceId)
	alias := path.GenerateServiceAliasKey(serviceKey)

	opts := []etcdadpt.OpOptions{
		etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)),
		etcdadpt.OpPut(etcdadpt.WithStrKey(index), etcdadpt.WithStrValue(service.ServiceId)),
	}
	uniqueCmpOpts := []etcdadpt.CmpOptions{
		etcdadpt.NotExistKey(key),
		etcdadpt.NotExistKey(index),
	}
	failOpts := []etcdadpt.OpOptions{
		etcdadpt.OpGet(etcdadpt.WithStrKey(index)),
	}

	if len(serviceKey.Alias) > 0 {
		opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(alias), etcdadpt.WithStrValue(service.ServiceId)))
		uniqueCmpOpts = append(uniqueCmpOpts, etcdadpt.NotExistKey(alias))
		failOpts = append(failOpts, etcdadpt.OpGet(etcdadpt.WithStrKey(alias)))
	}

	syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, request)
	if err != nil {
		log.Error("fail to create sync opts", err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	opts = append(opts, syncOpts...)

	resp, err := etcdadpt.TxnWithCmp(ctx, opts, uniqueCmpOpts, failOpts)
	if err != nil {
		log.Error(fmt.Sprintf("create micro-service[%s] failed, operator: %s",
			serviceFlag, remoteIP), err)
		return nil, pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}

	if resp.Succeeded {
		log.Info(fmt.Sprintf("create micro-service[%s][%s] successfully, operator: %s",
			service.ServiceId, serviceFlag, remoteIP))
		return &pb.CreateServiceResponse{
			ServiceId: service.ServiceId,
		}, nil
	}

	if len(requestServiceID) != 0 {
		if len(resp.Kvs) == 0 || requestServiceID != util.BytesToStringWithNoCopy(resp.Kvs[0].Value) {
			log.Warn(fmt.Sprintf("create micro-service[%s] failed, service already exists, operator: %s",
				serviceFlag, remoteIP))
			return nil, pb.NewError(pb.ErrServiceAlreadyExists,
				"ServiceID conflict or found the same service with different id.")
		}
	}

	if len(resp.Kvs) == 0 {
		// internal error?
		log.Error(fmt.Sprintf("create micro-service[%s] failed, unexpected txn response, operator: %s",
			serviceFlag, remoteIP), nil)
		return nil, pb.NewError(pb.ErrInternal, "Unexpected txn response.")
	}

	existServiceID := util.BytesToStringWithNoCopy(resp.Kvs[0].Value)
	log.Warn(fmt.Sprintf("create micro-service[%s][%s] failed, service already exists, operator: %s",
		existServiceID, serviceFlag, remoteIP))
	return &pb.CreateServiceResponse{
		ServiceId: existServiceID,
	}, nil
}

func (ds *MetadataManager) ListService(ctx context.Context, request *pb.GetServicesRequest) (
	*pb.GetServicesResponse, error) {
	services, err := eutil.GetAllServiceUtil(ctx)
	if err != nil {
		log.Error("get all services by domain failed", err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	return &pb.GetServicesResponse{
		Services: services,
	}, nil
}

func (ds *MetadataManager) GetService(ctx context.Context, request *pb.GetServiceRequest) (
	*pb.MicroService, error) {
	domainProject := util.ParseDomainProject(ctx)
	singleService, err := eutil.GetService(ctx, domainProject, request.ServiceId)

	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("get micro-service[%s] failed, service does not exist in db", request.ServiceId))
			return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
		}
		log.Error(fmt.Sprintf("get micro-service[%s] failed, get service file failed", request.ServiceId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	return singleService, nil
}

func (ds *MetadataManager) ListServiceDetail(ctx context.Context, request *pb.GetServicesInfoRequest) (
	*pb.GetServicesInfoResponse, error) {
	ctx = util.WithCacheOnly(ctx)

	optionMap := make(map[string]struct{}, len(request.Options))
	for _, opt := range request.Options {
		optionMap[opt] = struct{}{}
	}

	options := make([]string, 0, len(optionMap))
	if _, ok := optionMap["all"]; ok {
		optionMap["statistics"] = struct{}{}
		options = []string{"tags", "instances", "schemas", "dependencies"}
	} else {
		for opt := range optionMap {
			options = append(options, opt)
		}
	}

	var st *pb.Statistics
	if _, ok := optionMap["statistics"]; ok {
		var err error
		st, err = statistics(ctx, request.WithShared)
		if err != nil {
			return nil, pb.NewError(pb.ErrInternal, err.Error())
		}
		if len(optionMap) == 1 {
			return &pb.GetServicesInfoResponse{
				Statistics: st,
			}, nil
		}
	}

	//获取所有服务
	services, err := eutil.GetAllServiceUtil(ctx)
	if err != nil {
		log.Error("get all services by domain failed", err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	allServiceDetails := make([]*pb.ServiceDetail, 0, len(services))
	domainProject := util.ParseDomainProject(ctx)
	for _, service := range services {
		if !ds.filterServices(domainProject, request, service) {
			continue
		}

		serviceDetail, err := getServiceDetailUtil(ctx, ServiceDetailOpt{
			domainProject: domainProject,
			service:       service,
			countOnly:     request.CountOnly,
			options:       options,
		})
		if err != nil {
			return nil, pb.NewError(pb.ErrInternal, err.Error())
		}
		serviceDetail.MicroService = service
		tmpServiceDetail, err := datasource.NewServiceOverview(serviceDetail, ds.InstanceProperties)
		if err != nil {
			return nil, err
		}
		allServiceDetails = append(allServiceDetails, tmpServiceDetail)
	}

	return &pb.GetServicesInfoResponse{
		AllServicesDetail: allServiceDetails,
		Statistics:        st,
	}, nil
}

func (ds *MetadataManager) filterServices(domainProject string, request *pb.GetServicesInfoRequest, service *pb.MicroService) bool {
	if !request.WithShared && datasource.IsGlobal(pb.MicroServiceToKey(domainProject, service)) {
		return false
	}
	if len(request.Environment) > 0 && request.Environment != service.Environment {
		return false
	}
	if len(request.AppId) > 0 && request.AppId != service.AppId {
		return false
	}
	if len(request.ServiceName) > 0 && request.ServiceName != service.ServiceName {
		return false
	}
	return true
}

func (ds *MetadataManager) GetOverview(ctx context.Context, request *pb.GetServicesRequest) (
	*pb.Statistics, error) {
	ctx = util.WithCacheOnly(ctx)
	st, err := statistics(ctx, false)
	if err != nil {
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	return st, nil
}

func (ds *MetadataManager) ListApp(ctx context.Context, request *pb.GetAppsRequest) (*pb.GetAppsResponse, error) {
	domainProject := util.ParseDomainProject(ctx)
	key := path.GetServiceAppKey(domainProject, request.Environment, "")

	opts := append(eutil.FromContext(ctx),
		etcdadpt.WithStrKey(key),
		etcdadpt.WithPrefix(),
		etcdadpt.WithKeyOnly())

	resp, err := sd.ServiceIndex().Search(ctx, opts...)
	if err != nil {
		return nil, err
	}
	l := len(resp.Kvs)
	if l == 0 {
		return &pb.GetAppsResponse{}, nil
	}

	apps := make([]string, 0, l)
	appMap := make(map[string]struct{}, l)
	for _, keyValue := range resp.Kvs {
		key := path.GetInfoFromSvcIndexKV(keyValue.Key)
		if !request.WithShared && datasource.IsGlobal(key) {
			continue
		}
		if _, ok := appMap[key.AppId]; ok {
			continue
		}
		appMap[key.AppId] = struct{}{}
		apps = append(apps, key.AppId)
	}

	return &pb.GetAppsResponse{
		AppIds: apps,
	}, nil
}

func (ds *MetadataManager) ExistServiceByID(ctx context.Context, request *pb.GetExistenceByIDRequest) (*pb.GetExistenceByIDResponse, error) {
	domainProject := util.ParseDomainProject(ctx)
	return &pb.GetExistenceByIDResponse{
		Exist: eutil.ServiceExist(ctx, domainProject, request.ServiceId),
	}, nil
}

func (ds *MetadataManager) ExistService(ctx context.Context, request *pb.GetExistenceRequest) (string, error) {
	domainProject := util.ParseDomainProject(ctx)
	serviceFlag := util.StringJoin([]string{
		request.Environment, request.AppId, request.ServiceName, request.Version}, path.SPLIT)

	ids, exist, err := eutil.FindServiceIds(ctx, &pb.MicroServiceKey{
		Environment: request.Environment,
		AppId:       request.AppId,
		ServiceName: request.ServiceName,
		Alias:       request.ServiceName,
		Version:     request.Version,
		Tenant:      domainProject,
	}, true)
	if err != nil {
		log.Error(fmt.Sprintf("micro-service[%s] exist failed, find serviceIDs failed", serviceFlag), err)
		return "", pb.NewError(pb.ErrInternal, err.Error())
	}
	if !exist {
		log.Info(fmt.Sprintf("micro-service[%s] exist failed, service does not exist", serviceFlag))
		return "", pb.NewError(pb.ErrServiceNotExists, serviceFlag+" does not exist.")
	}
	if len(ids) == 0 {
		log.Info(fmt.Sprintf("micro-service[%s] exist failed, version mismatch", serviceFlag))
		return "", pb.NewError(pb.ErrServiceVersionNotExists, serviceFlag+" version mismatch.")
	}
	// 约定多个时，取较新版本
	return ids[0], nil
}

func (ds *MetadataManager) FindService(ctx context.Context, request *pb.MicroServiceKey) (*pb.GetServicesResponse, error) {
	copyKey := *request
	copyKey.Tenant = util.ParseDomainProject(ctx)
	copyKey.Version = ""
	key := path.GenerateServiceIndexKey(&copyKey)
	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())

	resp, err := sd.ServiceIndex().Search(ctx, opts...)
	if err != nil {
		return nil, err
	}
	result := &pb.GetServicesResponse{}
	for _, kv := range resp.Kvs {
		svc, err := ds.GetService(ctx, &pb.GetServiceRequest{ServiceId: kv.Value.(string)})
		if err != nil {
			return nil, err
		}
		result.Services = append(result.Services, svc)
	}
	return result, nil
}

func (ds *MetadataManager) PutServiceProperties(ctx context.Context, request *pb.UpdateServicePropsRequest) error {
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)

	key := path.GenerateServiceKey(domainProject, request.ServiceId)
	microservice, err := eutil.GetService(ctx, domainProject, request.ServiceId)
	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("service does not exist, update service[%s] properties failed, operator: %s",
				request.ServiceId, remoteIP))
			return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
		}
		log.Error(fmt.Sprintf("update service[%s] properties failed, get service file failed, operator: %s",
			request.ServiceId, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	copyServiceRef := *microservice
	copyServiceRef.Properties = request.Properties
	copyServiceRef.ModTimestamp = strconv.FormatInt(time.Now().Unix(), 10)

	data, err := json.Marshal(copyServiceRef)
	if err != nil {
		log.Error(fmt.Sprintf("update service[%s] properties failed, json marshal service failed, operator: %s",
			request.ServiceId, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	opts := []etcdadpt.OpOptions{
		etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)),
	}
	syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceService, request)
	if err != nil {
		log.Error("fail to create task", err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	opts = append(opts, syncOpts...)

	// Set key file
	resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotEqualVer(key, 0)), nil)
	if err != nil {
		log.Error(fmt.Sprintf("update service[%s] properties failed, operator: %s", request.ServiceId, remoteIP), err)
		return pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}
	if !resp.Succeeded {
		log.Error(fmt.Sprintf("update service[%s] properties failed, service does not exist, operator: %s",
			request.ServiceId, remoteIP), err)
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	log.Info(fmt.Sprintf("update service[%s] properties successfully, operator: %s", request.ServiceId, remoteIP))
	return nil
}

func (ds *MetadataManager) RegisterInstance(ctx context.Context, request *pb.RegisterInstanceRequest) (
	*pb.RegisterInstanceResponse, error) {
	instanceID, err := ds.registerInstance(ctx, request)
	if err != nil {
		return nil, err
	}
	return &pb.RegisterInstanceResponse{
		InstanceId: instanceID,
	}, nil
}

func (ds *MetadataManager) registerInstance(ctx context.Context, request *pb.RegisterInstanceRequest) (string, error) {
	remoteIP := util.GetIPFromContext(ctx)
	instance := request.Instance

	//允许自定义id
	if len(instance.InstanceId) > 0 {
		needRegister, err := ds.sendHeartbeatInstead(ctx, instance)
		if err != nil {
			return "", err
		}
		if !needRegister {
			return instance.InstanceId, nil
		}
	} else {
		instance.InstanceId = uuid.Generator().GetInstanceID(ctx)
	}

	ttl := ds.calcInstanceTTL(instance)
	instanceFlag := fmt.Sprintf("ttl %ds, endpoints %v, host '%s', serviceID %s",
		ttl, instance.Endpoints, instance.HostName, instance.ServiceId)

	//先以domain/project的方式组装
	domainProject := util.ParseDomainProject(ctx)

	instanceID := instance.InstanceId
	data, err := json.Marshal(instance)
	if err != nil {
		log.Error(fmt.Sprintf("register instance failed, %s, instanceID %s, operator %s",
			instanceFlag, instanceID, remoteIP), err)
		return "", pb.NewError(pb.ErrInternal, err.Error())
	}

	leaseID, err := etcdadpt.Instance().LeaseGrant(ctx, ttl)
	if err != nil {
		log.Error(fmt.Sprintf("grant lease failed, %s, operator: %s", instanceFlag, remoteIP), err)
		return "", pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}

	// build the request options
	key := path.GenerateInstanceKey(domainProject, instance.ServiceId, instanceID)
	hbKey := path.GenerateInstanceLeaseKey(domainProject, instance.ServiceId, instanceID)

	leaseOp := etcdadpt.WithLease(leaseID)
	resp, err := etcdadpt.TxnWithCmp(ctx,
		etcdadpt.Ops(
			etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data), leaseOp),
			etcdadpt.OpPut(etcdadpt.WithStrKey(hbKey), etcdadpt.WithStrValue(fmt.Sprintf("%d", leaseID)), leaseOp),
		),
		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, instance.ServiceId), 0)),
		nil)
	if err != nil {
		log.Error(fmt.Sprintf("register instance failed, %s, instanceID %s, operator %s",
			instanceFlag, instanceID, remoteIP), err)
		return "", pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}
	if !resp.Succeeded {
		log.Error(fmt.Sprintf("register instance failed, %s, instanceID %s, operator %s: service does not exist",
			instanceFlag, instanceID, remoteIP), nil)
		return "", pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}
	sendEvent(ctx, sync.CreateAction, datasource.ResourceInstance, request)
	log.Info(fmt.Sprintf("register instance %s, instanceID %s, operator %s",
		instanceFlag, instanceID, remoteIP))
	return instanceID, nil
}

func sendEvent(ctx context.Context, action string, resourceType string, resource interface{}) {
	if !util.EnableSync(ctx) {
		return
	}
	event.Publish(ctx, action, resourceType, resource)
}

func (ds *MetadataManager) calcInstanceTTL(instance *pb.MicroServiceInstance) int64 {
	if instance.HealthCheck == nil {
		instance.HealthCheck = &pb.HealthCheck{
			Mode:     pb.CHECK_BY_HEARTBEAT,
			Interval: datasource.DefaultLeaseRenewalInterval,
			Times:    datasource.DefaultLeaseRetryTimes,
		}
	}
	ttl := int64(instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1))
	if ds.InstanceTTL > 0 {
		ttl = ds.InstanceTTL
	}
	return ttl
}

func (ds *MetadataManager) sendHeartbeatInstead(ctx context.Context, instance *pb.MicroServiceInstance) (bool, error) {
	remoteIP := util.GetIPFromContext(ctx)
	// keep alive the lease ttl
	// there are two reasons for sending a heartbeat here:
	// 1. request the scenario the instance has been removed,
	//    the cast of registration operation can be reduced.
	// 2. request the self-protection scenario, the instance is unhealthy
	//    and needs to be re-registered.
	err := ds.SendHeartbeat(ctx, &pb.HeartbeatRequest{ServiceId: instance.ServiceId,
		InstanceId: instance.InstanceId})
	if err == nil {
		log.Info(fmt.Sprintf("register instance successful, reuse instance[%s/%s], operator %s",
			instance.ServiceId, instance.InstanceId, remoteIP))
		return false, nil
	}

	if errsvc.IsErrEqualCode(err, pb.ErrInstanceNotExists) {
		// register a new one
		return true, nil
	}

	log.Error(fmt.Sprintf("register service[%s]'s instance failed, endpoints %v, host '%s', operator %s",
		instance.ServiceId, instance.Endpoints, instance.HostName, remoteIP), err)
	return false, err
}

func (ds *MetadataManager) ExistInstance(ctx context.Context, request *pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error) {
	domainProject := util.ParseDomainProject(ctx)
	exist, err := eutil.ExistInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
	if err != nil {
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	return &pb.GetExistenceByIDResponse{
		Exist: exist,
	}, nil
}

func (ds *MetadataManager) GetInstance(ctx context.Context, request *pb.GetOneInstanceRequest) (
	*pb.GetOneInstanceResponse, error) {
	domainProject := util.ParseDomainProject(ctx)

	service := &pb.MicroService{}
	var err error
	if len(request.ConsumerServiceId) > 0 {
		service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
		if err != nil {
			if errors.Is(err, datasource.ErrNoData) {
				log.Debug(fmt.Sprintf("consumer does not exist in db, consumer[%s] find provider instance[%s/%s]",
					request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId))
				return nil, pb.NewError(pb.ErrServiceNotExists,
					fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId))
			}
			log.Error(fmt.Sprintf("get consumer failed, consumer[%s] find provider instance[%s/%s]",
				request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId), err)
			return nil, pb.NewError(pb.ErrInternal, err.Error())
		}
	}

	provider, err := eutil.GetService(ctx, domainProject, request.ProviderServiceId)
	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("provider does not exist in db, consumer[%s] find provider instance[%s/%s]",
				request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId))
			return nil, pb.NewError(pb.ErrServiceNotExists,
				fmt.Sprintf("Provider[%s] does not exist.", request.ProviderServiceId))
		}
		log.Error(fmt.Sprintf("get provider failed, consumer[%s] find provider instance[%s/%s]",
			request.ConsumerServiceId, request.ProviderServiceId, request.ProviderInstanceId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	findFlag := func() string {
		return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instance[%s]",
			request.ConsumerServiceId, service.Environment, service.AppId, service.ServiceName, service.Version,
			provider.ServiceId, provider.Environment, provider.AppId, provider.ServiceName, provider.Version,
			request.ProviderInstanceId)
	}

	var item *cache.VersionRuleCacheItem
	rev, _ := ctx.Value(util.CtxRequestRevision).(string)
	item, err = cache.FindInstances.GetWithProviderID(ctx, service, pb.MicroServiceToKey(domainProject, provider),
		&pb.HeartbeatSetElement{
			ServiceId: request.ProviderServiceId, InstanceId: request.ProviderInstanceId,
		}, request.Tags, rev)
	if err != nil {
		log.Error(fmt.Sprintf("find Instances by providerID failed, %s failed", findFlag()), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	if item == nil || len(item.Instances) == 0 {
		mes := fmt.Errorf("%s failed, provider instance does not exist", findFlag())
		log.Error("find Instances by ProviderID failed", mes)
		return nil, pb.NewError(pb.ErrInstanceNotExists, mes.Error())
	}

	instance := item.Instances[0]
	if rev == item.Rev {
		instance = nil // for gRPC
	}
	_ = util.WithResponseRev(ctx, item.Rev)

	return &pb.GetOneInstanceResponse{
		Instance: instance,
	}, nil
}

func (ds *MetadataManager) ListInstance(ctx context.Context, request *pb.GetInstancesRequest) (*pb.GetInstancesResponse,
	error) {
	domainProject := util.ParseDomainProject(ctx)

	service := &pb.MicroService{}
	var err error
	if len(request.ConsumerServiceId) > 0 {
		service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
		if err != nil {
			if errors.Is(err, datasource.ErrNoData) {
				log.Debug(fmt.Sprintf("consumer does not exist in db, consumer[%s] find provider[%s] instances",
					request.ConsumerServiceId, request.ProviderServiceId))
				return nil, pb.NewError(pb.ErrServiceNotExists,
					fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId))
			}
			log.Error(fmt.Sprintf("get consumer failed, consumer[%s] find provider[%s] instances",
				request.ConsumerServiceId, request.ProviderServiceId), err)
			return nil, pb.NewError(pb.ErrInternal, err.Error())
		}
	}

	provider, err := eutil.GetService(ctx, domainProject, request.ProviderServiceId)
	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("provider does not exist, consumer[%s] find provider[%s] instances",
				request.ConsumerServiceId, request.ProviderServiceId))
			return nil, pb.NewError(pb.ErrServiceNotExists,
				fmt.Sprintf("Provider[%s] does not exist.", request.ProviderServiceId))
		}
		log.Error(fmt.Sprintf("get provider failed, consumer[%s] find provider[%s] instances",
			request.ConsumerServiceId, request.ProviderServiceId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	findFlag := func() string {
		return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instances",
			request.ConsumerServiceId, service.Environment, service.AppId, service.ServiceName, service.Version,
			provider.ServiceId, provider.Environment, provider.AppId, provider.ServiceName, provider.Version)
	}

	var item *cache.VersionRuleCacheItem
	rev, _ := ctx.Value(util.CtxRequestRevision).(string)
	item, err = cache.FindInstances.GetWithProviderID(ctx, service, pb.MicroServiceToKey(domainProject, provider),
		&pb.HeartbeatSetElement{
			ServiceId: request.ProviderServiceId,
		}, request.Tags, rev)
	if err != nil {
		log.Error(fmt.Sprintf("FindInstances.GetWithProviderID failed, %s failed", findFlag()), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	if item == nil || len(item.ServiceIds) == 0 {
		err := fmt.Errorf("%s failed, provider instance does not exist", findFlag())
		log.Error("FindInstances.GetWithProviderID failed", err)
		return nil, pb.NewError(pb.ErrServiceNotExists, err.Error())
	}

	instances := item.Instances
	if rev == item.Rev {
		instances = nil // for gRPC
	}
	_ = util.WithResponseRev(ctx, item.Rev)

	return &pb.GetInstancesResponse{
		Instances: instances,
	}, nil
}

func (ds *MetadataManager) FindInstances(ctx context.Context, request *pb.FindInstancesRequest) (*pb.FindInstancesResponse,
	error) {
	provider := &pb.MicroServiceKey{
		Tenant:      util.ParseTargetDomainProject(ctx),
		Environment: request.Environment,
		AppId:       request.AppId,
		ServiceName: request.ServiceName,
		Alias:       request.Alias,
	}

	rev, ok := ctx.Value(util.CtxRequestRevision).(string)
	if !ok {
		err := errors.New("rev request context is not type string")
		log.Error("", err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	if datasource.IsGlobal(provider) {
		return ds.findSharedServiceInstance(ctx, request, provider, rev)
	}
	return ds.findInstance(ctx, request, provider, rev)
}

func (ds *MetadataManager) findInstance(ctx context.Context, request *pb.FindInstancesRequest,
	provider *pb.MicroServiceKey, rev string) (*pb.FindInstancesResponse, error) {
	var err error
	domainProject := util.ParseDomainProject(ctx)
	service := &pb.MicroService{Environment: request.Environment}
	if len(request.ConsumerServiceId) > 0 {
		service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
		if err != nil {
			if errors.Is(err, datasource.ErrNoData) {
				log.Debug(fmt.Sprintf("consumer does not exist, consumer[%s] find provider[%s/%s/%s]",
					request.ConsumerServiceId, request.Environment, request.AppId, request.ServiceName))
				return nil, pb.NewError(pb.ErrServiceNotExists,
					fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId))
			}
			log.Error(fmt.Sprintf("get consumer failed, consumer[%s] find provider[%s/%s/%s]",
				request.ConsumerServiceId, request.Environment, request.AppId, request.ServiceName), err)
			return nil, pb.NewError(pb.ErrInternal, err.Error())
		}
		provider.Environment = service.Environment
	}

	// provider is not a shared micro-service,
	// only allow shared micro-service instances found request different domains.
	ctx = util.SetTargetDomainProject(ctx, util.ParseDomain(ctx), util.ParseProject(ctx))
	provider.Tenant = util.ParseTargetDomainProject(ctx)

	findFlag := fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s/%s/%s]",
		request.ConsumerServiceId, service.Environment, service.AppId, service.ServiceName, service.Version,
		provider.Environment, provider.AppId, provider.ServiceName)

	// cache
	var item *cache.VersionRuleCacheItem
	item, err = cache.FindInstances.Get(ctx, service, provider, request.Tags, rev)
	if err != nil {
		log.Error(fmt.Sprintf("FindInstancesCache.Get failed, %s failed", findFlag), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	if item == nil {
		err := fmt.Errorf("%s failed, provider does not exist", findFlag)
		log.Error("FindInstancesCache.Get failed", err)
		return nil, pb.NewError(pb.ErrServiceNotExists, err.Error())
	}

	// add dependency queue
	if len(request.ConsumerServiceId) > 0 &&
		len(item.ServiceIds) > 0 &&
		!cache.DependencyRule.ExistRule(ctx, request.ConsumerServiceId, provider) {
		provider, err = ds.reshapeProviderKey(ctx, provider, item.ServiceIds[0])
		if err != nil {
			return nil, err
		}
		if provider != nil {
			err = eutil.AddServiceVersionRule(ctx, domainProject, service, provider)
		} else {
			err := fmt.Errorf("%s failed, provider does not exist", findFlag)
			log.Error("AddServiceVersionRule failed", err)
			return nil, pb.NewError(pb.ErrServiceNotExists, err.Error())
		}
		if err != nil {
			log.Error(fmt.Sprintf("AddServiceVersionRule failed, %s failed", findFlag), err)
			return nil, pb.NewError(pb.ErrInternal, err.Error())
		}
	}

	return ds.genFindResult(ctx, rev, item)
}

func (ds *MetadataManager) findSharedServiceInstance(ctx context.Context, request *pb.FindInstancesRequest,
	provider *pb.MicroServiceKey, rev string) (*pb.FindInstancesResponse, error) {
	var err error
	service := &pb.MicroService{Environment: request.Environment}
	// it means the shared micro-services must be the same env with SC.
	provider.Environment = core.Service.Environment
	findFlag := fmt.Sprintf("find shared provider[%s/%s/%s/%s]", provider.Environment, provider.AppId, provider.ServiceName, provider.Version)

	// cache
	var item *cache.VersionRuleCacheItem
	item, err = cache.FindInstances.Get(ctx, service, provider, request.Tags, rev)
	if err != nil {
		log.Error(fmt.Sprintf("FindInstancesCache.Get failed, %s failed", findFlag), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	if item == nil {
		err := fmt.Errorf("%s failed, provider does not exist", findFlag)
		log.Error("FindInstancesCache.Get failed", err)
		return nil, pb.NewError(pb.ErrServiceNotExists, err.Error())
	}

	return ds.genFindResult(ctx, rev, item)
}

func (ds *MetadataManager) genFindResult(ctx context.Context, oldRev string, item *cache.VersionRuleCacheItem) (
	*pb.FindInstancesResponse, error) {
	instances := item.Instances
	if oldRev == item.Rev {
		instances = nil // for gRPC
	}
	// TODO support gRPC output context
	_ = util.WithResponseRev(ctx, item.Rev)
	return &pb.FindInstancesResponse{
		Instances: instances,
	}, nil
}

func (ds *MetadataManager) reshapeProviderKey(ctx context.Context, provider *pb.MicroServiceKey, providerID string) (
	*pb.MicroServiceKey, error) {
	// service name 可能是别名，所以重新获取
	providerService, err := eutil.GetService(ctx, provider.Tenant, providerID)
	if err != nil {
		return nil, err
	}

	provider = pb.MicroServiceToKey(provider.Tenant, providerService)
	provider.Version = datasource.AllVersions // just compatible to old version
	return provider, nil
}

func (ds *MetadataManager) PutInstance(ctx context.Context, request *pb.RegisterInstanceRequest) error {
	domainProject := util.ParseDomainProject(ctx)
	instance := request.Instance
	serviceID := instance.ServiceId
	instanceID := instance.InstanceId
	exist, err := eutil.ExistInstance(ctx, domainProject, serviceID, instanceID)
	if err != nil {
		log.Error(fmt.Sprintf("update instance[%s/%s] failed", serviceID, instanceID), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	if !exist {
		log.Error(fmt.Sprintf("update instance[%s/%s] failed, instance does not exist",
			serviceID, instanceID), nil)
		return pb.NewError(pb.ErrInstanceNotExists, "Service instance does not exist.")
	}

	if err := eutil.UpdateInstance(ctx, domainProject, instance); err != nil {
		log.Error(fmt.Sprintf("update instance[%s/%s] failed", serviceID, instanceID), err)
		return err
	}
	sendEvent(ctx, sync.UpdateAction, datasource.ResourceInstance, instance)
	log.Info(fmt.Sprintf("update instance[%s/%s] successfully", serviceID, instanceID))
	return nil
}

func (ds *MetadataManager) PutInstanceStatus(ctx context.Context, request *pb.UpdateInstanceStatusRequest) error {
	domainProject := util.ParseDomainProject(ctx)
	updateStatusFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId, request.Status}, path.SPLIT)

	instance, err := eutil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
	if err != nil {
		log.Error(fmt.Sprintf("update instance[%s] status failed", updateStatusFlag), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	if instance == nil {
		log.Error(fmt.Sprintf("update instance[%s] status failed, instance does not exist", updateStatusFlag), nil)
		return pb.NewError(pb.ErrInstanceNotExists, "Service instance does not exist.")
	}

	copyInstanceRef := *instance
	copyInstanceRef.Status = request.Status
	copyInstanceRef.ModTimestamp = strconv.FormatInt(time.Now().Unix(), 10)

	if err := eutil.UpdateInstance(ctx, domainProject, &copyInstanceRef); err != nil {
		log.Error(fmt.Sprintf("update instance[%s] status failed", updateStatusFlag), err)
		return err
	}
	sendEvent(ctx, sync.UpdateAction, datasource.ResourceInstance, copyInstanceRef)
	log.Info(fmt.Sprintf("update instance[%s] status successfully", updateStatusFlag))
	return nil
}

func (ds *MetadataManager) PutInstanceProperties(ctx context.Context, request *pb.UpdateInstancePropsRequest) error {
	domainProject := util.ParseDomainProject(ctx)
	instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, path.SPLIT)

	instance, err := eutil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
	if err != nil {
		log.Error(fmt.Sprintf("update instance[%s] properties failed", instanceFlag), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	if instance == nil {
		log.Error(fmt.Sprintf("update instance[%s] properties failed, instance does not exist", instanceFlag), nil)
		return pb.NewError(pb.ErrInstanceNotExists, "Service instance does not exist.")
	}

	copyInstanceRef := *instance
	copyInstanceRef.Properties = request.Properties
	copyInstanceRef.ModTimestamp = strconv.FormatInt(time.Now().Unix(), 10)

	if err := eutil.UpdateInstance(ctx, domainProject, &copyInstanceRef); err != nil {
		log.Error(fmt.Sprintf("update instance[%s] properties failed", instanceFlag), err)
		return err
	}

	sendEvent(ctx, sync.UpdateAction, datasource.ResourceInstance, copyInstanceRef)
	log.Info(fmt.Sprintf("update instance[%s] properties successfully", instanceFlag))
	return nil
}

func (ds *MetadataManager) SendManyHeartbeat(ctx context.Context, request *pb.HeartbeatSetRequest) (*pb.HeartbeatSetResponse, error) {
	domainProject := util.ParseDomainProject(ctx)

	heartBeatCount := len(request.Instances)
	existFlag := make(map[string]bool, heartBeatCount)
	instancesHbRst := make(chan *pb.InstanceHbRst, heartBeatCount)
	noMultiCounter := 0
	for _, heartbeatElement := range request.Instances {
		if _, ok := existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId]; ok {
			log.Warn(fmt.Sprintf("instance[%s/%s] is duplicate request heartbeat set",
				heartbeatElement.ServiceId, heartbeatElement.InstanceId))
			continue
		} else {
			existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
			noMultiCounter++
		}
		gopool.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement))
	}
	count := 0
	instanceHbRstArr := make([]*pb.InstanceHbRst, 0, heartBeatCount)
	for heartbeat := range instancesHbRst {
		count++
		instanceHbRstArr = append(instanceHbRstArr, heartbeat)
		sendEvent(ctx, sync.UpdateAction, datasource.ResourceHeartbeat,
			&pb.HeartbeatRequest{ServiceId: heartbeat.ServiceId, InstanceId: heartbeat.InstanceId})
		if count == noMultiCounter {
			close(instancesHbRst)
		}
	}
	log.Info(fmt.Sprintf("batch update heartbeats, %v", instanceHbRstArr))
	return &pb.HeartbeatSetResponse{
		Instances: instanceHbRstArr,
	}, nil
}

func (ds *MetadataManager) UnregisterInstance(ctx context.Context, request *pb.UnregisterInstanceRequest) error {
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)
	serviceID := request.ServiceId
	instanceID := request.InstanceId

	instanceFlag := util.StringJoin([]string{serviceID, instanceID}, path.SPLIT)

	err := revokeInstance(ctx, domainProject, serviceID, instanceID)
	if err != nil {
		log.Error(fmt.Sprintf("unregister instance failed, instance[%s], operator %s: revoke instance failed",
			instanceFlag, remoteIP), err)
		return err
	}
	sendEvent(ctx, sync.DeleteAction, datasource.ResourceInstance, request)
	log.Info(fmt.Sprintf("unregister instance[%s], operator %s", instanceFlag, remoteIP))
	return nil
}

func (ds *MetadataManager) SendHeartbeat(ctx context.Context, request *pb.HeartbeatRequest) error {
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)
	instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, path.SPLIT)

	_, ttl, err := eutil.HeartbeatUtil(ctx, domainProject, request.ServiceId, request.InstanceId)
	if err != nil {
		log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s",
			instanceFlag, remoteIP), err)
		return err
	}

	if ttl == 0 {
		log.Error(fmt.Sprintf("heartbeat successful, but renew instance[%s] failed. operator %s", instanceFlag, remoteIP),
			errors.New("connect backend timed out"))
	} else {
		log.Info(fmt.Sprintf("heartbeat successful, renew instance[%s] ttl to %d. operator %s",
			instanceFlag, ttl, remoteIP))
	}
	sendEvent(ctx, sync.UpdateAction, datasource.ResourceHeartbeat, request)
	return nil
}

func (ds *MetadataManager) ListManyInstances(ctx context.Context, request *pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
	domainProject := util.ParseDomainProject(ctx)
	key := path.GetInstanceRootKey(domainProject) + path.SPLIT
	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
	kvs, err := sd.Instance().Search(ctx, opts...)
	if err != nil {
		return nil, err
	}
	resp := &pb.GetAllInstancesResponse{}
	for _, keyValue := range kvs.Kvs {
		instance, ok := keyValue.Value.(*pb.MicroServiceInstance)
		if !ok {
			log.Warn(fmt.Sprintf("Unexpected value format! %s", util.BytesToStringWithNoCopy(keyValue.Key)))
			continue
		}
		resp.Instances = append(resp.Instances, instance)
	}
	return resp, nil
}

func (ds *MetadataManager) ModifySchemas(ctx context.Context, request *pb.ModifySchemasRequest) (
	*pb.ModifySchemasResponse, error) {
	remoteIP := util.GetIPFromContext(ctx)
	serviceID := request.ServiceId
	domainProject := util.ParseDomainProject(ctx)

	serviceInfo, err := eutil.GetService(ctx, domainProject, serviceID)
	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("modify service[%s] schemas failed, service does not exist in db, operator: %s",
				serviceID, remoteIP))
			return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
		}
		log.Error(fmt.Sprintf("modify service[%s] schemas failed, get service failed, operator: %s",
			serviceID, remoteIP), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	if respErr := ds.modifySchemas(ctx, domainProject, serviceInfo, request.Schemas); respErr != nil {
		log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), respErr)
		return nil, respErr
	}

	return &pb.ModifySchemasResponse{}, nil
}

func (ds *MetadataManager) ModifySchema(ctx context.Context, request *pb.ModifySchemaRequest) (
	*pb.ModifySchemaResponse, error) {
	remoteIP := util.GetIPFromContext(ctx)
	serviceID := request.ServiceId
	schemaID := request.SchemaId

	schema := pb.Schema{
		SchemaId: schemaID,
		Summary:  request.Summary,
		Schema:   request.Schema,
	}
	err := ds.modifySchema(ctx, serviceID, &schema)
	if err != nil {
		log.Error(fmt.Sprintf("modify schema[%s/%s] failed, operator: %s", serviceID, schemaID, remoteIP), err)
		return nil, err
	}

	log.Info(fmt.Sprintf("modify schema[%s/%s] successfully, operator: %s", serviceID, schemaID, remoteIP))
	return &pb.ModifySchemaResponse{}, nil
}

func (ds *MetadataManager) ExistSchema(ctx context.Context, request *pb.GetExistenceRequest) (
	*pb.GetExistenceResponse, error) {
	domainProject := util.ParseDomainProject(ctx)

	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
		log.Warn(fmt.Sprintf("schema[%s/%s] exist failed, service does not exist", request.ServiceId, request.SchemaId))
		return nil, pb.NewError(pb.ErrServiceNotExists, "service does not exist.")
	}

	key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
	exist, err := checkSchemaInfoExist(ctx, key)
	if err != nil {
		log.Error(fmt.Sprintf("schema[%s/%s] exist failed, get schema failed", request.ServiceId, request.SchemaId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	if !exist {
		log.Info(fmt.Sprintf("schema[%s/%s] exist failed, schema does not exist", request.ServiceId, request.SchemaId))
		return nil, schema.ErrSchemaNotFound
	}
	schemaSummary, err := getSchemaSummary(ctx, domainProject, request.ServiceId, request.SchemaId)
	if err != nil {
		log.Error(fmt.Sprintf("schema[%s/%s] exist failed, get schema summary failed",
			request.ServiceId, request.SchemaId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}
	return &pb.GetExistenceResponse{
		SchemaId: request.SchemaId,
		Summary:  schemaSummary,
	}, nil
}

func (ds *MetadataManager) GetSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
	domainProject := util.ParseDomainProject(ctx)

	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
		log.Error(fmt.Sprintf("get schema[%s/%s] failed, service does not exist",
			request.ServiceId, request.SchemaId), nil)
		return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key))
	resp, errDo := sd.Schema().Search(ctx, opts...)
	if errDo != nil {
		log.Error(fmt.Sprintf("get schema[%s/%s] failed", request.ServiceId, request.SchemaId), errDo)
		return nil, pb.NewError(pb.ErrUnavailableBackend, errDo.Error())
	}
	if resp.Count == 0 {
		log.Error(fmt.Sprintf("get schema[%s/%s] failed, schema does not exists",
			request.ServiceId, request.SchemaId), errDo)
		return nil, schema.ErrSchemaNotFound
	}

	schemaSummary, err := getSchemaSummary(ctx, domainProject, request.ServiceId, request.SchemaId)
	if err != nil {
		log.Error(fmt.Sprintf("get schema[%s/%s] failed, get schema summary failed",
			request.ServiceId, request.SchemaId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	return &pb.GetSchemaResponse{
		Schema:        util.BytesToStringWithNoCopy(resp.Kvs[0].Value.([]byte)),
		SchemaSummary: schemaSummary,
	}, nil
}

func (ds *MetadataManager) GetAllSchemas(ctx context.Context, request *pb.GetAllSchemaRequest) (
	*pb.GetAllSchemaResponse, error) {
	domainProject := util.ParseDomainProject(ctx)

	service, err := eutil.GetService(ctx, domainProject, request.ServiceId)
	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("get service[%s] all schemas failed, service does not exist in db", request.ServiceId))
			return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
		}
		log.Error(fmt.Sprintf("get service[%s] all schemas failed, get service failed", request.ServiceId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	schemasList := service.Schemas
	if len(schemasList) == 0 {
		return &pb.GetAllSchemaResponse{
			Schemas: []*pb.Schema{},
		}, nil
	}

	key := path.GenerateServiceSchemaSummaryKey(domainProject, request.ServiceId, "")
	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
	resp, errDo := sd.SchemaSummary().Search(ctx, opts...)
	if errDo != nil {
		log.Error(fmt.Sprintf("get service[%s] all schema summaries failed", request.ServiceId), errDo)
		return nil, pb.NewError(pb.ErrUnavailableBackend, errDo.Error())
	}

	respWithSchema := &kvstore.Response{}
	if request.WithSchema {
		key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, "")
		opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
		respWithSchema, errDo = sd.Schema().Search(ctx, opts...)
		if errDo != nil {
			log.Error(fmt.Sprintf("get service[%s] all schemas failed", request.ServiceId), errDo)
			return nil, pb.NewError(pb.ErrUnavailableBackend, errDo.Error())
		}
	}

	schemas := make([]*pb.Schema, 0, len(schemasList))
	for _, schemaID := range schemasList {
		tempSchema := &pb.Schema{}
		tempSchema.SchemaId = schemaID
		for _, summarySchema := range resp.Kvs {
			_, _, schemaIDOfSummary := path.GetInfoFromSchemaSummaryKV(summarySchema.Key)
			if schemaID == schemaIDOfSummary {
				tempSchema.Summary = summarySchema.Value.(string)
			}
		}

		for _, contentSchema := range respWithSchema.Kvs {
			_, _, schemaIDOfSchema := path.GetInfoFromSchemaKV(contentSchema.Key)
			if schemaID == schemaIDOfSchema {
				tempSchema.Schema = util.BytesToStringWithNoCopy(contentSchema.Value.([]byte))
			}
		}
		schemas = append(schemas, tempSchema)
	}

	return &pb.GetAllSchemaResponse{
		Schemas: schemas,
	}, nil
}

func (ds *MetadataManager) DeleteSchema(ctx context.Context, request *pb.DeleteSchemaRequest) error {
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)

	key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
	exist, err := eutil.CheckSchemaInfoExist(ctx, key)
	if err != nil {
		log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: %s",
			request.ServiceId, request.SchemaId, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	if !exist {
		log.Error(fmt.Sprintf("delete schema[%s/%s] failed, schema does not exist, operator: %s",
			request.ServiceId, request.SchemaId, remoteIP), nil)
		return schema.ErrSchemaNotFound
	}
	epSummaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, request.ServiceId, request.SchemaId)
	opts := []etcdadpt.OpOptions{etcdadpt.OpDel(etcdadpt.WithStrKey(epSummaryKey)), etcdadpt.OpDel(etcdadpt.WithStrKey(key))}
	schemaKeyOpt, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, key, key)
	if err != nil {
		log.Error("fail to create delete opts", err)
		return err
	}
	opts = append(opts, schemaKeyOpt...)
	schemaSummaryKeyOpt, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, epSummaryKey, epSummaryKey)
	if err != nil {
		log.Error("fail to create delete opts", err)
		return err
	}
	opts = append(opts, schemaSummaryKeyOpt...)

	resp, errDo := etcdadpt.TxnWithCmp(ctx, opts,
		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)),
		nil)

	if errDo != nil {
		log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: %s",
			request.ServiceId, request.SchemaId, remoteIP), errDo)
		return pb.NewError(pb.ErrUnavailableBackend, errDo.Error())
	}
	if !resp.Succeeded {
		log.Error(fmt.Sprintf("delete schema[%s/%s] failed, service does not exist, operator: %s",
			request.ServiceId, request.SchemaId, remoteIP), nil)
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	log.Info(fmt.Sprintf("delete schema[%s/%s] info successfully, operator: %s",
		request.ServiceId, request.SchemaId, remoteIP))
	return nil
}

func (ds *MetadataManager) PutManyTags(ctx context.Context, request *pb.AddServiceTagsRequest) error {
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)

	// service id存在性校验
	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
		log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, service does not exist, operator: %s",
			request.ServiceId, request.Tags, remoteIP), nil)
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, request.Tags)
	if checkErr != nil {
		log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, operator: %s",
			request.ServiceId, request.Tags, remoteIP), checkErr)
		return checkErr
	}

	log.Info(fmt.Sprintf("add service[%s]'s tags %v successfully, operator: %s", request.ServiceId, request.Tags, remoteIP))
	return nil
}

func (ds *MetadataManager) ListTag(ctx context.Context, request *pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error) {
	var err error
	domainProject := util.ParseDomainProject(ctx)
	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
		log.Error(fmt.Sprintf("get service[%s]'s tags failed, service does not exist", request.ServiceId), err)
		return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}
	tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
	if err != nil {
		log.Error(fmt.Sprintf("get service[%s]'s tags failed, get tags failed", request.ServiceId), err)
		return nil, pb.NewError(pb.ErrInternal, err.Error())
	}

	return &pb.GetServiceTagsResponse{
		Tags: tags,
	}, nil
}

func (ds *MetadataManager) PutTag(ctx context.Context, request *pb.UpdateServiceTagRequest) error {
	var err error
	remoteIP := util.GetIPFromContext(ctx)
	tagFlag := util.StringJoin([]string{request.Key, request.Value}, path.SPLIT)
	domainProject := util.ParseDomainProject(ctx)

	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
		log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, service does not exist, operator: %s",
			request.ServiceId, tagFlag, remoteIP), err)
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
	if err != nil {
		log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, get tag failed, operator: %s",
			request.ServiceId, tagFlag, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	//check if the tag exists
	if _, ok := tags[request.Key]; !ok {
		log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, tag does not exist, operator: %s",
			request.ServiceId, tagFlag, remoteIP), nil)
		return pb.NewError(pb.ErrTagNotExists, "Tag does not exist, please add one first.")
	}

	copyTags := make(map[string]string, len(tags))
	for k, v := range tags {
		copyTags[k] = v
	}
	copyTags[request.Key] = request.Value

	checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, copyTags)
	if checkErr != nil {
		log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, operator: %s",
			request.ServiceId, tagFlag, remoteIP), checkErr)
		return checkErr
	}

	log.Info(fmt.Sprintf("update service[%s]'s tag[%s] successfully, operator: %s", request.ServiceId, tagFlag, remoteIP))
	return nil
}

func (ds *MetadataManager) DeleteManyTags(ctx context.Context, request *pb.DeleteServiceTagsRequest) error {
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)

	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, service does not exist, operator: %s",
			request.ServiceId, request.Keys, remoteIP), nil)
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
	if err != nil {
		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, get service tags failed, operator: %s",
			request.ServiceId, request.Keys, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	copyTags := make(map[string]string, len(tags))
	for k, v := range tags {
		copyTags[k] = v
	}
	for _, key := range request.Keys {
		if _, ok := copyTags[key]; !ok {
			log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, tag[%s] does not exist, operator: %s",
				request.ServiceId, request.Keys, key, remoteIP), nil)
			return pb.NewError(pb.ErrTagNotExists, "Delete tags failed for this key "+key+" does not exist.")
		}
		delete(copyTags, key)
	}

	// the capacity of tags may be 0
	data, err := json.Marshal(copyTags)
	if err != nil {
		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, marshall service tags failed, operator: %s",
			request.ServiceId, request.Keys, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	key := path.GenerateServiceTagKey(domainProject, request.ServiceId)

	opts := etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
	syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, key, data,
		esync.WithOpts(map[string]string{"key": key}))
	if err != nil {
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	opts = append(opts, syncOpts...)

	resp, err := etcdadpt.TxnWithCmp(ctx, opts,
		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)), nil)
	if err != nil {
		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, operator: %s",
			request.ServiceId, request.Keys, remoteIP), err)
		return pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}
	if !resp.Succeeded {
		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, service does not exist, operator: %s",
			request.ServiceId, request.Keys, remoteIP), err)
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	log.Info(fmt.Sprintf("delete service[%s]'s tags %v successfully, operator: %s", request.ServiceId, request.Keys, remoteIP))
	return nil
}

func (ds *MetadataManager) modifySchemas(ctx context.Context, domainProject string, service *pb.MicroService,
	schemas []*pb.Schema) error {
	remoteIP := util.GetIPFromContext(ctx)
	serviceID := service.ServiceId
	schemasFromDatabase, err := getSchemasFromDatabase(ctx, domainProject, serviceID)
	if err != nil {
		log.Error(fmt.Sprintf("modify service[%s] schemas failed, get schemas failed, operator: %s",
			serviceID, remoteIP), nil)
		return pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}

	needUpdateSchemas, needAddSchemas, needDeleteSchemas, _ :=
		datasource.SchemasAnalysis(schemas, schemasFromDatabase, service.Schemas)

	pluginOps := make([]etcdadpt.OpOptions, 0)
	quotaSize := len(needAddSchemas) - len(needDeleteSchemas)
	if quotaSize > 0 {
		errQuota := quotasvc.ApplySchema(ctx, serviceID, int64(quotaSize))
		if errQuota != nil {
			log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), errQuota)
			return errQuota
		}
	}

	var schemaIDs []string
	for _, schema := range needAddSchemas {
		log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		opts, _ := putSchema(ctx, domainProject, service.ServiceId, schema)
		pluginOps = append(pluginOps, opts...)
		schemaIDs = append(schemaIDs, schema.SchemaId)
	}

	for _, schema := range needUpdateSchemas {
		log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		opts, _ := putSchema(ctx, domainProject, serviceID, schema)
		pluginOps = append(pluginOps, opts...)
		schemaIDs = append(schemaIDs, schema.SchemaId)
	}

	for _, schema := range needDeleteSchemas {
		log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
		opts, _ := deleteSchema(ctx, domainProject, serviceID, schema)
		pluginOps = append(pluginOps, opts...)
	}

	service.Schemas = schemaIDs
	opts, err := eutil.UpdateService(ctx, domainProject, serviceID, service)
	if err != nil {
		log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
			serviceID, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	pluginOps = append(pluginOps, opts...)

	if len(pluginOps) != 0 {
		resp, err := etcdadpt.TxnWithCmp(ctx, pluginOps,
			etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)),
			nil)
		if err != nil {
			return pb.NewError(pb.ErrUnavailableBackend, err.Error())
		}
		if !resp.Succeeded {
			return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
		}
	}
	return nil
}

func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, schema *pb.Schema) *errsvc.Error {
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)
	schemaID := schema.SchemaId

	microService, err := eutil.GetService(ctx, domainProject, serviceID)
	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("microService does not exist, modify schema[%s/%s] failed, operator: %s",
				serviceID, schemaID, remoteIP))
			return pb.NewError(pb.ErrServiceNotExists, "Service does not exist")
		}
		log.Error(fmt.Sprintf("modify schema[%s/%s] failed, get `microService failed, operator: %s",
			serviceID, schemaID, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	var pluginOps []etcdadpt.OpOptions
	isExist := isExistSchemaID(microService, []*pb.Schema{schema})
	if !isExist {
		microService.Schemas = append(microService.Schemas, schemaID)
		opts, err := eutil.UpdateService(ctx, domainProject, serviceID, microService)
		if err != nil {
			log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
				serviceID, schemaID, remoteIP), err)
			return pb.NewError(pb.ErrInternal, err.Error())
		}
		pluginOps = append(pluginOps, opts...)
	}

	opts, err := commitSchemaInfo(ctx, domainProject, serviceID, schema)
	if err != nil {
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	pluginOps = append(pluginOps, opts...)

	resp, err := etcdadpt.TxnWithCmp(ctx, pluginOps,
		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)),
		nil)
	if err != nil {
		return pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}
	if !resp.Succeeded {
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}
	return nil
}

func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.DeleteServiceRequest) error {
	serviceID := request.ServiceId
	force := request.Force
	remoteIP := util.GetIPFromContext(ctx)
	domainProject := util.ParseDomainProject(ctx)

	title := "delete"
	if force {
		title = "force delete"
	}

	if serviceID == core.Service.ServiceId {
		err := errors.New("not allow to delete service center")
		log.Error(fmt.Sprintf("%s micro-service[%s] failed, operator: %s", title, serviceID, remoteIP), err)
		return pb.NewError(pb.ErrInvalidParams, err.Error())
	}

	microservice, err := eutil.GetService(ctx, domainProject, serviceID)
	if err != nil {
		if errors.Is(err, datasource.ErrNoData) {
			log.Debug(fmt.Sprintf("service does not exist, %s micro-service[%s] failed, operator: %s",
				title, serviceID, remoteIP))
			return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
		}
		log.Error(fmt.Sprintf("%s micro-service[%s] failed, get service file failed, operator: %s",
			title, serviceID, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}

	// 强制删除，则与该服务相关的信息删除，非强制删除： 如果作为该被依赖（作为provider，提供服务,且不是只存在自依赖）或者存在实例，则不能删除
	if !force {
		services, err := eutil.GetConsumerIds(ctx, domainProject, microservice)
		if err != nil {
			log.Error(fmt.Sprintf("delete micro-service[%s] failed, get service dependency failed, operator: %s",
				serviceID, remoteIP), err)
			return pb.NewError(pb.ErrInternal, err.Error())
		}
		if l := len(services); l > 1 || (l == 1 && services[0] != serviceID) {
			log.Error(fmt.Sprintf("delete micro-service[%s] failed, other services[%d] depend on it, operator: %s",
				serviceID, l, remoteIP), nil)
			return pb.NewError(pb.ErrDependedOnConsumer, "Can not delete this service, other service rely it.")
		}

		instancesKey := path.GenerateInstanceKey(domainProject, serviceID, "")
		rsp, err := sd.Instance().Search(ctx,
			etcdadpt.WithStrKey(instancesKey),
			etcdadpt.WithPrefix(),
			etcdadpt.WithCountOnly())
		if err != nil {
			log.Error(fmt.Sprintf("delete micro-service[%s] failed, get instances failed, operator: %s",
				serviceID, remoteIP), err)
			return pb.NewError(pb.ErrUnavailableBackend, err.Error())
		}

		if rsp.Count > 0 {
			log.Error(fmt.Sprintf("delete micro-service[%s] failed, service deployed instances[%d], operator: %s",
				serviceID, rsp.Count, remoteIP), nil)
			return pb.NewError(pb.ErrDeployedInstance, "Can not delete the service deployed instance(s).")
		}
	}

	serviceIDKey := path.GenerateServiceKey(domainProject, serviceID)
	serviceKey := &pb.MicroServiceKey{
		Tenant:      domainProject,
		Environment: microservice.Environment,
		AppId:       microservice.AppId,
		ServiceName: microservice.ServiceName,
		Version:     microservice.Version,
		Alias:       microservice.Alias,
	}
	opts := []etcdadpt.OpOptions{
		etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateServiceIndexKey(serviceKey))),
		etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateServiceAliasKey(serviceKey))),
		etcdadpt.OpDel(etcdadpt.WithStrKey(serviceIDKey)),
	}

	syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceService, serviceID,
		&pb.DeleteServiceRequest{ServiceId: serviceID, Force: force})
	if err != nil {
		log.Error("fail to sync opt", err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	opts = append(opts, syncOpts...)

	//删除依赖规则
	optDeleteDep, err := eutil.DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey)
	if err != nil {
		log.Error(fmt.Sprintf("%s micro-service[%s] failed, delete dependency failed, operator: %s",
			title, serviceID, remoteIP), err)
		return pb.NewError(pb.ErrInternal, err.Error())
	}
	opts = append(opts, optDeleteDep)

	//删除schemas
	opts = append(opts, etcdadpt.OpDel(
		etcdadpt.WithStrKey(path.GenerateServiceSchemaKey(domainProject, serviceID, "")),
		etcdadpt.WithPrefix()))
	opts = append(opts, etcdadpt.OpDel(
		etcdadpt.WithStrKey(path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, "")),
		etcdadpt.WithPrefix()))
	opts = append(opts, etcdadpt.OpDel(
		etcdadpt.WithStrKey(path.GenerateServiceSchemaRefKey(domainProject, serviceID, "")),
		etcdadpt.WithPrefix()))

	//删除tags
	opts = append(opts, etcdadpt.OpDel(
		etcdadpt.WithStrKey(path.GenerateServiceTagKey(domainProject, serviceID))))

	//删除instances
	opts = append(opts, etcdadpt.OpDel(
		etcdadpt.WithStrKey(path.GenerateInstanceKey(domainProject, serviceID, "")),
		etcdadpt.WithPrefix()))
	opts = append(opts, etcdadpt.OpDel(
		etcdadpt.WithStrKey(path.GenerateInstanceLeaseKey(domainProject, serviceID, "")),
		etcdadpt.WithPrefix()))

	//删除实例
	err = eutil.DeleteServiceAllInstances(ctx, serviceID)
	if err != nil {
		log.Error(fmt.Sprintf("%s micro-service[%s] failed, revoke all instances failed, operator: %s",
			title, serviceID, remoteIP), err)
		return pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}

	resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotEqualVer(serviceIDKey, 0)), nil)
	if err != nil {
		log.Error(fmt.Sprintf("%s micro-service[%s] failed, operator: %s", title, serviceID, remoteIP), err)
		return pb.NewError(pb.ErrUnavailableBackend, err.Error())
	}
	if !resp.Succeeded {
		log.Error(fmt.Sprintf("%s micro-service[%s] failed, service does not exist, operator: %s",
			title, serviceID, remoteIP), err)
		return pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
	}

	quotasvc.RemandService(ctx)

	log.Info(fmt.Sprintf("%s micro-service[%s] successfully, operator: %s", title, serviceID, remoteIP))
	return nil
}
