Syncer supports to enable rbac (#1487)

diff --git a/.github/workflows/eventbase-ci.yml b/.github/workflows/eventbase-ci.yml
index b0918b7..87dfd75 100644
--- a/.github/workflows/eventbase-ci.yml
+++ b/.github/workflows/eventbase-ci.yml
@@ -13,7 +13,7 @@
         uses: actions/checkout@v1
       - name: UT test
         run: |
-          sudo docker-compose -f ./scripts/docker-compose.yaml up -d
+          sudo docker compose -f ./scripts/docker-compose.yaml up -d
           sleep 20
           export TEST_DB_MODE=mongo
           export TEST_DB_URI=mongodb://127.0.0.1:27017
@@ -31,7 +31,7 @@
         uses: actions/checkout@v1
       - name: UT for etcd
         run: |
-          time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
+          time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
           while ! nc -z 127.0.0.1 2379; do
             sleep 1
           done
diff --git a/.github/workflows/static_check.yml b/.github/workflows/static_check.yml
index 404a1c8..4b3394c 100644
--- a/.github/workflows/static_check.yml
+++ b/.github/workflows/static_check.yml
@@ -40,7 +40,7 @@
         uses: actions/checkout@v1
       - name: UT-MONGO
         run: |
-          sudo docker-compose -f ./scripts/docker-compose.yaml up -d
+          sudo docker compose -f ./scripts/docker-compose.yaml up -d
           sleep 20
           bash -x scripts/ut_test_in_docker.sh mongo
   integration-test:
diff --git a/docker-compose.yml b/docker-compose.yml
index c7be231..e50d3d9 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -17,7 +17,7 @@
 version: '3'
 services:
   etcd:
-    image: 'quay.io/coreos/etcd:latest'
+    image: 'quay.io/coreos/etcd:v3.5.15'
     # restart: always
     #ports:
     #  - "2379:2379"
diff --git a/etc/conf/syncer.yaml b/etc/conf/syncer.yaml
index 784f86a..11b65a6 100644
--- a/etc/conf/syncer.yaml
+++ b/etc/conf/syncer.yaml
@@ -1,11 +1,13 @@
 sync:
   enableOnStart: false
+  rbacEnabled: false
   peers:
     - name: dc
       kind: servicecomb
       endpoints: ["127.0.0.1:30105"]
       # only allow mode implemented in incremental approach like push, watch(such as pub/sub, long polling)
       mode: [push]
+      token:
   tombstone:
     retire:
       # use linux crontab not Quartz cron
diff --git a/scripts/integration_test.sh b/scripts/integration_test.sh
index 6b4c4a9..a5984f7 100755
--- a/scripts/integration_test.sh
+++ b/scripts/integration_test.sh
@@ -41,7 +41,7 @@
 docker rm -f etcd
 kill -9 $(ps aux | grep 'service-center' | awk '{print $2}')
 set -e
-sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
+sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
 while ! nc -z 127.0.0.1 2379; do
   echo "Waiting Etcd to launch on 2379..."
   sleep 1
diff --git a/scripts/ut_test_in_docker.sh b/scripts/ut_test_in_docker.sh
index 946305c..422b722 100644
--- a/scripts/ut_test_in_docker.sh
+++ b/scripts/ut_test_in_docker.sh
@@ -31,7 +31,7 @@
 
 if [ "${db_name}" == "etcd" ];then
   echo "${green}Starting etcd in docker${reset}"
-  docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
+  docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
   while ! nc -z 127.0.0.1 2379; do
     echo "Waiting Etcd to launch on 2379..."
     sleep 1
diff --git a/syncer/config/config.go b/syncer/config/config.go
index de01f5e..a568d5c 100644
--- a/syncer/config/config.go
+++ b/syncer/config/config.go
@@ -21,9 +21,10 @@
 	"fmt"
 	"path/filepath"
 
+	"github.com/go-chassis/go-archaius"
+
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/go-chassis/go-archaius"
 )
 
 var config Config
@@ -33,8 +34,12 @@
 }
 
 type Sync struct {
-	EnableOnStart bool    `yaml:"enableOnStart"`
-	Peers         []*Peer `yaml:"peers"`
+	EnableOnStart bool `yaml:"enableOnStart"`
+	// When RbacEnabled is true, syncer's API requires the rbac token,
+	// and service-center also provides the rbac token to communicate with peer.
+	// At the same time, service-center rbac must be enabled.
+	RbacEnabled bool    `yaml:"rbacEnabled"`
+	Peers       []*Peer `yaml:"peers"`
 }
 
 type Peer struct {
@@ -42,6 +47,8 @@
 	Kind      string   `yaml:"kind"`
 	Endpoints []string `yaml:"endpoints"`
 	Mode      []string `yaml:"mode"`
+	// The token to communicate with peer, this takes effect only when RbacEnabled is true
+	Token string `yaml:"token"`
 }
 
 func Init() error {
diff --git a/syncer/rpc/auth.go b/syncer/rpc/auth.go
new file mode 100644
index 0000000..9f3ce60
--- /dev/null
+++ b/syncer/rpc/auth.go
@@ -0,0 +1,63 @@
+package rpc
+
+import (
+	"context"
+	"fmt"
+	"strings"
+
+	"github.com/go-chassis/cari/rbac"
+	"github.com/go-chassis/go-chassis/v2/security/authr"
+	"github.com/go-chassis/go-chassis/v2/server/restful"
+	"google.golang.org/grpc/metadata"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/syncer/config"
+)
+
+var errWrongAccountNorRole = fmt.Errorf("account should be %s, and roles should contain %s", RbacAllowedAccountName, RbacAllowedRoleName)
+
+func auth(ctx context.Context) error {
+	if !config.GetConfig().Sync.RbacEnabled {
+		return nil
+	}
+	md, ok := metadata.FromIncomingContext(ctx)
+	if !ok {
+		return rbac.NewError(rbac.ErrNoAuthHeader, "")
+	}
+
+	authHeader := md.Get(restful.HeaderAuth)
+	if len(authHeader) == 0 {
+		return rbac.NewError(rbac.ErrNoAuthHeader, fmt.Sprintf("header %s not found nor content empty", restful.HeaderAuth))
+	}
+
+	s := strings.Split(authHeader[0], " ")
+	if len(s) != 2 {
+		return rbac.ErrInvalidHeader
+	}
+	to := s[1]
+
+	claims, err := authr.Authenticate(ctx, to)
+	if err != nil {
+		return err
+	}
+	m, ok := claims.(map[string]interface{})
+	if !ok {
+		log.Error("claims convert failed", rbac.ErrConvert)
+		return rbac.ErrConvert
+	}
+	account, err := rbac.GetAccount(m)
+	if err != nil {
+		log.Error("get account from token failed", err)
+		return err
+	}
+
+	if account.Name != RbacAllowedAccountName {
+		return errWrongAccountNorRole
+	}
+	for _, role := range account.Roles {
+		if role == RbacAllowedRoleName {
+			return nil
+		}
+	}
+	return errWrongAccountNorRole
+}
diff --git a/syncer/rpc/auth_test.go b/syncer/rpc/auth_test.go
new file mode 100644
index 0000000..2cc1bcb
--- /dev/null
+++ b/syncer/rpc/auth_test.go
@@ -0,0 +1,130 @@
+package rpc
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"testing"
+
+	"github.com/go-chassis/cari/pkg/errsvc"
+	"github.com/go-chassis/cari/rbac"
+	"github.com/go-chassis/go-chassis/v2/security/authr"
+	"github.com/go-chassis/go-chassis/v2/server/restful"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/metadata"
+
+	"github.com/apache/servicecomb-service-center/syncer/config"
+)
+
+type testAuth struct{}
+
+func (testAuth) Login(ctx context.Context, user string, password string, opts ...authr.LoginOption) (string, error) {
+	return "", nil
+}
+
+func (testAuth) Authenticate(ctx context.Context, token string) (interface{}, error) {
+	var claim map[string]interface{}
+	return claim, json.Unmarshal([]byte(token), &claim)
+}
+
+func Test_auth(t *testing.T) {
+	// use the custom auth plugin
+	authr.Install("test", func(opts *authr.Options) (authr.Authenticator, error) {
+		return testAuth{}, nil
+	})
+	assert.NoError(t, authr.Init(authr.WithPlugin("test")))
+
+	type args struct {
+		ctx context.Context
+	}
+	tests := []struct {
+		name    string
+		preDo   func()
+		args    args
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name: "sync rbac disables",
+			preDo: func() {
+				config.SetConfig(config.Config{
+					Sync: &config.Sync{
+						RbacEnabled: false,
+					}})
+			},
+			args: args{
+				ctx: context.Background(), // rbac disabled, empty ctx should pass the auth
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "no header",
+			preDo: func() {
+				config.SetConfig(config.Config{
+					Sync: &config.Sync{
+						RbacEnabled: true,
+					}})
+			},
+			args: args{
+				ctx: context.Background(), // rbac enabled, empty ctx should not pass the auth
+			},
+			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+				var errSvcErr *errsvc.Error
+				ok := errors.As(err, &errSvcErr)
+				assert.True(t, ok)
+
+				return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code)
+			},
+		},
+		{
+			name: "with header but no auth header",
+			args: args{
+				ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"fake": "fake"})),
+			},
+			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+				var errSvcErr *errsvc.Error
+				ok := errors.As(err, &errSvcErr)
+				assert.True(t, ok)
+
+				return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code)
+			},
+		},
+		{
+			name: "auth header format error",
+			args: args{
+				ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{restful.HeaderAuth: "fake"})),
+			},
+			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+				return assert.Equal(t, rbac.ErrInvalidHeader, err)
+			},
+		},
+		{
+			name: "wrong account nor role",
+			args: args{
+				ctx: metadata.NewIncomingContext(context.Background(),
+					metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"x","roles":["x"]}`})),
+			},
+			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+				return assert.Equal(t, errWrongAccountNorRole, err)
+			},
+		},
+		{
+			name: "valid token",
+			args: args{
+				ctx: metadata.NewIncomingContext(context.Background(),
+					metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"sync-user","roles":["sync-admin"]}`})),
+			},
+			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+				return assert.NoError(t, err)
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.preDo != nil {
+				tt.preDo()
+			}
+			tt.wantErr(t, auth(tt.args.ctx), fmt.Sprintf("auth(%v)", tt.args.ctx))
+		})
+	}
+}
diff --git a/syncer/rpc/server.go b/syncer/rpc/server.go
index 990f789..dc6efee 100644
--- a/syncer/rpc/server.go
+++ b/syncer/rpc/server.go
@@ -22,18 +22,21 @@
 	"fmt"
 	"time"
 
-	"github.com/apache/servicecomb-service-center/syncer/service/replicator"
-	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
-
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
 	"github.com/apache/servicecomb-service-center/syncer/config"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
 )
 
 const (
 	HealthStatusConnected = "CONNECTED"
 	HealthStatusAbnormal  = "ABNORMAL"
 	HealthStatusClose     = "CLOSE"
+	HealthStatusAuthFail  = "AuthFail"
+
+	RbacAllowedAccountName = "sync-user"
+	RbacAllowedRoleName    = "sync-admin"
 )
 
 func NewServer() *Server {
@@ -49,6 +52,12 @@
 }
 
 func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Results, error) {
+	err := auth(ctx)
+	if err != nil {
+		log.Error("auth failed", err)
+		return generateFailedResults(events, err)
+	}
+
 	log.Info(fmt.Sprintf("start sync: %s", events.Flag()))
 
 	res := s.replicator.Persist(ctx, events)
@@ -56,6 +65,20 @@
 	return s.toResults(res), nil
 }
 
+func generateFailedResults(events *v1sync.EventList, err error) (*v1sync.Results, error) {
+	if events == nil || len(events.Events) == 0 {
+		return &v1sync.Results{Results: map[string]*v1sync.Result{}}, nil
+	}
+	rsts := make(map[string]*v1sync.Result, len(events.Events))
+	for _, evt := range events.Events {
+		rsts[evt.Id] = &v1sync.Result{
+			Code:    resource.Fail,
+			Message: err.Error(),
+		}
+	}
+	return &v1sync.Results{Results: rsts}, nil
+}
+
 func (s *Server) toResults(results []*resource.Result) *v1sync.Results {
 	syncResult := make(map[string]*v1sync.Result, len(results))
 	for _, r := range results {
@@ -69,11 +92,18 @@
 	}
 }
 
-func (s *Server) Health(_ context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) {
+func (s *Server) Health(ctx context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) {
 	resp := &v1sync.HealthReply{
 		Status:         HealthStatusConnected,
 		LocalTimestamp: time.Now().UnixNano(),
 	}
+	err := auth(ctx)
+	if err != nil {
+		resp.Status = HealthStatusAuthFail
+		log.Error("auth failed", err)
+		return resp, nil
+	}
+
 	// TODO enable to close syncer
 	if !config.GetConfig().Sync.EnableOnStart {
 		resp.Status = HealthStatusClose
diff --git a/syncer/rpc/server_test.go b/syncer/rpc/server_test.go
new file mode 100644
index 0000000..4123c1a
--- /dev/null
+++ b/syncer/rpc/server_test.go
@@ -0,0 +1,85 @@
+package rpc
+
+import (
+	"context"
+	"reflect"
+	"testing"
+
+	"github.com/go-chassis/cari/rbac"
+	"github.com/stretchr/testify/assert"
+
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+	"github.com/apache/servicecomb-service-center/syncer/config"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+)
+
+type testReplicator struct{}
+
+func (testReplicator) Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) {
+	return &v1sync.Results{Results: map[string]*v1sync.Result{
+		"constant_id": &v1sync.Result{
+			Code: resource.Success,
+		},
+	}}, nil
+}
+
+func (testReplicator) Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result {
+	return []*resource.Result{
+		&resource.Result{
+			EventID: "constant_id",
+			Status:  resource.Success,
+		},
+	}
+}
+
+func TestServer_Sync(t *testing.T) {
+	s := NewServer()
+
+	// rbac enabled, should sync failed and return auth failed message
+	config.SetConfig(config.Config{
+		Sync: &config.Sync{
+			RbacEnabled: true,
+		}})
+	events := &v1sync.EventList{Events: []*v1sync.Event{
+		{
+			Id: "evt1",
+		},
+		{
+			Id: "evt2",
+		},
+	}}
+
+	expectedRst := map[string]*v1sync.Result{
+		"evt1": &v1sync.Result{
+			Code:    resource.Fail,
+			Message: rbac.NewError(rbac.ErrNoAuthHeader, "").Error(),
+		},
+
+		"evt2": &v1sync.Result{
+			Code:    resource.Fail,
+			Message: rbac.NewError(rbac.ErrNoAuthHeader, "").Error(),
+		},
+	}
+	rst, err := s.Sync(context.Background(), events)
+	assert.NoError(t, err)
+	assert.True(t, reflect.DeepEqual(expectedRst, rst.Results))
+
+	rst, err = s.Sync(context.Background(), nil) // nil input
+	assert.NoError(t, err)
+	assert.Equal(t, 0, len(rst.Results))
+
+	// rbac disabled, should sync success(with the mock replicator)
+	config.SetConfig(config.Config{
+		Sync: &config.Sync{
+			RbacEnabled: false,
+		}})
+	expectedRst = map[string]*v1sync.Result{
+		"constant_id": &v1sync.Result{
+			Code: resource.Success,
+		},
+	}
+	s.replicator = &testReplicator{}
+	rst, err = s.Sync(context.Background(), events)
+	assert.NoError(t, err)
+	assert.True(t, reflect.DeepEqual(expectedRst, rst.Results))
+}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index c880d55..095ae61 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -18,6 +18,9 @@
 package server
 
 import (
+	"github.com/go-chassis/go-chassis/v2"
+	chassisServer "github.com/go-chassis/go-chassis/v2/core/server"
+
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	syncv1 "github.com/apache/servicecomb-service-center/syncer/api/v1"
 	"github.com/apache/servicecomb-service-center/syncer/config"
@@ -25,8 +28,6 @@
 	"github.com/apache/servicecomb-service-center/syncer/rpc"
 	"github.com/apache/servicecomb-service-center/syncer/service/admin"
 	"github.com/apache/servicecomb-service-center/syncer/service/sync"
-	"github.com/go-chassis/go-chassis/v2"
-	chassisServer "github.com/go-chassis/go-chassis/v2/core/server"
 )
 
 // Run register chassis schema and run syncer services before chassis.Run()
@@ -40,6 +41,15 @@
 		return
 	}
 
+	if len(config.GetConfig().Sync.Peers) <= 0 {
+		log.Warn("peers parameter configuration is empty")
+		return
+	}
+
+	if config.GetConfig().Sync.RbacEnabled {
+		log.Info("syncer rbac enabled")
+	}
+
 	chassis.RegisterSchema("grpc", rpc.NewServer(),
 		chassisServer.WithRPCServiceDesc(&syncv1.EventService_ServiceDesc))
 
diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go
index 99c9db7..cdba735 100644
--- a/syncer/service/admin/health.go
+++ b/syncer/service/admin/health.go
@@ -20,13 +20,17 @@
 import (
 	"context"
 	"errors"
+	"fmt"
 	"time"
 
+	"github.com/go-chassis/go-chassis/v2/server/restful"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
 
 	"github.com/apache/servicecomb-service-center/client"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc"
+	"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
 	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
 	syncerclient "github.com/apache/servicecomb-service-center/syncer/client"
 	"github.com/apache/servicecomb-service-center/syncer/config"
@@ -59,25 +63,15 @@
 	Mode      []string `json:"mode"`
 	Endpoints []string `json:"endpoints"`
 	Status    string   `json:"status"`
+	Token     string   `json:"-"`
 }
 
 func Init() {
 	cfg := config.GetConfig()
-	if cfg.Sync == nil {
-		log.Warn("sync config is empty")
-		return
-	}
-	if !cfg.Sync.EnableOnStart {
-		log.Info("syncer is disabled")
-		return
-	}
-	if len(cfg.Sync.Peers) <= 0 {
-		log.Warn("peers parameter configuration is empty")
-		return
-	}
 	peerInfos = make([]*PeerInfo, 0, len(cfg.Sync.Peers))
 	for _, c := range cfg.Sync.Peers {
 		if len(c.Endpoints) <= 0 {
+			log.Warn("no endpoints of peer: " + c.Name)
 			continue
 		}
 		p := &Peer{
@@ -86,10 +80,21 @@
 			Mode:      c.Mode,
 			Endpoints: c.Endpoints,
 		}
-		conn, err := newRPCConn(p.Endpoints)
-		if err == nil {
-			peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn})
+		if config.GetConfig().Sync.RbacEnabled {
+			plainToken, err := cipher.Decrypt(c.Token)
+			if err != nil {
+				log.Error(fmt.Sprintf("decrypt token of peer %s failed, use original content", c.Name), err)
+				plainToken = c.Token
+			}
+			p.Token = plainToken
 		}
+
+		conn, err := newRPCConn(p.Endpoints)
+		if err != nil {
+			log.Error(fmt.Sprintf("new client failed for peer: %s", c.Name), err)
+			continue
+		}
+		peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn})
 	}
 }
 
@@ -103,7 +108,7 @@
 		if len(peerInfo.Peer.Endpoints) <= 0 {
 			continue
 		}
-		status := getPeerStatus(peerInfo.Peer.Name, peerInfo.ClientConn)
+		status := getPeerStatus(peerInfo)
 		resp.Peers = append(resp.Peers, &Peer{
 			Name:      peerInfo.Peer.Name,
 			Kind:      peerInfo.Peer.Kind,
@@ -117,19 +122,25 @@
 	return resp, nil
 }
 
-func getPeerStatus(peerName string, clientConn *grpc.ClientConn) string {
-	if clientConn == nil {
+func getPeerStatus(peerInfo *PeerInfo) string {
+	if peerInfo.ClientConn == nil {
 		log.Warn("clientConn is nil")
 		return rpc.HealthStatusAbnormal
 	}
 	local := time.Now().UnixNano()
-	set := client.NewSet(clientConn)
-	reply, err := set.EventServiceClient.Health(context.Background(), &v1sync.HealthRequest{})
+	set := client.NewSet(peerInfo.ClientConn)
+	ctx := context.Background()
+	if config.GetConfig().Sync.RbacEnabled {
+		ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
+			restful.HeaderAuth: "Bearer " + peerInfo.Peer.Token,
+		}))
+	}
+	reply, err := set.EventServiceClient.Health(ctx, &v1sync.HealthRequest{})
 	if err != nil || reply == nil {
 		log.Error("get peer health failed", err)
 		return rpc.HealthStatusAbnormal
 	}
-	reportClockDiff(peerName, local, reply.LocalTimestamp)
+	reportClockDiff(peerInfo.Peer.Name, local, reply.LocalTimestamp)
 	return reply.Status
 }
 
@@ -159,3 +170,7 @@
 		TLSConfig:   syncerclient.RPClientConfig(),
 	})
 }
+
+func Peers() []*PeerInfo {
+	return peerInfos
+}
diff --git a/syncer/service/replicator/replicator.go b/syncer/service/replicator/replicator.go
index d7f1e3d..52adfc2 100644
--- a/syncer/service/replicator/replicator.go
+++ b/syncer/service/replicator/replicator.go
@@ -21,16 +21,20 @@
 	"context"
 	"fmt"
 
+	"github.com/go-chassis/foundation/gopool"
+	"github.com/go-chassis/go-chassis/v2/server/restful"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+
 	"github.com/apache/servicecomb-service-center/client"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/rpc"
 	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
 	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
 	syncerclient "github.com/apache/servicecomb-service-center/syncer/client"
 	"github.com/apache/servicecomb-service-center/syncer/config"
 	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
-	"github.com/go-chassis/foundation/gopool"
-	"google.golang.org/grpc"
 )
 
 const (
@@ -48,7 +52,8 @@
 )
 
 var (
-	conn *grpc.ClientConn
+	conn      *grpc.ClientConn
+	peerToken = ""
 )
 
 func Work() error {
@@ -68,8 +73,7 @@
 }
 
 func InitSyncClient() error {
-	cfg := config.GetConfig()
-	peer := cfg.Sync.Peers[0]
+	peer := config.GetConfig().Sync.Peers[0]
 	log.Info(fmt.Sprintf("peer is %v", peer))
 	var err error
 	conn, err = rpc.GetRoundRobinLbConn(&rpc.Config{
@@ -78,7 +82,19 @@
 		ServiceName: serviceName,
 		TLSConfig:   syncerclient.RPClientConfig(),
 	})
-	return err
+	if err != nil {
+		log.Error("get rpc client failed", err)
+		return err
+	}
+	if !config.GetConfig().Sync.RbacEnabled {
+		return nil
+	}
+	peerToken, err = cipher.Decrypt(peer.Token)
+	if err != nil {
+		log.Error("decrypt peer token failed, use original content", err)
+		peerToken = peer.Token
+	}
+	return nil
 }
 
 func Close() {
@@ -161,6 +177,11 @@
 	}
 
 	log.Info(fmt.Sprintf("page count %d to sync", len(els)))
+	if config.GetConfig().Sync.RbacEnabled {
+		ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
+			restful.HeaderAuth: "Bearer " + peerToken,
+		}))
+	}
 
 	for _, in := range els {
 		res, err := set.EventServiceClient.Sync(ctx, in)