blob: 18e035a69696fa9c48fcb782c219a7845fa58aec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package service
import (
"fmt"
"strings"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
apt "github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
scerr "github.com/apache/servicecomb-service-center/server/error"
"github.com/apache/servicecomb-service-center/server/plugin"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/quota"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
"context"
)
func (s *MicroServiceService) GetSchemaInfo(ctx context.Context, in *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
err := Validate(in)
if err != nil {
log.Errorf(nil, "get schema[%s/%s] failed", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()),
}, nil
}
domainProject := util.ParseDomainProject(ctx)
if !serviceUtil.ServiceExist(ctx, domainProject, in.ServiceId) {
log.Errorf(nil, "get schema[%s/%s] failed, service does not exist", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
key := apt.GenerateServiceSchemaKey(domainProject, in.ServiceId, in.SchemaId)
opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key))
resp, errDo := backend.Store().Schema().Search(ctx, opts...)
if errDo != nil {
log.Errorf(errDo, "get schema[%s/%s] failed", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrUnavailableBackend, errDo.Error()),
}, errDo
}
if resp.Count == 0 {
log.Errorf(errDo, "get schema[%s/%s] failed, schema does not exists", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrSchemaNotExists, "Do not have this schema info."),
}, nil
}
schemaSummary, err := getSchemaSummary(ctx, domainProject, in.ServiceId, in.SchemaId)
if err != nil {
log.Errorf(err, "get schema[%s/%s] failed, get schema summary failed", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
}, err
}
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Get schema info successfully."),
Schema: util.BytesToStringWithNoCopy(resp.Kvs[0].Value.([]byte)),
SchemaSummary: schemaSummary,
}, nil
}
func (s *MicroServiceService) GetAllSchemaInfo(ctx context.Context, in *pb.GetAllSchemaRequest) (*pb.GetAllSchemaResponse, error) {
err := Validate(in)
if err != nil {
log.Errorf(nil, "get service[%s] all schemas failed", in.ServiceId)
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()),
}, nil
}
domainProject := util.ParseDomainProject(ctx)
service, err := serviceUtil.GetService(ctx, domainProject, in.ServiceId)
if err != nil {
log.Errorf(err, "get service[%s] all schemas failed, get service failed", in.ServiceId)
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
}, err
}
if service == nil {
log.Errorf(nil, "get service[%s] all schemas failed, service does not exist", in.ServiceId)
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
schemasList := service.Schemas
if schemasList == nil || len(schemasList) == 0 {
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Do not have this schema info."),
Schemas: []*pb.Schema{},
}, nil
}
key := apt.GenerateServiceSchemaSummaryKey(domainProject, in.ServiceId, "")
opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix())
resp, errDo := backend.Store().SchemaSummary().Search(ctx, opts...)
if errDo != nil {
log.Errorf(errDo, "get service[%s] all schema summaries failed", in.ServiceId)
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(scerr.ErrUnavailableBackend, errDo.Error()),
}, errDo
}
respWithSchema := &discovery.Response{}
if in.WithSchema {
key := apt.GenerateServiceSchemaKey(domainProject, in.ServiceId, "")
opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key), registry.WithPrefix())
respWithSchema, errDo = backend.Store().Schema().Search(ctx, opts...)
if errDo != nil {
log.Errorf(errDo, "get service[%s] all schemas failed", in.ServiceId)
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(scerr.ErrUnavailableBackend, errDo.Error()),
}, errDo
}
}
schemas := make([]*pb.Schema, 0, len(schemasList))
for _, schemaId := range schemasList {
tempSchema := &pb.Schema{}
tempSchema.SchemaId = schemaId
for _, summarySchema := range resp.Kvs {
_, _, schemaIdOfSummary := apt.GetInfoFromSchemaSummaryKV(summarySchema.Key)
if schemaId == schemaIdOfSummary {
tempSchema.Summary = summarySchema.Value.(string)
}
}
for _, contentSchema := range respWithSchema.Kvs {
_, _, schemaIdOfSchema := apt.GetInfoFromSchemaKV(contentSchema.Key)
if schemaId == schemaIdOfSchema {
tempSchema.Schema = util.BytesToStringWithNoCopy(contentSchema.Value.([]byte))
}
}
schemas = append(schemas, tempSchema)
}
return &pb.GetAllSchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Get all schema info successfully."),
Schemas: schemas,
}, nil
}
func (s *MicroServiceService) DeleteSchema(ctx context.Context, in *pb.DeleteSchemaRequest) (*pb.DeleteSchemaResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
err := Validate(in)
if err != nil {
log.Errorf(err, "delete schema[%s/%s] failed, operator: %s", in.ServiceId, in.SchemaId, remoteIP)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()),
}, nil
}
domainProject := util.ParseDomainProject(ctx)
if !serviceUtil.ServiceExist(ctx, domainProject, in.ServiceId) {
log.Errorf(nil, "delete schema[%s/%s] failed, service does not exist, operator: %s",
in.ServiceId, in.SchemaId, remoteIP)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
key := apt.GenerateServiceSchemaKey(domainProject, in.ServiceId, in.SchemaId)
exist, err := serviceUtil.CheckSchemaInfoExist(ctx, key)
if err != nil {
log.Errorf(err, "delete schema[%s/%s] failed, operator: %s", in.ServiceId, in.SchemaId, remoteIP)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
}, err
}
if !exist {
log.Errorf(nil, "delete schema[%s/%s] failed, schema does not exist, operator: %s",
in.ServiceId, in.SchemaId, remoteIP)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrSchemaNotExists, "Schema info does not exist."),
}, nil
}
epSummaryKey := apt.GenerateServiceSchemaSummaryKey(domainProject, in.ServiceId, in.SchemaId)
opts := []registry.PluginOp{
registry.OpDel(registry.WithStrKey(epSummaryKey)),
registry.OpDel(registry.WithStrKey(key)),
}
resp, errDo := backend.Registry().TxnWithCmp(ctx, opts,
[]registry.CompareOp{registry.OpCmp(
registry.CmpVer(util.StringToBytesWithNoCopy(apt.GenerateServiceKey(domainProject, in.ServiceId))),
registry.CMP_NOT_EQUAL, 0)},
nil)
if errDo != nil {
log.Errorf(errDo, "delete schema[%s/%s] failed, operator: %s", in.ServiceId, in.SchemaId, remoteIP)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrUnavailableBackend, errDo.Error()),
}, errDo
}
if !resp.Succeeded {
log.Errorf(nil, "delete schema[%s/%s] failed, service does not exist, operator: %s",
in.ServiceId, in.SchemaId, remoteIP)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
log.Infof("delete schema[%s/%s] info successfully, operator: %s", in.ServiceId, in.SchemaId, remoteIP)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Delete schema info successfully."),
}, nil
}
// ModifySchemas covers all the schemas of a service.
// To cover the old schemas, ModifySchemas adds new schemas into, delete and
// modify the old schemas.
// 1. When the service is in production environment and schema is not editable:
// If the request contains a new schemaId (the number of schemaIds of
// the service is also required to be 0, or the request will be rejected),
// the new schemaId will be automatically added to the service information.
// Schema is only allowed to add.
// 2. Other cases:
// If the request contains a new schemaId,
// the new schemaId will be automatically added to the service information.
// Schema is allowed to add/delete/modify.
func (s *MicroServiceService) ModifySchemas(ctx context.Context, in *pb.ModifySchemasRequest) (*pb.ModifySchemasResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
err := Validate(in)
if err != nil {
log.Errorf(err, "modify service[%s] schemas failed, operator: %s", in.ServiceId, remoteIP)
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, "Invalid request."),
}, nil
}
serviceId := in.ServiceId
domainProject := util.ParseDomainProject(ctx)
service, err := serviceUtil.GetService(ctx, domainProject, serviceId)
if err != nil {
log.Errorf(err, "modify service[%s] schemas failed, get service failed, operator: %s", serviceId, remoteIP)
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
}, err
}
if service == nil {
log.Errorf(nil, "modify service[%s] schemas failed, service does not exist, operator: %s",
serviceId, remoteIP)
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
respErr := s.modifySchemas(ctx, domainProject, service, in.Schemas)
if respErr != nil {
log.Errorf(nil, "modify service[%s] schemas failed, operator: %s", serviceId, remoteIP)
resp := &pb.ModifySchemasResponse{
Response: pb.CreateResponseWithSCErr(respErr),
}
if respErr.InternalError() {
return resp, respErr
}
return resp, nil
}
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "modify schemas info successfully."),
}, nil
}
func schemasAnalysis(schemas []*pb.Schema, schemasFromDb []*pb.Schema, schemaIdsInService []string) (
[]*pb.Schema, []*pb.Schema, []*pb.Schema, []string) {
needUpdateSchemas := make([]*pb.Schema, 0, len(schemas))
needAddSchemas := make([]*pb.Schema, 0, len(schemas))
needDeleteSchemas := make([]*pb.Schema, 0, len(schemasFromDb))
nonExistSchemaIds := make([]string, 0, len(schemas))
duplicate := make(map[string]struct{})
for _, schema := range schemas {
if _, ok := duplicate[schema.SchemaId]; ok {
continue
}
duplicate[schema.SchemaId] = struct{}{}
exist := false
for _, schemaFromDb := range schemasFromDb {
if schema.SchemaId == schemaFromDb.SchemaId {
needUpdateSchemas = append(needUpdateSchemas, schema)
exist = true
break
}
}
if !exist {
needAddSchemas = append(needAddSchemas, schema)
}
exist = false
for _, schemaId := range schemaIdsInService {
if schema.SchemaId == schemaId {
exist = true
}
}
if !exist {
nonExistSchemaIds = append(nonExistSchemaIds, schema.SchemaId)
}
}
for _, schemaFromDb := range schemasFromDb {
exist := false
for _, schema := range schemas {
if schema.SchemaId == schemaFromDb.SchemaId {
exist = true
break
}
}
if !exist {
needDeleteSchemas = append(needDeleteSchemas, schemaFromDb)
}
}
return needUpdateSchemas, needAddSchemas, needDeleteSchemas, nonExistSchemaIds
}
func (s *MicroServiceService) modifySchemas(ctx context.Context, domainProject string, service *pb.MicroService, schemas []*pb.Schema) *scerr.Error {
remoteIP := util.GetIPFromContext(ctx)
serviceId := service.ServiceId
schemasFromDatabase, err := GetSchemasFromDatabase(ctx, domainProject, serviceId)
if err != nil {
log.Errorf(nil, "modify service[%s] schemas failed, get schemas failed, operator: %s",
serviceId, remoteIP)
return scerr.NewError(scerr.ErrUnavailableBackend, err.Error())
}
needUpdateSchemas, needAddSchemas, needDeleteSchemas, nonExistSchemaIds := schemasAnalysis(schemas, schemasFromDatabase, service.Schemas)
pluginOps := make([]registry.PluginOp, 0)
if !s.isSchemaEditable(service) {
if len(service.Schemas) == 0 {
res := quota.NewApplyQuotaResource(quota.SchemaQuotaType, domainProject, serviceId, int64(len(nonExistSchemaIds)))
rst := plugin.Plugins().Quota().Apply4Quotas(ctx, res)
errQuota := rst.Err
if errQuota != nil {
log.Errorf(errQuota, "modify service[%s] schemas failed, operator: %s", serviceId, remoteIP)
return errQuota
}
service.Schemas = nonExistSchemaIds
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
log.Errorf(err, "modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
serviceId, remoteIP)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
pluginOps = append(pluginOps, opt)
} else {
if len(nonExistSchemaIds) != 0 {
errInfo := fmt.Errorf("Non-existent schemaIds %v", nonExistSchemaIds)
log.Errorf(errInfo, "modify service[%s] schemas failed, operator: %s", serviceId, remoteIP)
return scerr.NewError(scerr.ErrUndefinedSchemaId, errInfo.Error())
}
for _, needUpdateSchema := range needUpdateSchemas {
exist, err := isExistSchemaSummary(ctx, domainProject, serviceId, needUpdateSchema.SchemaId)
if err != nil {
return scerr.NewError(scerr.ErrInternal, err.Error())
}
if !exist {
opts := schemaWithDatabaseOpera(registry.OpPut, domainProject, serviceId, needUpdateSchema)
pluginOps = append(pluginOps, opts...)
} else {
log.Warnf("schema[%s/%s] and it's summary already exist, skip to update, operator: %s",
serviceId, needUpdateSchema.SchemaId, remoteIP)
}
}
}
for _, schema := range needAddSchemas {
log.Infof("add new schema[%s/%s], operator: %s", serviceId, schema.SchemaId, remoteIP)
opts := schemaWithDatabaseOpera(registry.OpPut, domainProject, service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
}
} else {
quotaSize := len(needAddSchemas) - len(needDeleteSchemas)
if quotaSize > 0 {
res := quota.NewApplyQuotaResource(quota.SchemaQuotaType, domainProject, serviceId, int64(quotaSize))
rst := plugin.Plugins().Quota().Apply4Quotas(ctx, res)
err := rst.Err
if err != nil {
log.Errorf(err, "modify service[%s] schemas failed, operator: %s", serviceId, remoteIP)
return err
}
}
var schemaIds []string
for _, schema := range needAddSchemas {
log.Infof("add new schema[%s/%s], operator: %s", serviceId, schema.SchemaId, remoteIP)
opts := schemaWithDatabaseOpera(registry.OpPut, domainProject, service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
schemaIds = append(schemaIds, schema.SchemaId)
}
for _, schema := range needUpdateSchemas {
log.Infof("update schema[%s/%s], operator: %s", serviceId, schema.SchemaId, remoteIP)
opts := schemaWithDatabaseOpera(registry.OpPut, domainProject, serviceId, schema)
pluginOps = append(pluginOps, opts...)
schemaIds = append(schemaIds, schema.SchemaId)
}
for _, schema := range needDeleteSchemas {
log.Infof("delete non-existent schema[%s/%s], operator: %s", serviceId, schema.SchemaId, remoteIP)
opts := schemaWithDatabaseOpera(registry.OpDel, domainProject, serviceId, schema)
pluginOps = append(pluginOps, opts...)
}
service.Schemas = schemaIds
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
log.Errorf(err, "modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
serviceId, remoteIP)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
pluginOps = append(pluginOps, opt)
}
if len(pluginOps) != 0 {
resp, err := backend.BatchCommitWithCmp(ctx, pluginOps,
[]registry.CompareOp{registry.OpCmp(
registry.CmpVer(util.StringToBytesWithNoCopy(apt.GenerateServiceKey(domainProject, serviceId))),
registry.CMP_NOT_EQUAL, 0)},
nil)
if err != nil {
return scerr.NewError(scerr.ErrUnavailableBackend, err.Error())
}
if !resp.Succeeded {
return scerr.NewError(scerr.ErrServiceNotExists, "Service does not exist.")
}
}
return nil
}
func (s *MicroServiceService) isSchemaEditable(service *pb.MicroService) bool {
return (len(service.Environment) != 0 && service.Environment != pb.ENV_PROD) || s.schemaEditable
}
func isExistSchemaId(service *pb.MicroService, schemas []*pb.Schema) bool {
serviceSchemaIds := service.Schemas
for _, schema := range schemas {
if !containsValueInSlice(serviceSchemaIds, schema.SchemaId) {
log.Errorf(nil, "schema[%s/%s] does not exist schemaId", service.ServiceId, schema.SchemaId)
return false
}
}
return true
}
func schemaWithDatabaseOpera(invoke registry.Operation, domainProject string, serviceId string, schema *pb.Schema) []registry.PluginOp {
pluginOps := make([]registry.PluginOp, 0)
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)
opt := invoke(registry.WithStrKey(key), registry.WithStrValue(schema.Schema))
pluginOps = append(pluginOps, opt)
keySummary := apt.GenerateServiceSchemaSummaryKey(domainProject, serviceId, schema.SchemaId)
opt = invoke(registry.WithStrKey(keySummary), registry.WithStrValue(schema.Summary))
pluginOps = append(pluginOps, opt)
return pluginOps
}
func GetSchemasFromDatabase(ctx context.Context, domainProject string, serviceId string) ([]*pb.Schema, error) {
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, "")
resp, err := backend.Store().Schema().Search(ctx,
registry.WithPrefix(),
registry.WithStrKey(key))
if err != nil {
log.Errorf(err, "get service[%s]'s schema failed", serviceId)
return nil, err
}
schemas := make([]*pb.Schema, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
key := util.BytesToStringWithNoCopy(kv.Key)
tmp := strings.Split(key, "/")
schemaId := tmp[len(tmp)-1]
schema := util.BytesToStringWithNoCopy(kv.Value.([]byte))
schemaStruct := &pb.Schema{
SchemaId: schemaId,
Schema: schema,
}
schemas = append(schemas, schemaStruct)
}
return schemas, nil
}
// ModifySchema modifies a specific schema
// 1. When the service is in production environment and schema is not editable:
// If the request contains a new schemaId (the number of schemaIds of
// the service is also required to be 0, or the request will be rejected),
// the new schemaId will be automatically added to the service information.
// Schema is only allowed to add.
// 2. Other cases:
// If the request contains a new schemaId,
// the new schemaId will be automatically added to the service information.
// Schema is allowed to add/modify.
func (s *MicroServiceService) ModifySchema(ctx context.Context, request *pb.ModifySchemaRequest) (*pb.ModifySchemaResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
domainProject := util.ParseDomainProject(ctx)
respErr := s.canModifySchema(ctx, domainProject, request)
if respErr != nil {
resp := &pb.ModifySchemaResponse{
Response: pb.CreateResponseWithSCErr(respErr),
}
if respErr.InternalError() {
return resp, respErr
}
return resp, nil
}
serviceId := request.ServiceId
schemaId := request.SchemaId
schema := pb.Schema{
SchemaId: schemaId,
Summary: request.Summary,
Schema: request.Schema,
}
err := s.modifySchema(ctx, serviceId, &schema)
if err != nil {
log.Errorf(err, "modify schema[%s/%s] failed, operator: %s", serviceId, schemaId, remoteIP)
resp := &pb.ModifySchemaResponse{
Response: pb.CreateResponseWithSCErr(err),
}
if err.InternalError() {
return resp, err
}
return resp, nil
}
log.Infof("modify schema[%s/%s] successfully, operator: %s", serviceId, schemaId, remoteIP)
return &pb.ModifySchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "modify schema info success"),
}, nil
}
func (s *MicroServiceService) canModifySchema(ctx context.Context, domainProject string, in *pb.ModifySchemaRequest) *scerr.Error {
remoteIP := util.GetIPFromContext(ctx)
serviceId := in.ServiceId
schemaId := in.SchemaId
if len(schemaId) == 0 || len(serviceId) == 0 {
log.Errorf(nil, "update schema[%s/%s] failed, invalid params, operator: %s",
serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrInvalidParams, "serviceId or schemaId is nil")
}
err := Validate(in)
if err != nil {
log.Errorf(err, "update schema[%s/%s] failed, operator: %s", serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrInvalidParams, err.Error())
}
res := quota.NewApplyQuotaResource(quota.SchemaQuotaType, domainProject, serviceId, 1)
rst := plugin.Plugins().Quota().Apply4Quotas(ctx, res)
errQuota := rst.Err
if errQuota != nil {
log.Errorf(errQuota, "update schema[%s/%s] failed, operator: %s", serviceId, schemaId, remoteIP)
return errQuota
}
if len(in.Summary) == 0 {
log.Warnf("schema[%s/%s]'s summary is empty, operator: %s", serviceId, schemaId, remoteIP)
}
return nil
}
func (s *MicroServiceService) modifySchema(ctx context.Context, serviceId string, schema *pb.Schema) *scerr.Error {
remoteIP := util.GetIPFromContext(ctx)
domainProject := util.ParseDomainProject(ctx)
schemaId := schema.SchemaId
service, err := serviceUtil.GetService(ctx, domainProject, serviceId)
if err != nil {
log.Errorf(err, "modify schema[%s/%s] failed, get service failed, operator: %s",
serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
if service == nil {
log.Errorf(nil, "modify schema[%s/%s] failed, service does not exist, operator: %s",
serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrServiceNotExists, "Service does not exist")
}
var pluginOps []registry.PluginOp
isExist := isExistSchemaId(service, []*pb.Schema{schema})
if !s.isSchemaEditable(service) {
if len(service.Schemas) != 0 && !isExist {
return scerr.NewError(scerr.ErrUndefinedSchemaId, "Non-existent schemaId can't be added in "+pb.ENV_PROD)
}
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, schemaId)
respSchema, err := backend.Store().Schema().Search(ctx, registry.WithStrKey(key), registry.WithCountOnly())
if err != nil {
log.Errorf(err, "modify schema[%s/%s] failed, get schema summary failed, operator: %s",
serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrUnavailableBackend, err.Error())
}
if respSchema.Count != 0 {
if len(schema.Summary) == 0 {
log.Errorf(err, "%s mode, schema[%s/%s] already exists, can not be changed, operator: %s",
pb.ENV_PROD, serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrModifySchemaNotAllow, "schema already exist, can not be changed in "+pb.ENV_PROD)
}
exist, err := isExistSchemaSummary(ctx, domainProject, serviceId, schemaId)
if err != nil {
log.Errorf(err, "check schema[%s/%s] summary existence failed, operator: %s",
serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
if exist {
log.Errorf(err, "%s mode, schema[%s/%s] already exist, can not be changed, operator: %s",
pb.ENV_PROD, serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrModifySchemaNotAllow, "schema already exist, can not be changed in "+pb.ENV_PROD)
}
}
if len(service.Schemas) == 0 {
service.Schemas = append(service.Schemas, schemaId)
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
log.Errorf(err, "modify schema[%s/%s] failed, update service.Schemas failed, operator: %s",
serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
pluginOps = append(pluginOps, opt)
}
} else {
if !isExist {
service.Schemas = append(service.Schemas, schemaId)
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
log.Errorf(err, "modify schema[%s/%s] failed, update service.Schemas failed, operator: %s",
serviceId, schemaId, remoteIP)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
pluginOps = append(pluginOps, opt)
}
}
opts := CommitSchemaInfo(domainProject, serviceId, schema)
pluginOps = append(pluginOps, opts...)
resp, err := backend.Registry().TxnWithCmp(ctx, pluginOps,
[]registry.CompareOp{registry.OpCmp(
registry.CmpVer(util.StringToBytesWithNoCopy(apt.GenerateServiceKey(domainProject, serviceId))),
registry.CMP_NOT_EQUAL, 0)},
nil)
if err != nil {
return scerr.NewError(scerr.ErrUnavailableBackend, err.Error())
}
if !resp.Succeeded {
return scerr.NewError(scerr.ErrServiceNotExists, "Service does not exist.")
}
return nil
}
func isExistSchemaSummary(ctx context.Context, domainProject, serviceId, schemaId string) (bool, error) {
key := apt.GenerateServiceSchemaSummaryKey(domainProject, serviceId, schemaId)
resp, err := backend.Store().SchemaSummary().Search(ctx, registry.WithStrKey(key), registry.WithCountOnly())
if err != nil {
return true, err
}
if resp.Count == 0 {
return false, nil
}
return true, nil
}
func CommitSchemaInfo(domainProject string, serviceId string, schema *pb.Schema) []registry.PluginOp {
if len(schema.Summary) != 0 {
return schemaWithDatabaseOpera(registry.OpPut, domainProject, serviceId, schema)
} else {
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)
opt := registry.OpPut(registry.WithStrKey(key), registry.WithStrValue(schema.Schema))
return []registry.PluginOp{opt}
}
}
func containsValueInSlice(in []string, value string) bool {
if in == nil || len(value) == 0 {
return false
}
for _, i := range in {
if i == value {
return true
}
}
return false
}
func getSchemaSummary(ctx context.Context, domainProject string, serviceId string, schemaId string) (string, error) {
key := apt.GenerateServiceSchemaSummaryKey(domainProject, serviceId, schemaId)
resp, err := backend.Store().SchemaSummary().Search(ctx,
registry.WithStrKey(key),
)
if err != nil {
log.Errorf(err, "get schema[%s/%s] summary failed", serviceId, schemaId)
return "", err
}
if len(resp.Kvs) == 0 {
return "", nil
}
return resp.Kvs[0].Value.(string), nil
}