blob: ce0672e5a03539dff3ca8664e32824bbd2ef8f35 [file] [log] [blame]
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package transferleader
import (
"context"
"encoding/json"
"sync"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/looplab/fsm"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// Fsm state change: Begin -> CloseOldLeader -> OpenNewLeader -> Finish.
// CloseOldLeader will send close shard request if the old leader node exists.
// OpenNewLeader will send open shard request to new leader node.
const (
eventCloseOldLeader = "EventCloseOldLeader"
eventOpenNewLeader = "EventOpenNewLeader"
eventFinish = "EventFinish"
stateBegin = "StateBegin"
stateCloseOldLeader = "StateCloseOldLeader"
stateOpenNewLeader = "StateOpenNewLeader"
stateFinish = "StateFinish"
)
var (
transferLeaderEvents = fsm.Events{
{Name: eventCloseOldLeader, Src: []string{stateBegin}, Dst: stateCloseOldLeader},
{Name: eventOpenNewLeader, Src: []string{stateCloseOldLeader}, Dst: stateOpenNewLeader},
{Name: eventFinish, Src: []string{stateOpenNewLeader}, Dst: stateFinish},
}
transferLeaderCallbacks = fsm.Callbacks{
eventCloseOldLeader: closeOldLeaderCallback,
eventOpenNewLeader: openNewShardCallback,
eventFinish: finishCallback,
}
)
type Procedure struct {
fsm *fsm.FSM
params ProcedureParams
relatedVersionInfo procedure.RelatedVersionInfo
// Protect the state.
lock sync.RWMutex
state procedure.State
}
// rawData used for storage, procedure will be converted to persist raw data before saved in storage.
type rawData struct {
ID uint64
FsmState string
State procedure.State
snapshot metadata.Snapshot
ShardID storage.ShardID
OldLeaderNodeName string
NewLeaderNodeName string
}
// callbackRequest is fsm callbacks param.
type callbackRequest struct {
ctx context.Context
p *Procedure
}
type ProcedureParams struct {
ID uint64
Dispatch eventdispatch.Dispatch
Storage procedure.Storage
ClusterSnapshot metadata.Snapshot
ShardID storage.ShardID
OldLeaderNodeName string
NewLeaderNodeName string
}
func NewProcedure(params ProcedureParams) (procedure.Procedure, error) {
if err := validateClusterTopology(params.ClusterSnapshot.Topology, params.ShardID, params.OldLeaderNodeName); err != nil {
return nil, err
}
relatedVersionInfo, err := buildRelatedVersionInfo(params)
if err != nil {
return nil, err
}
transferLeaderOperationFsm := fsm.NewFSM(
stateBegin,
transferLeaderEvents,
transferLeaderCallbacks,
)
return &Procedure{
fsm: transferLeaderOperationFsm,
relatedVersionInfo: relatedVersionInfo,
params: params,
state: procedure.StateInit,
}, nil
}
func buildRelatedVersionInfo(params ProcedureParams) (procedure.RelatedVersionInfo, error) {
shardViewWithVersion := make(map[storage.ShardID]uint64, 0)
shardView, exists := params.ClusterSnapshot.Topology.ShardViewsMapping[params.ShardID]
if !exists {
return procedure.RelatedVersionInfo{}, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in topology, shardID:%d", params.ShardID)
}
shardViewWithVersion[params.ShardID] = shardView.Version
relatedVersionInfo := procedure.RelatedVersionInfo{
ClusterID: params.ClusterSnapshot.Topology.ClusterView.ClusterID,
ShardWithVersion: shardViewWithVersion,
ClusterVersion: params.ClusterSnapshot.Topology.ClusterView.Version,
}
return relatedVersionInfo, nil
}
func validateClusterTopology(topology metadata.Topology, shardID storage.ShardID, oldLeaderNodeName string) error {
_, found := topology.ShardViewsMapping[shardID]
if !found {
log.Error("shard not found", zap.Uint64("shardID", uint64(shardID)))
return metadata.ErrShardNotFound
}
if len(oldLeaderNodeName) == 0 {
return nil
}
shardNodes := topology.ClusterView.ShardNodes
if len(shardNodes) == 0 {
log.Error("shard not exist in any node", zap.Uint32("shardID", uint32(shardID)))
return metadata.ErrShardNotFound
}
for _, shardNode := range shardNodes {
if shardNode.ID == shardID {
leaderNodeName := shardNode.NodeName
if leaderNodeName != oldLeaderNodeName {
log.Error("shard leader node not match", zap.String("requestOldLeaderNodeName", oldLeaderNodeName), zap.String("actualOldLeaderNodeName", leaderNodeName))
return metadata.ErrNodeNotFound
}
}
}
return nil
}
func (p *Procedure) ID() uint64 {
return p.params.ID
}
func (p *Procedure) Typ() procedure.Typ {
return procedure.TransferLeader
}
func (p *Procedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
return p.relatedVersionInfo
}
func (p *Procedure) Priority() procedure.Priority {
return procedure.PriorityHigh
}
func (p *Procedure) Start(ctx context.Context) error {
p.updateStateWithLock(procedure.StateRunning)
transferLeaderRequest := callbackRequest{
ctx: ctx,
p: p,
}
for {
switch p.fsm.Current() {
case stateBegin:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "transferLeader procedure persist")
}
if err := p.fsm.Event(eventCloseOldLeader, transferLeaderRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
return errors.WithMessage(err, "transferLeader procedure close old leader")
}
case stateCloseOldLeader:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "transferLeader procedure persist")
}
if err := p.fsm.Event(eventOpenNewLeader, transferLeaderRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
return errors.WithMessage(err, "transferLeader procedure open new leader")
}
case stateOpenNewLeader:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "transferLeader procedure persist")
}
if err := p.fsm.Event(eventFinish, transferLeaderRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
return errors.WithMessage(err, "transferLeader procedure finish")
}
case stateFinish:
// TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine.
p.updateStateWithLock(procedure.StateFinished)
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "transferLeader procedure persist")
}
return nil
}
}
}
func (p *Procedure) persist(ctx context.Context) error {
meta, err := p.convertToMeta()
if err != nil {
return errors.WithMessage(err, "convert to meta")
}
err = p.params.Storage.CreateOrUpdate(ctx, meta)
if err != nil {
return errors.WithMessage(err, "createOrUpdate procedure storage")
}
return nil
}
func (p *Procedure) Cancel(_ context.Context) error {
p.updateStateWithLock(procedure.StateCancelled)
return nil
}
func (p *Procedure) State() procedure.State {
p.lock.RLock()
defer p.lock.RUnlock()
return p.state
}
func closeOldLeaderCallback(event *fsm.Event) {
req, err := procedure.GetRequestFromEvent[callbackRequest](event)
if err != nil {
procedure.CancelEventWithLog(event, err, "get request from event")
return
}
ctx := req.ctx
if len(req.p.params.OldLeaderNodeName) == 0 {
return
}
log.Info("try to close shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("oldLeader", req.p.params.OldLeaderNodeName))
closeShardRequest := eventdispatch.CloseShardRequest{
ShardID: uint32(req.p.params.ShardID),
}
if err := req.p.params.Dispatch.CloseShard(ctx, req.p.params.OldLeaderNodeName, closeShardRequest); err != nil {
procedure.CancelEventWithLog(event, err, "close shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("oldLeaderName", req.p.params.OldLeaderNodeName))
return
}
log.Info("close shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("oldLeader", req.p.params.OldLeaderNodeName))
}
func openNewShardCallback(event *fsm.Event) {
req, err := procedure.GetRequestFromEvent[callbackRequest](event)
if err != nil {
procedure.CancelEventWithLog(event, err, "get request from event")
return
}
ctx := req.ctx
shardView, exists := req.p.params.ClusterSnapshot.Topology.ShardViewsMapping[req.p.params.ShardID]
if !exists {
procedure.CancelEventWithLog(event, metadata.ErrShardNotFound, "shard not found in topology", zap.Uint64("shardID", uint64(req.p.params.ShardID)))
return
}
preVersion := shardView.Version
openShardRequest := eventdispatch.OpenShardRequest{
Shard: metadata.ShardInfo{ID: req.p.params.ShardID, Role: storage.ShardRoleLeader, Version: preVersion + 1},
}
log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName))
if err := req.p.params.Dispatch.OpenShard(ctx, req.p.params.NewLeaderNodeName, openShardRequest); err != nil {
procedure.CancelEventWithLog(event, err, "open shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName))
return
}
log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName))
}
func finishCallback(event *fsm.Event) {
req, err := procedure.GetRequestFromEvent[callbackRequest](event)
if err != nil {
procedure.CancelEventWithLog(event, err, "get request from event")
return
}
log.Info("transfer leader finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("oldLeaderNode", req.p.params.OldLeaderNodeName), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName))
}
func (p *Procedure) updateStateWithLock(state procedure.State) {
p.lock.Lock()
defer p.lock.Unlock()
p.state = state
}
// TODO: Consider refactor meta procedure convertor function, encapsulate as a tool function.
func (p *Procedure) convertToMeta() (procedure.Meta, error) {
p.lock.RLock()
defer p.lock.RUnlock()
rawData := rawData{
ID: p.params.ID,
FsmState: p.fsm.Current(),
ShardID: p.params.ShardID,
snapshot: p.params.ClusterSnapshot,
OldLeaderNodeName: p.params.OldLeaderNodeName,
NewLeaderNodeName: p.params.NewLeaderNodeName,
State: p.state,
}
rawDataBytes, err := json.Marshal(rawData)
if err != nil {
return procedure.Meta{}, procedure.ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.params.ShardID, err)
}
meta := procedure.Meta{
ID: p.params.ID,
Typ: procedure.TransferLeader,
State: p.state,
RawData: rawDataBytes,
}
return meta, nil
}