feat: support alloc shard on same node (#125)

* docs: fix example config

* feat: support alloc shard on duplicate node
diff --git a/config/example-cluster1.toml b/config/example-cluster1.toml
index 90c2d9a..c1235b0 100644
--- a/config/example-cluster1.toml
+++ b/config/example-cluster1.toml
@@ -1,5 +1,5 @@
 etcd-start-timeout-ms = 30000
-peer-urls = "http://{HostIP1}:2380"
+peer-urls = "http://{HostIP1}:12380"
 advertise-client-urls = "http://{HostIP1}:12379"
 advertise-peer-urls = "http://{HostIP1}:12380"
 client-urls = "http://{HostIP1}:12379"
diff --git a/config/example-cluster2.toml b/config/example-cluster2.toml
index 9aaa11e..3bc0966 100644
--- a/config/example-cluster2.toml
+++ b/config/example-cluster2.toml
@@ -1,8 +1,8 @@
 etcd-start-timeout-ms = 30000
-peer-urls = "http://{HostIP2}:2380"
-advertise-client-urls = "http://{HostIP2}:12379"
-advertise-peer-urls = "http://{HostIP2}:12380"
-client-urls = "http://{HostIP2}:12379"
+peer-urls = "http://{HostIP2}:22380"
+advertise-client-urls = "http://{HostIP2}:22379"
+advertise-peer-urls = "http://{HostIP2}:22380"
+client-urls = "http://{HostIP2}:22379"
 wal-dir = "/tmp/ceresmeta2/wal"
 data-dir = "/tmp/ceresmeta2/data"
 node-name = "meta2"
diff --git a/server/coordinator/procedure/common_test.go b/server/coordinator/procedure/common_test.go
index ec25dcd..b562a6d 100644
--- a/server/coordinator/procedure/common_test.go
+++ b/server/coordinator/procedure/common_test.go
@@ -27,7 +27,7 @@
 	defaultNodeCount                       = 2
 	defaultReplicationFactor               = 1
 	defaultPartitionTableProportionOfNodes = 0.5
-	defaultShardTotal                      = 2
+	defaultShardTotal                      = 4
 )
 
 type MockDispatch struct{}
diff --git a/server/coordinator/procedure/create_partition_table_test.go b/server/coordinator/procedure/create_partition_table_test.go
index bd7bdf8..b73cfd6 100644
--- a/server/coordinator/procedure/create_partition_table_test.go
+++ b/server/coordinator/procedure/create_partition_table_test.go
@@ -42,9 +42,9 @@
 
 	partitionTableNum := Max(1, int(float32(len(nodeNames))*defaultPartitionTableProportionOfNodes))
 
-	partitionTableShards, err := shardPicker.PickShards(ctx, c.Name(), partitionTableNum)
+	partitionTableShards, err := shardPicker.PickShards(ctx, c.Name(), partitionTableNum, false)
 	re.NoError(err)
-	dataTableShards, err := shardPicker.PickShards(ctx, c.Name(), len(request.GetPartitionTableInfo().SubTableNames))
+	dataTableShards, err := shardPicker.PickShards(ctx, c.Name(), len(request.GetPartitionTableInfo().SubTableNames), true)
 	re.NoError(err)
 
 	procedure := NewCreatePartitionTableProcedure(CreatePartitionTableProcedureRequest{
diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go
index f18add7..474f45f 100644
--- a/server/coordinator/procedure/error.go
+++ b/server/coordinator/procedure/error.go
@@ -15,4 +15,5 @@
 	ErrGetRequest            = coderr.NewCodeError(coderr.Internal, "get request from event")
 	ErrNodeNumberNotEnough   = coderr.NewCodeError(coderr.Internal, "node number not enough")
 	ErrEmptyPartitionNames   = coderr.NewCodeError(coderr.Internal, "partition names is empty")
+	ErrShardNumberNotEnough  = coderr.NewCodeError(coderr.Internal, "shard number not enough")
 )
diff --git a/server/coordinator/procedure/factory.go b/server/coordinator/procedure/factory.go
index 2cc131e..8f837a9 100644
--- a/server/coordinator/procedure/factory.go
+++ b/server/coordinator/procedure/factory.go
@@ -139,12 +139,12 @@
 
 	partitionTableNum := Max(1, int(float32(len(nodeNames))*request.PartitionTableRatioOfNodes))
 
-	partitionTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, partitionTableNum)
+	partitionTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, partitionTableNum, false)
 	if err != nil {
 		return nil, errors.WithMessage(err, "pick partition table shards")
 	}
 
-	dataTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, len(request.SourceReq.PartitionTableInfo.SubTableNames))
+	dataTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, len(request.SourceReq.PartitionTableInfo.SubTableNames), true)
 	if err != nil {
 		return nil, errors.WithMessage(err, "pick data table shards")
 	}
diff --git a/server/coordinator/procedure/shard_picker.go b/server/coordinator/procedure/shard_picker.go
index af8993f..edf74a9 100644
--- a/server/coordinator/procedure/shard_picker.go
+++ b/server/coordinator/procedure/shard_picker.go
@@ -13,8 +13,12 @@
 )
 
 // ShardPicker is used to pick up the shards suitable for scheduling in the cluster.
+// If expectShardNum bigger than cluster node number, the result depends on enableDuplicateNode:
+// If enableDuplicateNode is false, pick shards will be failed and return error.
+// If enableDuplicateNode is true, pick shard will return shards on the same node.
+// TODO: Consider refactor this interface, abstracts the parameters of PickShards as PickStrategy.
 type ShardPicker interface {
-	PickShards(ctx context.Context, clusterName string, expectShardNum int) ([]cluster.ShardNodeWithVersion, error)
+	PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error)
 }
 
 // RandomShardPicker randomly pick up shards that are not on the same node in the current cluster.
@@ -29,46 +33,63 @@
 }
 
 // PickShards will pick a specified number of shards as expectShardNum.
-func (p *RandomShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int) ([]cluster.ShardNodeWithVersion, error) {
+func (p *RandomShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error) {
 	getNodeShardResult, err := p.clusterManager.GetNodeShards(ctx, clusterName)
 	if err != nil {
 		return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "get node shards")
 	}
+
+	if expectShardNum > len(getNodeShardResult.NodeShards) {
+		return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrShardNumberNotEnough, fmt.Sprintf("number of shards is:%d, expecet number of shards is:%d", len(getNodeShardResult.NodeShards), expectShardNum))
+	}
+
 	nodeShardsMapping := make(map[string][]cluster.ShardNodeWithVersion, 0)
-	var nodeNames []string
 	for _, nodeShard := range getNodeShardResult.NodeShards {
 		_, exists := nodeShardsMapping[nodeShard.ShardNode.NodeName]
 		if !exists {
 			nodeShards := []cluster.ShardNodeWithVersion{}
-			nodeNames = append(nodeNames, nodeShard.ShardNode.NodeName)
 			nodeShardsMapping[nodeShard.ShardNode.NodeName] = nodeShards
 		}
 		nodeShardsMapping[nodeShard.ShardNode.NodeName] = append(nodeShardsMapping[nodeShard.ShardNode.NodeName], nodeShard)
 	}
-	if len(nodeShardsMapping) < expectShardNum {
-		return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrNodeNumberNotEnough, fmt.Sprintf("number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum))
-	}
 
-	var selectNodeNames []string
-	for i := 0; i < expectShardNum; i++ {
-		selectNodeIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeNames))))
-		if err != nil {
-			return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
+	if !enableDuplicateNode {
+		if len(nodeShardsMapping) < expectShardNum {
+			return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrNodeNumberNotEnough, fmt.Sprintf("number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum))
 		}
-		selectNodeNames = append(selectNodeNames, nodeNames[selectNodeIndex.Int64()])
-		nodeNames[selectNodeIndex.Int64()] = nodeNames[len(nodeNames)-1]
-		nodeNames = nodeNames[:len(nodeNames)-1]
 	}
 
+	// Try to make shards on different nodes.
 	result := []cluster.ShardNodeWithVersion{}
-	for _, nodeName := range selectNodeNames {
-		nodeShards := nodeShardsMapping[nodeName]
-		selectShardIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeShards))))
-		if err != nil {
-			return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
+	for {
+		nodeNames := []string{}
+		for nodeName := range nodeShardsMapping {
+			nodeNames = append(nodeNames, nodeName)
 		}
-		result = append(result, nodeShards[selectShardIndex.Int64()])
-	}
 
-	return result, nil
+		for len(nodeNames) > 0 {
+			if len(result) >= expectShardNum {
+				return result, nil
+			}
+
+			selectNodeIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeNames))))
+			if err != nil {
+				return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
+			}
+
+			nodeShards := nodeShardsMapping[nodeNames[selectNodeIndex.Int64()]]
+
+			if len(nodeShards) > 0 {
+				result = append(result, nodeShards[0])
+
+				// Remove select shard.
+				nodeShards[0] = nodeShards[len(nodeShards)-1]
+				nodeShardsMapping[nodeNames[selectNodeIndex.Int64()]] = nodeShards[:len(nodeShards)-1]
+			}
+
+			// Remove select node.
+			nodeNames[selectNodeIndex.Int64()] = nodeNames[len(nodeNames)-1]
+			nodeNames = nodeNames[:len(nodeNames)-1]
+		}
+	}
 }
diff --git a/server/coordinator/procedure/shard_picker_test.go b/server/coordinator/procedure/shard_picker_test.go
index de223aa..aa8e6eb 100644
--- a/server/coordinator/procedure/shard_picker_test.go
+++ b/server/coordinator/procedure/shard_picker_test.go
@@ -15,10 +15,25 @@
 	manager, _ := prepare(t)
 
 	randomShardPicker := NewRandomShardPicker(manager)
-	nodeShards, err := randomShardPicker.PickShards(ctx, clusterName, 2)
+	nodeShards, err := randomShardPicker.PickShards(ctx, clusterName, 2, false)
 	re.NoError(err)
 
 	// Verify the number of shards and ensure that they are not on the same node.
 	re.Equal(len(nodeShards), 2)
 	re.NotEqual(nodeShards[0].ShardNode.NodeName, nodeShards[1].ShardNode.NodeName)
+
+	// ExpectShardNum is bigger than node number and enableDuplicateNode is false, it should be throw error.
+	_, err = randomShardPicker.PickShards(ctx, clusterName, 3, false)
+	re.Error(err)
+
+	// ExpectShardNum is bigger than node number and enableDuplicateNode is true, it should return correct shards.
+	nodeShards, err = randomShardPicker.PickShards(ctx, clusterName, 3, true)
+	re.NoError(err)
+	re.Equal(len(nodeShards), 3)
+	nodeShards, err = randomShardPicker.PickShards(ctx, clusterName, 4, true)
+	re.NoError(err)
+	re.Equal(len(nodeShards), 4)
+	// ExpectShardNum is bigger than shard number.
+	_, err = randomShardPicker.PickShards(ctx, clusterName, 5, true)
+	re.Error(err)
 }
diff --git a/server/coordinator/procedure/split_test.go b/server/coordinator/procedure/split_test.go
index e2101f6..b8ba789 100644
--- a/server/coordinator/procedure/split_test.go
+++ b/server/coordinator/procedure/split_test.go
@@ -22,15 +22,18 @@
 	re.NoError(err)
 
 	// Randomly select a shardNode to split.
-	targetShardNode := getNodeShardsResult.NodeShards[0].ShardNode
+	createTableNodeShard := getNodeShardsResult.NodeShards[0].ShardNode
 
 	// Create some tables in this shard.
-	_, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName0, false)
+	createTableResult, err := c.CreateTable(ctx, createTableNodeShard.NodeName, testSchemaName, testTableName0, false)
 	re.NoError(err)
-	_, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName1, false)
+	_, err = c.CreateTable(ctx, createTableNodeShard.NodeName, testSchemaName, testTableName1, false)
 	re.NoError(err)
 
 	// Split one table from this shard.
+	getNodeShardResult, err := c.GetShardNodeByTableIDs([]storage.TableID{createTableResult.Table.ID})
+	targetShardNode := getNodeShardResult.ShardNodes[createTableResult.Table.ID][0]
+	re.NoError(err)
 	newShardID, err := c.AllocShardID(ctx)
 	re.NoError(err)
 	procedure := NewSplitProcedure(1, dispatch, s, c, testSchemaName, targetShardNode.ID, storage.ShardID(newShardID), []string{testTableName0}, targetShardNode.NodeName)