feat: support external etcd (#238)

Co-authored-by: xikai.wxk <xikai.wxk@antgroup.com>
diff --git a/config/example-cluster0.toml b/config/example-cluster0.toml
index 916bdfd..ba61f94 100644
--- a/config/example-cluster0.toml
+++ b/config/example-cluster0.toml
@@ -7,7 +7,8 @@
 node-name = "meta0"
 initial-cluster = "meta0=http://127.0.0.1:2380,meta1=http://127.0.0.1:12380,meta2=http://127.0.0.1:22380"
 default-cluster-node-count = 2
-default-http-port = 8080
+http-port = 8080
+grpc-port = 2379
 
 [log]
 level = "info"
diff --git a/config/example-cluster1.toml b/config/example-cluster1.toml
index 369b589..8198f70 100644
--- a/config/example-cluster1.toml
+++ b/config/example-cluster1.toml
@@ -7,7 +7,8 @@
 node-name = "meta1"
 initial-cluster = "meta0=http://127.0.0.1:2380,meta1=http://127.0.0.1:12380,meta2=http://127.0.0.1:22380"
 default-cluster-node-count = 2
-default-http-port = 8081
+http-port = 8081
+grpc-port = 12379
 
 [log]
 level = "info"
diff --git a/config/example-cluster2.toml b/config/example-cluster2.toml
index 88e718d..dc964f7 100644
--- a/config/example-cluster2.toml
+++ b/config/example-cluster2.toml
@@ -7,7 +7,8 @@
 node-name = "meta2"
 initial-cluster = "meta0=http://127.0.0.1:2380,meta1=http://127.0.0.1:12380,meta2=http://127.0.0.1:22380"
 default-cluster-node-count = 2
-default-http-port = 8082
+http-port = 8082
+grpc-port = 22379
 
 [log]
 level = "info"
diff --git a/go.mod b/go.mod
index 15c7365..61c5347 100644
--- a/go.mod
+++ b/go.mod
@@ -12,6 +12,7 @@
 	github.com/mgechev/revive v1.2.1
 	github.com/pelletier/go-toml/v2 v2.0.6
 	github.com/pkg/errors v0.9.1
+	github.com/spaolacci/murmur3 v1.1.0
 	github.com/stretchr/testify v1.8.1
 	github.com/tikv/pd v2.1.19+incompatible
 	go.etcd.io/etcd/api/v3 v3.5.4
@@ -68,7 +69,6 @@
 	github.com/prometheus/procfs v0.6.0 // indirect
 	github.com/sirupsen/logrus v1.7.0 // indirect
 	github.com/soheilhy/cmux v0.1.5 // indirect
-	github.com/spaolacci/murmur3 v1.1.0 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
 	github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
 	github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
diff --git a/go.sum b/go.sum
index b6c86a5..9dfb1ca 100644
--- a/go.sum
+++ b/go.sum
@@ -18,10 +18,6 @@
 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
 github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230718090326-c8106d1700c6 h1:Nk07ueHmoB8ykYfoPy4V+dYkrvZYy+sAwta+mfMupag=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230718090326-c8106d1700c6/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230906055607-37f874106629 h1:qYiWnGNzOmKTwtFN2KeCFSggj1yNX+w6OG5ihNZjGA4=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230906055607-37f874106629/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
 github.com/CeresDB/ceresdbproto/golang v0.0.0-20230912105726-f0b0ff9a06d4 h1:I8Nzn5XRwxTJlTs5DLa+Alt2tUxsXy76JPNZwDTBf/E=
 github.com/CeresDB/ceresdbproto/golang v0.0.0-20230912105726-f0b0ff9a06d4/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
diff --git a/server/config/config.go b/server/config/config.go
index 67a910d..4576207 100644
--- a/server/config/config.go
+++ b/server/config/config.go
@@ -19,7 +19,7 @@
 )
 
 const (
-	defaultGrpcHandleTimeoutMs    int64 = 60 * 1000
+	defaultEnableEmbedEtcd        bool  = true
 	defaultInitialLimiterCapacity int   = 100 * 1000
 	defaultInitialLimiterRate     int   = 10 * 1000
 	defaultEnableLimiter          bool  = false
@@ -28,7 +28,16 @@
 	defaultMaxTxnOps                    = 128
 	defaultEtcdLeaseTTLSec              = 10
 
+	defaultGrpcHandleTimeoutMs int = 60 * 1000
+	// GrpcServiceMaxSendMsgSize controls the max size of the sent message(200MB by default).
+	defaultGrpcServiceMaxSendMsgSize int = 200 * 1024 * 1024
+	// GrpcServiceMaxRecvMsgSize controls the max size of the received message(100MB by default).
+	defaultGrpcServiceMaxRecvMsgSize int = 100 * 1024 * 1024
+	// GrpcServiceKeepAlivePingMinIntervalSec controls the min interval for one keepalive ping.
+	defaultGrpcServiceKeepAlivePingMinIntervalSec int = 20
+
 	defaultNodeNamePrefix          = "ceresmeta"
+	defaultEndpoint                = "127.0.0.1"
 	defaultRootPath                = "/ceresdb"
 	defaultClientUrls              = "http://0.0.0.0:2379"
 	defaultPeerUrls                = "http://0.0.0.0:2380"
@@ -57,6 +66,7 @@
 	defaultProcedureExecutingBatchSize = math.MaxUint32
 
 	defaultHTTPPort = 8080
+	defaultGrpcPort = 2379
 
 	defaultDataDir = "/tmp/ceresmeta"
 
@@ -85,14 +95,20 @@
 	EtcdLog     log.Config    `toml:"etcd-log" env:"ETCD_LOG"`
 	FlowLimiter LimiterConfig `toml:"flow-limiter" env:"FLOW_LIMITER"`
 
-	GrpcHandleTimeoutMs int64 `toml:"grpc-handle-timeout-ms" env:"GRPC_HANDLER_TIMEOUT_MS"`
-	EtcdStartTimeoutMs  int64 `toml:"etcd-start-timeout-ms" env:"ETCD_START_TIMEOUT_MS"`
-	EtcdCallTimeoutMs   int64 `toml:"etcd-call-timeout-ms" env:"ETCD_CALL_TIMEOUT_MS"`
-	EtcdMaxTxnOps       int64 `toml:"etcd-max-txn-ops" env:"ETCD_MAX_TXN_OPS"`
+	EnableEmbedEtcd    bool  `toml:"enable-embed-etcd" env:"ENABLE_EMBED_ETCD"`
+	EtcdStartTimeoutMs int64 `toml:"etcd-start-timeout-ms" env:"ETCD_START_TIMEOUT_MS"`
+	EtcdCallTimeoutMs  int64 `toml:"etcd-call-timeout-ms" env:"ETCD_CALL_TIMEOUT_MS"`
+	EtcdMaxTxnOps      int64 `toml:"etcd-max-txn-ops" env:"ETCD_MAX_TXN_OPS"`
+
+	GrpcHandleTimeoutMs                    int `toml:"grpc-handle-timeout-ms" env:"GRPC_HANDLER_TIMEOUT_MS"`
+	GrpcServiceMaxSendMsgSize              int `toml:"grpc-service-max-send-msg-size" env:"GRPC_SERVICE_MAX_SEND_MSG_SIZE"`
+	GrpcServiceMaxRecvMsgSize              int `toml:"grpc-service-max-recv-msg-size" env:"GRPC_SERVICE_MAX_RECV_MSG_SIZE"`
+	GrpcServiceKeepAlivePingMinIntervalSec int `toml:"grpc-service-keep-alive-ping-min-interval-sec" env:"GRPC_SERVICE_KEEP_ALIVE_PING_MIN_INTERVAL_SEC"`
 
 	LeaseTTLSec int64 `toml:"lease-sec" env:"LEASE_SEC"`
 
 	NodeName            string `toml:"node-name" env:"NODE_NAME"`
+	Addr                string `toml:"addr" env:"ADDR"`
 	DataDir             string `toml:"data-dir" env:"DATA_DIR"`
 	StorageRootPath     string `toml:"storage-root-path" env:"STORAGE_ROOT_PATH"`
 	InitialCluster      string `toml:"initial-cluster" env:"INITIAL_CLUSTER"`
@@ -136,7 +152,8 @@
 	AdvertiseClientUrls string `toml:"advertise-client-urls" env:"ADVERTISE_CLIENT_URLS"`
 	AdvertisePeerUrls   string `toml:"advertise-peer-urls" env:"ADVERTISE_PEER_URLS"`
 
-	HTTPPort int `toml:"default-http-port" env:"DEFAULT_HTTP_PORT"`
+	HTTPPort int `toml:"http-port" env:"HTTP_PORT"`
+	GrpcPort int `toml:"grpc-port" env:"GRPC_PORT"`
 }
 
 func (c *Config) GrpcHandleTimeout() time.Duration {
@@ -260,14 +277,20 @@
 			Enable: defaultEnableLimiter,
 		},
 
-		GrpcHandleTimeoutMs: defaultGrpcHandleTimeoutMs,
-		EtcdStartTimeoutMs:  defaultEtcdStartTimeoutMs,
-		EtcdCallTimeoutMs:   defaultCallTimeoutMs,
-		EtcdMaxTxnOps:       defaultMaxTxnOps,
+		EnableEmbedEtcd:    defaultEnableEmbedEtcd,
+		EtcdStartTimeoutMs: defaultEtcdStartTimeoutMs,
+		EtcdCallTimeoutMs:  defaultCallTimeoutMs,
+		EtcdMaxTxnOps:      defaultMaxTxnOps,
+
+		GrpcHandleTimeoutMs:                    defaultGrpcHandleTimeoutMs,
+		GrpcServiceMaxSendMsgSize:              defaultGrpcServiceMaxSendMsgSize,
+		GrpcServiceMaxRecvMsgSize:              defaultGrpcServiceMaxRecvMsgSize,
+		GrpcServiceKeepAlivePingMinIntervalSec: defaultGrpcServiceKeepAlivePingMinIntervalSec,
 
 		LeaseTTLSec: defaultEtcdLeaseTTLSec,
 
 		NodeName:        defaultNodeName,
+		Addr:            defaultEndpoint,
 		DataDir:         defaultDataDir,
 		StorageRootPath: defaultRootPath,
 
@@ -300,6 +323,7 @@
 		ProcedureExecutingBatchSize:     defaultProcedureExecutingBatchSize,
 
 		HTTPPort: defaultHTTPPort,
+		GrpcPort: defaultGrpcPort,
 	}
 
 	version := fs.Bool("version", false, "print version information")
diff --git a/server/etcdutil/get_leader.go b/server/etcdutil/get_leader.go
index 136ff18..8cd262b 100644
--- a/server/etcdutil/get_leader.go
+++ b/server/etcdutil/get_leader.go
@@ -7,13 +7,13 @@
 )
 
 type EtcdLeaderGetter interface {
-	EtcdLeaderID() uint64
+	EtcdLeaderID() (uint64, error)
 }
 
 type LeaderGetterWrapper struct {
 	Server *etcdserver.EtcdServer
 }
 
-func (w *LeaderGetterWrapper) EtcdLeaderID() uint64 {
-	return w.Server.Lead()
+func (w *LeaderGetterWrapper) EtcdLeaderID() (uint64, error) {
+	return w.Server.Lead(), nil
 }
diff --git a/server/member/member.go b/server/member/member.go
index 27a2a92..ba0c0e3 100644
--- a/server/member/member.go
+++ b/server/member/member.go
@@ -77,7 +77,7 @@
 		return nil, ErrInvalidLeaderValue.WithCause(err)
 	}
 
-	return &getLeaderResp{Leader: leader, Revision: leaderKv.ModRevision, IsLocal: leader.GetId() == m.ID}, nil
+	return &getLeaderResp{Leader: leader, Revision: leaderKv.ModRevision, IsLocal: leader.GetEndpoint() == m.Endpoint}, nil
 }
 
 // GetLeaderAddr gets the leader address of the cluster with memory cache.
@@ -91,7 +91,7 @@
 	}
 	return GetLeaderAddrResp{
 		LeaderEndpoint: m.leader.Endpoint,
-		IsLocal:        m.leader.GetId() == m.ID,
+		IsLocal:        m.leader.Endpoint == m.Endpoint,
 	}, nil
 }
 
@@ -148,7 +148,7 @@
 	}
 }
 
-func (m *Member) CampaignAndKeepLeader(ctx context.Context, leaseTTLSec int64, callbacks LeadershipEventCallbacks) error {
+func (m *Member) CampaignAndKeepLeader(ctx context.Context, leaseTTLSec int64, leadershipChecker LeadershipChecker, callbacks LeadershipEventCallbacks) error {
 	leaderVal, err := m.Marshal()
 	if err != nil {
 		return err
@@ -226,9 +226,9 @@
 				m.logger.Info("no longer a leader because lease has expired")
 				return nil
 			}
-			etcdLeader := m.etcdLeaderGetter.EtcdLeaderID()
-			if etcdLeader != m.ID {
-				m.logger.Info("etcd leader changed and should re-assign the leadership", zap.String("old-leader", m.Name), zap.Uint64("new-leader", etcdLeader))
+
+			if !leadershipChecker.ShouldCampaign(m) {
+				m.logger.Info("etcd leader changed and should re-assign the leadership", zap.String("old-leader", m.Name))
 				return nil
 			}
 		case <-ctx.Done():
diff --git a/server/member/watch_leader.go b/server/member/watch_leader.go
index 828174e..b6bc21d 100644
--- a/server/member/watch_leader.go
+++ b/server/member/watch_leader.go
@@ -6,6 +6,8 @@
 	"context"
 	"time"
 
+	"github.com/CeresDB/ceresdbproto/golang/pkg/metastoragepb"
+	"github.com/CeresDB/ceresmeta/pkg/assert"
 	"github.com/CeresDB/ceresmeta/pkg/log"
 	"github.com/CeresDB/ceresmeta/server/etcdutil"
 	"go.uber.org/zap"
@@ -25,10 +27,13 @@
 	ShouldStop() bool
 }
 
+// LeaderWatcher watches the changes of the CeresMeta cluster's leadership.
 type LeaderWatcher struct {
 	watchCtx    WatchContext
 	self        *Member
 	leaseTTLSec int64
+
+	leadershipChecker LeadershipChecker
 }
 
 type LeadershipEventCallbacks interface {
@@ -36,20 +41,64 @@
 	BeforeTransfer(ctx context.Context)
 }
 
-func NewLeaderWatcher(ctx WatchContext, self *Member, leaseTTLSec int64) *LeaderWatcher {
+// LeadershipChecker tells which member should campaign the CeresMeta cluster's leadership, and whether the current leader is valid.
+type LeadershipChecker interface {
+	ShouldCampaign(self *Member) bool
+	IsValidLeader(memLeader *metastoragepb.Member) bool
+}
+
+// embeddedEtcdLeadershipChecker ensures the CeresMeta cluster's leader as the embedded ETCD cluster's leader.
+type embeddedEtcdLeadershipChecker struct {
+	etcdLeaderGetter etcdutil.EtcdLeaderGetter
+}
+
+func (c embeddedEtcdLeadershipChecker) ShouldCampaign(self *Member) bool {
+	etcdLeaderID, err := c.etcdLeaderGetter.EtcdLeaderID()
+	assert.Assertf(err == nil, "EtcdLeaderID must exist")
+	return self.ID == etcdLeaderID
+}
+
+func (c embeddedEtcdLeadershipChecker) IsValidLeader(memLeader *metastoragepb.Member) bool {
+	etcdLeaderID, err := c.etcdLeaderGetter.EtcdLeaderID()
+	assert.Assertf(err == nil, "EtcdLeaderID must exist")
+	return memLeader.Id == etcdLeaderID
+}
+
+// externalEtcdLeadershipChecker has no preference over the leadership of the CeresMeta cluster, that is to say, the leadership is random.
+type externalEtcdLeadershipChecker struct{}
+
+func (c externalEtcdLeadershipChecker) ShouldCampaign(_ *Member) bool {
+	return true
+}
+
+func (c externalEtcdLeadershipChecker) IsValidLeader(_ *metastoragepb.Member) bool {
+	return true
+}
+
+func NewLeaderWatcher(ctx WatchContext, self *Member, leaseTTLSec int64, embedEtcd bool) *LeaderWatcher {
+	var leadershipChecker LeadershipChecker
+	if embedEtcd {
+		leadershipChecker = embeddedEtcdLeadershipChecker{
+			etcdLeaderGetter: ctx,
+		}
+	} else {
+		leadershipChecker = externalEtcdLeadershipChecker{}
+	}
+
 	return &LeaderWatcher{
 		ctx,
 		self,
 		leaseTTLSec,
+		leadershipChecker,
 	}
 }
 
-// Watch watches the leader changes:
-//  1. Check whether the leader is valid (same as the etcd leader) if leader exists.
+// watchWithCheckEtcdLeader watches the leader changes:
+//  1. Check whether the leader is valid if leader exists.
 //     - Leader is valid: wait for the leader changes.
 //     - Leader is not valid: reset the leader by the current leader.
 //  2. Campaign the leadership if leader does not exist.
-//     - Elect the etcd leader as the ceresmeta leader.
+//     - Campaign the leader if this member should.
 //     - The leader keeps the leadership lease alive.
 //     - The other members keeps waiting for the leader changes.
 //
@@ -78,20 +127,20 @@
 		}
 
 		// Check whether leader exists.
-		leaderResp, err := l.self.getLeader(ctx)
+		resp, err := l.self.getLeader(ctx)
 		if err != nil {
 			logger.Error("fail to get leader", zap.Error(err))
 			wait = waitReasonFailEtcd
 			continue
 		}
 
-		etcdLeaderID := l.watchCtx.EtcdLeaderID()
-		if leaderResp.Leader == nil {
+		memLeader := resp.Leader
+		if memLeader == nil {
 			// Leader does not exist.
 			// A new leader should be elected and the etcd leader should be elected as the new leader.
-			if l.self.ID == etcdLeaderID {
+			if l.leadershipChecker.ShouldCampaign(l.self) {
 				// Campaign the leader and block until leader changes.
-				if err := l.self.CampaignAndKeepLeader(ctx, l.leaseTTLSec, callbacks); err != nil {
+				if err := l.self.CampaignAndKeepLeader(ctx, l.leaseTTLSec, l.leadershipChecker, callbacks); err != nil {
 					logger.Error("fail to campaign and keep leader", zap.Error(err))
 					wait = waitReasonFailEtcd
 				} else {
@@ -104,22 +153,22 @@
 			wait = waitReasonElectLeader
 		} else {
 			// Cache leader in memory.
-			l.self.leader = leaderResp.Leader
-			log.Info("update leader cache", zap.String("endpoint", leaderResp.Leader.Endpoint))
+			l.self.leader = memLeader
+			log.Info("update leader cache", zap.String("endpoint", memLeader.Endpoint))
 
 			// Leader does exist.
 			// A new leader should be elected (the leader should be reset by the current leader itself) if the leader is
 			// not the etcd leader.
-			if etcdLeaderID == leaderResp.Leader.Id {
+			if l.leadershipChecker.IsValidLeader(memLeader) {
 				// watch the leader and block until leader changes.
-				l.self.WaitForLeaderChange(ctx, leaderResp.Revision)
+				l.self.WaitForLeaderChange(ctx, resp.Revision)
 				logger.Warn("leader changes and stop watching")
 				continue
 			}
 
-			// The leader is not etcd leader and this node is leader so reset it.
-			if leaderResp.Leader.Id == l.self.ID {
-				if err := l.self.ResetLeader(ctx); err != nil {
+			// This leader is not valid, reset it if this member will campaign this leadership.
+			if l.leadershipChecker.ShouldCampaign(l.self) {
+				if err = l.self.ResetLeader(ctx); err != nil {
 					logger.Error("fail to reset leader", zap.Error(err))
 					wait = waitReasonFailEtcd
 				}
diff --git a/server/member/watch_leader_test.go b/server/member/watch_leader_test.go
index 68339ed..6314294 100644
--- a/server/member/watch_leader_test.go
+++ b/server/member/watch_leader_test.go
@@ -23,8 +23,8 @@
 	return ctx.stopped
 }
 
-func (ctx *mockWatchCtx) EtcdLeaderID() uint64 {
-	return ctx.srv.Lead()
+func (ctx *mockWatchCtx) EtcdLeaderID() (uint64, error) {
+	return ctx.srv.Lead(), nil
 }
 
 func TestWatchLeaderSingle(t *testing.T) {
@@ -40,7 +40,7 @@
 	rpcTimeout := time.Duration(10) * time.Second
 	leaseTTLSec := int64(1)
 	mem := NewMember("", uint64(etcd.Server.ID()), "mem0", "", client, leaderGetter, rpcTimeout)
-	leaderWatcher := NewLeaderWatcher(watchCtx, mem, leaseTTLSec)
+	leaderWatcher := NewLeaderWatcher(watchCtx, mem, leaseTTLSec, true)
 
 	ctx, cancelWatch := context.WithCancel(context.Background())
 	watchedDone := make(chan struct{}, 1)
diff --git a/server/server.go b/server/server.go
index a180ef9..591cea4 100644
--- a/server/server.go
+++ b/server/server.go
@@ -5,6 +5,7 @@
 import (
 	"context"
 	"fmt"
+	"net"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -27,6 +28,7 @@
 	"go.etcd.io/etcd/server/v3/embed"
 	"go.uber.org/zap"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/keepalive"
 )
 
 type Server struct {
@@ -79,7 +81,22 @@
 
 // Run runs the services and background jobs.
 func (srv *Server) Run(ctx context.Context) error {
-	if err := srv.startEtcd(ctx); err != nil {
+	// If enableEmbedEtcd is true, the grpc server is started in the same process as the etcd server.
+	if srv.cfg.EnableEmbedEtcd {
+		if err := srv.startEmbedEtcd(ctx); err != nil {
+			srv.status.Set(status.Terminated)
+			return err
+		}
+	} else {
+		// If enableEmbedEtcd is false, the grpc server is started in a separate process.
+		go func() {
+			if err := srv.startGrpcServer(ctx); err != nil {
+				srv.status.Set(status.Terminated)
+				log.Fatal("Grpc serve failed", zap.Error(err))
+			}
+		}()
+	}
+	if err := srv.initEtcdClient(); err != nil {
 		srv.status.Set(status.Terminated)
 		return err
 	}
@@ -91,7 +108,6 @@
 
 	srv.startBgJobs(ctx)
 	srv.status.Set(status.StatusRunning)
-
 	return nil
 }
 
@@ -117,7 +133,7 @@
 	return atomic.LoadInt32(&srv.isClosed) == 1
 }
 
-func (srv *Server) startEtcd(ctx context.Context) error {
+func (srv *Server) startEmbedEtcd(ctx context.Context) error {
 	etcdSrv, err := embed.StartEtcd(srv.etcdCfg)
 	if err != nil {
 		return ErrStartEtcd.WithCause(err)
@@ -131,23 +147,53 @@
 	case <-newCtx.Done():
 		return ErrStartEtcdTimeout.WithCausef("timeout is:%v", srv.cfg.EtcdStartTimeout())
 	}
+	srv.etcdSrv = etcdSrv
 
-	endpoints := []string{srv.etcdCfg.ACUrls[0].String()}
+	return nil
+}
+
+func (srv *Server) initEtcdClient() error {
+	etcdEndpoints := make([]string, 0, len(srv.etcdCfg.ACUrls))
+	for _, url := range srv.etcdCfg.ACUrls {
+		etcdEndpoints = append(etcdEndpoints, url.String())
+	}
 	lgc := log.GetLoggerConfig()
 	client, err := clientv3.New(clientv3.Config{
-		Endpoints:   endpoints,
+		Endpoints:   etcdEndpoints,
 		DialTimeout: srv.cfg.EtcdCallTimeout(),
 		LogConfig:   lgc,
 	})
 	if err != nil {
 		return ErrCreateEtcdClient.WithCause(err)
 	}
-
 	srv.etcdCli = client
-	etcdLeaderGetter := &etcdutil.LeaderGetterWrapper{Server: etcdSrv.Server}
 
-	srv.member = member.NewMember("", uint64(etcdSrv.Server.ID()), srv.cfg.NodeName, endpoints[0], client, etcdLeaderGetter, srv.cfg.EtcdCallTimeout())
-	srv.etcdSrv = etcdSrv
+	endpoint := fmt.Sprintf("%s:%d", srv.cfg.Addr, srv.cfg.GrpcPort)
+	if srv.etcdSrv != nil {
+		etcdLeaderGetter := &etcdutil.LeaderGetterWrapper{Server: srv.etcdSrv.Server}
+		srv.member = member.NewMember(srv.cfg.StorageRootPath, uint64(srv.etcdSrv.Server.ID()), srv.cfg.NodeName, srv.etcdCfg.ACUrls[0].String(), client, etcdLeaderGetter, srv.cfg.EtcdCallTimeout())
+	} else {
+		srv.member = member.NewMember(srv.cfg.StorageRootPath, 0, srv.cfg.NodeName, endpoint, client, nil, srv.cfg.EtcdCallTimeout())
+	}
+	return nil
+}
+
+func (srv *Server) startGrpcServer(_ context.Context) error {
+	opts := srv.buildGrpcOptions()
+	server := grpc.NewServer(opts...)
+
+	grpcService := metagrpc.NewService(srv.cfg.GrpcHandleTimeout(), srv)
+	server.RegisterService(&metaservicepb.CeresmetaRpcService_ServiceDesc, grpcService)
+	addr := fmt.Sprintf(":%d", srv.cfg.GrpcPort)
+	lis, err := net.Listen("tcp", addr)
+	if err != nil {
+		return errors.Wrapf(err, "listen on %s failed", addr)
+	}
+
+	if err = server.Serve(lis); err != nil {
+		return errors.Wrap(err, "serve failed")
+	}
+
 	return nil
 }
 
@@ -210,7 +256,8 @@
 	watchCtx := &leaderWatchContext{
 		srv,
 	}
-	watcher := member.NewLeaderWatcher(watchCtx, srv.member, srv.cfg.LeaseTTLSec)
+	// If enable embed etcd, we should watch the leader of the etcd cluster.
+	watcher := member.NewLeaderWatcher(watchCtx, srv.member, srv.cfg.LeaseTTLSec, srv.cfg.EnableEmbedEtcd)
 
 	callbacks := &leadershipEventCallbacks{
 		srv: srv,
@@ -259,6 +306,19 @@
 	return nil
 }
 
+func (srv *Server) buildGrpcOptions() []grpc.ServerOption {
+	keepalivePolicy := keepalive.EnforcementPolicy{
+		MinTime:             time.Duration(srv.cfg.GrpcServiceKeepAlivePingMinIntervalSec) * time.Second,
+		PermitWithoutStream: true,
+	}
+	opts := []grpc.ServerOption{
+		grpc.MaxSendMsgSize(srv.cfg.GrpcServiceMaxSendMsgSize),
+		grpc.MaxRecvMsgSize(srv.cfg.GrpcServiceMaxSendMsgSize),
+		grpc.KeepaliveEnforcementPolicy(keepalivePolicy),
+	}
+	return opts
+}
+
 type leaderWatchContext struct {
 	srv *Server
 }
@@ -267,8 +327,11 @@
 	return ctx.srv.IsClosed()
 }
 
-func (ctx *leaderWatchContext) EtcdLeaderID() uint64 {
-	return ctx.srv.etcdSrv.Server.Lead()
+func (ctx *leaderWatchContext) EtcdLeaderID() (uint64, error) {
+	if ctx.srv.etcdSrv != nil {
+		return ctx.srv.etcdSrv.Server.Lead(), nil
+	}
+	return 0, errors.WithMessage(member.ErrGetLeader, "no leader found")
 }
 
 func (srv *Server) GetClusterManager() cluster.Manager {
diff --git a/server/service/grpc/forward.go b/server/service/grpc/forward.go
index 58780c8..153e944 100644
--- a/server/service/grpc/forward.go
+++ b/server/service/grpc/forward.go
@@ -21,7 +21,6 @@
 	}
 
 	if forwardedAddr != "" {
-		log.Info("try to create ceresmeta client", zap.String("addr", forwardedAddr))
 		ceresmetaClient, err := s.getCeresmetaClient(ctx, forwardedAddr)
 		if err != nil {
 			return nil, errors.WithMessagef(err, "get forwarded ceresmeta client, addr:%s", forwardedAddr)
@@ -42,6 +41,7 @@
 func (s *Service) getForwardedGrpcClient(ctx context.Context, forwardedAddr string) (*grpc.ClientConn, error) {
 	client, ok := s.conns.Load(forwardedAddr)
 	if !ok {
+		log.Info("try to create ceresmeta client", zap.String("addr", forwardedAddr))
 		cc, err := service.GetClientConn(ctx, forwardedAddr)
 		if err != nil {
 			return nil, err