feat: add split procedure (#111)
* feat: add split procedure
* refactor: refactor by cr
* refactor: refactor by cr
diff --git a/config.toml b/config.toml
index 08a8dfe..a9222b1 100644
--- a/config.toml
+++ b/config.toml
@@ -7,7 +7,7 @@
data-dir = "/tmp/ceresmeta0/data"
node-name = "meta0"
initial-cluster = "meta0=http://127.0.0.1:2380"
-default-cluster-node-count = 1
+default-cluster-node-count = 2
[etcd-log]
file = "/tmp/ceresmeta0/etcd.log"
diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go
index b8a1138..95c7692 100644
--- a/server/cluster/cluster.go
+++ b/server/cluster/cluster.go
@@ -7,6 +7,7 @@
"fmt"
"path"
"sync"
+ "time"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/id"
@@ -98,6 +99,13 @@
Tables: tableInfos,
}
}
+
+ for _, shardID := range shardIDs {
+ _, exists := result[shardID]
+ if !exists {
+ result[shardID] = ShardTables{}
+ }
+ }
return result
}
@@ -132,6 +140,27 @@
return ret, nil
}
+func (c *Cluster) UpdateShardTables(ctx context.Context, shardTablesArr []ShardTables) error {
+ for _, shardTables := range shardTablesArr {
+ tableIDs := make([]storage.TableID, 0, len(shardTables.Tables))
+ for _, table := range shardTables.Tables {
+ tableIDs = append(tableIDs, table.ID)
+ }
+
+ _, err := c.topologyManager.UpdateShardView(ctx, storage.ShardView{
+ ShardID: shardTables.Shard.ID,
+ Version: shardTables.Shard.Version,
+ TableIDs: tableIDs,
+ CreatedAt: uint64(time.Now().UnixMilli()),
+ })
+ if err != nil {
+ return errors.WithMessagef(err, "update shard tables")
+ }
+ }
+
+ return nil
+}
+
// GetOrCreateSchema the second output parameter bool: returns true if the schema was newly created.
func (c *Cluster) GetOrCreateSchema(ctx context.Context, schemaName string) (storage.Schema, bool, error) {
return c.tableManager.GetOrCreateSchema(ctx, schemaName)
@@ -297,6 +326,13 @@
}, nil
}
+func (c *Cluster) GetClusterViewVersion() uint64 {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ return c.topologyManager.GetVersion()
+}
+
func (c *Cluster) GetClusterMinNodeCount() uint32 {
c.lock.RLock()
defer c.lock.RUnlock()
diff --git a/server/cluster/topology_manager.go b/server/cluster/topology_manager.go
index a02788f..4e0bee6 100644
--- a/server/cluster/topology_manager.go
+++ b/server/cluster/topology_manager.go
@@ -42,6 +42,8 @@
UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error
// CreateShardViews create shardViews.
CreateShardViews(ctx context.Context, shardViews []CreateShardView) error
+ // UpdateShardView update shardView.
+ UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error)
}
type ShardTableIDs struct {
@@ -238,6 +240,7 @@
// Update shardView in memory.
shardView.Version = prevVersion + 1
shardView.TableIDs = tableIDs
+ delete(m.tableShardMapping, tableID)
return ShardVersionUpdate{
ShardID: shardID,
@@ -246,6 +249,31 @@
}, nil
}
+func (m *TopologyManagerImpl) UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{
+ ClusterID: m.clusterID,
+ ShardView: shardView,
+ LatestVersion: shardView.Version - 1,
+ }); err != nil {
+ return ShardVersionUpdate{}, errors.WithMessage(err, "storage update shard view")
+ }
+
+ // Update shardView in memory.
+ m.shardTablesMapping[shardView.ShardID] = &shardView
+ for _, tableID := range shardView.TableIDs {
+ m.tableShardMapping[tableID] = shardView.ShardID
+ }
+
+ return ShardVersionUpdate{
+ ShardID: shardView.ShardID,
+ CurrVersion: shardView.Version,
+ PrevVersion: shardView.Version - 1,
+ }, nil
+}
+
func (m *TopologyManagerImpl) GetShardNodesByID(shardID storage.ShardID) ([]storage.ShardNode, error) {
m.lock.RLock()
defer m.lock.RUnlock()
diff --git a/server/cluster/types.go b/server/cluster/types.go
index 20e22c5..7831d4f 100644
--- a/server/cluster/types.go
+++ b/server/cluster/types.go
@@ -44,6 +44,10 @@
ShardVersionUpdate ShardVersionUpdate
}
+type UpdateShardTablesResult struct {
+ ShardVersionUpdate ShardVersionUpdate
+}
+
type ShardVersionUpdate struct {
ShardID storage.ShardID
CurrVersion uint64
diff --git a/server/coordinator/procedure/common_test.go b/server/coordinator/procedure/common_test.go
index ec41e22..8a9ca42 100644
--- a/server/coordinator/procedure/common_test.go
+++ b/server/coordinator/procedure/common_test.go
@@ -16,7 +16,8 @@
)
const (
- testTableName = "testTable"
+ testTableName0 = "table0"
+ testTableName1 = "table1"
testSchemaName = "testSchemaName"
nodeName0 = "node0"
nodeName1 = "node1"
diff --git a/server/coordinator/procedure/create_drop_table_test.go b/server/coordinator/procedure/create_drop_table_test.go
index a232507..a2a5959 100644
--- a/server/coordinator/procedure/create_drop_table_test.go
+++ b/server/coordinator/procedure/create_drop_table_test.go
@@ -21,12 +21,12 @@
testTableNum := 20
// Create table.
for i := 0; i < testTableNum; i++ {
- tableName := fmt.Sprintf("%s_%d", testTableName, i)
+ tableName := fmt.Sprintf("%s_%d", testTableName0, i)
testCreateTable(t, dispatch, c, tableName)
}
// Check get table.
for i := 0; i < testTableNum; i++ {
- tableName := fmt.Sprintf("%s_%d", testTableName, i)
+ tableName := fmt.Sprintf("%s_%d", testTableName0, i)
table, b, err := c.GetTable(testSchemaName, tableName)
re.NoError(err)
re.Equal(b, true)
@@ -47,12 +47,12 @@
// Drop table.
for i := 0; i < testTableNum; i++ {
- tableName := fmt.Sprintf("%s_%d", testTableName, i)
+ tableName := fmt.Sprintf("%s_%d", testTableName0, i)
testDropTable(t, dispatch, c, tableName)
}
// Check table not exists.
for i := 0; i < testTableNum; i++ {
- tableName := fmt.Sprintf("%s_%d", testTableName, i)
+ tableName := fmt.Sprintf("%s_%d", testTableName0, i)
_, b, err := c.GetTable(testSchemaName, tableName)
re.NoError(err)
re.Equal(b, false)
diff --git a/server/coordinator/procedure/drop_table.go b/server/coordinator/procedure/drop_table.go
index 51c9d6d..8c3442b 100644
--- a/server/coordinator/procedure/drop_table.go
+++ b/server/coordinator/procedure/drop_table.go
@@ -52,11 +52,6 @@
log.Warn("drop non-existing table", zap.String("schema", request.rawReq.GetSchemaName()), zap.String("table", request.rawReq.GetName()))
return
}
- result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName())
- if err != nil {
- cancelEventWithLog(event, err, "cluster drop table")
- return
- }
shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID})
if err != nil {
@@ -64,6 +59,12 @@
return
}
+ result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName())
+ if err != nil {
+ cancelEventWithLog(event, err, "cluster drop table")
+ return
+ }
+
shardNodes, ok := shardNodesResult.ShardNodes[table.ID]
if !ok {
cancelEventWithLog(event, ErrShardLeaderNotFound, fmt.Sprintf("cluster get shard by table id, table:%v", table))
diff --git a/server/coordinator/procedure/factory.go b/server/coordinator/procedure/factory.go
index e7ccabc..6cd0939 100644
--- a/server/coordinator/procedure/factory.go
+++ b/server/coordinator/procedure/factory.go
@@ -48,13 +48,25 @@
ShardID storage.ShardID
OldLeaderNodeName string
NewLeaderNodeName string
+ ClusterVersion uint64
}
-func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage) *Factory {
+type SplitRequest struct {
+ ClusterName string
+ SchemaName string
+ TableNames []string
+ ShardID storage.ShardID
+ NewShardID storage.ShardID
+ TargetNodeName string
+ ClusterVersion uint64
+}
+
+func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage, manager cluster.Manager) *Factory {
return &Factory{
- idAllocator: allocator,
- dispatch: dispatch,
- storage: storage,
+ idAllocator: allocator,
+ dispatch: dispatch,
+ storage: storage,
+ clusterManager: manager,
}
}
@@ -103,6 +115,22 @@
request.ShardID, request.OldLeaderNodeName, request.NewLeaderNodeName, id)
}
+func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest) (Procedure, error) {
+ id, err := f.allocProcedureID(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ c, err := f.clusterManager.GetCluster(ctx, request.ClusterName)
+ if err != nil {
+ log.Error("cluster not found", zap.String("clusterName", request.ClusterName))
+ return nil, cluster.ErrClusterNotFound
+ }
+
+ procedure := NewSplitProcedure(id, f.dispatch, f.storage, c, request.SchemaName, request.ShardID, request.NewShardID, request.TableNames, request.TargetNodeName)
+ return procedure, nil
+}
+
func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
diff --git a/server/coordinator/procedure/scatter.go b/server/coordinator/procedure/scatter.go
index 93907cd..8ecc98e 100644
--- a/server/coordinator/procedure/scatter.go
+++ b/server/coordinator/procedure/scatter.go
@@ -126,6 +126,11 @@
// Allocates shard ids across the registered nodes, and caller should ensure `minNodeCount <= len(allNodes)`.
func allocNodeShards(shardTotal uint32, minNodeCount uint32, allNodes []cluster.RegisteredNode, shardIDs []storage.ShardID) ([]storage.ShardNode, error) {
+ // If the number of registered nodes exceeds the required number of nodes, intercept the first registered nodes.
+ if len(allNodes) > int(minNodeCount) {
+ allNodes = allNodes[:minNodeCount]
+ }
+
shards := make([]storage.ShardNode, 0, shardTotal)
perNodeShardCount := shardTotal / minNodeCount
diff --git a/server/coordinator/procedure/scatter_test.go b/server/coordinator/procedure/scatter_test.go
index e5e16fe..512405d 100644
--- a/server/coordinator/procedure/scatter_test.go
+++ b/server/coordinator/procedure/scatter_test.go
@@ -22,7 +22,9 @@
totalShardNum := c.GetTotalShardNum()
shardIDs := make([]storage.ShardID, 0, totalShardNum)
for i := uint32(0); i < totalShardNum; i++ {
- shardIDs = append(shardIDs, storage.ShardID(i))
+ shardID, err := c.AllocShardID(ctx)
+ re.NoError(err)
+ shardIDs = append(shardIDs, storage.ShardID(shardID))
}
p := NewScatterProcedure(dispatch, c, 1, shardIDs)
go func() {
diff --git a/server/coordinator/procedure/split.go b/server/coordinator/procedure/split.go
new file mode 100644
index 0000000..644d691
--- /dev/null
+++ b/server/coordinator/procedure/split.go
@@ -0,0 +1,446 @@
+// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package procedure
+
+import (
+ "context"
+ "encoding/json"
+ "strings"
+ "sync"
+
+ "github.com/CeresDB/ceresmeta/pkg/log"
+ "github.com/CeresDB/ceresmeta/server/cluster"
+ "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
+ "github.com/CeresDB/ceresmeta/server/storage"
+ "github.com/looplab/fsm"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+)
+
+const (
+ eventSplitCreateNewShardMetadata = "EventSplitCreateNewShardMetadata"
+ eventSplitCreateNewShardView = "EventCreateNewShardView"
+ eventSplitUpdateShardTables = "EventSplitUpdateShardTables"
+ eventSplitOpenNewShard = "EventSplitOpenNewShard"
+ eventSplitFinish = "EventSplitFinish"
+
+ stateSplitBegin = "StateBegin"
+ stateSplitCreateNewShardMetadata = "StateSplitCreateNewShardMetadata"
+ stateSplitCreateNewShardView = "StateSplitCreateNewShardView"
+ stateSplitUpdateShardTables = "StateSplitUpdateShardTables"
+ stateSplitOpenNewShard = "StateOpenNewShard"
+ stateSplitFinish = "StateFinish"
+)
+
+var (
+ splitEvents = fsm.Events{
+ {Name: eventSplitCreateNewShardMetadata, Src: []string{stateSplitBegin}, Dst: stateSplitCreateNewShardMetadata},
+ {Name: eventSplitCreateNewShardView, Src: []string{stateSplitCreateNewShardMetadata}, Dst: stateSplitCreateNewShardView},
+ {Name: eventSplitUpdateShardTables, Src: []string{stateSplitCreateNewShardView}, Dst: stateSplitUpdateShardTables},
+ {Name: eventSplitOpenNewShard, Src: []string{stateSplitUpdateShardTables}, Dst: stateSplitOpenNewShard},
+ {Name: eventSplitFinish, Src: []string{stateSplitOpenNewShard}, Dst: stateSplitFinish},
+ }
+ splitCallbacks = fsm.Callbacks{
+ eventSplitCreateNewShardMetadata: splitOpenNewShardMetadataCallback,
+ eventSplitCreateNewShardView: splitCreateShardViewCallback,
+ eventSplitUpdateShardTables: splitUpdateShardTablesCallback,
+ eventSplitOpenNewShard: splitOpenShardCallback,
+ eventSplitFinish: splitFinishCallback,
+ }
+)
+
+// SplitProcedure fsm: Update ShardTable Metadata -> OpenNewShard -> CloseTable
+type SplitProcedure struct {
+ id uint64
+
+ fsm *fsm.FSM
+
+ cluster *cluster.Cluster
+ dispatch eventdispatch.Dispatch
+ storage Storage
+
+ shardID storage.ShardID
+ newShardID storage.ShardID
+ tableNames []string
+ targetNodeName string
+ schemaName string
+
+ // Protect the state.
+ lock sync.RWMutex
+ state State
+}
+
+func NewSplitProcedure(id uint64, dispatch eventdispatch.Dispatch, storage Storage, c *cluster.Cluster, schemaName string, shardID storage.ShardID, newShardID storage.ShardID, tableNames []string, targetNodeName string) *SplitProcedure {
+ splitFsm := fsm.NewFSM(
+ stateSplitBegin,
+ splitEvents,
+ splitCallbacks,
+ )
+
+ return &SplitProcedure{
+ fsm: splitFsm,
+ id: id,
+ cluster: c,
+ dispatch: dispatch,
+ shardID: shardID,
+ newShardID: newShardID,
+ targetNodeName: targetNodeName,
+ tableNames: tableNames,
+ schemaName: schemaName,
+ storage: storage,
+ }
+}
+
+type splitCallbackRequest struct {
+ ctx context.Context
+
+ cluster *cluster.Cluster
+ dispatch eventdispatch.Dispatch
+
+ shardID storage.ShardID
+ newShardID storage.ShardID
+ schemaName string
+ tableNames []string
+ targetNodeName string
+}
+
+func (p *SplitProcedure) ID() uint64 {
+ return p.id
+}
+
+func (p *SplitProcedure) Typ() Typ {
+ return Split
+}
+
+func (p *SplitProcedure) Start(ctx context.Context) error {
+ p.updateStateWithLock(StateRunning)
+
+ splitCallbackRequest := splitCallbackRequest{
+ ctx: ctx,
+ cluster: p.cluster,
+ dispatch: p.dispatch,
+ shardID: p.shardID,
+ newShardID: p.newShardID,
+ schemaName: p.schemaName,
+ tableNames: p.tableNames,
+ targetNodeName: p.targetNodeName,
+ }
+
+ for {
+ switch p.fsm.Current() {
+ case stateSplitBegin:
+ if err := p.persist(ctx); err != nil {
+ return errors.WithMessage(err, "split procedure persist")
+ }
+ if err := p.fsm.Event(eventSplitCreateNewShardMetadata, splitCallbackRequest); err != nil {
+ p.updateStateWithLock(StateFailed)
+ return errors.WithMessagef(err, "split procedure create new shard metadata")
+ }
+ case stateSplitCreateNewShardMetadata:
+ if err := p.persist(ctx); err != nil {
+ return errors.WithMessage(err, "split procedure persist")
+ }
+ if err := p.fsm.Event(eventSplitCreateNewShardView, splitCallbackRequest); err != nil {
+ p.updateStateWithLock(StateFailed)
+ return errors.WithMessagef(err, "split procedure create new shard view")
+ }
+ case stateSplitCreateNewShardView:
+ if err := p.persist(ctx); err != nil {
+ return errors.WithMessage(err, "split procedure persist")
+ }
+ if err := p.fsm.Event(eventSplitUpdateShardTables, splitCallbackRequest); err != nil {
+ p.updateStateWithLock(StateFailed)
+ return errors.WithMessagef(err, "split procedure create new shard")
+ }
+ case stateSplitUpdateShardTables:
+ if err := p.persist(ctx); err != nil {
+ return errors.WithMessage(err, "split procedure persist")
+ }
+ if err := p.fsm.Event(eventSplitOpenNewShard, splitCallbackRequest); err != nil {
+ p.updateStateWithLock(StateFailed)
+ return errors.WithMessagef(err, "split procedure create shard tables")
+ }
+ case stateSplitOpenNewShard:
+ if err := p.persist(ctx); err != nil {
+ return errors.WithMessage(err, "split procedure persist")
+ }
+ if err := p.fsm.Event(eventSplitFinish, splitCallbackRequest); err != nil {
+ p.updateStateWithLock(StateFailed)
+ return errors.WithMessagef(err, "split procedure delete shard tables")
+ }
+ case stateSplitFinish:
+ if err := p.persist(ctx); err != nil {
+ return errors.WithMessage(err, "split procedure persist")
+ }
+ p.updateStateWithLock(StateFinished)
+ return nil
+ }
+ }
+}
+
+func (p *SplitProcedure) Cancel(_ context.Context) error {
+ p.updateStateWithLock(StateCancelled)
+ return nil
+}
+
+func (p *SplitProcedure) State() State {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.state
+}
+
+func (p *SplitProcedure) updateStateWithLock(state State) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.state = state
+}
+
+// splitOpenNewShardMetadataCallback create new shard and update metadata, table mapping will be updated in splitCloseTableCallback & splitOpenTableCallback callbacks.
+func splitOpenNewShardMetadataCallback(event *fsm.Event) {
+ request, err := getRequestFromEvent[splitCallbackRequest](event)
+ if err != nil {
+ cancelEventWithLog(event, err, "get request from event")
+ return
+ }
+ ctx := request.ctx
+
+ // Validate cluster state.
+ curState := request.cluster.GetClusterState()
+ if curState != storage.ClusterStateStable {
+ cancelEventWithLog(event, cluster.ErrClusterStateInvalid, "cluster state must be stable")
+ return
+ }
+
+ // Validate tables.
+ shardTables := request.cluster.GetShardTables([]storage.ShardID{request.shardID}, request.targetNodeName)[request.shardID]
+ var tableNames []string
+ for _, table := range shardTables.Tables {
+ if request.schemaName == table.SchemaName {
+ tableNames = append(tableNames, table.Name)
+ }
+ }
+
+ if !IsSubSlice(request.tableNames, tableNames) {
+ cancelEventWithLog(event, cluster.ErrTableNotFound, "split tables not found in shard", zap.String("requestTableNames", strings.Join(request.tableNames, ",")), zap.String("tableNames", strings.Join(tableNames, ",")))
+ return
+ }
+
+ shardNodes, err := request.cluster.GetShardNodesByShardID(request.shardID)
+ if err != nil {
+ cancelEventWithLog(event, err, "cluster get shardNode by id")
+ return
+ }
+
+ var leaderShardNode storage.ShardNode
+ found := false
+ for _, shardNode := range shardNodes {
+ if shardNode.ShardRole == storage.ShardRoleLeader {
+ leaderShardNode = shardNode
+ found = true
+ }
+ }
+ if !found {
+ cancelEventWithLog(event, ErrShardLeaderNotFound, "shard leader not found")
+ return
+ }
+
+ // Create a new shard on origin node.
+ getNodeShardResult, err := request.cluster.GetNodeShards(ctx)
+ if err != nil {
+ cancelEventWithLog(event, err, "get node shards failed")
+ return
+ }
+
+ var updateShardNodes []storage.ShardNode
+ for _, shardNodeWithVersion := range getNodeShardResult.NodeShards {
+ updateShardNodes = append(updateShardNodes, shardNodeWithVersion.ShardNode)
+ }
+ updateShardNodes = append(updateShardNodes, storage.ShardNode{
+ ID: request.newShardID,
+ ShardRole: storage.ShardRoleLeader,
+ NodeName: leaderShardNode.NodeName,
+ })
+
+ // Update cluster view metadata.
+ if err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, updateShardNodes); err != nil {
+ cancelEventWithLog(event, err, "update cluster view failed")
+ return
+ }
+}
+
+func splitCreateShardViewCallback(event *fsm.Event) {
+ request, err := getRequestFromEvent[splitCallbackRequest](event)
+ if err != nil {
+ cancelEventWithLog(event, err, "get request from event")
+ return
+ }
+ ctx := request.ctx
+
+ if err := request.cluster.CreateShardViews(ctx, []cluster.CreateShardView{{
+ ShardID: request.newShardID,
+ Tables: []storage.TableID{},
+ }}); err != nil {
+ cancelEventWithLog(event, err, "create shard views")
+ return
+ }
+}
+
+func splitOpenShardCallback(event *fsm.Event) {
+ request, err := getRequestFromEvent[splitCallbackRequest](event)
+ if err != nil {
+ cancelEventWithLog(event, err, "get request from event")
+ return
+ }
+ ctx := request.ctx
+
+ // Send open new shard request to CSE.
+ if err := request.dispatch.OpenShard(ctx, request.targetNodeName, eventdispatch.OpenShardRequest{
+ Shard: cluster.ShardInfo{
+ ID: request.newShardID,
+ Role: storage.ShardRoleLeader,
+ Version: 0,
+ },
+ }); err != nil {
+ cancelEventWithLog(event, err, "open shard failed")
+ return
+ }
+}
+
+func splitUpdateShardTablesCallback(event *fsm.Event) {
+ request, err := getRequestFromEvent[splitCallbackRequest](event)
+ if err != nil {
+ cancelEventWithLog(event, err, "get request from event")
+ return
+ }
+ ctx := request.ctx
+
+ originShardTables := request.cluster.GetShardTables([]storage.ShardID{request.shardID}, request.targetNodeName)[request.shardID]
+
+ // Find remaining tables in old shard.
+ var remainingTables []cluster.TableInfo
+
+ for _, tableInfo := range originShardTables.Tables {
+ found := false
+ for _, tableName := range request.tableNames {
+ if tableInfo.Name == tableName && tableInfo.SchemaName == request.schemaName {
+ found = true
+ break
+ }
+ }
+ if !found {
+ remainingTables = append(remainingTables, tableInfo)
+ }
+ }
+
+ // Update shard tables.
+ originShardTables.Tables = remainingTables
+ originShardTables.Shard.Version++
+
+ getNodeShardsResult, err := request.cluster.GetNodeShards(ctx)
+ if err != nil {
+ cancelEventWithLog(event, err, "get node shards")
+ return
+ }
+
+ // Find new shard in metadata.
+ var newShardInfo cluster.ShardInfo
+ found := false
+ for _, shardNodeWithVersion := range getNodeShardsResult.NodeShards {
+ if shardNodeWithVersion.ShardInfo.ID == request.newShardID {
+ newShardInfo = shardNodeWithVersion.ShardInfo
+ found = true
+ break
+ }
+ }
+ if !found {
+ cancelEventWithLog(event, cluster.ErrShardNotFound, "new shard not found", zap.Uint32("shardID", uint32(request.newShardID)))
+ return
+ }
+ newShardInfo.Version++
+
+ // Find split tables in metadata.
+ var tables []cluster.TableInfo
+ for _, tableName := range request.tableNames {
+ table, exists, err := request.cluster.GetTable(request.schemaName, tableName)
+ if err != nil {
+ cancelEventWithLog(event, err, "get table", zap.String("schemaName", request.schemaName), zap.String("tableName", tableName))
+ return
+ }
+ if !exists {
+ cancelEventWithLog(event, cluster.ErrTableNotFound, "table not found", zap.String("schemaName", request.schemaName), zap.String("tableName", tableName))
+ return
+ }
+ tables = append(tables, cluster.TableInfo{
+ ID: table.ID,
+ Name: table.Name,
+ SchemaID: table.SchemaID,
+ SchemaName: request.schemaName,
+ })
+ }
+ newShardTables := cluster.ShardTables{
+ Shard: newShardInfo,
+ Tables: tables,
+ }
+
+ if err := request.cluster.UpdateShardTables(ctx, []cluster.ShardTables{originShardTables, newShardTables}); err != nil {
+ cancelEventWithLog(event, err, "update shard tables")
+ return
+ }
+}
+
+func splitFinishCallback(event *fsm.Event) {
+ request, err := getRequestFromEvent[splitCallbackRequest](event)
+ if err != nil {
+ cancelEventWithLog(event, err, "get request from event")
+ return
+ }
+ log.Info("split procedure finish", zap.Uint32("shardID", uint32(request.shardID)), zap.Uint32("newShardID", uint32(request.newShardID)))
+}
+
+func (p *SplitProcedure) persist(ctx context.Context) error {
+ meta, err := p.convertToMeta()
+ if err != nil {
+ return errors.WithMessage(err, "convert to meta")
+ }
+ err = p.storage.CreateOrUpdate(ctx, meta)
+ if err != nil {
+ return errors.WithMessage(err, "createOrUpdate procedure storage")
+ }
+ return nil
+}
+
+type SplitProcedurePersistRawData struct {
+ SchemaName string
+ TableNames []string
+ ShardID uint32
+ NewShardID uint32
+ TargetNodeName string
+}
+
+func (p *SplitProcedure) convertToMeta() (Meta, error) {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ rawData := SplitProcedurePersistRawData{
+ SchemaName: p.schemaName,
+ TableNames: p.tableNames,
+ ShardID: uint32(p.shardID),
+ NewShardID: uint32(p.newShardID),
+ TargetNodeName: p.targetNodeName,
+ }
+ rawDataBytes, err := json.Marshal(rawData)
+ if err != nil {
+ return Meta{}, ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.shardID, err)
+ }
+
+ meta := Meta{
+ ID: p.id,
+ Typ: Split,
+ State: p.state,
+
+ RawData: rawDataBytes,
+ }
+
+ return meta, nil
+}
diff --git a/server/coordinator/procedure/split_test.go b/server/coordinator/procedure/split_test.go
new file mode 100644
index 0000000..ebff9ab
--- /dev/null
+++ b/server/coordinator/procedure/split_test.go
@@ -0,0 +1,75 @@
+// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package procedure
+
+import (
+ "context"
+ "testing"
+
+ "github.com/CeresDB/ceresmeta/server/cluster"
+ "github.com/CeresDB/ceresmeta/server/storage"
+ "github.com/stretchr/testify/require"
+)
+
+func TestSplit(t *testing.T) {
+ re := require.New(t)
+ ctx := context.Background()
+ dispatch := MockDispatch{}
+ c := prepare(t)
+ s := NewTestStorage(t)
+
+ getNodeShardsResult, err := c.GetNodeShards(ctx)
+ re.NoError(err)
+
+ // Randomly select a shardNode to split.
+ targetShardNode := getNodeShardsResult.NodeShards[0].ShardNode
+
+ // Create some tables in this shard.
+ _, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName0)
+ re.NoError(err)
+ _, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName1)
+ re.NoError(err)
+
+ // Split one table from this shard.
+ newShardID, err := c.AllocShardID(ctx)
+ re.NoError(err)
+ procedure := NewSplitProcedure(1, dispatch, s, c, testSchemaName, targetShardNode.ID, storage.ShardID(newShardID), []string{testTableName0}, targetShardNode.NodeName)
+ err = procedure.Start(ctx)
+ re.NoError(err)
+
+ // Validate split result:
+ // 1. Shards on node, split shard and new shard must be all exists on node.
+ // 2. Tables mapping of split shard and new shard must be all exists.
+ // 3. Tables in table mapping must be correct, the split table only exists on the new shard.
+ getNodeShardsResult, err = c.GetNodeShards(ctx)
+ re.NoError(err)
+
+ nodeShardsMapping := make(map[storage.ShardID]cluster.ShardNodeWithVersion, 0)
+ for _, nodeShard := range getNodeShardsResult.NodeShards {
+ nodeShardsMapping[nodeShard.ShardNode.ID] = nodeShard
+ }
+ splitNodeShard := nodeShardsMapping[targetShardNode.ID]
+ newNodeShard := nodeShardsMapping[storage.ShardID(newShardID)]
+ re.NotNil(splitNodeShard)
+ re.NotNil(newNodeShard)
+
+ shardTables := c.GetShardTables([]storage.ShardID{targetShardNode.ID, storage.ShardID(newShardID)}, targetShardNode.NodeName)
+ splitShardTables := shardTables[targetShardNode.ID]
+ newShardTables := shardTables[storage.ShardID(newShardID)]
+ re.NotNil(splitShardTables)
+ re.NotNil(newShardTables)
+
+ splitShardTablesMapping := make(map[string]cluster.TableInfo, 0)
+ for _, table := range splitShardTables.Tables {
+ splitShardTablesMapping[table.Name] = table
+ }
+ _, exists := splitShardTablesMapping[testTableName0]
+ re.False(exists)
+
+ newShardTablesMapping := make(map[string]cluster.TableInfo, 0)
+ for _, table := range newShardTables.Tables {
+ newShardTablesMapping[table.Name] = table
+ }
+ _, exists = newShardTablesMapping[testTableName0]
+ re.True(exists)
+}
diff --git a/server/coordinator/procedure/transfer_leader.go b/server/coordinator/procedure/transfer_leader.go
index 046cf72..e35b280 100644
--- a/server/coordinator/procedure/transfer_leader.go
+++ b/server/coordinator/procedure/transfer_leader.go
@@ -212,19 +212,23 @@
return
}
- shardNodes, err := request.cluster.GetShardNodesByShardID(request.shardID)
+ getNodeShardResult, err := request.cluster.GetNodeShards(ctx)
if err != nil {
cancelEventWithLog(event, err, "get shardNodes by shardID failed")
return
}
found := false
+ shardNodes := make([]storage.ShardNode, 0, len(getNodeShardResult.NodeShards))
var leaderShardNode storage.ShardNode
- for _, shardNode := range shardNodes {
- if shardNode.ShardRole == storage.ShardRoleLeader {
- found = true
- leaderShardNode = shardNode
- leaderShardNode.NodeName = request.newLeaderNodeName
+ for _, shardNodeWithVersion := range getNodeShardResult.NodeShards {
+ if shardNodeWithVersion.ShardNode.ShardRole == storage.ShardRoleLeader {
+ leaderShardNode = shardNodeWithVersion.ShardNode
+ if leaderShardNode.ID == request.shardID {
+ found = true
+ leaderShardNode.NodeName = request.newLeaderNodeName
+ }
+ shardNodes = append(shardNodes, leaderShardNode)
}
}
if !found {
@@ -232,7 +236,7 @@
return
}
- err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, []storage.ShardNode{leaderShardNode})
+ err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, shardNodes)
if err != nil {
cancelEventWithLog(event, storage.ErrUpdateClusterViewConflict, "update cluster view")
return
diff --git a/server/coordinator/procedure/util.go b/server/coordinator/procedure/util.go
index 5a2d20e..ee866f5 100644
--- a/server/coordinator/procedure/util.go
+++ b/server/coordinator/procedure/util.go
@@ -32,3 +32,24 @@
return *new(T), ErrGetRequest.WithCausef("event arg type must be same as return type")
}
}
+
+func IsContains(slice []string, target string) bool {
+ for _, a := range slice {
+ if a == target {
+ return true
+ }
+ }
+ return false
+}
+
+func IsSubSlice(subSlice []string, slice []string) bool {
+ if len(subSlice) > len(slice) {
+ return false
+ }
+ for _, val := range slice {
+ if !IsContains(slice, val) {
+ return false
+ }
+ }
+ return true
+}
diff --git a/server/coordinator/scheduler.go b/server/coordinator/scheduler.go
index b49ed1b..108e3c4 100644
--- a/server/coordinator/scheduler.go
+++ b/server/coordinator/scheduler.go
@@ -134,6 +134,7 @@
// 1. Shard exists in metadata and not exists in node, reopen lack shards on node.
if !exists {
+ log.Info("Shard exists in metadata and not exists in node, reopen lack shards on node.", zap.String("node", node), zap.Uint32("shardID", uint32(expectShard.ID)))
if err := s.dispatch.OpenShard(ctx, node, eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil {
return errors.WithMessagef(err, "reopen shard failed, shardInfo:%d", expectShard.ID)
}
@@ -146,6 +147,7 @@
}
// 3. Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node.
+ log.Info("Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node.", zap.String("node", node), zap.Uint32("shardID", uint32(expectShard.ID)))
if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(expectShard.ID)}); err != nil {
return errors.WithMessagef(err, "close shard failed, shardInfo:%d", expectShard.ID)
}
@@ -160,6 +162,7 @@
if ok {
continue
}
+ log.Info("Shard exists in node and not exists in metadata, close extra shard on node.", zap.String("node", node), zap.Uint32("shardID", uint32(realShard.ID)))
if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(realShard.ID)}); err != nil {
return errors.WithMessagef(err, "close shard failed, shardInfo:%d", realShard.ID)
}
diff --git a/server/server.go b/server/server.go
index e9ca843..c55a1e1 100644
--- a/server/server.go
+++ b/server/server.go
@@ -181,7 +181,7 @@
}
srv.procedureManager = procedureManager
dispatch := eventdispatch.NewDispatchImpl()
- procedureFactory := procedure.NewFactory(id.NewAllocatorImpl(srv.etcdCli, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)
+ procedureFactory := procedure.NewFactory(id.NewAllocatorImpl(srv.etcdCli, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage, manager)
srv.procedureFactory = procedureFactory
srv.scheduler = coordinator.NewScheduler(manager, procedureManager, procedureFactory, dispatch)
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go
index 8a199d1..b85052c 100644
--- a/server/service/grpc/service.go
+++ b/server/service/grpc/service.go
@@ -4,6 +4,7 @@
import (
"context"
+ "fmt"
"sync"
"time"
@@ -75,7 +76,7 @@
}, ShardInfos: shardInfos,
}
- log.Info("registerNode", zap.String("name", req.Info.Endpoint), zap.String("info", req.Info.String()))
+ log.Info("registerNode", zap.String("name", req.Info.Endpoint), zap.String("info", fmt.Sprintf("%+v", registeredNode)))
err = s.h.GetClusterManager().RegisterNode(ctx, req.GetHeader().GetClusterName(), registeredNode)
if err != nil {
diff --git a/server/service/http/api.go b/server/service/http/api.go
index 8dc5257..9d51a78 100644
--- a/server/service/http/api.go
+++ b/server/service/http/api.go
@@ -47,6 +47,7 @@
// Register post API.
router.Post("/getShardTables", a.getShardTables)
router.Post("/transferLeader", a.transferLeader)
+ router.Post("/split", a.split)
router.Post("/route", a.route)
router.Post("/dropTable", a.dropTable)
@@ -233,3 +234,58 @@
a.respond(writer, nil)
}
+
+type SplitRequest struct {
+ ClusterName string `json:"clusterName"`
+ SchemaName string `json:"schemaName"`
+ ShardID uint32 `json:"shardID"`
+ SplitTables []string `json:"splitTables"`
+ NodeName string `json:"nodeName"`
+}
+
+func (a *API) split(writer http.ResponseWriter, req *http.Request) {
+ var splitRequest SplitRequest
+ err := json.NewDecoder(req.Body).Decode(&splitRequest)
+ if err != nil {
+ log.Error("decode request body failed", zap.Error(err))
+ a.respondError(writer, ErrParseRequest, nil)
+ return
+ }
+ ctx := context.Background()
+
+ c, err := a.clusterManager.GetCluster(ctx, splitRequest.ClusterName)
+ if err != nil {
+ log.Error("cluster not found", zap.String("clusterName", splitRequest.ClusterName), zap.Error(err))
+ a.respondError(writer, cluster.ErrClusterNotFound, "cluster not found")
+ return
+ }
+
+ newShardID, err := c.AllocShardID(ctx)
+ if err != nil {
+ log.Error("alloc shard id failed")
+ a.respondError(writer, ErrAllocShardID, "alloc shard id failed")
+ return
+ }
+
+ splitProcedure, err := a.procedureFactory.CreateSplitProcedure(ctx, procedure.SplitRequest{
+ ClusterName: splitRequest.ClusterName,
+ SchemaName: splitRequest.SchemaName,
+ TableNames: splitRequest.SplitTables,
+ ShardID: storage.ShardID(splitRequest.ShardID),
+ NewShardID: storage.ShardID(newShardID),
+ TargetNodeName: splitRequest.NodeName,
+ })
+ if err != nil {
+ log.Error("create split procedure", zap.Error(err))
+ a.respondError(writer, ErrCreateProcedure, "create split procedure")
+ return
+ }
+
+ if err := a.procedureManager.Submit(ctx, splitProcedure); err != nil {
+ log.Error("submit split procedure", zap.Error(err))
+ a.respondError(writer, ErrSubmitProcedure, "submit split procedure")
+ return
+ }
+
+ a.respond(writer, newShardID)
+}
diff --git a/server/service/http/error.go b/server/service/http/error.go
index 217eaa6..09cbbe1 100644
--- a/server/service/http/error.go
+++ b/server/service/http/error.go
@@ -11,4 +11,5 @@
ErrCreateProcedure = coderr.NewCodeError(coderr.Internal, "create procedure")
ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit procedure")
ErrGetCluster = coderr.NewCodeError(coderr.Internal, "get cluster")
+ ErrAllocShardID = coderr.NewCodeError(coderr.Internal, "alloc shard id")
)