feat: support external etcd (#238) Co-authored-by: xikai.wxk <xikai.wxk@antgroup.com>
diff --git a/config/example-cluster0.toml b/config/example-cluster0.toml index 916bdfd..ba61f94 100644 --- a/config/example-cluster0.toml +++ b/config/example-cluster0.toml
@@ -7,7 +7,8 @@ node-name = "meta0" initial-cluster = "meta0=http://127.0.0.1:2380,meta1=http://127.0.0.1:12380,meta2=http://127.0.0.1:22380" default-cluster-node-count = 2 -default-http-port = 8080 +http-port = 8080 +grpc-port = 2379 [log] level = "info"
diff --git a/config/example-cluster1.toml b/config/example-cluster1.toml index 369b589..8198f70 100644 --- a/config/example-cluster1.toml +++ b/config/example-cluster1.toml
@@ -7,7 +7,8 @@ node-name = "meta1" initial-cluster = "meta0=http://127.0.0.1:2380,meta1=http://127.0.0.1:12380,meta2=http://127.0.0.1:22380" default-cluster-node-count = 2 -default-http-port = 8081 +http-port = 8081 +grpc-port = 12379 [log] level = "info"
diff --git a/config/example-cluster2.toml b/config/example-cluster2.toml index 88e718d..dc964f7 100644 --- a/config/example-cluster2.toml +++ b/config/example-cluster2.toml
@@ -7,7 +7,8 @@ node-name = "meta2" initial-cluster = "meta0=http://127.0.0.1:2380,meta1=http://127.0.0.1:12380,meta2=http://127.0.0.1:22380" default-cluster-node-count = 2 -default-http-port = 8082 +http-port = 8082 +grpc-port = 22379 [log] level = "info"
diff --git a/go.mod b/go.mod index 15c7365..61c5347 100644 --- a/go.mod +++ b/go.mod
@@ -12,6 +12,7 @@ github.com/mgechev/revive v1.2.1 github.com/pelletier/go-toml/v2 v2.0.6 github.com/pkg/errors v0.9.1 + github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.8.1 github.com/tikv/pd v2.1.19+incompatible go.etcd.io/etcd/api/v3 v3.5.4 @@ -68,7 +69,6 @@ github.com/prometheus/procfs v0.6.0 // indirect github.com/sirupsen/logrus v1.7.0 // indirect github.com/soheilhy/cmux v0.1.5 // indirect - github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
diff --git a/go.sum b/go.sum index b6c86a5..9dfb1ca 100644 --- a/go.sum +++ b/go.sum
@@ -18,10 +18,6 @@ github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230718090326-c8106d1700c6 h1:Nk07ueHmoB8ykYfoPy4V+dYkrvZYy+sAwta+mfMupag= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230718090326-c8106d1700c6/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230906055607-37f874106629 h1:qYiWnGNzOmKTwtFN2KeCFSggj1yNX+w6OG5ihNZjGA4= -github.com/CeresDB/ceresdbproto/golang v0.0.0-20230906055607-37f874106629/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/CeresDB/ceresdbproto/golang v0.0.0-20230912105726-f0b0ff9a06d4 h1:I8Nzn5XRwxTJlTs5DLa+Alt2tUxsXy76JPNZwDTBf/E= github.com/CeresDB/ceresdbproto/golang v0.0.0-20230912105726-f0b0ff9a06d4/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
diff --git a/server/config/config.go b/server/config/config.go index 67a910d..4576207 100644 --- a/server/config/config.go +++ b/server/config/config.go
@@ -19,7 +19,7 @@ ) const ( - defaultGrpcHandleTimeoutMs int64 = 60 * 1000 + defaultEnableEmbedEtcd bool = true defaultInitialLimiterCapacity int = 100 * 1000 defaultInitialLimiterRate int = 10 * 1000 defaultEnableLimiter bool = false @@ -28,7 +28,16 @@ defaultMaxTxnOps = 128 defaultEtcdLeaseTTLSec = 10 + defaultGrpcHandleTimeoutMs int = 60 * 1000 + // GrpcServiceMaxSendMsgSize controls the max size of the sent message(200MB by default). + defaultGrpcServiceMaxSendMsgSize int = 200 * 1024 * 1024 + // GrpcServiceMaxRecvMsgSize controls the max size of the received message(100MB by default). + defaultGrpcServiceMaxRecvMsgSize int = 100 * 1024 * 1024 + // GrpcServiceKeepAlivePingMinIntervalSec controls the min interval for one keepalive ping. + defaultGrpcServiceKeepAlivePingMinIntervalSec int = 20 + defaultNodeNamePrefix = "ceresmeta" + defaultEndpoint = "127.0.0.1" defaultRootPath = "/ceresdb" defaultClientUrls = "http://0.0.0.0:2379" defaultPeerUrls = "http://0.0.0.0:2380" @@ -57,6 +66,7 @@ defaultProcedureExecutingBatchSize = math.MaxUint32 defaultHTTPPort = 8080 + defaultGrpcPort = 2379 defaultDataDir = "/tmp/ceresmeta" @@ -85,14 +95,20 @@ EtcdLog log.Config `toml:"etcd-log" env:"ETCD_LOG"` FlowLimiter LimiterConfig `toml:"flow-limiter" env:"FLOW_LIMITER"` - GrpcHandleTimeoutMs int64 `toml:"grpc-handle-timeout-ms" env:"GRPC_HANDLER_TIMEOUT_MS"` - EtcdStartTimeoutMs int64 `toml:"etcd-start-timeout-ms" env:"ETCD_START_TIMEOUT_MS"` - EtcdCallTimeoutMs int64 `toml:"etcd-call-timeout-ms" env:"ETCD_CALL_TIMEOUT_MS"` - EtcdMaxTxnOps int64 `toml:"etcd-max-txn-ops" env:"ETCD_MAX_TXN_OPS"` + EnableEmbedEtcd bool `toml:"enable-embed-etcd" env:"ENABLE_EMBED_ETCD"` + EtcdStartTimeoutMs int64 `toml:"etcd-start-timeout-ms" env:"ETCD_START_TIMEOUT_MS"` + EtcdCallTimeoutMs int64 `toml:"etcd-call-timeout-ms" env:"ETCD_CALL_TIMEOUT_MS"` + EtcdMaxTxnOps int64 `toml:"etcd-max-txn-ops" env:"ETCD_MAX_TXN_OPS"` + + GrpcHandleTimeoutMs int `toml:"grpc-handle-timeout-ms" env:"GRPC_HANDLER_TIMEOUT_MS"` + GrpcServiceMaxSendMsgSize int `toml:"grpc-service-max-send-msg-size" env:"GRPC_SERVICE_MAX_SEND_MSG_SIZE"` + GrpcServiceMaxRecvMsgSize int `toml:"grpc-service-max-recv-msg-size" env:"GRPC_SERVICE_MAX_RECV_MSG_SIZE"` + GrpcServiceKeepAlivePingMinIntervalSec int `toml:"grpc-service-keep-alive-ping-min-interval-sec" env:"GRPC_SERVICE_KEEP_ALIVE_PING_MIN_INTERVAL_SEC"` LeaseTTLSec int64 `toml:"lease-sec" env:"LEASE_SEC"` NodeName string `toml:"node-name" env:"NODE_NAME"` + Addr string `toml:"addr" env:"ADDR"` DataDir string `toml:"data-dir" env:"DATA_DIR"` StorageRootPath string `toml:"storage-root-path" env:"STORAGE_ROOT_PATH"` InitialCluster string `toml:"initial-cluster" env:"INITIAL_CLUSTER"` @@ -136,7 +152,8 @@ AdvertiseClientUrls string `toml:"advertise-client-urls" env:"ADVERTISE_CLIENT_URLS"` AdvertisePeerUrls string `toml:"advertise-peer-urls" env:"ADVERTISE_PEER_URLS"` - HTTPPort int `toml:"default-http-port" env:"DEFAULT_HTTP_PORT"` + HTTPPort int `toml:"http-port" env:"HTTP_PORT"` + GrpcPort int `toml:"grpc-port" env:"GRPC_PORT"` } func (c *Config) GrpcHandleTimeout() time.Duration { @@ -260,14 +277,20 @@ Enable: defaultEnableLimiter, }, - GrpcHandleTimeoutMs: defaultGrpcHandleTimeoutMs, - EtcdStartTimeoutMs: defaultEtcdStartTimeoutMs, - EtcdCallTimeoutMs: defaultCallTimeoutMs, - EtcdMaxTxnOps: defaultMaxTxnOps, + EnableEmbedEtcd: defaultEnableEmbedEtcd, + EtcdStartTimeoutMs: defaultEtcdStartTimeoutMs, + EtcdCallTimeoutMs: defaultCallTimeoutMs, + EtcdMaxTxnOps: defaultMaxTxnOps, + + GrpcHandleTimeoutMs: defaultGrpcHandleTimeoutMs, + GrpcServiceMaxSendMsgSize: defaultGrpcServiceMaxSendMsgSize, + GrpcServiceMaxRecvMsgSize: defaultGrpcServiceMaxRecvMsgSize, + GrpcServiceKeepAlivePingMinIntervalSec: defaultGrpcServiceKeepAlivePingMinIntervalSec, LeaseTTLSec: defaultEtcdLeaseTTLSec, NodeName: defaultNodeName, + Addr: defaultEndpoint, DataDir: defaultDataDir, StorageRootPath: defaultRootPath, @@ -300,6 +323,7 @@ ProcedureExecutingBatchSize: defaultProcedureExecutingBatchSize, HTTPPort: defaultHTTPPort, + GrpcPort: defaultGrpcPort, } version := fs.Bool("version", false, "print version information")
diff --git a/server/etcdutil/get_leader.go b/server/etcdutil/get_leader.go index 136ff18..8cd262b 100644 --- a/server/etcdutil/get_leader.go +++ b/server/etcdutil/get_leader.go
@@ -7,13 +7,13 @@ ) type EtcdLeaderGetter interface { - EtcdLeaderID() uint64 + EtcdLeaderID() (uint64, error) } type LeaderGetterWrapper struct { Server *etcdserver.EtcdServer } -func (w *LeaderGetterWrapper) EtcdLeaderID() uint64 { - return w.Server.Lead() +func (w *LeaderGetterWrapper) EtcdLeaderID() (uint64, error) { + return w.Server.Lead(), nil }
diff --git a/server/member/member.go b/server/member/member.go index 27a2a92..ba0c0e3 100644 --- a/server/member/member.go +++ b/server/member/member.go
@@ -77,7 +77,7 @@ return nil, ErrInvalidLeaderValue.WithCause(err) } - return &getLeaderResp{Leader: leader, Revision: leaderKv.ModRevision, IsLocal: leader.GetId() == m.ID}, nil + return &getLeaderResp{Leader: leader, Revision: leaderKv.ModRevision, IsLocal: leader.GetEndpoint() == m.Endpoint}, nil } // GetLeaderAddr gets the leader address of the cluster with memory cache. @@ -91,7 +91,7 @@ } return GetLeaderAddrResp{ LeaderEndpoint: m.leader.Endpoint, - IsLocal: m.leader.GetId() == m.ID, + IsLocal: m.leader.Endpoint == m.Endpoint, }, nil } @@ -148,7 +148,7 @@ } } -func (m *Member) CampaignAndKeepLeader(ctx context.Context, leaseTTLSec int64, callbacks LeadershipEventCallbacks) error { +func (m *Member) CampaignAndKeepLeader(ctx context.Context, leaseTTLSec int64, leadershipChecker LeadershipChecker, callbacks LeadershipEventCallbacks) error { leaderVal, err := m.Marshal() if err != nil { return err @@ -226,9 +226,9 @@ m.logger.Info("no longer a leader because lease has expired") return nil } - etcdLeader := m.etcdLeaderGetter.EtcdLeaderID() - if etcdLeader != m.ID { - m.logger.Info("etcd leader changed and should re-assign the leadership", zap.String("old-leader", m.Name), zap.Uint64("new-leader", etcdLeader)) + + if !leadershipChecker.ShouldCampaign(m) { + m.logger.Info("etcd leader changed and should re-assign the leadership", zap.String("old-leader", m.Name)) return nil } case <-ctx.Done():
diff --git a/server/member/watch_leader.go b/server/member/watch_leader.go index 828174e..b6bc21d 100644 --- a/server/member/watch_leader.go +++ b/server/member/watch_leader.go
@@ -6,6 +6,8 @@ "context" "time" + "github.com/CeresDB/ceresdbproto/golang/pkg/metastoragepb" + "github.com/CeresDB/ceresmeta/pkg/assert" "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/etcdutil" "go.uber.org/zap" @@ -25,10 +27,13 @@ ShouldStop() bool } +// LeaderWatcher watches the changes of the CeresMeta cluster's leadership. type LeaderWatcher struct { watchCtx WatchContext self *Member leaseTTLSec int64 + + leadershipChecker LeadershipChecker } type LeadershipEventCallbacks interface { @@ -36,20 +41,64 @@ BeforeTransfer(ctx context.Context) } -func NewLeaderWatcher(ctx WatchContext, self *Member, leaseTTLSec int64) *LeaderWatcher { +// LeadershipChecker tells which member should campaign the CeresMeta cluster's leadership, and whether the current leader is valid. +type LeadershipChecker interface { + ShouldCampaign(self *Member) bool + IsValidLeader(memLeader *metastoragepb.Member) bool +} + +// embeddedEtcdLeadershipChecker ensures the CeresMeta cluster's leader as the embedded ETCD cluster's leader. +type embeddedEtcdLeadershipChecker struct { + etcdLeaderGetter etcdutil.EtcdLeaderGetter +} + +func (c embeddedEtcdLeadershipChecker) ShouldCampaign(self *Member) bool { + etcdLeaderID, err := c.etcdLeaderGetter.EtcdLeaderID() + assert.Assertf(err == nil, "EtcdLeaderID must exist") + return self.ID == etcdLeaderID +} + +func (c embeddedEtcdLeadershipChecker) IsValidLeader(memLeader *metastoragepb.Member) bool { + etcdLeaderID, err := c.etcdLeaderGetter.EtcdLeaderID() + assert.Assertf(err == nil, "EtcdLeaderID must exist") + return memLeader.Id == etcdLeaderID +} + +// externalEtcdLeadershipChecker has no preference over the leadership of the CeresMeta cluster, that is to say, the leadership is random. +type externalEtcdLeadershipChecker struct{} + +func (c externalEtcdLeadershipChecker) ShouldCampaign(_ *Member) bool { + return true +} + +func (c externalEtcdLeadershipChecker) IsValidLeader(_ *metastoragepb.Member) bool { + return true +} + +func NewLeaderWatcher(ctx WatchContext, self *Member, leaseTTLSec int64, embedEtcd bool) *LeaderWatcher { + var leadershipChecker LeadershipChecker + if embedEtcd { + leadershipChecker = embeddedEtcdLeadershipChecker{ + etcdLeaderGetter: ctx, + } + } else { + leadershipChecker = externalEtcdLeadershipChecker{} + } + return &LeaderWatcher{ ctx, self, leaseTTLSec, + leadershipChecker, } } -// Watch watches the leader changes: -// 1. Check whether the leader is valid (same as the etcd leader) if leader exists. +// watchWithCheckEtcdLeader watches the leader changes: +// 1. Check whether the leader is valid if leader exists. // - Leader is valid: wait for the leader changes. // - Leader is not valid: reset the leader by the current leader. // 2. Campaign the leadership if leader does not exist. -// - Elect the etcd leader as the ceresmeta leader. +// - Campaign the leader if this member should. // - The leader keeps the leadership lease alive. // - The other members keeps waiting for the leader changes. // @@ -78,20 +127,20 @@ } // Check whether leader exists. - leaderResp, err := l.self.getLeader(ctx) + resp, err := l.self.getLeader(ctx) if err != nil { logger.Error("fail to get leader", zap.Error(err)) wait = waitReasonFailEtcd continue } - etcdLeaderID := l.watchCtx.EtcdLeaderID() - if leaderResp.Leader == nil { + memLeader := resp.Leader + if memLeader == nil { // Leader does not exist. // A new leader should be elected and the etcd leader should be elected as the new leader. - if l.self.ID == etcdLeaderID { + if l.leadershipChecker.ShouldCampaign(l.self) { // Campaign the leader and block until leader changes. - if err := l.self.CampaignAndKeepLeader(ctx, l.leaseTTLSec, callbacks); err != nil { + if err := l.self.CampaignAndKeepLeader(ctx, l.leaseTTLSec, l.leadershipChecker, callbacks); err != nil { logger.Error("fail to campaign and keep leader", zap.Error(err)) wait = waitReasonFailEtcd } else { @@ -104,22 +153,22 @@ wait = waitReasonElectLeader } else { // Cache leader in memory. - l.self.leader = leaderResp.Leader - log.Info("update leader cache", zap.String("endpoint", leaderResp.Leader.Endpoint)) + l.self.leader = memLeader + log.Info("update leader cache", zap.String("endpoint", memLeader.Endpoint)) // Leader does exist. // A new leader should be elected (the leader should be reset by the current leader itself) if the leader is // not the etcd leader. - if etcdLeaderID == leaderResp.Leader.Id { + if l.leadershipChecker.IsValidLeader(memLeader) { // watch the leader and block until leader changes. - l.self.WaitForLeaderChange(ctx, leaderResp.Revision) + l.self.WaitForLeaderChange(ctx, resp.Revision) logger.Warn("leader changes and stop watching") continue } - // The leader is not etcd leader and this node is leader so reset it. - if leaderResp.Leader.Id == l.self.ID { - if err := l.self.ResetLeader(ctx); err != nil { + // This leader is not valid, reset it if this member will campaign this leadership. + if l.leadershipChecker.ShouldCampaign(l.self) { + if err = l.self.ResetLeader(ctx); err != nil { logger.Error("fail to reset leader", zap.Error(err)) wait = waitReasonFailEtcd }
diff --git a/server/member/watch_leader_test.go b/server/member/watch_leader_test.go index 68339ed..6314294 100644 --- a/server/member/watch_leader_test.go +++ b/server/member/watch_leader_test.go
@@ -23,8 +23,8 @@ return ctx.stopped } -func (ctx *mockWatchCtx) EtcdLeaderID() uint64 { - return ctx.srv.Lead() +func (ctx *mockWatchCtx) EtcdLeaderID() (uint64, error) { + return ctx.srv.Lead(), nil } func TestWatchLeaderSingle(t *testing.T) { @@ -40,7 +40,7 @@ rpcTimeout := time.Duration(10) * time.Second leaseTTLSec := int64(1) mem := NewMember("", uint64(etcd.Server.ID()), "mem0", "", client, leaderGetter, rpcTimeout) - leaderWatcher := NewLeaderWatcher(watchCtx, mem, leaseTTLSec) + leaderWatcher := NewLeaderWatcher(watchCtx, mem, leaseTTLSec, true) ctx, cancelWatch := context.WithCancel(context.Background()) watchedDone := make(chan struct{}, 1)
diff --git a/server/server.go b/server/server.go index a180ef9..591cea4 100644 --- a/server/server.go +++ b/server/server.go
@@ -5,6 +5,7 @@ import ( "context" "fmt" + "net" "sync" "sync/atomic" "time" @@ -27,6 +28,7 @@ "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type Server struct { @@ -79,7 +81,22 @@ // Run runs the services and background jobs. func (srv *Server) Run(ctx context.Context) error { - if err := srv.startEtcd(ctx); err != nil { + // If enableEmbedEtcd is true, the grpc server is started in the same process as the etcd server. + if srv.cfg.EnableEmbedEtcd { + if err := srv.startEmbedEtcd(ctx); err != nil { + srv.status.Set(status.Terminated) + return err + } + } else { + // If enableEmbedEtcd is false, the grpc server is started in a separate process. + go func() { + if err := srv.startGrpcServer(ctx); err != nil { + srv.status.Set(status.Terminated) + log.Fatal("Grpc serve failed", zap.Error(err)) + } + }() + } + if err := srv.initEtcdClient(); err != nil { srv.status.Set(status.Terminated) return err } @@ -91,7 +108,6 @@ srv.startBgJobs(ctx) srv.status.Set(status.StatusRunning) - return nil } @@ -117,7 +133,7 @@ return atomic.LoadInt32(&srv.isClosed) == 1 } -func (srv *Server) startEtcd(ctx context.Context) error { +func (srv *Server) startEmbedEtcd(ctx context.Context) error { etcdSrv, err := embed.StartEtcd(srv.etcdCfg) if err != nil { return ErrStartEtcd.WithCause(err) @@ -131,23 +147,53 @@ case <-newCtx.Done(): return ErrStartEtcdTimeout.WithCausef("timeout is:%v", srv.cfg.EtcdStartTimeout()) } + srv.etcdSrv = etcdSrv - endpoints := []string{srv.etcdCfg.ACUrls[0].String()} + return nil +} + +func (srv *Server) initEtcdClient() error { + etcdEndpoints := make([]string, 0, len(srv.etcdCfg.ACUrls)) + for _, url := range srv.etcdCfg.ACUrls { + etcdEndpoints = append(etcdEndpoints, url.String()) + } lgc := log.GetLoggerConfig() client, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, + Endpoints: etcdEndpoints, DialTimeout: srv.cfg.EtcdCallTimeout(), LogConfig: lgc, }) if err != nil { return ErrCreateEtcdClient.WithCause(err) } - srv.etcdCli = client - etcdLeaderGetter := &etcdutil.LeaderGetterWrapper{Server: etcdSrv.Server} - srv.member = member.NewMember("", uint64(etcdSrv.Server.ID()), srv.cfg.NodeName, endpoints[0], client, etcdLeaderGetter, srv.cfg.EtcdCallTimeout()) - srv.etcdSrv = etcdSrv + endpoint := fmt.Sprintf("%s:%d", srv.cfg.Addr, srv.cfg.GrpcPort) + if srv.etcdSrv != nil { + etcdLeaderGetter := &etcdutil.LeaderGetterWrapper{Server: srv.etcdSrv.Server} + srv.member = member.NewMember(srv.cfg.StorageRootPath, uint64(srv.etcdSrv.Server.ID()), srv.cfg.NodeName, srv.etcdCfg.ACUrls[0].String(), client, etcdLeaderGetter, srv.cfg.EtcdCallTimeout()) + } else { + srv.member = member.NewMember(srv.cfg.StorageRootPath, 0, srv.cfg.NodeName, endpoint, client, nil, srv.cfg.EtcdCallTimeout()) + } + return nil +} + +func (srv *Server) startGrpcServer(_ context.Context) error { + opts := srv.buildGrpcOptions() + server := grpc.NewServer(opts...) + + grpcService := metagrpc.NewService(srv.cfg.GrpcHandleTimeout(), srv) + server.RegisterService(&metaservicepb.CeresmetaRpcService_ServiceDesc, grpcService) + addr := fmt.Sprintf(":%d", srv.cfg.GrpcPort) + lis, err := net.Listen("tcp", addr) + if err != nil { + return errors.Wrapf(err, "listen on %s failed", addr) + } + + if err = server.Serve(lis); err != nil { + return errors.Wrap(err, "serve failed") + } + return nil } @@ -210,7 +256,8 @@ watchCtx := &leaderWatchContext{ srv, } - watcher := member.NewLeaderWatcher(watchCtx, srv.member, srv.cfg.LeaseTTLSec) + // If enable embed etcd, we should watch the leader of the etcd cluster. + watcher := member.NewLeaderWatcher(watchCtx, srv.member, srv.cfg.LeaseTTLSec, srv.cfg.EnableEmbedEtcd) callbacks := &leadershipEventCallbacks{ srv: srv, @@ -259,6 +306,19 @@ return nil } +func (srv *Server) buildGrpcOptions() []grpc.ServerOption { + keepalivePolicy := keepalive.EnforcementPolicy{ + MinTime: time.Duration(srv.cfg.GrpcServiceKeepAlivePingMinIntervalSec) * time.Second, + PermitWithoutStream: true, + } + opts := []grpc.ServerOption{ + grpc.MaxSendMsgSize(srv.cfg.GrpcServiceMaxSendMsgSize), + grpc.MaxRecvMsgSize(srv.cfg.GrpcServiceMaxSendMsgSize), + grpc.KeepaliveEnforcementPolicy(keepalivePolicy), + } + return opts +} + type leaderWatchContext struct { srv *Server } @@ -267,8 +327,11 @@ return ctx.srv.IsClosed() } -func (ctx *leaderWatchContext) EtcdLeaderID() uint64 { - return ctx.srv.etcdSrv.Server.Lead() +func (ctx *leaderWatchContext) EtcdLeaderID() (uint64, error) { + if ctx.srv.etcdSrv != nil { + return ctx.srv.etcdSrv.Server.Lead(), nil + } + return 0, errors.WithMessage(member.ErrGetLeader, "no leader found") } func (srv *Server) GetClusterManager() cluster.Manager {
diff --git a/server/service/grpc/forward.go b/server/service/grpc/forward.go index 58780c8..153e944 100644 --- a/server/service/grpc/forward.go +++ b/server/service/grpc/forward.go
@@ -21,7 +21,6 @@ } if forwardedAddr != "" { - log.Info("try to create ceresmeta client", zap.String("addr", forwardedAddr)) ceresmetaClient, err := s.getCeresmetaClient(ctx, forwardedAddr) if err != nil { return nil, errors.WithMessagef(err, "get forwarded ceresmeta client, addr:%s", forwardedAddr) @@ -42,6 +41,7 @@ func (s *Service) getForwardedGrpcClient(ctx context.Context, forwardedAddr string) (*grpc.ClientConn, error) { client, ok := s.conns.Load(forwardedAddr) if !ok { + log.Info("try to create ceresmeta client", zap.String("addr", forwardedAddr)) cc, err := service.GetClientConn(ctx, forwardedAddr) if err != nil { return nil, err