blob: 99c8f03b28c970457dedaa0057e2772a540364ed [file] [log] [blame]
//Copyright 2017 Huawei Technologies Co., Ltd
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
package service
import (
"errors"
"github.com/ServiceComb/service-center/pkg/util"
apt "github.com/ServiceComb/service-center/server/core"
"github.com/ServiceComb/service-center/server/core/backend"
"github.com/ServiceComb/service-center/server/core/backend/store"
pb "github.com/ServiceComb/service-center/server/core/proto"
scerr "github.com/ServiceComb/service-center/server/error"
"github.com/ServiceComb/service-center/server/infra/quota"
"github.com/ServiceComb/service-center/server/infra/registry"
"github.com/ServiceComb/service-center/server/plugin"
serviceUtil "github.com/ServiceComb/service-center/server/service/util"
"golang.org/x/net/context"
"net/http"
"strings"
)
func (s *MicroServiceService) GetSchemaInfo(ctx context.Context, in *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
if in == nil || len(in.ServiceId) == 0 || len(in.SchemaId) == 0 {
util.Logger().Errorf(nil, "get schema failed: invalid params.")
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, "Invalid request path."),
}, nil
}
err := apt.Validate(in)
if err != nil {
util.Logger().Errorf(nil, "get schema failed, serviceId %s, schemaId %s: invalid params.", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()),
}, nil
}
domainProject := util.ParseDomainProject(ctx)
if !serviceUtil.ServiceExist(ctx, domainProject, in.ServiceId) {
util.Logger().Errorf(nil, "get schema failed, serviceId %s, schemaId %s: service not exist.", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
key := apt.GenerateServiceSchemaKey(domainProject, in.ServiceId, in.SchemaId)
opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key))
resp, errDo := store.Store().Schema().Search(ctx, opts...)
if errDo != nil {
util.Logger().Errorf(errDo, "get schema failed, serviceId %s, schemaId %s: get schema info failed.", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInternal, "Get schema info failed."),
}, errDo
}
if resp.Count == 0 {
util.Logger().Errorf(errDo, "get schema failed, serviceId %s, schemaId %s: schema not exists.", in.ServiceId, in.SchemaId)
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(scerr.ErrSchemaNotExists, "Do not have this schema info."),
}, nil
}
return &pb.GetSchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Get schema info successfully."),
Schema: util.BytesToStringWithNoCopy(resp.Kvs[0].Value),
}, nil
}
func (s *MicroServiceService) DeleteSchema(ctx context.Context, request *pb.DeleteSchemaRequest) (*pb.DeleteSchemaResponse, error) {
if request == nil || len(request.ServiceId) == 0 || len(request.SchemaId) == 0 {
util.Logger().Errorf(nil, "delete schema failed: invalid params.")
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, "Invalid request path."),
}, nil
}
err := apt.Validate(request)
if err != nil {
util.Logger().Errorf(err, "delete schema failed, serviceId %s, schemaId %s: invalid params.", request.ServiceId, request.SchemaId)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()),
}, nil
}
domainProject := util.ParseDomainProject(ctx)
if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
util.Logger().Errorf(nil, "delete schema failed, serviceId %s, schemaId %s: service not exist.", request.ServiceId, request.SchemaId)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
key := apt.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
exist, err := serviceUtil.CheckSchemaInfoExist(ctx, key)
if err != nil {
util.Logger().Errorf(err, "delete schema failed, serviceId %s, schemaId %s: get schema failed.", request.ServiceId, request.SchemaId)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrInternal, "Schema info does not exist."),
}, err
}
if !exist {
util.Logger().Errorf(nil, "delete schema failed, serviceId %s, schemaId %s: schema not exist.", request.ServiceId, request.SchemaId)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrSchemaNotExists, "Schema info does not exist."),
}, nil
}
epSummaryKey := apt.GenerateServiceSchemaSummaryKey(domainProject, request.ServiceId, request.SchemaId)
opts := []registry.PluginOp{
registry.OpDel(registry.WithStrKey(epSummaryKey)),
registry.OpDel(registry.WithStrKey(key)),
}
_, errDo := backend.Registry().Txn(ctx, opts)
if errDo != nil {
util.Logger().Errorf(errDo, "delete schema failed, serviceId %s, schemaId %s: delete schema from etcd failed.", request.ServiceId, request.SchemaId)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(scerr.ErrUnavailableBackend, "Delete schema info failed."),
}, errDo
}
util.Logger().Infof("delete schema info successfully.%s", request.SchemaId)
return &pb.DeleteSchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Delete schema info successfully."),
}, nil
}
func (s *MicroServiceService) ModifySchemas(ctx context.Context, request *pb.ModifySchemasRequest) (*pb.ModifySchemasResponse, error) {
err := apt.Validate(request)
if err != nil {
util.Logger().Errorf(err, "modify schemas failed: invalid params.")
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, "Invalid request."),
}, nil
}
serviceId := request.ServiceId
domainProject := util.ParseDomainProject(ctx)
service, err := serviceUtil.GetService(ctx, domainProject, serviceId)
if err != nil {
util.Logger().Errorf(err, "modify schemas failed: get service failed. %s", serviceId)
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(scerr.ErrInternal, "Invalid request."),
}, err
}
if service == nil {
util.Logger().Errorf(nil, "modify schemas failed: service does not exist. %s", serviceId)
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(scerr.ErrServiceNotExists, "Service does not exist."),
}, nil
}
respErr := modifySchemas(ctx, domainProject, service, request.Schemas)
if respErr != nil {
resp := &pb.ModifySchemasResponse{
Response: pb.CreateResponse(respErr.Code, respErr.Detail),
}
if respErr.StatusCode() == http.StatusInternalServerError {
return resp, respErr
}
return resp, nil
}
return &pb.ModifySchemasResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "modify schemas info successfully."),
}, nil
}
func schemasAnalysis(schemas []*pb.Schema, schemasFromDb []*pb.Schema, schemaIdsInService []string) ([]*pb.Schema, []*pb.Schema, []string) {
needUpdateSchemas := make([]*pb.Schema, 0, len(schemas))
needAddSchemas := make([]*pb.Schema, 0, len(schemas))
for _, schema := range schemas {
exist := false
for _, schemaFromDb := range schemasFromDb {
if schema.SchemaId == schemaFromDb.SchemaId {
needUpdateSchemas = append(needUpdateSchemas, schema)
exist = true
break
}
}
if !exist {
needAddSchemas = append(needAddSchemas, schema)
}
}
nonExistSchemaIds := make([]string, 0, len(schemas))
for _, schema := range schemas {
exist := false
for _, schemaId := range schemaIdsInService {
if schema.SchemaId == schemaId {
exist = true
}
}
if !exist {
nonExistSchemaIds = append(nonExistSchemaIds, schema.SchemaId)
}
}
return needUpdateSchemas, needAddSchemas, nonExistSchemaIds
}
func modifySchemas(ctx context.Context, domainProject string, service *pb.MicroService, schemas []*pb.Schema) *scerr.Error {
serviceId := service.ServiceId
schemasFromDatabase, err := GetSchemasFromDatabase(ctx, domainProject, serviceId)
if err != nil {
util.Logger().Errorf(nil, "modify schema failed: get schema from database failed, %s", serviceId)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
needUpdateSchemas, needAddSchemas, nonExistSchemaIds := schemasAnalysis(schemas, schemasFromDatabase, service.Schemas)
pluginOps := make([]registry.PluginOp, 0)
if service.Environment == pb.ENV_PROD {
if len(service.Schemas) == 0 {
_, ok, err := plugin.Plugins().Quota().Apply4Quotas(ctx, quota.SchemaQuotaType, domainProject, serviceId, int16(len(schemas)))
if err != nil {
util.Logger().Errorf(err, "Add schema info failed, check resource num failed, %s", serviceId)
return scerr.NewError(scerr.ErrUnavailableQuota, err.Error())
}
if !ok {
util.Logger().Errorf(err, "Add schema info failed, reach the max size of schema, %s", serviceId)
return scerr.NewError(scerr.ErrNotEnoughQuota, "reach the max size of schema")
}
service.Schemas = nonExistSchemaIds
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
util.Logger().Errorf(err, "update service failed , %s", serviceId)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
pluginOps = append(pluginOps, opt)
} else {
if len(nonExistSchemaIds) != 0 {
util.Logger().Errorf(nil, "non-exist schemaId %#v", nonExistSchemaIds)
return scerr.NewError(scerr.ErrInvalidParams, "non-exist schemaId")
}
for _, needUpdateSchema := range needUpdateSchemas {
exist, err := isExistSchemaSummary(ctx, domainProject, serviceId, needUpdateSchema.SchemaId)
if err != nil {
return scerr.NewError(scerr.ErrInternal, err.Error())
}
if !exist {
opts := schemaWithDatabaseOpera(registry.OpPut, domainProject, serviceId, needUpdateSchema)
pluginOps = append(pluginOps, opts...)
} else {
util.Logger().Warnf(nil, "schema and summary more existed, skip,service %s, schemaId %s", service.ServiceId, needUpdateSchema.SchemaId)
}
}
}
} else {
needDeleteSchemas := make([]*pb.Schema, 0, len(schemasFromDatabase))
for _, schemaFromDb := range schemasFromDatabase {
exist := false
for _, schema := range schemas {
if schema.SchemaId == schemaFromDb.SchemaId {
exist = true
break
}
}
if !exist {
needDeleteSchemas = append(needDeleteSchemas, schemaFromDb)
}
}
quotaSize := len(needAddSchemas) - len(needDeleteSchemas)
if quotaSize > 0 {
_, ok, err := plugin.Plugins().Quota().Apply4Quotas(ctx, quota.SchemaQuotaType, domainProject, serviceId, int16(quotaSize))
if err != nil {
util.Logger().Errorf(err, "Add schema info failed, check resource num failed, %s", serviceId)
return scerr.NewError(scerr.ErrUnavailableQuota, err.Error())
}
if !ok {
util.Logger().Errorf(err, "Add schema info failed, reach the max size of schema, %s", serviceId)
return scerr.NewError(scerr.ErrNotEnoughQuota, "reach the max size of schema")
}
}
for _, schema := range needUpdateSchemas {
util.Logger().Infof("update schema %v", schema)
opts := schemaWithDatabaseOpera(registry.OpPut, domainProject, serviceId, schema)
pluginOps = append(pluginOps, opts...)
}
for _, schema := range needDeleteSchemas {
util.Logger().Infof("delete not exist schema %v", schema)
opts := schemaWithDatabaseOpera(registry.OpDel, domainProject, serviceId, schema)
pluginOps = append(pluginOps, opts...)
}
if len(nonExistSchemaIds) != 0 {
service.Schemas = append(service.Schemas, nonExistSchemaIds...)
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
util.Logger().Errorf(err, "update service failed , %s", serviceId)
return scerr.NewError(scerr.ErrInternal, err.Error())
}
pluginOps = append(pluginOps, opt)
}
}
for _, schema := range needAddSchemas {
util.Logger().Infof("add new schema %v", schema)
opts := schemaWithDatabaseOpera(registry.OpPut, domainProject, service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
}
if len(pluginOps) != 0 {
err = backend.BatchCommit(ctx, pluginOps)
if err != nil {
return scerr.NewError(scerr.ErrInternal, err.Error())
}
}
return nil
}
func isExistSchemaId(service *pb.MicroService, schemas []*pb.Schema) bool {
serviceSchemaIds := service.Schemas
for _, schema := range schemas {
if !containsValueInSlice(serviceSchemaIds, schema.SchemaId) {
util.Logger().Errorf(nil, "schema not exist schemaId: %s, serviceId %s", schema.SchemaId, service.ServiceId)
return false
}
}
return true
}
func schemaWithDatabaseOpera(invoke registry.Operation, domainProject string, serviceId string, schema *pb.Schema) []registry.PluginOp {
pluginOps := make([]registry.PluginOp, 0)
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)
opt := invoke(registry.WithStrKey(key), registry.WithStrValue(schema.Schema))
pluginOps = append(pluginOps, opt)
keySummary := apt.GenerateServiceSchemaSummaryKey(domainProject, serviceId, schema.SchemaId)
opt = invoke(registry.WithStrKey(keySummary), registry.WithStrValue(schema.Summary))
pluginOps = append(pluginOps, opt)
return pluginOps
}
func GetSchemasFromDatabase(ctx context.Context, domainProject string, serviceId string) ([]*pb.Schema, error) {
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, "")
util.Logger().Debugf("key is %s", key)
resp, err := backend.Registry().Do(ctx,
registry.GET,
registry.WithPrefix(),
registry.WithStrKey(key))
if err != nil {
util.Logger().Errorf(err, "Get schema of one service failed. %s", serviceId)
return nil, err
}
schemas := make([]*pb.Schema, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
key := util.BytesToStringWithNoCopy(kv.Key)
tmp := strings.Split(key, "/")
schemaId := tmp[len(tmp)-1]
schema := util.BytesToStringWithNoCopy(kv.Value)
schemaStruct := &pb.Schema{
SchemaId: schemaId,
Schema: schema,
}
schemas = append(schemas, schemaStruct)
}
return schemas, nil
}
func (s *MicroServiceService) ModifySchema(ctx context.Context, request *pb.ModifySchemaRequest) (*pb.ModifySchemaResponse, error) {
domainProject := util.ParseDomainProject(ctx)
respErr := s.canModifySchema(ctx, domainProject, request)
if respErr != nil {
resp := &pb.ModifySchemaResponse{
Response: pb.CreateResponse(respErr.Code, respErr.Detail),
}
if respErr.StatusCode() == http.StatusInternalServerError {
return resp, respErr
}
return resp, nil
}
serviceId := request.ServiceId
schemaId := request.SchemaId
schema := pb.Schema{
SchemaId: schemaId,
Summary: request.Summary,
Schema: request.Schema,
}
err, isInnerErr := s.modifySchema(ctx, serviceId, &schema)
if err != nil {
util.Logger().Errorf(err, "modify service schema failed")
if isInnerErr {
return &pb.ModifySchemaResponse{
Response: pb.CreateResponse(scerr.ErrInternal, "Modify schema info failed."),
}, err
}
return &pb.ModifySchemaResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams, err.Error()),
}, nil
}
util.Logger().Infof("update schema success: serviceId %s, schemaId %s.", serviceId, schemaId)
return &pb.ModifySchemaResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Modify schema info success."),
}, nil
}
func (s *MicroServiceService) canModifySchema(ctx context.Context, domainProject string, request *pb.ModifySchemaRequest) *scerr.Error {
serviceId := request.ServiceId
schemaId := request.SchemaId
if len(schemaId) == 0 || len(serviceId) == 0 {
util.Logger().Errorf(nil, "update schema failed: invalid params.")
return scerr.NewError(scerr.ErrInvalidParams, "serviceId or schemaId is nil")
}
err := apt.Validate(request)
if err != nil {
util.Logger().Errorf(err, "update schema failed, serviceId %s, schemaId %s: invalid params.", serviceId, schemaId)
return scerr.NewError(scerr.ErrInvalidParams, err.Error())
}
_, ok, err := plugin.Plugins().Quota().Apply4Quotas(ctx, quota.SchemaQuotaType, domainProject, serviceId, 1)
if err != nil {
util.Logger().Errorf(err, "Add schema info failed, check resource num failed, %s, %s", serviceId, schemaId)
return scerr.NewError(scerr.ErrUnavailableQuota, err.Error())
}
if !ok {
util.Logger().Errorf(err, "Add schema info failed, reach the max size of schema, %s, %s", serviceId, schemaId)
return scerr.NewError(scerr.ErrNotEnoughQuota, "add schema failed, reach max size of schema")
}
if len(request.Summary) == 0 {
util.Logger().Warnf(nil, "service %s schema %s is empty.", request.ServiceId, schemaId)
}
return nil
}
func (s *MicroServiceService) modifySchema(ctx context.Context, serviceId string, schema *pb.Schema) (error, bool) {
domainProject := util.ParseDomainProject(ctx)
schemaId := schema.SchemaId
service, err := serviceUtil.GetService(ctx, domainProject, serviceId)
if err != nil {
util.Logger().Errorf(err, "update schema failed, serviceId %s, schemaId %s: get service failed.", serviceId, schemaId)
return err, true
}
if service == nil {
util.Logger().Errorf(nil, "update schema failed, serviceId %s, schemaId %s: service not exist,%s", serviceId, schemaId)
return errors.New("service non-exist"), false
}
util.Logger().Infof("modify schema, serviceId %s", service.ServiceId)
pluginOps := make([]registry.PluginOp, 0)
isExist := isExistSchemaId(service, []*pb.Schema{schema})
if service.Environment == pb.ENV_PROD {
if len(service.Schemas) != 0 && !isExist {
return errors.New("non-exist schemaId, prod mode"), false
}
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, schemaId)
respSchema, err := store.Store().Schema().Search(ctx, registry.WithStrKey(key), registry.WithCountOnly())
if err != nil {
util.Logger().Errorf(err, "get schema summary failed, %s %s", serviceId, schemaId)
return err, true
}
if respSchema.Count != 0 {
if len(schema.Summary) == 0 {
util.Logger().Errorf(err, "prod mode, schema more exist, can not change, %s %s", serviceId, schemaId)
return errors.New("schema more exist, can not change, prod mode"), false
}
exist, err := isExistSchemaSummary(ctx, domainProject, serviceId, schemaId)
if err != nil {
util.Logger().Errorf(err, "check schema summary is exist failed")
return err, false
}
if exist {
util.Logger().Errorf(err, "prod mode, schema more exist, can not change, %s %s", serviceId, schemaId)
return errors.New("schema more exist, can not change, prod mode"), false
}
}
if len(service.Schemas) == 0 {
service.Schemas = append(service.Schemas, schemaId)
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
util.Logger().Errorf(err, "update service failed , %s", serviceId)
return err, true
}
pluginOps = append(pluginOps, opt)
}
} else {
if !isExist {
service.Schemas = append(service.Schemas, schemaId)
opt, err := serviceUtil.UpdateService(domainProject, serviceId, service)
if err != nil {
util.Logger().Errorf(err, "update service failed , %s", serviceId)
return err, true
}
pluginOps = append(pluginOps, opt)
}
}
opts := CommitSchemaInfo(domainProject, serviceId, schema)
pluginOps = append(pluginOps, opts...)
_, err = backend.Registry().Txn(ctx, pluginOps)
if err != nil {
util.Logger().Errorf(err, "commit update schema failed")
return err, true
}
return nil, false
}
func isExistSchemaSummary(ctx context.Context, domainProject, serviceId, schemaId string) (bool, error) {
key := apt.GenerateServiceSchemaSummaryKey(domainProject, serviceId, schemaId)
resp, err := store.Store().SchemaSummary().Search(ctx, registry.WithStrKey(key), registry.WithCountOnly())
if err != nil {
return true, err
}
if resp.Count == 0 {
return false, nil
}
return true, nil
}
func CommitSchemaInfo(domainProject string, serviceId string, schema *pb.Schema) []registry.PluginOp {
if len(schema.Summary) != 0 {
return schemaWithDatabaseOpera(registry.OpPut, domainProject, serviceId, schema)
} else {
key := apt.GenerateServiceSchemaKey(domainProject, serviceId, schema.SchemaId)
opt := registry.OpPut(registry.WithStrKey(key), registry.WithStrValue(schema.Schema))
return []registry.PluginOp{opt}
}
}
func containsValueInSlice(in []string, value string) bool {
if in == nil || len(value) == 0 {
return false
}
for _, i := range in {
if i == value {
return true
}
}
return false
}
func getSchemaSummary(ctx context.Context, domainProject string, serviceId string, schemaId string) (string, error) {
key := apt.GenerateServiceSchemaSummaryKey(domainProject, serviceId, schemaId)
resp, err := store.Store().SchemaSummary().Search(ctx,
registry.WithStrKey(key),
)
if err != nil {
util.Logger().Errorf(err, "get %s schema %s summary failed", serviceId, schemaId)
return "", err
}
if len(resp.Kvs) == 0 {
return "", nil
}
return util.BytesToStringWithNoCopy(resp.Kvs[0].Value), nil
}