feat: support alloc shard on same node (#125)
* docs: fix example config
* feat: support alloc shard on duplicate node
diff --git a/config/example-cluster1.toml b/config/example-cluster1.toml
index 90c2d9a..c1235b0 100644
--- a/config/example-cluster1.toml
+++ b/config/example-cluster1.toml
@@ -1,5 +1,5 @@
etcd-start-timeout-ms = 30000
-peer-urls = "http://{HostIP1}:2380"
+peer-urls = "http://{HostIP1}:12380"
advertise-client-urls = "http://{HostIP1}:12379"
advertise-peer-urls = "http://{HostIP1}:12380"
client-urls = "http://{HostIP1}:12379"
diff --git a/config/example-cluster2.toml b/config/example-cluster2.toml
index 9aaa11e..3bc0966 100644
--- a/config/example-cluster2.toml
+++ b/config/example-cluster2.toml
@@ -1,8 +1,8 @@
etcd-start-timeout-ms = 30000
-peer-urls = "http://{HostIP2}:2380"
-advertise-client-urls = "http://{HostIP2}:12379"
-advertise-peer-urls = "http://{HostIP2}:12380"
-client-urls = "http://{HostIP2}:12379"
+peer-urls = "http://{HostIP2}:22380"
+advertise-client-urls = "http://{HostIP2}:22379"
+advertise-peer-urls = "http://{HostIP2}:22380"
+client-urls = "http://{HostIP2}:22379"
wal-dir = "/tmp/ceresmeta2/wal"
data-dir = "/tmp/ceresmeta2/data"
node-name = "meta2"
diff --git a/server/coordinator/procedure/common_test.go b/server/coordinator/procedure/common_test.go
index ec25dcd..b562a6d 100644
--- a/server/coordinator/procedure/common_test.go
+++ b/server/coordinator/procedure/common_test.go
@@ -27,7 +27,7 @@
defaultNodeCount = 2
defaultReplicationFactor = 1
defaultPartitionTableProportionOfNodes = 0.5
- defaultShardTotal = 2
+ defaultShardTotal = 4
)
type MockDispatch struct{}
diff --git a/server/coordinator/procedure/create_partition_table_test.go b/server/coordinator/procedure/create_partition_table_test.go
index bd7bdf8..b73cfd6 100644
--- a/server/coordinator/procedure/create_partition_table_test.go
+++ b/server/coordinator/procedure/create_partition_table_test.go
@@ -42,9 +42,9 @@
partitionTableNum := Max(1, int(float32(len(nodeNames))*defaultPartitionTableProportionOfNodes))
- partitionTableShards, err := shardPicker.PickShards(ctx, c.Name(), partitionTableNum)
+ partitionTableShards, err := shardPicker.PickShards(ctx, c.Name(), partitionTableNum, false)
re.NoError(err)
- dataTableShards, err := shardPicker.PickShards(ctx, c.Name(), len(request.GetPartitionTableInfo().SubTableNames))
+ dataTableShards, err := shardPicker.PickShards(ctx, c.Name(), len(request.GetPartitionTableInfo().SubTableNames), true)
re.NoError(err)
procedure := NewCreatePartitionTableProcedure(CreatePartitionTableProcedureRequest{
diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go
index f18add7..474f45f 100644
--- a/server/coordinator/procedure/error.go
+++ b/server/coordinator/procedure/error.go
@@ -15,4 +15,5 @@
ErrGetRequest = coderr.NewCodeError(coderr.Internal, "get request from event")
ErrNodeNumberNotEnough = coderr.NewCodeError(coderr.Internal, "node number not enough")
ErrEmptyPartitionNames = coderr.NewCodeError(coderr.Internal, "partition names is empty")
+ ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough")
)
diff --git a/server/coordinator/procedure/factory.go b/server/coordinator/procedure/factory.go
index 2cc131e..8f837a9 100644
--- a/server/coordinator/procedure/factory.go
+++ b/server/coordinator/procedure/factory.go
@@ -139,12 +139,12 @@
partitionTableNum := Max(1, int(float32(len(nodeNames))*request.PartitionTableRatioOfNodes))
- partitionTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, partitionTableNum)
+ partitionTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, partitionTableNum, false)
if err != nil {
return nil, errors.WithMessage(err, "pick partition table shards")
}
- dataTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, len(request.SourceReq.PartitionTableInfo.SubTableNames))
+ dataTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, len(request.SourceReq.PartitionTableInfo.SubTableNames), true)
if err != nil {
return nil, errors.WithMessage(err, "pick data table shards")
}
diff --git a/server/coordinator/procedure/shard_picker.go b/server/coordinator/procedure/shard_picker.go
index af8993f..edf74a9 100644
--- a/server/coordinator/procedure/shard_picker.go
+++ b/server/coordinator/procedure/shard_picker.go
@@ -13,8 +13,12 @@
)
// ShardPicker is used to pick up the shards suitable for scheduling in the cluster.
+// If expectShardNum bigger than cluster node number, the result depends on enableDuplicateNode:
+// If enableDuplicateNode is false, pick shards will be failed and return error.
+// If enableDuplicateNode is true, pick shard will return shards on the same node.
+// TODO: Consider refactor this interface, abstracts the parameters of PickShards as PickStrategy.
type ShardPicker interface {
- PickShards(ctx context.Context, clusterName string, expectShardNum int) ([]cluster.ShardNodeWithVersion, error)
+ PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error)
}
// RandomShardPicker randomly pick up shards that are not on the same node in the current cluster.
@@ -29,46 +33,63 @@
}
// PickShards will pick a specified number of shards as expectShardNum.
-func (p *RandomShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int) ([]cluster.ShardNodeWithVersion, error) {
+func (p *RandomShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error) {
getNodeShardResult, err := p.clusterManager.GetNodeShards(ctx, clusterName)
if err != nil {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "get node shards")
}
+
+ if expectShardNum > len(getNodeShardResult.NodeShards) {
+ return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrShardNumberNotEnough, fmt.Sprintf("number of shards is:%d, expecet number of shards is:%d", len(getNodeShardResult.NodeShards), expectShardNum))
+ }
+
nodeShardsMapping := make(map[string][]cluster.ShardNodeWithVersion, 0)
- var nodeNames []string
for _, nodeShard := range getNodeShardResult.NodeShards {
_, exists := nodeShardsMapping[nodeShard.ShardNode.NodeName]
if !exists {
nodeShards := []cluster.ShardNodeWithVersion{}
- nodeNames = append(nodeNames, nodeShard.ShardNode.NodeName)
nodeShardsMapping[nodeShard.ShardNode.NodeName] = nodeShards
}
nodeShardsMapping[nodeShard.ShardNode.NodeName] = append(nodeShardsMapping[nodeShard.ShardNode.NodeName], nodeShard)
}
- if len(nodeShardsMapping) < expectShardNum {
- return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrNodeNumberNotEnough, fmt.Sprintf("number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum))
- }
- var selectNodeNames []string
- for i := 0; i < expectShardNum; i++ {
- selectNodeIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeNames))))
- if err != nil {
- return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
+ if !enableDuplicateNode {
+ if len(nodeShardsMapping) < expectShardNum {
+ return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrNodeNumberNotEnough, fmt.Sprintf("number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum))
}
- selectNodeNames = append(selectNodeNames, nodeNames[selectNodeIndex.Int64()])
- nodeNames[selectNodeIndex.Int64()] = nodeNames[len(nodeNames)-1]
- nodeNames = nodeNames[:len(nodeNames)-1]
}
+ // Try to make shards on different nodes.
result := []cluster.ShardNodeWithVersion{}
- for _, nodeName := range selectNodeNames {
- nodeShards := nodeShardsMapping[nodeName]
- selectShardIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeShards))))
- if err != nil {
- return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
+ for {
+ nodeNames := []string{}
+ for nodeName := range nodeShardsMapping {
+ nodeNames = append(nodeNames, nodeName)
}
- result = append(result, nodeShards[selectShardIndex.Int64()])
- }
- return result, nil
+ for len(nodeNames) > 0 {
+ if len(result) >= expectShardNum {
+ return result, nil
+ }
+
+ selectNodeIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeNames))))
+ if err != nil {
+ return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
+ }
+
+ nodeShards := nodeShardsMapping[nodeNames[selectNodeIndex.Int64()]]
+
+ if len(nodeShards) > 0 {
+ result = append(result, nodeShards[0])
+
+ // Remove select shard.
+ nodeShards[0] = nodeShards[len(nodeShards)-1]
+ nodeShardsMapping[nodeNames[selectNodeIndex.Int64()]] = nodeShards[:len(nodeShards)-1]
+ }
+
+ // Remove select node.
+ nodeNames[selectNodeIndex.Int64()] = nodeNames[len(nodeNames)-1]
+ nodeNames = nodeNames[:len(nodeNames)-1]
+ }
+ }
}
diff --git a/server/coordinator/procedure/shard_picker_test.go b/server/coordinator/procedure/shard_picker_test.go
index de223aa..aa8e6eb 100644
--- a/server/coordinator/procedure/shard_picker_test.go
+++ b/server/coordinator/procedure/shard_picker_test.go
@@ -15,10 +15,25 @@
manager, _ := prepare(t)
randomShardPicker := NewRandomShardPicker(manager)
- nodeShards, err := randomShardPicker.PickShards(ctx, clusterName, 2)
+ nodeShards, err := randomShardPicker.PickShards(ctx, clusterName, 2, false)
re.NoError(err)
// Verify the number of shards and ensure that they are not on the same node.
re.Equal(len(nodeShards), 2)
re.NotEqual(nodeShards[0].ShardNode.NodeName, nodeShards[1].ShardNode.NodeName)
+
+ // ExpectShardNum is bigger than node number and enableDuplicateNode is false, it should be throw error.
+ _, err = randomShardPicker.PickShards(ctx, clusterName, 3, false)
+ re.Error(err)
+
+ // ExpectShardNum is bigger than node number and enableDuplicateNode is true, it should return correct shards.
+ nodeShards, err = randomShardPicker.PickShards(ctx, clusterName, 3, true)
+ re.NoError(err)
+ re.Equal(len(nodeShards), 3)
+ nodeShards, err = randomShardPicker.PickShards(ctx, clusterName, 4, true)
+ re.NoError(err)
+ re.Equal(len(nodeShards), 4)
+ // ExpectShardNum is bigger than shard number.
+ _, err = randomShardPicker.PickShards(ctx, clusterName, 5, true)
+ re.Error(err)
}
diff --git a/server/coordinator/procedure/split_test.go b/server/coordinator/procedure/split_test.go
index e2101f6..b8ba789 100644
--- a/server/coordinator/procedure/split_test.go
+++ b/server/coordinator/procedure/split_test.go
@@ -22,15 +22,18 @@
re.NoError(err)
// Randomly select a shardNode to split.
- targetShardNode := getNodeShardsResult.NodeShards[0].ShardNode
+ createTableNodeShard := getNodeShardsResult.NodeShards[0].ShardNode
// Create some tables in this shard.
- _, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName0, false)
+ createTableResult, err := c.CreateTable(ctx, createTableNodeShard.NodeName, testSchemaName, testTableName0, false)
re.NoError(err)
- _, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName1, false)
+ _, err = c.CreateTable(ctx, createTableNodeShard.NodeName, testSchemaName, testTableName1, false)
re.NoError(err)
// Split one table from this shard.
+ getNodeShardResult, err := c.GetShardNodeByTableIDs([]storage.TableID{createTableResult.Table.ID})
+ targetShardNode := getNodeShardResult.ShardNodes[createTableResult.Table.ID][0]
+ re.NoError(err)
newShardID, err := c.AllocShardID(ctx)
re.NoError(err)
procedure := NewSplitProcedure(1, dispatch, s, c, testSchemaName, targetShardNode.ID, storage.ShardID(newShardID), []string{testTableName0}, targetShardNode.NodeName)