feat: add split procedure (#111)

* feat: add split procedure

* refactor: refactor by cr

* refactor: refactor by cr
diff --git a/config.toml b/config.toml
index 08a8dfe..a9222b1 100644
--- a/config.toml
+++ b/config.toml
@@ -7,7 +7,7 @@
 data-dir = "/tmp/ceresmeta0/data"
 node-name = "meta0"
 initial-cluster = "meta0=http://127.0.0.1:2380"
-default-cluster-node-count = 1
+default-cluster-node-count = 2
 
 [etcd-log]
 file = "/tmp/ceresmeta0/etcd.log"
diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go
index b8a1138..95c7692 100644
--- a/server/cluster/cluster.go
+++ b/server/cluster/cluster.go
@@ -7,6 +7,7 @@
 	"fmt"
 	"path"
 	"sync"
+	"time"
 
 	"github.com/CeresDB/ceresmeta/pkg/log"
 	"github.com/CeresDB/ceresmeta/server/id"
@@ -98,6 +99,13 @@
 			Tables: tableInfos,
 		}
 	}
+
+	for _, shardID := range shardIDs {
+		_, exists := result[shardID]
+		if !exists {
+			result[shardID] = ShardTables{}
+		}
+	}
 	return result
 }
 
@@ -132,6 +140,27 @@
 	return ret, nil
 }
 
+func (c *Cluster) UpdateShardTables(ctx context.Context, shardTablesArr []ShardTables) error {
+	for _, shardTables := range shardTablesArr {
+		tableIDs := make([]storage.TableID, 0, len(shardTables.Tables))
+		for _, table := range shardTables.Tables {
+			tableIDs = append(tableIDs, table.ID)
+		}
+
+		_, err := c.topologyManager.UpdateShardView(ctx, storage.ShardView{
+			ShardID:   shardTables.Shard.ID,
+			Version:   shardTables.Shard.Version,
+			TableIDs:  tableIDs,
+			CreatedAt: uint64(time.Now().UnixMilli()),
+		})
+		if err != nil {
+			return errors.WithMessagef(err, "update shard tables")
+		}
+	}
+
+	return nil
+}
+
 // GetOrCreateSchema the second output parameter bool: returns true if the schema was newly created.
 func (c *Cluster) GetOrCreateSchema(ctx context.Context, schemaName string) (storage.Schema, bool, error) {
 	return c.tableManager.GetOrCreateSchema(ctx, schemaName)
@@ -297,6 +326,13 @@
 	}, nil
 }
 
+func (c *Cluster) GetClusterViewVersion() uint64 {
+	c.lock.RLock()
+	defer c.lock.RUnlock()
+
+	return c.topologyManager.GetVersion()
+}
+
 func (c *Cluster) GetClusterMinNodeCount() uint32 {
 	c.lock.RLock()
 	defer c.lock.RUnlock()
diff --git a/server/cluster/topology_manager.go b/server/cluster/topology_manager.go
index a02788f..4e0bee6 100644
--- a/server/cluster/topology_manager.go
+++ b/server/cluster/topology_manager.go
@@ -42,6 +42,8 @@
 	UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error
 	// CreateShardViews create shardViews.
 	CreateShardViews(ctx context.Context, shardViews []CreateShardView) error
+	// UpdateShardView update shardView.
+	UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error)
 }
 
 type ShardTableIDs struct {
@@ -238,6 +240,7 @@
 	// Update shardView in memory.
 	shardView.Version = prevVersion + 1
 	shardView.TableIDs = tableIDs
+	delete(m.tableShardMapping, tableID)
 
 	return ShardVersionUpdate{
 		ShardID:     shardID,
@@ -246,6 +249,31 @@
 	}, nil
 }
 
+func (m *TopologyManagerImpl) UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+
+	if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{
+		ClusterID:     m.clusterID,
+		ShardView:     shardView,
+		LatestVersion: shardView.Version - 1,
+	}); err != nil {
+		return ShardVersionUpdate{}, errors.WithMessage(err, "storage update shard view")
+	}
+
+	// Update shardView in memory.
+	m.shardTablesMapping[shardView.ShardID] = &shardView
+	for _, tableID := range shardView.TableIDs {
+		m.tableShardMapping[tableID] = shardView.ShardID
+	}
+
+	return ShardVersionUpdate{
+		ShardID:     shardView.ShardID,
+		CurrVersion: shardView.Version,
+		PrevVersion: shardView.Version - 1,
+	}, nil
+}
+
 func (m *TopologyManagerImpl) GetShardNodesByID(shardID storage.ShardID) ([]storage.ShardNode, error) {
 	m.lock.RLock()
 	defer m.lock.RUnlock()
diff --git a/server/cluster/types.go b/server/cluster/types.go
index 20e22c5..7831d4f 100644
--- a/server/cluster/types.go
+++ b/server/cluster/types.go
@@ -44,6 +44,10 @@
 	ShardVersionUpdate ShardVersionUpdate
 }
 
+type UpdateShardTablesResult struct {
+	ShardVersionUpdate ShardVersionUpdate
+}
+
 type ShardVersionUpdate struct {
 	ShardID     storage.ShardID
 	CurrVersion uint64
diff --git a/server/coordinator/procedure/common_test.go b/server/coordinator/procedure/common_test.go
index ec41e22..8a9ca42 100644
--- a/server/coordinator/procedure/common_test.go
+++ b/server/coordinator/procedure/common_test.go
@@ -16,7 +16,8 @@
 )
 
 const (
-	testTableName            = "testTable"
+	testTableName0           = "table0"
+	testTableName1           = "table1"
 	testSchemaName           = "testSchemaName"
 	nodeName0                = "node0"
 	nodeName1                = "node1"
diff --git a/server/coordinator/procedure/create_drop_table_test.go b/server/coordinator/procedure/create_drop_table_test.go
index a232507..a2a5959 100644
--- a/server/coordinator/procedure/create_drop_table_test.go
+++ b/server/coordinator/procedure/create_drop_table_test.go
@@ -21,12 +21,12 @@
 	testTableNum := 20
 	// Create table.
 	for i := 0; i < testTableNum; i++ {
-		tableName := fmt.Sprintf("%s_%d", testTableName, i)
+		tableName := fmt.Sprintf("%s_%d", testTableName0, i)
 		testCreateTable(t, dispatch, c, tableName)
 	}
 	// Check get table.
 	for i := 0; i < testTableNum; i++ {
-		tableName := fmt.Sprintf("%s_%d", testTableName, i)
+		tableName := fmt.Sprintf("%s_%d", testTableName0, i)
 		table, b, err := c.GetTable(testSchemaName, tableName)
 		re.NoError(err)
 		re.Equal(b, true)
@@ -47,12 +47,12 @@
 
 	// Drop table.
 	for i := 0; i < testTableNum; i++ {
-		tableName := fmt.Sprintf("%s_%d", testTableName, i)
+		tableName := fmt.Sprintf("%s_%d", testTableName0, i)
 		testDropTable(t, dispatch, c, tableName)
 	}
 	// Check table not exists.
 	for i := 0; i < testTableNum; i++ {
-		tableName := fmt.Sprintf("%s_%d", testTableName, i)
+		tableName := fmt.Sprintf("%s_%d", testTableName0, i)
 		_, b, err := c.GetTable(testSchemaName, tableName)
 		re.NoError(err)
 		re.Equal(b, false)
diff --git a/server/coordinator/procedure/drop_table.go b/server/coordinator/procedure/drop_table.go
index 51c9d6d..8c3442b 100644
--- a/server/coordinator/procedure/drop_table.go
+++ b/server/coordinator/procedure/drop_table.go
@@ -52,11 +52,6 @@
 		log.Warn("drop non-existing table", zap.String("schema", request.rawReq.GetSchemaName()), zap.String("table", request.rawReq.GetName()))
 		return
 	}
-	result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName())
-	if err != nil {
-		cancelEventWithLog(event, err, "cluster drop table")
-		return
-	}
 
 	shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID})
 	if err != nil {
@@ -64,6 +59,12 @@
 		return
 	}
 
+	result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName())
+	if err != nil {
+		cancelEventWithLog(event, err, "cluster drop table")
+		return
+	}
+
 	shardNodes, ok := shardNodesResult.ShardNodes[table.ID]
 	if !ok {
 		cancelEventWithLog(event, ErrShardLeaderNotFound, fmt.Sprintf("cluster get shard by table id, table:%v", table))
diff --git a/server/coordinator/procedure/factory.go b/server/coordinator/procedure/factory.go
index e7ccabc..6cd0939 100644
--- a/server/coordinator/procedure/factory.go
+++ b/server/coordinator/procedure/factory.go
@@ -48,13 +48,25 @@
 	ShardID           storage.ShardID
 	OldLeaderNodeName string
 	NewLeaderNodeName string
+	ClusterVersion    uint64
 }
 
-func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage) *Factory {
+type SplitRequest struct {
+	ClusterName    string
+	SchemaName     string
+	TableNames     []string
+	ShardID        storage.ShardID
+	NewShardID     storage.ShardID
+	TargetNodeName string
+	ClusterVersion uint64
+}
+
+func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage, manager cluster.Manager) *Factory {
 	return &Factory{
-		idAllocator: allocator,
-		dispatch:    dispatch,
-		storage:     storage,
+		idAllocator:    allocator,
+		dispatch:       dispatch,
+		storage:        storage,
+		clusterManager: manager,
 	}
 }
 
@@ -103,6 +115,22 @@
 		request.ShardID, request.OldLeaderNodeName, request.NewLeaderNodeName, id)
 }
 
+func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest) (Procedure, error) {
+	id, err := f.allocProcedureID(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	c, err := f.clusterManager.GetCluster(ctx, request.ClusterName)
+	if err != nil {
+		log.Error("cluster not found", zap.String("clusterName", request.ClusterName))
+		return nil, cluster.ErrClusterNotFound
+	}
+
+	procedure := NewSplitProcedure(id, f.dispatch, f.storage, c, request.SchemaName, request.ShardID, request.NewShardID, request.TableNames, request.TargetNodeName)
+	return procedure, nil
+}
+
 func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
 	id, err := f.idAllocator.Alloc(ctx)
 	if err != nil {
diff --git a/server/coordinator/procedure/scatter.go b/server/coordinator/procedure/scatter.go
index 93907cd..8ecc98e 100644
--- a/server/coordinator/procedure/scatter.go
+++ b/server/coordinator/procedure/scatter.go
@@ -126,6 +126,11 @@
 
 // Allocates shard ids across the registered nodes, and caller should ensure `minNodeCount <= len(allNodes)`.
 func allocNodeShards(shardTotal uint32, minNodeCount uint32, allNodes []cluster.RegisteredNode, shardIDs []storage.ShardID) ([]storage.ShardNode, error) {
+	// If the number of registered nodes exceeds the required number of nodes, intercept the first registered nodes.
+	if len(allNodes) > int(minNodeCount) {
+		allNodes = allNodes[:minNodeCount]
+	}
+
 	shards := make([]storage.ShardNode, 0, shardTotal)
 
 	perNodeShardCount := shardTotal / minNodeCount
diff --git a/server/coordinator/procedure/scatter_test.go b/server/coordinator/procedure/scatter_test.go
index e5e16fe..512405d 100644
--- a/server/coordinator/procedure/scatter_test.go
+++ b/server/coordinator/procedure/scatter_test.go
@@ -22,7 +22,9 @@
 	totalShardNum := c.GetTotalShardNum()
 	shardIDs := make([]storage.ShardID, 0, totalShardNum)
 	for i := uint32(0); i < totalShardNum; i++ {
-		shardIDs = append(shardIDs, storage.ShardID(i))
+		shardID, err := c.AllocShardID(ctx)
+		re.NoError(err)
+		shardIDs = append(shardIDs, storage.ShardID(shardID))
 	}
 	p := NewScatterProcedure(dispatch, c, 1, shardIDs)
 	go func() {
diff --git a/server/coordinator/procedure/split.go b/server/coordinator/procedure/split.go
new file mode 100644
index 0000000..644d691
--- /dev/null
+++ b/server/coordinator/procedure/split.go
@@ -0,0 +1,446 @@
+// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package procedure
+
+import (
+	"context"
+	"encoding/json"
+	"strings"
+	"sync"
+
+	"github.com/CeresDB/ceresmeta/pkg/log"
+	"github.com/CeresDB/ceresmeta/server/cluster"
+	"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
+	"github.com/CeresDB/ceresmeta/server/storage"
+	"github.com/looplab/fsm"
+	"github.com/pkg/errors"
+	"go.uber.org/zap"
+)
+
+const (
+	eventSplitCreateNewShardMetadata = "EventSplitCreateNewShardMetadata"
+	eventSplitCreateNewShardView     = "EventCreateNewShardView"
+	eventSplitUpdateShardTables      = "EventSplitUpdateShardTables"
+	eventSplitOpenNewShard           = "EventSplitOpenNewShard"
+	eventSplitFinish                 = "EventSplitFinish"
+
+	stateSplitBegin                  = "StateBegin"
+	stateSplitCreateNewShardMetadata = "StateSplitCreateNewShardMetadata"
+	stateSplitCreateNewShardView     = "StateSplitCreateNewShardView"
+	stateSplitUpdateShardTables      = "StateSplitUpdateShardTables"
+	stateSplitOpenNewShard           = "StateOpenNewShard"
+	stateSplitFinish                 = "StateFinish"
+)
+
+var (
+	splitEvents = fsm.Events{
+		{Name: eventSplitCreateNewShardMetadata, Src: []string{stateSplitBegin}, Dst: stateSplitCreateNewShardMetadata},
+		{Name: eventSplitCreateNewShardView, Src: []string{stateSplitCreateNewShardMetadata}, Dst: stateSplitCreateNewShardView},
+		{Name: eventSplitUpdateShardTables, Src: []string{stateSplitCreateNewShardView}, Dst: stateSplitUpdateShardTables},
+		{Name: eventSplitOpenNewShard, Src: []string{stateSplitUpdateShardTables}, Dst: stateSplitOpenNewShard},
+		{Name: eventSplitFinish, Src: []string{stateSplitOpenNewShard}, Dst: stateSplitFinish},
+	}
+	splitCallbacks = fsm.Callbacks{
+		eventSplitCreateNewShardMetadata: splitOpenNewShardMetadataCallback,
+		eventSplitCreateNewShardView:     splitCreateShardViewCallback,
+		eventSplitUpdateShardTables:      splitUpdateShardTablesCallback,
+		eventSplitOpenNewShard:           splitOpenShardCallback,
+		eventSplitFinish:                 splitFinishCallback,
+	}
+)
+
+// SplitProcedure fsm: Update ShardTable Metadata -> OpenNewShard -> CloseTable
+type SplitProcedure struct {
+	id uint64
+
+	fsm *fsm.FSM
+
+	cluster  *cluster.Cluster
+	dispatch eventdispatch.Dispatch
+	storage  Storage
+
+	shardID        storage.ShardID
+	newShardID     storage.ShardID
+	tableNames     []string
+	targetNodeName string
+	schemaName     string
+
+	// Protect the state.
+	lock  sync.RWMutex
+	state State
+}
+
+func NewSplitProcedure(id uint64, dispatch eventdispatch.Dispatch, storage Storage, c *cluster.Cluster, schemaName string, shardID storage.ShardID, newShardID storage.ShardID, tableNames []string, targetNodeName string) *SplitProcedure {
+	splitFsm := fsm.NewFSM(
+		stateSplitBegin,
+		splitEvents,
+		splitCallbacks,
+	)
+
+	return &SplitProcedure{
+		fsm:            splitFsm,
+		id:             id,
+		cluster:        c,
+		dispatch:       dispatch,
+		shardID:        shardID,
+		newShardID:     newShardID,
+		targetNodeName: targetNodeName,
+		tableNames:     tableNames,
+		schemaName:     schemaName,
+		storage:        storage,
+	}
+}
+
+type splitCallbackRequest struct {
+	ctx context.Context
+
+	cluster  *cluster.Cluster
+	dispatch eventdispatch.Dispatch
+
+	shardID        storage.ShardID
+	newShardID     storage.ShardID
+	schemaName     string
+	tableNames     []string
+	targetNodeName string
+}
+
+func (p *SplitProcedure) ID() uint64 {
+	return p.id
+}
+
+func (p *SplitProcedure) Typ() Typ {
+	return Split
+}
+
+func (p *SplitProcedure) Start(ctx context.Context) error {
+	p.updateStateWithLock(StateRunning)
+
+	splitCallbackRequest := splitCallbackRequest{
+		ctx:            ctx,
+		cluster:        p.cluster,
+		dispatch:       p.dispatch,
+		shardID:        p.shardID,
+		newShardID:     p.newShardID,
+		schemaName:     p.schemaName,
+		tableNames:     p.tableNames,
+		targetNodeName: p.targetNodeName,
+	}
+
+	for {
+		switch p.fsm.Current() {
+		case stateSplitBegin:
+			if err := p.persist(ctx); err != nil {
+				return errors.WithMessage(err, "split procedure persist")
+			}
+			if err := p.fsm.Event(eventSplitCreateNewShardMetadata, splitCallbackRequest); err != nil {
+				p.updateStateWithLock(StateFailed)
+				return errors.WithMessagef(err, "split procedure create new shard metadata")
+			}
+		case stateSplitCreateNewShardMetadata:
+			if err := p.persist(ctx); err != nil {
+				return errors.WithMessage(err, "split procedure persist")
+			}
+			if err := p.fsm.Event(eventSplitCreateNewShardView, splitCallbackRequest); err != nil {
+				p.updateStateWithLock(StateFailed)
+				return errors.WithMessagef(err, "split procedure create new shard view")
+			}
+		case stateSplitCreateNewShardView:
+			if err := p.persist(ctx); err != nil {
+				return errors.WithMessage(err, "split procedure persist")
+			}
+			if err := p.fsm.Event(eventSplitUpdateShardTables, splitCallbackRequest); err != nil {
+				p.updateStateWithLock(StateFailed)
+				return errors.WithMessagef(err, "split procedure create new shard")
+			}
+		case stateSplitUpdateShardTables:
+			if err := p.persist(ctx); err != nil {
+				return errors.WithMessage(err, "split procedure persist")
+			}
+			if err := p.fsm.Event(eventSplitOpenNewShard, splitCallbackRequest); err != nil {
+				p.updateStateWithLock(StateFailed)
+				return errors.WithMessagef(err, "split procedure create shard tables")
+			}
+		case stateSplitOpenNewShard:
+			if err := p.persist(ctx); err != nil {
+				return errors.WithMessage(err, "split procedure persist")
+			}
+			if err := p.fsm.Event(eventSplitFinish, splitCallbackRequest); err != nil {
+				p.updateStateWithLock(StateFailed)
+				return errors.WithMessagef(err, "split procedure delete shard tables")
+			}
+		case stateSplitFinish:
+			if err := p.persist(ctx); err != nil {
+				return errors.WithMessage(err, "split procedure persist")
+			}
+			p.updateStateWithLock(StateFinished)
+			return nil
+		}
+	}
+}
+
+func (p *SplitProcedure) Cancel(_ context.Context) error {
+	p.updateStateWithLock(StateCancelled)
+	return nil
+}
+
+func (p *SplitProcedure) State() State {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+	return p.state
+}
+
+func (p *SplitProcedure) updateStateWithLock(state State) {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	p.state = state
+}
+
+// splitOpenNewShardMetadataCallback create new shard and update metadata, table mapping will be updated in splitCloseTableCallback & splitOpenTableCallback callbacks.
+func splitOpenNewShardMetadataCallback(event *fsm.Event) {
+	request, err := getRequestFromEvent[splitCallbackRequest](event)
+	if err != nil {
+		cancelEventWithLog(event, err, "get request from event")
+		return
+	}
+	ctx := request.ctx
+
+	// Validate cluster state.
+	curState := request.cluster.GetClusterState()
+	if curState != storage.ClusterStateStable {
+		cancelEventWithLog(event, cluster.ErrClusterStateInvalid, "cluster state must be stable")
+		return
+	}
+
+	// Validate tables.
+	shardTables := request.cluster.GetShardTables([]storage.ShardID{request.shardID}, request.targetNodeName)[request.shardID]
+	var tableNames []string
+	for _, table := range shardTables.Tables {
+		if request.schemaName == table.SchemaName {
+			tableNames = append(tableNames, table.Name)
+		}
+	}
+
+	if !IsSubSlice(request.tableNames, tableNames) {
+		cancelEventWithLog(event, cluster.ErrTableNotFound, "split tables not found in shard", zap.String("requestTableNames", strings.Join(request.tableNames, ",")), zap.String("tableNames", strings.Join(tableNames, ",")))
+		return
+	}
+
+	shardNodes, err := request.cluster.GetShardNodesByShardID(request.shardID)
+	if err != nil {
+		cancelEventWithLog(event, err, "cluster get shardNode by id")
+		return
+	}
+
+	var leaderShardNode storage.ShardNode
+	found := false
+	for _, shardNode := range shardNodes {
+		if shardNode.ShardRole == storage.ShardRoleLeader {
+			leaderShardNode = shardNode
+			found = true
+		}
+	}
+	if !found {
+		cancelEventWithLog(event, ErrShardLeaderNotFound, "shard leader not found")
+		return
+	}
+
+	// Create a new shard on origin node.
+	getNodeShardResult, err := request.cluster.GetNodeShards(ctx)
+	if err != nil {
+		cancelEventWithLog(event, err, "get node shards failed")
+		return
+	}
+
+	var updateShardNodes []storage.ShardNode
+	for _, shardNodeWithVersion := range getNodeShardResult.NodeShards {
+		updateShardNodes = append(updateShardNodes, shardNodeWithVersion.ShardNode)
+	}
+	updateShardNodes = append(updateShardNodes, storage.ShardNode{
+		ID:        request.newShardID,
+		ShardRole: storage.ShardRoleLeader,
+		NodeName:  leaderShardNode.NodeName,
+	})
+
+	// Update cluster view metadata.
+	if err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, updateShardNodes); err != nil {
+		cancelEventWithLog(event, err, "update cluster view failed")
+		return
+	}
+}
+
+func splitCreateShardViewCallback(event *fsm.Event) {
+	request, err := getRequestFromEvent[splitCallbackRequest](event)
+	if err != nil {
+		cancelEventWithLog(event, err, "get request from event")
+		return
+	}
+	ctx := request.ctx
+
+	if err := request.cluster.CreateShardViews(ctx, []cluster.CreateShardView{{
+		ShardID: request.newShardID,
+		Tables:  []storage.TableID{},
+	}}); err != nil {
+		cancelEventWithLog(event, err, "create shard views")
+		return
+	}
+}
+
+func splitOpenShardCallback(event *fsm.Event) {
+	request, err := getRequestFromEvent[splitCallbackRequest](event)
+	if err != nil {
+		cancelEventWithLog(event, err, "get request from event")
+		return
+	}
+	ctx := request.ctx
+
+	// Send open new shard request to CSE.
+	if err := request.dispatch.OpenShard(ctx, request.targetNodeName, eventdispatch.OpenShardRequest{
+		Shard: cluster.ShardInfo{
+			ID:      request.newShardID,
+			Role:    storage.ShardRoleLeader,
+			Version: 0,
+		},
+	}); err != nil {
+		cancelEventWithLog(event, err, "open shard failed")
+		return
+	}
+}
+
+func splitUpdateShardTablesCallback(event *fsm.Event) {
+	request, err := getRequestFromEvent[splitCallbackRequest](event)
+	if err != nil {
+		cancelEventWithLog(event, err, "get request from event")
+		return
+	}
+	ctx := request.ctx
+
+	originShardTables := request.cluster.GetShardTables([]storage.ShardID{request.shardID}, request.targetNodeName)[request.shardID]
+
+	// Find remaining tables in old shard.
+	var remainingTables []cluster.TableInfo
+
+	for _, tableInfo := range originShardTables.Tables {
+		found := false
+		for _, tableName := range request.tableNames {
+			if tableInfo.Name == tableName && tableInfo.SchemaName == request.schemaName {
+				found = true
+				break
+			}
+		}
+		if !found {
+			remainingTables = append(remainingTables, tableInfo)
+		}
+	}
+
+	// Update shard tables.
+	originShardTables.Tables = remainingTables
+	originShardTables.Shard.Version++
+
+	getNodeShardsResult, err := request.cluster.GetNodeShards(ctx)
+	if err != nil {
+		cancelEventWithLog(event, err, "get node shards")
+		return
+	}
+
+	// Find new shard in metadata.
+	var newShardInfo cluster.ShardInfo
+	found := false
+	for _, shardNodeWithVersion := range getNodeShardsResult.NodeShards {
+		if shardNodeWithVersion.ShardInfo.ID == request.newShardID {
+			newShardInfo = shardNodeWithVersion.ShardInfo
+			found = true
+			break
+		}
+	}
+	if !found {
+		cancelEventWithLog(event, cluster.ErrShardNotFound, "new shard not found", zap.Uint32("shardID", uint32(request.newShardID)))
+		return
+	}
+	newShardInfo.Version++
+
+	// Find split tables in metadata.
+	var tables []cluster.TableInfo
+	for _, tableName := range request.tableNames {
+		table, exists, err := request.cluster.GetTable(request.schemaName, tableName)
+		if err != nil {
+			cancelEventWithLog(event, err, "get table", zap.String("schemaName", request.schemaName), zap.String("tableName", tableName))
+			return
+		}
+		if !exists {
+			cancelEventWithLog(event, cluster.ErrTableNotFound, "table not found", zap.String("schemaName", request.schemaName), zap.String("tableName", tableName))
+			return
+		}
+		tables = append(tables, cluster.TableInfo{
+			ID:         table.ID,
+			Name:       table.Name,
+			SchemaID:   table.SchemaID,
+			SchemaName: request.schemaName,
+		})
+	}
+	newShardTables := cluster.ShardTables{
+		Shard:  newShardInfo,
+		Tables: tables,
+	}
+
+	if err := request.cluster.UpdateShardTables(ctx, []cluster.ShardTables{originShardTables, newShardTables}); err != nil {
+		cancelEventWithLog(event, err, "update shard tables")
+		return
+	}
+}
+
+func splitFinishCallback(event *fsm.Event) {
+	request, err := getRequestFromEvent[splitCallbackRequest](event)
+	if err != nil {
+		cancelEventWithLog(event, err, "get request from event")
+		return
+	}
+	log.Info("split procedure finish", zap.Uint32("shardID", uint32(request.shardID)), zap.Uint32("newShardID", uint32(request.newShardID)))
+}
+
+func (p *SplitProcedure) persist(ctx context.Context) error {
+	meta, err := p.convertToMeta()
+	if err != nil {
+		return errors.WithMessage(err, "convert to meta")
+	}
+	err = p.storage.CreateOrUpdate(ctx, meta)
+	if err != nil {
+		return errors.WithMessage(err, "createOrUpdate procedure storage")
+	}
+	return nil
+}
+
+type SplitProcedurePersistRawData struct {
+	SchemaName     string
+	TableNames     []string
+	ShardID        uint32
+	NewShardID     uint32
+	TargetNodeName string
+}
+
+func (p *SplitProcedure) convertToMeta() (Meta, error) {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+
+	rawData := SplitProcedurePersistRawData{
+		SchemaName:     p.schemaName,
+		TableNames:     p.tableNames,
+		ShardID:        uint32(p.shardID),
+		NewShardID:     uint32(p.newShardID),
+		TargetNodeName: p.targetNodeName,
+	}
+	rawDataBytes, err := json.Marshal(rawData)
+	if err != nil {
+		return Meta{}, ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.shardID, err)
+	}
+
+	meta := Meta{
+		ID:    p.id,
+		Typ:   Split,
+		State: p.state,
+
+		RawData: rawDataBytes,
+	}
+
+	return meta, nil
+}
diff --git a/server/coordinator/procedure/split_test.go b/server/coordinator/procedure/split_test.go
new file mode 100644
index 0000000..ebff9ab
--- /dev/null
+++ b/server/coordinator/procedure/split_test.go
@@ -0,0 +1,75 @@
+// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package procedure
+
+import (
+	"context"
+	"testing"
+
+	"github.com/CeresDB/ceresmeta/server/cluster"
+	"github.com/CeresDB/ceresmeta/server/storage"
+	"github.com/stretchr/testify/require"
+)
+
+func TestSplit(t *testing.T) {
+	re := require.New(t)
+	ctx := context.Background()
+	dispatch := MockDispatch{}
+	c := prepare(t)
+	s := NewTestStorage(t)
+
+	getNodeShardsResult, err := c.GetNodeShards(ctx)
+	re.NoError(err)
+
+	// Randomly select a shardNode to split.
+	targetShardNode := getNodeShardsResult.NodeShards[0].ShardNode
+
+	// Create some tables in this shard.
+	_, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName0)
+	re.NoError(err)
+	_, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName1)
+	re.NoError(err)
+
+	// Split one table from this shard.
+	newShardID, err := c.AllocShardID(ctx)
+	re.NoError(err)
+	procedure := NewSplitProcedure(1, dispatch, s, c, testSchemaName, targetShardNode.ID, storage.ShardID(newShardID), []string{testTableName0}, targetShardNode.NodeName)
+	err = procedure.Start(ctx)
+	re.NoError(err)
+
+	// Validate split result:
+	// 1. Shards on node, split shard and new shard must be all exists on node.
+	// 2. Tables mapping of split shard and new shard must be all exists.
+	// 3. Tables in table mapping must be correct, the split table only exists on the new shard.
+	getNodeShardsResult, err = c.GetNodeShards(ctx)
+	re.NoError(err)
+
+	nodeShardsMapping := make(map[storage.ShardID]cluster.ShardNodeWithVersion, 0)
+	for _, nodeShard := range getNodeShardsResult.NodeShards {
+		nodeShardsMapping[nodeShard.ShardNode.ID] = nodeShard
+	}
+	splitNodeShard := nodeShardsMapping[targetShardNode.ID]
+	newNodeShard := nodeShardsMapping[storage.ShardID(newShardID)]
+	re.NotNil(splitNodeShard)
+	re.NotNil(newNodeShard)
+
+	shardTables := c.GetShardTables([]storage.ShardID{targetShardNode.ID, storage.ShardID(newShardID)}, targetShardNode.NodeName)
+	splitShardTables := shardTables[targetShardNode.ID]
+	newShardTables := shardTables[storage.ShardID(newShardID)]
+	re.NotNil(splitShardTables)
+	re.NotNil(newShardTables)
+
+	splitShardTablesMapping := make(map[string]cluster.TableInfo, 0)
+	for _, table := range splitShardTables.Tables {
+		splitShardTablesMapping[table.Name] = table
+	}
+	_, exists := splitShardTablesMapping[testTableName0]
+	re.False(exists)
+
+	newShardTablesMapping := make(map[string]cluster.TableInfo, 0)
+	for _, table := range newShardTables.Tables {
+		newShardTablesMapping[table.Name] = table
+	}
+	_, exists = newShardTablesMapping[testTableName0]
+	re.True(exists)
+}
diff --git a/server/coordinator/procedure/transfer_leader.go b/server/coordinator/procedure/transfer_leader.go
index 046cf72..e35b280 100644
--- a/server/coordinator/procedure/transfer_leader.go
+++ b/server/coordinator/procedure/transfer_leader.go
@@ -212,19 +212,23 @@
 		return
 	}
 
-	shardNodes, err := request.cluster.GetShardNodesByShardID(request.shardID)
+	getNodeShardResult, err := request.cluster.GetNodeShards(ctx)
 	if err != nil {
 		cancelEventWithLog(event, err, "get shardNodes by shardID failed")
 		return
 	}
 
 	found := false
+	shardNodes := make([]storage.ShardNode, 0, len(getNodeShardResult.NodeShards))
 	var leaderShardNode storage.ShardNode
-	for _, shardNode := range shardNodes {
-		if shardNode.ShardRole == storage.ShardRoleLeader {
-			found = true
-			leaderShardNode = shardNode
-			leaderShardNode.NodeName = request.newLeaderNodeName
+	for _, shardNodeWithVersion := range getNodeShardResult.NodeShards {
+		if shardNodeWithVersion.ShardNode.ShardRole == storage.ShardRoleLeader {
+			leaderShardNode = shardNodeWithVersion.ShardNode
+			if leaderShardNode.ID == request.shardID {
+				found = true
+				leaderShardNode.NodeName = request.newLeaderNodeName
+			}
+			shardNodes = append(shardNodes, leaderShardNode)
 		}
 	}
 	if !found {
@@ -232,7 +236,7 @@
 		return
 	}
 
-	err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, []storage.ShardNode{leaderShardNode})
+	err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, shardNodes)
 	if err != nil {
 		cancelEventWithLog(event, storage.ErrUpdateClusterViewConflict, "update cluster view")
 		return
diff --git a/server/coordinator/procedure/util.go b/server/coordinator/procedure/util.go
index 5a2d20e..ee866f5 100644
--- a/server/coordinator/procedure/util.go
+++ b/server/coordinator/procedure/util.go
@@ -32,3 +32,24 @@
 		return *new(T), ErrGetRequest.WithCausef("event arg type must be same as return type")
 	}
 }
+
+func IsContains(slice []string, target string) bool {
+	for _, a := range slice {
+		if a == target {
+			return true
+		}
+	}
+	return false
+}
+
+func IsSubSlice(subSlice []string, slice []string) bool {
+	if len(subSlice) > len(slice) {
+		return false
+	}
+	for _, val := range slice {
+		if !IsContains(slice, val) {
+			return false
+		}
+	}
+	return true
+}
diff --git a/server/coordinator/scheduler.go b/server/coordinator/scheduler.go
index b49ed1b..108e3c4 100644
--- a/server/coordinator/scheduler.go
+++ b/server/coordinator/scheduler.go
@@ -134,6 +134,7 @@
 
 		// 1. Shard exists in metadata and not exists in node, reopen lack shards on node.
 		if !exists {
+			log.Info("Shard exists in metadata and not exists in node, reopen lack shards on node.", zap.String("node", node), zap.Uint32("shardID", uint32(expectShard.ID)))
 			if err := s.dispatch.OpenShard(ctx, node, eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil {
 				return errors.WithMessagef(err, "reopen shard failed, shardInfo:%d", expectShard.ID)
 			}
@@ -146,6 +147,7 @@
 		}
 
 		// 3. Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node.
+		log.Info("Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node.", zap.String("node", node), zap.Uint32("shardID", uint32(expectShard.ID)))
 		if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(expectShard.ID)}); err != nil {
 			return errors.WithMessagef(err, "close shard failed, shardInfo:%d", expectShard.ID)
 		}
@@ -160,6 +162,7 @@
 		if ok {
 			continue
 		}
+		log.Info("Shard exists in node and not exists in metadata, close extra shard on node.", zap.String("node", node), zap.Uint32("shardID", uint32(realShard.ID)))
 		if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(realShard.ID)}); err != nil {
 			return errors.WithMessagef(err, "close shard failed, shardInfo:%d", realShard.ID)
 		}
diff --git a/server/server.go b/server/server.go
index e9ca843..c55a1e1 100644
--- a/server/server.go
+++ b/server/server.go
@@ -181,7 +181,7 @@
 	}
 	srv.procedureManager = procedureManager
 	dispatch := eventdispatch.NewDispatchImpl()
-	procedureFactory := procedure.NewFactory(id.NewAllocatorImpl(srv.etcdCli, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)
+	procedureFactory := procedure.NewFactory(id.NewAllocatorImpl(srv.etcdCli, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage, manager)
 	srv.procedureFactory = procedureFactory
 	srv.scheduler = coordinator.NewScheduler(manager, procedureManager, procedureFactory, dispatch)
 
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go
index 8a199d1..b85052c 100644
--- a/server/service/grpc/service.go
+++ b/server/service/grpc/service.go
@@ -4,6 +4,7 @@
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"time"
 
@@ -75,7 +76,7 @@
 		}, ShardInfos: shardInfos,
 	}
 
-	log.Info("registerNode", zap.String("name", req.Info.Endpoint), zap.String("info", req.Info.String()))
+	log.Info("registerNode", zap.String("name", req.Info.Endpoint), zap.String("info", fmt.Sprintf("%+v", registeredNode)))
 
 	err = s.h.GetClusterManager().RegisterNode(ctx, req.GetHeader().GetClusterName(), registeredNode)
 	if err != nil {
diff --git a/server/service/http/api.go b/server/service/http/api.go
index 8dc5257..9d51a78 100644
--- a/server/service/http/api.go
+++ b/server/service/http/api.go
@@ -47,6 +47,7 @@
 	// Register post API.
 	router.Post("/getShardTables", a.getShardTables)
 	router.Post("/transferLeader", a.transferLeader)
+	router.Post("/split", a.split)
 	router.Post("/route", a.route)
 	router.Post("/dropTable", a.dropTable)
 
@@ -233,3 +234,58 @@
 
 	a.respond(writer, nil)
 }
+
+type SplitRequest struct {
+	ClusterName string   `json:"clusterName"`
+	SchemaName  string   `json:"schemaName"`
+	ShardID     uint32   `json:"shardID"`
+	SplitTables []string `json:"splitTables"`
+	NodeName    string   `json:"nodeName"`
+}
+
+func (a *API) split(writer http.ResponseWriter, req *http.Request) {
+	var splitRequest SplitRequest
+	err := json.NewDecoder(req.Body).Decode(&splitRequest)
+	if err != nil {
+		log.Error("decode request body failed", zap.Error(err))
+		a.respondError(writer, ErrParseRequest, nil)
+		return
+	}
+	ctx := context.Background()
+
+	c, err := a.clusterManager.GetCluster(ctx, splitRequest.ClusterName)
+	if err != nil {
+		log.Error("cluster not found", zap.String("clusterName", splitRequest.ClusterName), zap.Error(err))
+		a.respondError(writer, cluster.ErrClusterNotFound, "cluster not found")
+		return
+	}
+
+	newShardID, err := c.AllocShardID(ctx)
+	if err != nil {
+		log.Error("alloc shard id failed")
+		a.respondError(writer, ErrAllocShardID, "alloc shard id failed")
+		return
+	}
+
+	splitProcedure, err := a.procedureFactory.CreateSplitProcedure(ctx, procedure.SplitRequest{
+		ClusterName:    splitRequest.ClusterName,
+		SchemaName:     splitRequest.SchemaName,
+		TableNames:     splitRequest.SplitTables,
+		ShardID:        storage.ShardID(splitRequest.ShardID),
+		NewShardID:     storage.ShardID(newShardID),
+		TargetNodeName: splitRequest.NodeName,
+	})
+	if err != nil {
+		log.Error("create split procedure", zap.Error(err))
+		a.respondError(writer, ErrCreateProcedure, "create split procedure")
+		return
+	}
+
+	if err := a.procedureManager.Submit(ctx, splitProcedure); err != nil {
+		log.Error("submit split procedure", zap.Error(err))
+		a.respondError(writer, ErrSubmitProcedure, "submit split procedure")
+		return
+	}
+
+	a.respond(writer, newShardID)
+}
diff --git a/server/service/http/error.go b/server/service/http/error.go
index 217eaa6..09cbbe1 100644
--- a/server/service/http/error.go
+++ b/server/service/http/error.go
@@ -11,4 +11,5 @@
 	ErrCreateProcedure = coderr.NewCodeError(coderr.Internal, "create procedure")
 	ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit procedure")
 	ErrGetCluster      = coderr.NewCodeError(coderr.Internal, "get cluster")
+	ErrAllocShardID    = coderr.NewCodeError(coderr.Internal, "alloc shard id")
 )