[bugfix] missing sync task when sync is enabled (#1262)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index d20c6ee..917f270 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -1428,13 +1428,13 @@
}
service.Schemas = nonExistSchemaIds
- opt, err := eutil.UpdateService(domainProject, serviceID, service)
+ opts, err := eutil.UpdateService(ctx, domainProject, serviceID, service)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
serviceID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
} else {
if len(nonExistSchemaIds) != 0 {
errInfo := fmt.Errorf("non-existent schemaIDs %v", nonExistSchemaIds)
@@ -1447,7 +1447,10 @@
return pb.NewError(pb.ErrInternal, err.Error())
}
if !exist {
- opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, serviceID, needUpdateSchema)
+ opts, err := putSchema(ctx, domainProject, serviceID, needUpdateSchema)
+ if err != nil {
+ return pb.NewError(pb.ErrInternal, err.Error())
+ }
pluginOps = append(pluginOps, opts...)
} else {
log.Warn(fmt.Sprintf("schema[%s/%s] and it's summary already exist, skip to update, operator: %s",
@@ -1458,7 +1461,7 @@
for _, schema := range needAddSchemas {
log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, service.ServiceId, schema)
+ opts, _ := putSchema(ctx, domainProject, service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
}
} else {
@@ -1474,32 +1477,32 @@
var schemaIDs []string
for _, schema := range needAddSchemas {
log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, service.ServiceId, schema)
+ opts, _ := putSchema(ctx, domainProject, service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needUpdateSchemas {
log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, serviceID, schema)
+ opts, _ := putSchema(ctx, domainProject, serviceID, schema)
pluginOps = append(pluginOps, opts...)
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needDeleteSchemas {
log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpDel, domainProject, serviceID, schema)
+ opts, _ := deleteSchema(ctx, domainProject, serviceID, schema)
pluginOps = append(pluginOps, opts...)
}
service.Schemas = schemaIDs
- opt, err := eutil.UpdateService(domainProject, serviceID, service)
+ opts, err := eutil.UpdateService(ctx, domainProject, serviceID, service)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
serviceID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
}
if len(pluginOps) != 0 {
@@ -1575,28 +1578,31 @@
if len(microService.Schemas) == 0 {
microService.Schemas = append(microService.Schemas, schemaID)
- opt, err := eutil.UpdateService(domainProject, serviceID, microService)
+ opts, err := eutil.UpdateService(ctx, domainProject, serviceID, microService)
if err != nil {
log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
serviceID, schemaID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
}
} else {
if !isExist {
microService.Schemas = append(microService.Schemas, schemaID)
- opt, err := eutil.UpdateService(domainProject, serviceID, microService)
+ opts, err := eutil.UpdateService(ctx, domainProject, serviceID, microService)
if err != nil {
log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
serviceID, schemaID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
}
}
- opts := commitSchemaInfo(domainProject, serviceID, schema)
+ opts, err := commitSchemaInfo(ctx, domainProject, serviceID, schema)
+ if err != nil {
+ return pb.NewError(pb.ErrInternal, err.Error())
+ }
pluginOps = append(pluginOps, opts...)
resp, err := etcdadpt.TxnWithCmp(ctx, pluginOps,
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index 186749a..f32c5eb 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -253,6 +253,12 @@
serviceKey := path.GenerateServiceKey(domainProject, serviceID)
existContentOptions = append(existContentOptions,
etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body)))
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, body, sync.WithOpts(map[string]string{"key": serviceKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return err
+ }
+ existContentOptions = append(existContentOptions, syncOpts...)
}
newContentOptions := append(existContentOptions,
etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)))
@@ -389,14 +395,22 @@
log.Error(fmt.Sprintf("schema[%s] is reference by service", hash), nil)
return discovery.NewError(discovery.ErrInvalidParams, "Schema has reference.")
}
-
contentKey := path.GenerateServiceSchemaContentKey(domainProject, hash)
- success, err := etcdadpt.Delete(ctx, contentKey)
+ opts := []etcdadpt.OpOptions{
+ etcdadpt.OpDel(etcdadpt.WithStrKey(contentKey)),
+ }
+ delOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, contentKey, contentKey, sync.WithOpts(map[string]string{"key": contentKey}))
+ if err != nil {
+ log.Error("fail to create del opts", err)
+ return err
+ }
+ opts = append(opts, delOpts...)
+ resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.ExistKey(contentKey)), nil)
if err != nil {
log.Error(fmt.Sprintf("delete schema content[%s] failed", hash), err)
return err
}
- if !success {
+ if !resp.Succeeded {
log.Error(fmt.Sprintf("delete schema content[%s] failed", hash), schema.ErrSchemaContentNotFound)
return schema.ErrSchemaContentNotFound
}
diff --git a/datasource/etcd/schema_test.go b/datasource/etcd/schema_test.go
index c26b947..9106c57 100644
--- a/datasource/etcd/schema_test.go
+++ b/datasource/etcd/schema_test.go
@@ -113,7 +113,8 @@
}
tasks, err := task.List(schemaContext(), &listTaskReq)
assert.NoError(t, err)
- assert.Equal(t, 3, len(tasks))
+ // append the schemaID into service.Schemas if schemaID is new will create a kv task
+ assert.Equal(t, 4, len(tasks))
err = task.Delete(schemaContext(), tasks...)
assert.NoError(t, err)
})
diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
index 2bc8521..1fe4603 100644
--- a/datasource/etcd/sync_test.go
+++ b/datasource/etcd/sync_test.go
@@ -200,7 +200,8 @@
}
tasks, err := task.List(syncAllContext(), &listTaskReq)
assert.NoError(t, err)
- assert.Equal(t, 3, len(tasks))
+ // append the schemaID into service.Schemas if schemaID is new will create a kv task
+ assert.Equal(t, 4, len(tasks))
err = task.Delete(syncAllContext(), tasks...)
assert.NoError(t, err)
})
diff --git a/datasource/etcd/util.go b/datasource/etcd/util.go
index 2c641e0..25e9fd3 100644
--- a/datasource/etcd/util.go
+++ b/datasource/etcd/util.go
@@ -22,16 +22,18 @@
"fmt"
"strings"
- "github.com/apache/servicecomb-service-center/datasource"
- "github.com/apache/servicecomb-service-center/datasource/etcd/path"
- "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
- serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/util"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/go-chassis/foundation/gopool"
"github.com/little-cui/etcdadpt"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
)
type ServiceDetailOpt struct {
@@ -105,15 +107,50 @@
return true, nil
}
-func schemaWithDatabaseOpera(invoke etcdadpt.Operation, domainProject string, serviceID string, schema *pb.Schema) []etcdadpt.OpOptions {
- pluginOps := make([]etcdadpt.OpOptions, 0)
+func putSchema(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
+ opts := make([]etcdadpt.OpOptions, 0)
key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)
- opt := invoke(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
- pluginOps = append(pluginOps, opt)
+ onPutOpt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
+ opts = append(opts, onPutOpt)
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
keySummary := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schema.SchemaId)
- opt = invoke(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary))
- pluginOps = append(pluginOps, opt)
- return pluginOps
+ onPutOpt = etcdadpt.OpPut(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary))
+ opts = append(opts, onPutOpt)
+ syncOpts, err = sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Summary, sync.WithOpts(map[string]string{"key": keySummary}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
+}
+
+func deleteSchema(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
+ opts := make([]etcdadpt.OpOptions, 0)
+ key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)
+ onDelOpt := etcdadpt.OpDel(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
+ opts = append(opts, onDelOpt)
+ syncOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, key, schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ keySummary := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schema.SchemaId)
+ onDelOpt = etcdadpt.OpDel(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary))
+ opts = append(opts, onDelOpt)
+ syncOpts, err = sync.GenDeleteOpts(ctx, datasource.ResourceKV, keySummary, schema.Summary, sync.WithOpts(map[string]string{"key": keySummary}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
}
func isExistSchemaID(service *pb.MicroService, schemas []*pb.Schema) bool {
@@ -127,13 +164,22 @@
return true
}
-func commitSchemaInfo(domainProject string, serviceID string, schema *pb.Schema) []etcdadpt.OpOptions {
+func commitSchemaInfo(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
if len(schema.Summary) != 0 {
- return schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, serviceID, schema)
+ opts, err := putSchema(ctx, domainProject, serviceID, schema)
+ return opts, err
}
+ opts := make([]etcdadpt.OpOptions, 0)
key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)
opt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
- return []etcdadpt.OpOptions{opt}
+ opts = append(opts, opt)
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
}
func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) {
diff --git a/datasource/etcd/util/dependency_util.go b/datasource/etcd/util/dependency_util.go
index 601bc50..6065831 100644
--- a/datasource/etcd/util/dependency_util.go
+++ b/datasource/etcd/util/dependency_util.go
@@ -24,12 +24,15 @@
"fmt"
"strings"
- "github.com/apache/servicecomb-service-center/datasource/etcd/path"
- "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/util"
pb "github.com/go-chassis/cari/discovery"
"github.com/little-cui/etcdadpt"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+ esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
)
func GetConsumerIds(ctx context.Context, domainProject string, provider *pb.MicroService) ([]string, error) {
@@ -120,11 +123,19 @@
id := util.StringJoin([]string{provider.AppId, provider.ServiceName}, "_")
key := path.GenerateConsumerDependencyQueueKey(domainProject, consumer.ServiceId, id)
- override, err := etcdadpt.InsertBytes(ctx, key, data)
+ opts := make([]etcdadpt.OpOptions, 0)
+ opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
+ syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data, esync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return pb.NewError(pb.ErrInternal, err.Error())
+ }
+ opts = append(opts, syncOpts...)
+ resp, err := etcdadpt.Instance().TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotExistKey(key)), nil)
if err != nil {
return err
}
- if override {
+ if resp.Succeeded {
log.Info(fmt.Sprintf("put in queue[%s/%s]: consumer[%s/%s/%s/%s] -> provider[%s/%s/%s]", consumer.ServiceId, id,
consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version,
provider.Environment, provider.AppId, provider.ServiceName))
diff --git a/datasource/etcd/util/microservice_util.go b/datasource/etcd/util/microservice_util.go
index 997a6f5..ec596ec 100644
--- a/datasource/etcd/util/microservice_util.go
+++ b/datasource/etcd/util/microservice_util.go
@@ -23,15 +23,17 @@
"fmt"
"strings"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/little-cui/etcdadpt"
+
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/little-cui/etcdadpt"
)
/*
@@ -237,16 +239,23 @@
return services, nil
}
-func UpdateService(domainProject string, serviceID string, service *pb.MicroService) (opt etcdadpt.OpOptions, err error) {
- opt = etcdadpt.OpOptions{}
+func UpdateService(ctx context.Context, domainProject string, serviceID string, service *pb.MicroService) ([]etcdadpt.OpOptions, error) {
+ opts := make([]etcdadpt.OpOptions, 0)
key := path.GenerateServiceKey(domainProject, serviceID)
data, err := json.Marshal(service)
if err != nil {
log.Error("marshal service file failed", err)
- return
+ return opts, err
}
- opt = etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))
- return
+ opt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))
+ opts = append(opts, opt)
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, data, sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
}
func GetOneDomainProjectServiceCount(ctx context.Context, domainProject string) (int64, error) {
diff --git a/server/service/rbac/rbac.go b/server/service/rbac/rbac.go
index 58047b2..7cc2ffd 100644
--- a/server/service/rbac/rbac.go
+++ b/server/service/rbac/rbac.go
@@ -23,14 +23,16 @@
"errors"
"io/ioutil"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/server/config"
- "github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/go-chassis/cari/rbac"
"github.com/go-chassis/go-archaius"
"github.com/go-chassis/go-chassis/v2/security/authr"
"github.com/go-chassis/go-chassis/v2/security/secret"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/config"
+ "github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
+ "github.com/apache/servicecomb-service-center/server/service/sync"
)
const (
@@ -131,7 +133,8 @@
Roles: []string{rbac.RoleAdmin},
Password: pwd,
}
- err := CreateAccount(context.Background(), a)
+ ctx := sync.SetContext(context.Background())
+ err := CreateAccount(ctx, a)
if err == nil {
log.Info("root account init success")
return