blob: 781edf2b6bb19ea360f40eeeb42d65970f70e48f [file] [log] [blame]
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package server
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/coderr"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/config"
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/limiter"
"github.com/CeresDB/ceresmeta/server/member"
metagrpc "github.com/CeresDB/ceresmeta/server/service/grpc"
"github.com/CeresDB/ceresmeta/server/service/http"
"github.com/CeresDB/ceresmeta/server/status"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type Server struct {
isClosed int32
status *status.ServerStatus
cfg *config.Config
etcdCfg *embed.Config
// The fields below are initialized after Run of server is called.
clusterManager cluster.Manager
flowLimiter *limiter.FlowLimiter
// member describes membership in ceresmeta cluster.
member *member.Member
etcdCli *clientv3.Client
etcdSrv *embed.Etcd
// httpService contains http server and api set.
httpService *http.Service
// bgJobWg can be used to join with the background jobs.
bgJobWg sync.WaitGroup
// bgJobCancel can be used to cancel all pending background jobs.
bgJobCancel func()
}
// CreateServer creates the server instance without starting any services or background jobs.
func CreateServer(cfg *config.Config) (*Server, error) {
etcdCfg, err := cfg.GenEtcdConfig()
if err != nil {
return nil, err
}
srv := &Server{
isClosed: 0,
status: status.NewServerStatus(),
cfg: cfg,
etcdCfg: etcdCfg,
}
grpcService := metagrpc.NewService(cfg.GrpcHandleTimeout(), srv)
etcdCfg.ServiceRegister = func(grpcSrv *grpc.Server) {
grpcSrv.RegisterService(&metaservicepb.CeresmetaRpcService_ServiceDesc, grpcService)
}
return srv, nil
}
// Run runs the services and background jobs.
func (srv *Server) Run(ctx context.Context) error {
if err := srv.startEtcd(ctx); err != nil {
srv.status.Set(status.Terminated)
return err
}
if err := srv.startServer(ctx); err != nil {
srv.status.Set(status.Terminated)
return err
}
srv.startBgJobs(ctx)
srv.status.Set(status.StatusRunning)
return nil
}
func (srv *Server) Close() {
atomic.StoreInt32(&srv.isClosed, 1)
srv.stopBgJobs()
if srv.etcdCli != nil {
err := srv.etcdCli.Close()
if err != nil {
log.Error("fail to close etcdCli", zap.Error(err))
}
}
err := srv.httpService.Stop()
if err != nil {
log.Error("fail to close http server", zap.Error(err))
}
}
func (srv *Server) IsClosed() bool {
return atomic.LoadInt32(&srv.isClosed) == 1
}
func (srv *Server) startEtcd(ctx context.Context) error {
etcdSrv, err := embed.StartEtcd(srv.etcdCfg)
if err != nil {
return ErrStartEtcd.WithCause(err)
}
newCtx, cancel := context.WithTimeout(ctx, srv.cfg.EtcdStartTimeout())
defer cancel()
select {
case <-etcdSrv.Server.ReadyNotify():
case <-newCtx.Done():
return ErrStartEtcdTimeout.WithCausef("timeout is:%v", srv.cfg.EtcdStartTimeout())
}
endpoints := []string{srv.etcdCfg.ACUrls[0].String()}
lgc := log.GetLoggerConfig()
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: srv.cfg.EtcdCallTimeout(),
LogConfig: lgc,
})
if err != nil {
return ErrCreateEtcdClient.WithCause(err)
}
srv.etcdCli = client
etcdLeaderGetter := &etcdutil.LeaderGetterWrapper{Server: etcdSrv.Server}
srv.member = member.NewMember("", uint64(etcdSrv.Server.ID()), srv.cfg.NodeName, endpoints[0], client, etcdLeaderGetter, srv.cfg.EtcdCallTimeout())
srv.etcdSrv = etcdSrv
return nil
}
// startServer starts involved services.
func (srv *Server) startServer(_ context.Context) error {
if srv.cfg.MaxScanLimit <= 1 {
return ErrStartServer.WithCausef("scan limit must be greater than 1")
}
storage := storage.NewStorageWithEtcdBackend(srv.etcdCli, srv.cfg.StorageRootPath, storage.Options{
MaxScanLimit: srv.cfg.MaxScanLimit, MinScanLimit: srv.cfg.MinScanLimit,
})
manager, err := cluster.NewManagerImpl(storage, srv.etcdCli, srv.etcdCli, srv.cfg.StorageRootPath, srv.cfg.IDAllocatorStep)
if err != nil {
return errors.WithMessage(err, "start server")
}
srv.clusterManager = manager
srv.flowLimiter = limiter.NewFlowLimiter(srv.cfg.FlowLimiter)
api := http.NewAPI(manager, srv.status, http.NewForwardClient(srv.member, srv.cfg.HTTPPort), srv.flowLimiter)
httpService := http.NewHTTPService(srv.cfg.HTTPPort, time.Second*10, time.Second*10, api.NewAPIRouter())
go func() {
err := httpService.Start()
if err != nil {
log.Error("start http service failed", zap.Error(err))
}
}()
srv.httpService = httpService
log.Info("server started")
return nil
}
func (srv *Server) startBgJobs(ctx context.Context) {
var bgJobCtx context.Context
bgJobCtx, srv.bgJobCancel = context.WithCancel(ctx)
go srv.watchLeader(bgJobCtx)
go srv.watchEtcdLeaderPriority(bgJobCtx)
}
func (srv *Server) stopBgJobs() {
srv.bgJobCancel()
srv.bgJobWg.Wait()
}
// watchLeader watches whether the leader of the cluster exists.
// Every node campaigns the leadership if it finds the leader is offline and the leader should keep the leadership after
// election. And Keep the leader node also be the leader of the etcd cluster during election.
func (srv *Server) watchLeader(ctx context.Context) {
srv.bgJobWg.Add(1)
defer srv.bgJobWg.Done()
watchCtx := &leaderWatchContext{
srv,
}
watcher := member.NewLeaderWatcher(watchCtx, srv.member, srv.cfg.LeaseTTLSec)
callbacks := &leadershipEventCallbacks{
srv: srv,
}
watcher.Watch(ctx, callbacks)
}
func (srv *Server) watchEtcdLeaderPriority(_ context.Context) {
srv.bgJobWg.Add(1)
defer srv.bgJobWg.Done()
}
func (srv *Server) createDefaultCluster(ctx context.Context) error {
resp, err := srv.member.GetLeaderAddr(ctx)
if err != nil {
log.Warn("get leader failed", zap.Error(err))
}
// Create default cluster by the leader.
if resp.IsLocal {
topologyType, err := metadata.ParseTopologyType(srv.cfg.TopologyType)
if err != nil {
return err
}
defaultCluster, err := srv.clusterManager.CreateCluster(ctx, srv.cfg.DefaultClusterName,
metadata.CreateClusterOpts{
NodeCount: uint32(srv.cfg.DefaultClusterNodeCount),
ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor),
ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal),
EnableSchedule: srv.cfg.EnableSchedule,
TopologyType: topologyType,
})
if err != nil {
log.Warn("create default cluster failed", zap.Error(err))
if coderr.Is(err, metadata.ErrClusterAlreadyExists.Code()) {
_, err = srv.clusterManager.GetCluster(ctx, srv.cfg.DefaultClusterName)
if err != nil {
return errors.WithMessage(err, "get default cluster failed")
}
}
} else {
log.Info("create default cluster succeed", zap.String("cluster", defaultCluster.GetMetadata().Name()))
}
}
return nil
}
type leaderWatchContext struct {
srv *Server
}
func (ctx *leaderWatchContext) ShouldStop() bool {
return ctx.srv.IsClosed()
}
func (ctx *leaderWatchContext) EtcdLeaderID() uint64 {
return ctx.srv.etcdSrv.Server.Lead()
}
func (srv *Server) GetClusterManager() cluster.Manager {
return srv.clusterManager
}
func (srv *Server) GetLeader(ctx context.Context) (member.GetLeaderAddrResp, error) {
// Get leader with cache.
return srv.member.GetLeaderAddr(ctx)
}
func (srv *Server) GetFlowLimiter() (*limiter.FlowLimiter, error) {
if srv.flowLimiter == nil {
return nil, ErrFlowLimiterNotFound
}
return srv.flowLimiter, nil
}
type leadershipEventCallbacks struct {
srv *Server
}
func (c *leadershipEventCallbacks) AfterElected(ctx context.Context) {
if err := c.srv.clusterManager.Start(ctx); err != nil {
panic(fmt.Sprintf("cluster manager fail to start, err:%v", err))
}
if err := c.srv.createDefaultCluster(ctx); err != nil {
panic(fmt.Sprintf("create default cluster failed, err:%v", err))
}
}
func (c *leadershipEventCallbacks) BeforeTransfer(ctx context.Context) {
if err := c.srv.clusterManager.Stop(ctx); err != nil {
panic(fmt.Sprintf("cluster manager fail to stop, err:%v", err))
}
}