[bugfix] missing sync task when sync is enabled (#1262)

diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index d20c6ee..917f270 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -1428,13 +1428,13 @@
 			}
 
 			service.Schemas = nonExistSchemaIds
-			opt, err := eutil.UpdateService(domainProject, serviceID, service)
+			opts, err := eutil.UpdateService(ctx, domainProject, serviceID, service)
 			if err != nil {
 				log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
 					serviceID, remoteIP), err)
 				return pb.NewError(pb.ErrInternal, err.Error())
 			}
-			pluginOps = append(pluginOps, opt)
+			pluginOps = append(pluginOps, opts...)
 		} else {
 			if len(nonExistSchemaIds) != 0 {
 				errInfo := fmt.Errorf("non-existent schemaIDs %v", nonExistSchemaIds)
@@ -1447,7 +1447,10 @@
 					return pb.NewError(pb.ErrInternal, err.Error())
 				}
 				if !exist {
-					opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, serviceID, needUpdateSchema)
+					opts, err := putSchema(ctx, domainProject, serviceID, needUpdateSchema)
+					if err != nil {
+						return pb.NewError(pb.ErrInternal, err.Error())
+					}
 					pluginOps = append(pluginOps, opts...)
 				} else {
 					log.Warn(fmt.Sprintf("schema[%s/%s] and it's summary already exist, skip to update, operator: %s",
@@ -1458,7 +1461,7 @@
 
 		for _, schema := range needAddSchemas {
 			log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
-			opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, service.ServiceId, schema)
+			opts, _ := putSchema(ctx, domainProject, service.ServiceId, schema)
 			pluginOps = append(pluginOps, opts...)
 		}
 	} else {
@@ -1474,32 +1477,32 @@
 		var schemaIDs []string
 		for _, schema := range needAddSchemas {
 			log.Info(fmt.Sprintf("add new schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
-			opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, service.ServiceId, schema)
+			opts, _ := putSchema(ctx, domainProject, service.ServiceId, schema)
 			pluginOps = append(pluginOps, opts...)
 			schemaIDs = append(schemaIDs, schema.SchemaId)
 		}
 
 		for _, schema := range needUpdateSchemas {
 			log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
-			opts := schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, serviceID, schema)
+			opts, _ := putSchema(ctx, domainProject, serviceID, schema)
 			pluginOps = append(pluginOps, opts...)
 			schemaIDs = append(schemaIDs, schema.SchemaId)
 		}
 
 		for _, schema := range needDeleteSchemas {
 			log.Info(fmt.Sprintf("delete non-existent schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
-			opts := schemaWithDatabaseOpera(etcdadpt.OpDel, domainProject, serviceID, schema)
+			opts, _ := deleteSchema(ctx, domainProject, serviceID, schema)
 			pluginOps = append(pluginOps, opts...)
 		}
 
 		service.Schemas = schemaIDs
-		opt, err := eutil.UpdateService(domainProject, serviceID, service)
+		opts, err := eutil.UpdateService(ctx, domainProject, serviceID, service)
 		if err != nil {
 			log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
 				serviceID, remoteIP), err)
 			return pb.NewError(pb.ErrInternal, err.Error())
 		}
-		pluginOps = append(pluginOps, opt)
+		pluginOps = append(pluginOps, opts...)
 	}
 
 	if len(pluginOps) != 0 {
@@ -1575,28 +1578,31 @@
 
 		if len(microService.Schemas) == 0 {
 			microService.Schemas = append(microService.Schemas, schemaID)
-			opt, err := eutil.UpdateService(domainProject, serviceID, microService)
+			opts, err := eutil.UpdateService(ctx, domainProject, serviceID, microService)
 			if err != nil {
 				log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
 					serviceID, schemaID, remoteIP), err)
 				return pb.NewError(pb.ErrInternal, err.Error())
 			}
-			pluginOps = append(pluginOps, opt)
+			pluginOps = append(pluginOps, opts...)
 		}
 	} else {
 		if !isExist {
 			microService.Schemas = append(microService.Schemas, schemaID)
-			opt, err := eutil.UpdateService(domainProject, serviceID, microService)
+			opts, err := eutil.UpdateService(ctx, domainProject, serviceID, microService)
 			if err != nil {
 				log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
 					serviceID, schemaID, remoteIP), err)
 				return pb.NewError(pb.ErrInternal, err.Error())
 			}
-			pluginOps = append(pluginOps, opt)
+			pluginOps = append(pluginOps, opts...)
 		}
 	}
 
-	opts := commitSchemaInfo(domainProject, serviceID, schema)
+	opts, err := commitSchemaInfo(ctx, domainProject, serviceID, schema)
+	if err != nil {
+		return pb.NewError(pb.ErrInternal, err.Error())
+	}
 	pluginOps = append(pluginOps, opts...)
 
 	resp, err := etcdadpt.TxnWithCmp(ctx, pluginOps,
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index 186749a..f32c5eb 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -253,6 +253,12 @@
 		serviceKey := path.GenerateServiceKey(domainProject, serviceID)
 		existContentOptions = append(existContentOptions,
 			etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body)))
+		syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, body, sync.WithOpts(map[string]string{"key": serviceKey}))
+		if err != nil {
+			log.Error("fail to create update opts", err)
+			return err
+		}
+		existContentOptions = append(existContentOptions, syncOpts...)
 	}
 	newContentOptions := append(existContentOptions,
 		etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)))
@@ -389,14 +395,22 @@
 		log.Error(fmt.Sprintf("schema[%s] is reference by service", hash), nil)
 		return discovery.NewError(discovery.ErrInvalidParams, "Schema has reference.")
 	}
-
 	contentKey := path.GenerateServiceSchemaContentKey(domainProject, hash)
-	success, err := etcdadpt.Delete(ctx, contentKey)
+	opts := []etcdadpt.OpOptions{
+		etcdadpt.OpDel(etcdadpt.WithStrKey(contentKey)),
+	}
+	delOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, contentKey, contentKey, sync.WithOpts(map[string]string{"key": contentKey}))
+	if err != nil {
+		log.Error("fail to create del opts", err)
+		return err
+	}
+	opts = append(opts, delOpts...)
+	resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.ExistKey(contentKey)), nil)
 	if err != nil {
 		log.Error(fmt.Sprintf("delete schema content[%s] failed", hash), err)
 		return err
 	}
-	if !success {
+	if !resp.Succeeded {
 		log.Error(fmt.Sprintf("delete schema content[%s] failed", hash), schema.ErrSchemaContentNotFound)
 		return schema.ErrSchemaContentNotFound
 	}
diff --git a/datasource/etcd/schema_test.go b/datasource/etcd/schema_test.go
index c26b947..9106c57 100644
--- a/datasource/etcd/schema_test.go
+++ b/datasource/etcd/schema_test.go
@@ -113,7 +113,8 @@
 			}
 			tasks, err := task.List(schemaContext(), &listTaskReq)
 			assert.NoError(t, err)
-			assert.Equal(t, 3, len(tasks))
+			// append the schemaID into service.Schemas if schemaID is new will create a kv task
+			assert.Equal(t, 4, len(tasks))
 			err = task.Delete(schemaContext(), tasks...)
 			assert.NoError(t, err)
 		})
diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
index 2bc8521..1fe4603 100644
--- a/datasource/etcd/sync_test.go
+++ b/datasource/etcd/sync_test.go
@@ -200,7 +200,8 @@
 			}
 			tasks, err := task.List(syncAllContext(), &listTaskReq)
 			assert.NoError(t, err)
-			assert.Equal(t, 3, len(tasks))
+			// append the schemaID into service.Schemas if schemaID is new will create a kv task
+			assert.Equal(t, 4, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
 		})
diff --git a/datasource/etcd/util.go b/datasource/etcd/util.go
index 2c641e0..25e9fd3 100644
--- a/datasource/etcd/util.go
+++ b/datasource/etcd/util.go
@@ -22,16 +22,18 @@
 	"fmt"
 	"strings"
 
-	"github.com/apache/servicecomb-service-center/datasource"
-	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
-	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-	serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/util"
 	pb "github.com/go-chassis/cari/discovery"
 	"github.com/go-chassis/cari/pkg/errsvc"
 	"github.com/go-chassis/foundation/gopool"
 	"github.com/little-cui/etcdadpt"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
 type ServiceDetailOpt struct {
@@ -105,15 +107,50 @@
 	return true, nil
 }
 
-func schemaWithDatabaseOpera(invoke etcdadpt.Operation, domainProject string, serviceID string, schema *pb.Schema) []etcdadpt.OpOptions {
-	pluginOps := make([]etcdadpt.OpOptions, 0)
+func putSchema(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
+	opts := make([]etcdadpt.OpOptions, 0)
 	key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)
-	opt := invoke(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
-	pluginOps = append(pluginOps, opt)
+	onPutOpt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
+	opts = append(opts, onPutOpt)
+	syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return opts, err
+	}
+	opts = append(opts, syncOpts...)
 	keySummary := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schema.SchemaId)
-	opt = invoke(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary))
-	pluginOps = append(pluginOps, opt)
-	return pluginOps
+	onPutOpt = etcdadpt.OpPut(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary))
+	opts = append(opts, onPutOpt)
+	syncOpts, err = sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Summary, sync.WithOpts(map[string]string{"key": keySummary}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return opts, err
+	}
+	opts = append(opts, syncOpts...)
+	return opts, nil
+}
+
+func deleteSchema(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
+	opts := make([]etcdadpt.OpOptions, 0)
+	key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)
+	onDelOpt := etcdadpt.OpDel(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
+	opts = append(opts, onDelOpt)
+	syncOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, key, schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return opts, err
+	}
+	opts = append(opts, syncOpts...)
+	keySummary := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schema.SchemaId)
+	onDelOpt = etcdadpt.OpDel(etcdadpt.WithStrKey(keySummary), etcdadpt.WithStrValue(schema.Summary))
+	opts = append(opts, onDelOpt)
+	syncOpts, err = sync.GenDeleteOpts(ctx, datasource.ResourceKV, keySummary, schema.Summary, sync.WithOpts(map[string]string{"key": keySummary}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return opts, err
+	}
+	opts = append(opts, syncOpts...)
+	return opts, nil
 }
 
 func isExistSchemaID(service *pb.MicroService, schemas []*pb.Schema) bool {
@@ -127,13 +164,22 @@
 	return true
 }
 
-func commitSchemaInfo(domainProject string, serviceID string, schema *pb.Schema) []etcdadpt.OpOptions {
+func commitSchemaInfo(ctx context.Context, domainProject string, serviceID string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
 	if len(schema.Summary) != 0 {
-		return schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, serviceID, schema)
+		opts, err := putSchema(ctx, domainProject, serviceID, schema)
+		return opts, err
 	}
+	opts := make([]etcdadpt.OpOptions, 0)
 	key := path.GenerateServiceSchemaKey(domainProject, serviceID, schema.SchemaId)
 	opt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithStrValue(schema.Schema))
-	return []etcdadpt.OpOptions{opt}
+	opts = append(opts, opt)
+	syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return opts, err
+	}
+	opts = append(opts, syncOpts...)
+	return opts, nil
 }
 
 func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) {
diff --git a/datasource/etcd/util/dependency_util.go b/datasource/etcd/util/dependency_util.go
index 601bc50..6065831 100644
--- a/datasource/etcd/util/dependency_util.go
+++ b/datasource/etcd/util/dependency_util.go
@@ -24,12 +24,15 @@
 	"fmt"
 	"strings"
 
-	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
-	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/util"
 	pb "github.com/go-chassis/cari/discovery"
 	"github.com/little-cui/etcdadpt"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
 func GetConsumerIds(ctx context.Context, domainProject string, provider *pb.MicroService) ([]string, error) {
@@ -120,11 +123,19 @@
 
 	id := util.StringJoin([]string{provider.AppId, provider.ServiceName}, "_")
 	key := path.GenerateConsumerDependencyQueueKey(domainProject, consumer.ServiceId, id)
-	override, err := etcdadpt.InsertBytes(ctx, key, data)
+	opts := make([]etcdadpt.OpOptions, 0)
+	opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
+	syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data, esync.WithOpts(map[string]string{"key": key}))
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return pb.NewError(pb.ErrInternal, err.Error())
+	}
+	opts = append(opts, syncOpts...)
+	resp, err := etcdadpt.Instance().TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotExistKey(key)), nil)
 	if err != nil {
 		return err
 	}
-	if override {
+	if resp.Succeeded {
 		log.Info(fmt.Sprintf("put in queue[%s/%s]: consumer[%s/%s/%s/%s] -> provider[%s/%s/%s]", consumer.ServiceId, id,
 			consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version,
 			provider.Environment, provider.AppId, provider.ServiceName))
diff --git a/datasource/etcd/util/microservice_util.go b/datasource/etcd/util/microservice_util.go
index 997a6f5..ec596ec 100644
--- a/datasource/etcd/util/microservice_util.go
+++ b/datasource/etcd/util/microservice_util.go
@@ -23,15 +23,17 @@
 	"fmt"
 	"strings"
 
+	pb "github.com/go-chassis/cari/discovery"
+	"github.com/little-cui/etcdadpt"
+
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/config"
-	pb "github.com/go-chassis/cari/discovery"
-	"github.com/little-cui/etcdadpt"
 )
 
 /*
@@ -237,16 +239,23 @@
 	return services, nil
 }
 
-func UpdateService(domainProject string, serviceID string, service *pb.MicroService) (opt etcdadpt.OpOptions, err error) {
-	opt = etcdadpt.OpOptions{}
+func UpdateService(ctx context.Context, domainProject string, serviceID string, service *pb.MicroService) ([]etcdadpt.OpOptions, error) {
+	opts := make([]etcdadpt.OpOptions, 0)
 	key := path.GenerateServiceKey(domainProject, serviceID)
 	data, err := json.Marshal(service)
 	if err != nil {
 		log.Error("marshal service file failed", err)
-		return
+		return opts, err
 	}
-	opt = etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))
-	return
+	opt := etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))
+	opts = append(opts, opt)
+	syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, data, sync.WithOpts(map[string]string{"key": key}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return opts, err
+	}
+	opts = append(opts, syncOpts...)
+	return opts, nil
 }
 
 func GetOneDomainProjectServiceCount(ctx context.Context, domainProject string) (int64, error) {
diff --git a/server/service/rbac/rbac.go b/server/service/rbac/rbac.go
index 58047b2..7cc2ffd 100644
--- a/server/service/rbac/rbac.go
+++ b/server/service/rbac/rbac.go
@@ -23,14 +23,16 @@
 	"errors"
 	"io/ioutil"
 
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/server/config"
-	"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
 	"github.com/go-chassis/cari/pkg/errsvc"
 	"github.com/go-chassis/cari/rbac"
 	"github.com/go-chassis/go-archaius"
 	"github.com/go-chassis/go-chassis/v2/security/authr"
 	"github.com/go-chassis/go-chassis/v2/security/secret"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/server/config"
+	"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
+	"github.com/apache/servicecomb-service-center/server/service/sync"
 )
 
 const (
@@ -131,7 +133,8 @@
 		Roles:    []string{rbac.RoleAdmin},
 		Password: pwd,
 	}
-	err := CreateAccount(context.Background(), a)
+	ctx := sync.SetContext(context.Background())
+	err := CreateAccount(ctx, a)
 	if err == nil {
 		log.Info("root account init success")
 		return