Syncer supports to enable rbac (#1487)
diff --git a/.github/workflows/eventbase-ci.yml b/.github/workflows/eventbase-ci.yml
index b0918b7..87dfd75 100644
--- a/.github/workflows/eventbase-ci.yml
+++ b/.github/workflows/eventbase-ci.yml
@@ -13,7 +13,7 @@
uses: actions/checkout@v1
- name: UT test
run: |
- sudo docker-compose -f ./scripts/docker-compose.yaml up -d
+ sudo docker compose -f ./scripts/docker-compose.yaml up -d
sleep 20
export TEST_DB_MODE=mongo
export TEST_DB_URI=mongodb://127.0.0.1:27017
@@ -31,7 +31,7 @@
uses: actions/checkout@v1
- name: UT for etcd
run: |
- time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
+ time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
while ! nc -z 127.0.0.1 2379; do
sleep 1
done
diff --git a/.github/workflows/static_check.yml b/.github/workflows/static_check.yml
index 404a1c8..4b3394c 100644
--- a/.github/workflows/static_check.yml
+++ b/.github/workflows/static_check.yml
@@ -40,7 +40,7 @@
uses: actions/checkout@v1
- name: UT-MONGO
run: |
- sudo docker-compose -f ./scripts/docker-compose.yaml up -d
+ sudo docker compose -f ./scripts/docker-compose.yaml up -d
sleep 20
bash -x scripts/ut_test_in_docker.sh mongo
integration-test:
diff --git a/docker-compose.yml b/docker-compose.yml
index c7be231..e50d3d9 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -17,7 +17,7 @@
version: '3'
services:
etcd:
- image: 'quay.io/coreos/etcd:latest'
+ image: 'quay.io/coreos/etcd:v3.5.15'
# restart: always
#ports:
# - "2379:2379"
diff --git a/etc/conf/syncer.yaml b/etc/conf/syncer.yaml
index 784f86a..11b65a6 100644
--- a/etc/conf/syncer.yaml
+++ b/etc/conf/syncer.yaml
@@ -1,11 +1,13 @@
sync:
enableOnStart: false
+ rbacEnabled: false
peers:
- name: dc
kind: servicecomb
endpoints: ["127.0.0.1:30105"]
# only allow mode implemented in incremental approach like push, watch(such as pub/sub, long polling)
mode: [push]
+ token:
tombstone:
retire:
# use linux crontab not Quartz cron
diff --git a/scripts/integration_test.sh b/scripts/integration_test.sh
index 6b4c4a9..a5984f7 100755
--- a/scripts/integration_test.sh
+++ b/scripts/integration_test.sh
@@ -41,7 +41,7 @@
docker rm -f etcd
kill -9 $(ps aux | grep 'service-center' | awk '{print $2}')
set -e
-sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
+sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
while ! nc -z 127.0.0.1 2379; do
echo "Waiting Etcd to launch on 2379..."
sleep 1
diff --git a/scripts/ut_test_in_docker.sh b/scripts/ut_test_in_docker.sh
index 946305c..422b722 100644
--- a/scripts/ut_test_in_docker.sh
+++ b/scripts/ut_test_in_docker.sh
@@ -31,7 +31,7 @@
if [ "${db_name}" == "etcd" ];then
echo "${green}Starting etcd in docker${reset}"
- docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
+ docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new
while ! nc -z 127.0.0.1 2379; do
echo "Waiting Etcd to launch on 2379..."
sleep 1
diff --git a/syncer/config/config.go b/syncer/config/config.go
index de01f5e..a568d5c 100644
--- a/syncer/config/config.go
+++ b/syncer/config/config.go
@@ -21,9 +21,10 @@
"fmt"
"path/filepath"
+ "github.com/go-chassis/go-archaius"
+
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/go-chassis/go-archaius"
)
var config Config
@@ -33,8 +34,12 @@
}
type Sync struct {
- EnableOnStart bool `yaml:"enableOnStart"`
- Peers []*Peer `yaml:"peers"`
+ EnableOnStart bool `yaml:"enableOnStart"`
+ // When RbacEnabled is true, syncer's API requires the rbac token,
+ // and service-center also provides the rbac token to communicate with peer.
+ // At the same time, service-center rbac must be enabled.
+ RbacEnabled bool `yaml:"rbacEnabled"`
+ Peers []*Peer `yaml:"peers"`
}
type Peer struct {
@@ -42,6 +47,8 @@
Kind string `yaml:"kind"`
Endpoints []string `yaml:"endpoints"`
Mode []string `yaml:"mode"`
+ // The token to communicate with peer, this takes effect only when RbacEnabled is true
+ Token string `yaml:"token"`
}
func Init() error {
diff --git a/syncer/rpc/auth.go b/syncer/rpc/auth.go
new file mode 100644
index 0000000..9f3ce60
--- /dev/null
+++ b/syncer/rpc/auth.go
@@ -0,0 +1,63 @@
+package rpc
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ "github.com/go-chassis/cari/rbac"
+ "github.com/go-chassis/go-chassis/v2/security/authr"
+ "github.com/go-chassis/go-chassis/v2/server/restful"
+ "google.golang.org/grpc/metadata"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/syncer/config"
+)
+
+var errWrongAccountNorRole = fmt.Errorf("account should be %s, and roles should contain %s", RbacAllowedAccountName, RbacAllowedRoleName)
+
+func auth(ctx context.Context) error {
+ if !config.GetConfig().Sync.RbacEnabled {
+ return nil
+ }
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return rbac.NewError(rbac.ErrNoAuthHeader, "")
+ }
+
+ authHeader := md.Get(restful.HeaderAuth)
+ if len(authHeader) == 0 {
+ return rbac.NewError(rbac.ErrNoAuthHeader, fmt.Sprintf("header %s not found nor content empty", restful.HeaderAuth))
+ }
+
+ s := strings.Split(authHeader[0], " ")
+ if len(s) != 2 {
+ return rbac.ErrInvalidHeader
+ }
+ to := s[1]
+
+ claims, err := authr.Authenticate(ctx, to)
+ if err != nil {
+ return err
+ }
+ m, ok := claims.(map[string]interface{})
+ if !ok {
+ log.Error("claims convert failed", rbac.ErrConvert)
+ return rbac.ErrConvert
+ }
+ account, err := rbac.GetAccount(m)
+ if err != nil {
+ log.Error("get account from token failed", err)
+ return err
+ }
+
+ if account.Name != RbacAllowedAccountName {
+ return errWrongAccountNorRole
+ }
+ for _, role := range account.Roles {
+ if role == RbacAllowedRoleName {
+ return nil
+ }
+ }
+ return errWrongAccountNorRole
+}
diff --git a/syncer/rpc/auth_test.go b/syncer/rpc/auth_test.go
new file mode 100644
index 0000000..2cc1bcb
--- /dev/null
+++ b/syncer/rpc/auth_test.go
@@ -0,0 +1,130 @@
+package rpc
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "testing"
+
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/cari/rbac"
+ "github.com/go-chassis/go-chassis/v2/security/authr"
+ "github.com/go-chassis/go-chassis/v2/server/restful"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/metadata"
+
+ "github.com/apache/servicecomb-service-center/syncer/config"
+)
+
+type testAuth struct{}
+
+func (testAuth) Login(ctx context.Context, user string, password string, opts ...authr.LoginOption) (string, error) {
+ return "", nil
+}
+
+func (testAuth) Authenticate(ctx context.Context, token string) (interface{}, error) {
+ var claim map[string]interface{}
+ return claim, json.Unmarshal([]byte(token), &claim)
+}
+
+func Test_auth(t *testing.T) {
+ // use the custom auth plugin
+ authr.Install("test", func(opts *authr.Options) (authr.Authenticator, error) {
+ return testAuth{}, nil
+ })
+ assert.NoError(t, authr.Init(authr.WithPlugin("test")))
+
+ type args struct {
+ ctx context.Context
+ }
+ tests := []struct {
+ name string
+ preDo func()
+ args args
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "sync rbac disables",
+ preDo: func() {
+ config.SetConfig(config.Config{
+ Sync: &config.Sync{
+ RbacEnabled: false,
+ }})
+ },
+ args: args{
+ ctx: context.Background(), // rbac disabled, empty ctx should pass the auth
+ },
+ wantErr: assert.NoError,
+ },
+ {
+ name: "no header",
+ preDo: func() {
+ config.SetConfig(config.Config{
+ Sync: &config.Sync{
+ RbacEnabled: true,
+ }})
+ },
+ args: args{
+ ctx: context.Background(), // rbac enabled, empty ctx should not pass the auth
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ var errSvcErr *errsvc.Error
+ ok := errors.As(err, &errSvcErr)
+ assert.True(t, ok)
+
+ return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code)
+ },
+ },
+ {
+ name: "with header but no auth header",
+ args: args{
+ ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"fake": "fake"})),
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ var errSvcErr *errsvc.Error
+ ok := errors.As(err, &errSvcErr)
+ assert.True(t, ok)
+
+ return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code)
+ },
+ },
+ {
+ name: "auth header format error",
+ args: args{
+ ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{restful.HeaderAuth: "fake"})),
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ return assert.Equal(t, rbac.ErrInvalidHeader, err)
+ },
+ },
+ {
+ name: "wrong account nor role",
+ args: args{
+ ctx: metadata.NewIncomingContext(context.Background(),
+ metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"x","roles":["x"]}`})),
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ return assert.Equal(t, errWrongAccountNorRole, err)
+ },
+ },
+ {
+ name: "valid token",
+ args: args{
+ ctx: metadata.NewIncomingContext(context.Background(),
+ metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"sync-user","roles":["sync-admin"]}`})),
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ return assert.NoError(t, err)
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.preDo != nil {
+ tt.preDo()
+ }
+ tt.wantErr(t, auth(tt.args.ctx), fmt.Sprintf("auth(%v)", tt.args.ctx))
+ })
+ }
+}
diff --git a/syncer/rpc/server.go b/syncer/rpc/server.go
index 990f789..dc6efee 100644
--- a/syncer/rpc/server.go
+++ b/syncer/rpc/server.go
@@ -22,18 +22,21 @@
"fmt"
"time"
- "github.com/apache/servicecomb-service-center/syncer/service/replicator"
- "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
-
"github.com/apache/servicecomb-service-center/pkg/log"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
"github.com/apache/servicecomb-service-center/syncer/config"
+ "github.com/apache/servicecomb-service-center/syncer/service/replicator"
+ "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
)
const (
HealthStatusConnected = "CONNECTED"
HealthStatusAbnormal = "ABNORMAL"
HealthStatusClose = "CLOSE"
+ HealthStatusAuthFail = "AuthFail"
+
+ RbacAllowedAccountName = "sync-user"
+ RbacAllowedRoleName = "sync-admin"
)
func NewServer() *Server {
@@ -49,6 +52,12 @@
}
func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Results, error) {
+ err := auth(ctx)
+ if err != nil {
+ log.Error("auth failed", err)
+ return generateFailedResults(events, err)
+ }
+
log.Info(fmt.Sprintf("start sync: %s", events.Flag()))
res := s.replicator.Persist(ctx, events)
@@ -56,6 +65,20 @@
return s.toResults(res), nil
}
+func generateFailedResults(events *v1sync.EventList, err error) (*v1sync.Results, error) {
+ if events == nil || len(events.Events) == 0 {
+ return &v1sync.Results{Results: map[string]*v1sync.Result{}}, nil
+ }
+ rsts := make(map[string]*v1sync.Result, len(events.Events))
+ for _, evt := range events.Events {
+ rsts[evt.Id] = &v1sync.Result{
+ Code: resource.Fail,
+ Message: err.Error(),
+ }
+ }
+ return &v1sync.Results{Results: rsts}, nil
+}
+
func (s *Server) toResults(results []*resource.Result) *v1sync.Results {
syncResult := make(map[string]*v1sync.Result, len(results))
for _, r := range results {
@@ -69,11 +92,18 @@
}
}
-func (s *Server) Health(_ context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) {
+func (s *Server) Health(ctx context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) {
resp := &v1sync.HealthReply{
Status: HealthStatusConnected,
LocalTimestamp: time.Now().UnixNano(),
}
+ err := auth(ctx)
+ if err != nil {
+ resp.Status = HealthStatusAuthFail
+ log.Error("auth failed", err)
+ return resp, nil
+ }
+
// TODO enable to close syncer
if !config.GetConfig().Sync.EnableOnStart {
resp.Status = HealthStatusClose
diff --git a/syncer/rpc/server_test.go b/syncer/rpc/server_test.go
new file mode 100644
index 0000000..4123c1a
--- /dev/null
+++ b/syncer/rpc/server_test.go
@@ -0,0 +1,85 @@
+package rpc
+
+import (
+ "context"
+ "reflect"
+ "testing"
+
+ "github.com/go-chassis/cari/rbac"
+ "github.com/stretchr/testify/assert"
+
+ v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+ "github.com/apache/servicecomb-service-center/syncer/config"
+ "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+)
+
+type testReplicator struct{}
+
+func (testReplicator) Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) {
+ return &v1sync.Results{Results: map[string]*v1sync.Result{
+ "constant_id": &v1sync.Result{
+ Code: resource.Success,
+ },
+ }}, nil
+}
+
+func (testReplicator) Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result {
+ return []*resource.Result{
+ &resource.Result{
+ EventID: "constant_id",
+ Status: resource.Success,
+ },
+ }
+}
+
+func TestServer_Sync(t *testing.T) {
+ s := NewServer()
+
+ // rbac enabled, should sync failed and return auth failed message
+ config.SetConfig(config.Config{
+ Sync: &config.Sync{
+ RbacEnabled: true,
+ }})
+ events := &v1sync.EventList{Events: []*v1sync.Event{
+ {
+ Id: "evt1",
+ },
+ {
+ Id: "evt2",
+ },
+ }}
+
+ expectedRst := map[string]*v1sync.Result{
+ "evt1": &v1sync.Result{
+ Code: resource.Fail,
+ Message: rbac.NewError(rbac.ErrNoAuthHeader, "").Error(),
+ },
+
+ "evt2": &v1sync.Result{
+ Code: resource.Fail,
+ Message: rbac.NewError(rbac.ErrNoAuthHeader, "").Error(),
+ },
+ }
+ rst, err := s.Sync(context.Background(), events)
+ assert.NoError(t, err)
+ assert.True(t, reflect.DeepEqual(expectedRst, rst.Results))
+
+ rst, err = s.Sync(context.Background(), nil) // nil input
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(rst.Results))
+
+ // rbac disabled, should sync success(with the mock replicator)
+ config.SetConfig(config.Config{
+ Sync: &config.Sync{
+ RbacEnabled: false,
+ }})
+ expectedRst = map[string]*v1sync.Result{
+ "constant_id": &v1sync.Result{
+ Code: resource.Success,
+ },
+ }
+ s.replicator = &testReplicator{}
+ rst, err = s.Sync(context.Background(), events)
+ assert.NoError(t, err)
+ assert.True(t, reflect.DeepEqual(expectedRst, rst.Results))
+}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index c880d55..095ae61 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -18,6 +18,9 @@
package server
import (
+ "github.com/go-chassis/go-chassis/v2"
+ chassisServer "github.com/go-chassis/go-chassis/v2/core/server"
+
"github.com/apache/servicecomb-service-center/pkg/log"
syncv1 "github.com/apache/servicecomb-service-center/syncer/api/v1"
"github.com/apache/servicecomb-service-center/syncer/config"
@@ -25,8 +28,6 @@
"github.com/apache/servicecomb-service-center/syncer/rpc"
"github.com/apache/servicecomb-service-center/syncer/service/admin"
"github.com/apache/servicecomb-service-center/syncer/service/sync"
- "github.com/go-chassis/go-chassis/v2"
- chassisServer "github.com/go-chassis/go-chassis/v2/core/server"
)
// Run register chassis schema and run syncer services before chassis.Run()
@@ -40,6 +41,15 @@
return
}
+ if len(config.GetConfig().Sync.Peers) <= 0 {
+ log.Warn("peers parameter configuration is empty")
+ return
+ }
+
+ if config.GetConfig().Sync.RbacEnabled {
+ log.Info("syncer rbac enabled")
+ }
+
chassis.RegisterSchema("grpc", rpc.NewServer(),
chassisServer.WithRPCServiceDesc(&syncv1.EventService_ServiceDesc))
diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go
index 99c9db7..cdba735 100644
--- a/syncer/service/admin/health.go
+++ b/syncer/service/admin/health.go
@@ -20,13 +20,17 @@
import (
"context"
"errors"
+ "fmt"
"time"
+ "github.com/go-chassis/go-chassis/v2/server/restful"
"google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
"github.com/apache/servicecomb-service-center/client"
"github.com/apache/servicecomb-service-center/pkg/log"
pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc"
+ "github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
syncerclient "github.com/apache/servicecomb-service-center/syncer/client"
"github.com/apache/servicecomb-service-center/syncer/config"
@@ -59,25 +63,15 @@
Mode []string `json:"mode"`
Endpoints []string `json:"endpoints"`
Status string `json:"status"`
+ Token string `json:"-"`
}
func Init() {
cfg := config.GetConfig()
- if cfg.Sync == nil {
- log.Warn("sync config is empty")
- return
- }
- if !cfg.Sync.EnableOnStart {
- log.Info("syncer is disabled")
- return
- }
- if len(cfg.Sync.Peers) <= 0 {
- log.Warn("peers parameter configuration is empty")
- return
- }
peerInfos = make([]*PeerInfo, 0, len(cfg.Sync.Peers))
for _, c := range cfg.Sync.Peers {
if len(c.Endpoints) <= 0 {
+ log.Warn("no endpoints of peer: " + c.Name)
continue
}
p := &Peer{
@@ -86,10 +80,21 @@
Mode: c.Mode,
Endpoints: c.Endpoints,
}
- conn, err := newRPCConn(p.Endpoints)
- if err == nil {
- peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn})
+ if config.GetConfig().Sync.RbacEnabled {
+ plainToken, err := cipher.Decrypt(c.Token)
+ if err != nil {
+ log.Error(fmt.Sprintf("decrypt token of peer %s failed, use original content", c.Name), err)
+ plainToken = c.Token
+ }
+ p.Token = plainToken
}
+
+ conn, err := newRPCConn(p.Endpoints)
+ if err != nil {
+ log.Error(fmt.Sprintf("new client failed for peer: %s", c.Name), err)
+ continue
+ }
+ peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn})
}
}
@@ -103,7 +108,7 @@
if len(peerInfo.Peer.Endpoints) <= 0 {
continue
}
- status := getPeerStatus(peerInfo.Peer.Name, peerInfo.ClientConn)
+ status := getPeerStatus(peerInfo)
resp.Peers = append(resp.Peers, &Peer{
Name: peerInfo.Peer.Name,
Kind: peerInfo.Peer.Kind,
@@ -117,19 +122,25 @@
return resp, nil
}
-func getPeerStatus(peerName string, clientConn *grpc.ClientConn) string {
- if clientConn == nil {
+func getPeerStatus(peerInfo *PeerInfo) string {
+ if peerInfo.ClientConn == nil {
log.Warn("clientConn is nil")
return rpc.HealthStatusAbnormal
}
local := time.Now().UnixNano()
- set := client.NewSet(clientConn)
- reply, err := set.EventServiceClient.Health(context.Background(), &v1sync.HealthRequest{})
+ set := client.NewSet(peerInfo.ClientConn)
+ ctx := context.Background()
+ if config.GetConfig().Sync.RbacEnabled {
+ ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
+ restful.HeaderAuth: "Bearer " + peerInfo.Peer.Token,
+ }))
+ }
+ reply, err := set.EventServiceClient.Health(ctx, &v1sync.HealthRequest{})
if err != nil || reply == nil {
log.Error("get peer health failed", err)
return rpc.HealthStatusAbnormal
}
- reportClockDiff(peerName, local, reply.LocalTimestamp)
+ reportClockDiff(peerInfo.Peer.Name, local, reply.LocalTimestamp)
return reply.Status
}
@@ -159,3 +170,7 @@
TLSConfig: syncerclient.RPClientConfig(),
})
}
+
+func Peers() []*PeerInfo {
+ return peerInfos
+}
diff --git a/syncer/service/replicator/replicator.go b/syncer/service/replicator/replicator.go
index d7f1e3d..52adfc2 100644
--- a/syncer/service/replicator/replicator.go
+++ b/syncer/service/replicator/replicator.go
@@ -21,16 +21,20 @@
"context"
"fmt"
+ "github.com/go-chassis/foundation/gopool"
+ "github.com/go-chassis/go-chassis/v2/server/restful"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+
"github.com/apache/servicecomb-service-center/client"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/rpc"
"github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
syncerclient "github.com/apache/servicecomb-service-center/syncer/client"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
- "github.com/go-chassis/foundation/gopool"
- "google.golang.org/grpc"
)
const (
@@ -48,7 +52,8 @@
)
var (
- conn *grpc.ClientConn
+ conn *grpc.ClientConn
+ peerToken = ""
)
func Work() error {
@@ -68,8 +73,7 @@
}
func InitSyncClient() error {
- cfg := config.GetConfig()
- peer := cfg.Sync.Peers[0]
+ peer := config.GetConfig().Sync.Peers[0]
log.Info(fmt.Sprintf("peer is %v", peer))
var err error
conn, err = rpc.GetRoundRobinLbConn(&rpc.Config{
@@ -78,7 +82,19 @@
ServiceName: serviceName,
TLSConfig: syncerclient.RPClientConfig(),
})
- return err
+ if err != nil {
+ log.Error("get rpc client failed", err)
+ return err
+ }
+ if !config.GetConfig().Sync.RbacEnabled {
+ return nil
+ }
+ peerToken, err = cipher.Decrypt(peer.Token)
+ if err != nil {
+ log.Error("decrypt peer token failed, use original content", err)
+ peerToken = peer.Token
+ }
+ return nil
}
func Close() {
@@ -161,6 +177,11 @@
}
log.Info(fmt.Sprintf("page count %d to sync", len(els)))
+ if config.GetConfig().Sync.RbacEnabled {
+ ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
+ restful.HeaderAuth: "Bearer " + peerToken,
+ }))
+ }
for _, in := range els {
res, err := set.EventServiceClient.Sync(ctx, in)