refactor: support open shard concurrently (#205)
diff --git a/go.mod b/go.mod
index 3a53779..85305f7 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@
require (
github.com/AlekSi/gocov-xml v1.0.0
- github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c
+ github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c
github.com/axw/gocov v1.1.0
github.com/caarlos0/env/v6 v6.10.1
github.com/julienschmidt/httprouter v1.3.0
@@ -18,6 +18,8 @@
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/zap v1.21.0
+ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+ golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.1.10
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
@@ -89,11 +91,9 @@
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
- golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.7 // indirect
- golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
diff --git a/go.sum b/go.sum
index b4f80a2..adf1236 100644
--- a/go.sum
+++ b/go.sum
@@ -18,10 +18,8 @@
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c h1:7gmSQsGua+Y1g6ygsC/K75T/zK2ki7y5R5BkrN1/Ymc=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go
index 4cd1b89..d08ac03 100644
--- a/server/cluster/cluster.go
+++ b/server/cluster/cluster.go
@@ -39,7 +39,7 @@
dispatch := eventdispatch.NewDispatchImpl()
procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)
- schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType())
+ schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize())
return &Cluster{
logger: logger,
diff --git a/server/cluster/manager.go b/server/cluster/manager.go
index 3696aa7..fea613d 100644
--- a/server/cluster/manager.go
+++ b/server/cluster/manager.go
@@ -112,14 +112,15 @@
createTime := time.Now().UnixMilli()
clusterMetadataStorage := storage.Cluster{
- ID: clusterID,
- Name: clusterName,
- MinNodeCount: opts.NodeCount,
- ShardTotal: opts.ShardTotal,
- EnableSchedule: opts.EnableSchedule,
- TopologyType: opts.TopologyType,
- CreatedAt: uint64(createTime),
- ModifiedAt: uint64(createTime),
+ ID: clusterID,
+ Name: clusterName,
+ MinNodeCount: opts.NodeCount,
+ ShardTotal: opts.ShardTotal,
+ EnableSchedule: opts.EnableSchedule,
+ TopologyType: opts.TopologyType,
+ ProcedureExecutingBatchSize: opts.ProcedureExecutingBatchSize,
+ CreatedAt: uint64(createTime),
+ ModifiedAt: uint64(createTime),
}
err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{
Cluster: clusterMetadataStorage,
@@ -164,14 +165,15 @@
}
err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{
- ID: c.GetMetadata().GetClusterID(),
- Name: c.GetMetadata().Name(),
- MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
- ShardTotal: c.GetMetadata().GetTotalShardNum(),
- EnableSchedule: opt.EnableSchedule,
- TopologyType: opt.TopologyType,
- CreatedAt: c.GetMetadata().GetCreateTime(),
- ModifiedAt: uint64(time.Now().UnixMilli()),
+ ID: c.GetMetadata().GetClusterID(),
+ Name: c.GetMetadata().Name(),
+ MinNodeCount: c.GetMetadata().GetClusterMinNodeCount(),
+ ShardTotal: c.GetMetadata().GetTotalShardNum(),
+ EnableSchedule: opt.EnableSchedule,
+ TopologyType: opt.TopologyType,
+ ProcedureExecutingBatchSize: opt.ProcedureExecutingBatchSize,
+ CreatedAt: c.GetMetadata().GetCreateTime(),
+ ModifiedAt: uint64(time.Now().UnixMilli()),
}})
if err != nil {
log.Error("update cluster", zap.Error(err))
@@ -333,14 +335,15 @@
if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown {
req := storage.UpdateClusterRequest{
Cluster: storage.Cluster{
- ID: metadataStorage.ID,
- Name: metadataStorage.Name,
- MinNodeCount: metadataStorage.MinNodeCount,
- ShardTotal: metadataStorage.ShardTotal,
- EnableSchedule: metadataStorage.EnableSchedule,
- TopologyType: m.topologyType,
- CreatedAt: metadataStorage.CreatedAt,
- ModifiedAt: uint64(time.Now().UnixMilli()),
+ ID: metadataStorage.ID,
+ Name: metadataStorage.Name,
+ MinNodeCount: metadataStorage.MinNodeCount,
+ ShardTotal: metadataStorage.ShardTotal,
+ EnableSchedule: metadataStorage.EnableSchedule,
+ TopologyType: m.topologyType,
+ ProcedureExecutingBatchSize: metadataStorage.ProcedureExecutingBatchSize,
+ CreatedAt: metadataStorage.CreatedAt,
+ ModifiedAt: uint64(time.Now().UnixMilli()),
},
}
if err := m.storage.UpdateCluster(ctx, req); err != nil {
diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go
index 781f8d9..112f98a 100644
--- a/server/cluster/metadata/cluster_metadata.go
+++ b/server/cluster/metadata/cluster_metadata.go
@@ -576,6 +576,13 @@
return c.metaData.TopologyType
}
+func (c *ClusterMetadata) GetProcedureExecutingBatchSize() uint32 {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+
+ return c.metaData.ProcedureExecutingBatchSize
+}
+
func (c *ClusterMetadata) GetCreateTime() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()
diff --git a/server/cluster/metadata/types.go b/server/cluster/metadata/types.go
index 08bd753..bcac421 100644
--- a/server/cluster/metadata/types.go
+++ b/server/cluster/metadata/types.go
@@ -47,16 +47,18 @@
}
type CreateClusterOpts struct {
- NodeCount uint32
- ReplicationFactor uint32
- ShardTotal uint32
- EnableSchedule bool
- TopologyType storage.TopologyType
+ NodeCount uint32
+ ReplicationFactor uint32
+ ShardTotal uint32
+ EnableSchedule bool
+ TopologyType storage.TopologyType
+ ProcedureExecutingBatchSize uint32
}
type UpdateClusterOpts struct {
- EnableSchedule bool
- TopologyType storage.TopologyType
+ EnableSchedule bool
+ TopologyType storage.TopologyType
+ ProcedureExecutingBatchSize uint32
}
type CreateTableMetadataRequest struct {
diff --git a/server/config/config.go b/server/config/config.go
index 8ba4630..f728165 100644
--- a/server/config/config.go
+++ b/server/config/config.go
@@ -5,6 +5,7 @@
import (
"flag"
"fmt"
+ "math"
"os"
"strings"
"time"
@@ -52,7 +53,8 @@
defaultClusterShardTotal = 8
enableSchedule = true
// topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage.
- defaultTopologyType = "static"
+ defaultTopologyType = "static"
+ defaultProcedureExecutingBatchSize = math.MaxUint32
defaultHTTPPort = 8080
@@ -127,6 +129,8 @@
EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"`
// TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster.
TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"`
+ // ProcedureExecutingBatchSize determines the maximum number of shards in a single batch when opening shards concurrently.
+ ProcedureExecutingBatchSize uint32 `toml:"procedure-executing-batch-size" env:"PROCEDURE_EXECUTING_BATCH_SIZE"`
ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"`
PeerUrls string `toml:"peer-urls" env:"PEER_URLS"`
@@ -296,6 +300,7 @@
DefaultClusterShardTotal: defaultClusterShardTotal,
EnableSchedule: enableSchedule,
TopologyType: defaultTopologyType,
+ ProcedureExecutingBatchSize: defaultProcedureExecutingBatchSize,
HTTPPort: defaultHTTPPort,
}
diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go
index 8e46d87..c5f0ec0 100644
--- a/server/coordinator/factory.go
+++ b/server/coordinator/factory.go
@@ -81,6 +81,11 @@
OnFailed func(error) error
}
+type BatchRequest struct {
+ Batch []procedure.Procedure
+ BatchType procedure.Typ
+}
+
func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory {
return &Factory{
idAllocator: allocator,
@@ -253,6 +258,15 @@
)
}
+func (f *Factory) CreateBatchTransferLeaderProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) {
+ id, err := f.allocProcedureID(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ return transferleader.NewBatchTransferLeaderProcedure(id, request.Batch)
+}
+
func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go
index 44c2f5b..29a8864 100644
--- a/server/coordinator/procedure/error.go
+++ b/server/coordinator/procedure/error.go
@@ -22,4 +22,6 @@
ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data")
ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure")
ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough")
+ ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty")
+ ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch")
)
diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go
index ff37a82..d60437d 100644
--- a/server/coordinator/procedure/manager_impl.go
+++ b/server/coordinator/procedure/manager_impl.go
@@ -75,6 +75,7 @@
return nil
}
+// TODO: Filter duplicate submitted Procedure.
func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error {
if err := m.waitingProcedures.Push(procedure, 0); err != nil {
return err
diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go
new file mode 100644
index 0000000..8d2f5fa
--- /dev/null
+++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go
@@ -0,0 +1,130 @@
+// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package transferleader
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/CeresDB/ceresmeta/pkg/log"
+ "github.com/CeresDB/ceresmeta/server/coordinator/procedure"
+ "github.com/CeresDB/ceresmeta/server/storage"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
+)
+
+// BatchTransferLeaderProcedure is a proxy procedure contains a batch of TransferLeaderProcedure.
+// It is used to support concurrent execution of a batch of TransferLeaderProcedure with same version.
+type BatchTransferLeaderProcedure struct {
+ id uint64
+ batch []procedure.Procedure
+ relatedVersionInfo procedure.RelatedVersionInfo
+
+ // Protect the state.
+ lock sync.RWMutex
+ state procedure.State
+}
+
+func NewBatchTransferLeaderProcedure(id uint64, batch []procedure.Procedure) (procedure.Procedure, error) {
+ if len(batch) == 0 {
+ return nil, procedure.ErrEmptyBatchProcedure
+ }
+
+ relateVersionInfo, err := buildBatchRelatedVersionInfo(batch)
+ if err != nil {
+ return nil, err
+ }
+
+ return &BatchTransferLeaderProcedure{id: id, batch: batch, state: procedure.StateInit, relatedVersionInfo: relateVersionInfo}, nil
+}
+
+func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.RelatedVersionInfo, error) {
+ if len(batch) == 0 {
+ return procedure.RelatedVersionInfo{}, nil
+ }
+
+ result := procedure.RelatedVersionInfo{
+ ClusterID: batch[0].RelatedVersionInfo().ClusterID,
+ ShardWithVersion: map[storage.ShardID]uint64{},
+ ClusterVersion: batch[0].RelatedVersionInfo().ClusterVersion,
+ }
+
+ // The version of this batch of procedures must be the same.
+ for _, p := range batch {
+ if p.RelatedVersionInfo().ClusterID != result.ClusterID {
+ return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent")
+ }
+ if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion {
+ return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent")
+ }
+ // The ShardVersion of the same shard must be consistent.
+ for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
+ if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
+ if version != resultVersion {
+ return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID:%d, expetcdShardVersion:%d, shardVersion:%d", shardID, version, resultVersion))
+ }
+ } else {
+ result.ShardWithVersion[shardID] = version
+ }
+ }
+ }
+
+ return result, nil
+}
+
+func (p *BatchTransferLeaderProcedure) ID() uint64 {
+ return p.id
+}
+
+func (p *BatchTransferLeaderProcedure) Typ() procedure.Typ {
+ return procedure.TransferLeader
+}
+
+func (p *BatchTransferLeaderProcedure) Start(ctx context.Context) error {
+ // Start procedures with multiple goroutine.
+ g, _ := errgroup.WithContext(ctx)
+ for _, p := range p.batch {
+ p := p
+ g.Go(func() error {
+ err := p.Start(ctx)
+ if err != nil {
+ log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p)), zap.Error(err))
+ }
+ return err
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ p.updateStateWithLock(procedure.StateFailed)
+ return err
+ }
+
+ p.updateStateWithLock(procedure.StateFinished)
+ return nil
+}
+
+func (p *BatchTransferLeaderProcedure) Cancel(_ context.Context) error {
+ p.updateStateWithLock(procedure.StateCancelled)
+ return nil
+}
+
+func (p *BatchTransferLeaderProcedure) State() procedure.State {
+ return p.state
+}
+
+func (p *BatchTransferLeaderProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
+ return p.relatedVersionInfo
+}
+
+func (p *BatchTransferLeaderProcedure) Priority() procedure.Priority {
+ return p.batch[0].Priority()
+}
+
+func (p *BatchTransferLeaderProcedure) updateStateWithLock(state procedure.State) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.state = state
+}
diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go
new file mode 100644
index 0000000..4838ccb
--- /dev/null
+++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go
@@ -0,0 +1,106 @@
+// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package transferleader_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/CeresDB/ceresmeta/server/coordinator/procedure"
+ "github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader"
+ "github.com/CeresDB/ceresmeta/server/storage"
+ "github.com/stretchr/testify/require"
+)
+
+type mockProcedure struct {
+ ClusterID storage.ClusterID
+ clusterVersion uint64
+ typ procedure.Typ
+ ShardWithVersion map[storage.ShardID]uint64
+}
+
+func (m mockProcedure) ID() uint64 {
+ return 0
+}
+
+func (m mockProcedure) Typ() procedure.Typ {
+ return m.typ
+}
+
+func (m mockProcedure) Start(_ context.Context) error {
+ return nil
+}
+
+func (m mockProcedure) Cancel(_ context.Context) error {
+ return nil
+}
+
+func (m mockProcedure) State() procedure.State {
+ return procedure.StateInit
+}
+
+func (m mockProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
+ return procedure.RelatedVersionInfo{
+ ClusterID: m.ClusterID,
+ ShardWithVersion: m.ShardWithVersion,
+ ClusterVersion: m.clusterVersion,
+ }
+}
+
+func (m mockProcedure) Priority() procedure.Priority {
+ return procedure.PriorityLow
+}
+
+func TestBatchProcedure(t *testing.T) {
+ re := require.New(t)
+ var procedures []procedure.Procedure
+
+ // Procedures with same type and version.
+ for i := 0; i < 3; i++ {
+ shardWithVersion := map[storage.ShardID]uint64{}
+ shardWithVersion[storage.ShardID(i)] = 0
+ p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion)
+ procedures = append(procedures, p)
+ }
+ _, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+ re.NoError(err)
+
+ // Procedure with different clusterID.
+ for i := 0; i < 3; i++ {
+ shardWithVersion := map[storage.ShardID]uint64{}
+ shardWithVersion[storage.ShardID(i)] = 0
+ p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion)
+ procedures = append(procedures, p)
+ }
+ _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+ re.Error(err)
+
+ // Procedures with different type.
+ for i := 0; i < 3; i++ {
+ shardWithVersion := map[storage.ShardID]uint64{}
+ shardWithVersion[storage.ShardID(i)] = 0
+ p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
+ procedures = append(procedures, p)
+ }
+ _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+ re.Error(err)
+
+ // Procedures with different version.
+ for i := 0; i < 3; i++ {
+ shardWithVersion := map[storage.ShardID]uint64{}
+ shardWithVersion[storage.ShardID(0)] = uint64(i)
+ p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
+ procedures = append(procedures, p)
+ }
+ _, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+ re.Error(err)
+}
+
+func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure {
+ return mockProcedure{
+ ClusterID: clusterID,
+ clusterVersion: clusterVersion,
+ typ: typ,
+ ShardWithVersion: shardWithVersion,
+ }
+}
diff --git a/server/coordinator/procedure/operation/transferleader/transfer_leader.go b/server/coordinator/procedure/operation/transferleader/transfer_leader.go
index ab32098..ce0672e 100644
--- a/server/coordinator/procedure/operation/transferleader/transfer_leader.go
+++ b/server/coordinator/procedure/operation/transferleader/transfer_leader.go
@@ -279,14 +279,14 @@
Shard: metadata.ShardInfo{ID: req.p.params.ShardID, Role: storage.ShardRoleLeader, Version: preVersion + 1},
}
- log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName))
+ log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName))
if err := req.p.params.Dispatch.OpenShard(ctx, req.p.params.NewLeaderNodeName, openShardRequest); err != nil {
procedure.CancelEventWithLog(event, err, "open shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName))
return
}
- log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName))
+ log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName))
}
func finishCallback(event *fsm.Event) {
diff --git a/server/coordinator/procedure/test/common.go b/server/coordinator/procedure/test/common.go
index dc120e3..b267d84 100644
--- a/server/coordinator/procedure/test/common.go
+++ b/server/coordinator/procedure/test/common.go
@@ -6,6 +6,7 @@
"context"
"crypto/rand"
"fmt"
+ "math"
"math/big"
"testing"
"time"
@@ -21,16 +22,17 @@
)
const (
- TestTableName0 = "table0"
- TestTableName1 = "table1"
- TestSchemaName = "TestSchemaName"
- TestRootPath = "/rootPath"
- DefaultIDAllocatorStep = 20
- ClusterName = "ceresdbCluster1"
- DefaultNodeCount = 2
- DefaultShardTotal = 4
- DefaultSchedulerOperator = true
- DefaultTopologyType = "static"
+ TestTableName0 = "table0"
+ TestTableName1 = "table1"
+ TestSchemaName = "TestSchemaName"
+ TestRootPath = "/rootPath"
+ DefaultIDAllocatorStep = 20
+ ClusterName = "ceresdbCluster1"
+ DefaultNodeCount = 2
+ DefaultShardTotal = 4
+ DefaultSchedulerOperator = true
+ DefaultTopologyType = "static"
+ DefaultProcedureExecutingBatchSize = math.MaxUint32
)
type MockDispatch struct{}
@@ -99,13 +101,14 @@
logger := zap.NewNop()
clusterMetadata := metadata.NewClusterMetadata(logger, storage.Cluster{
- ID: 0,
- Name: ClusterName,
- MinNodeCount: DefaultNodeCount,
- ShardTotal: DefaultShardTotal,
- EnableSchedule: DefaultSchedulerOperator,
- TopologyType: DefaultTopologyType,
- CreatedAt: 0,
+ ID: 0,
+ Name: ClusterName,
+ MinNodeCount: DefaultNodeCount,
+ ShardTotal: DefaultShardTotal,
+ EnableSchedule: DefaultSchedulerOperator,
+ TopologyType: DefaultTopologyType,
+ ProcedureExecutingBatchSize: DefaultProcedureExecutingBatchSize,
+ CreatedAt: 0,
}, clusterStorage, client, TestRootPath, DefaultIDAllocatorStep)
err := clusterMetadata.Init(ctx)
diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go
index 26f756b..9d1ebde 100644
--- a/server/coordinator/scheduler/assign_shard_scheduler.go
+++ b/server/coordinator/scheduler/assign_shard_scheduler.go
@@ -5,26 +5,26 @@
import (
"context"
"fmt"
+ "strings"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
+ "github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
)
-const (
- AssignReason = "ShardView exists in metadata but shardNode not exists, assign shard to node"
-)
-
// AssignShardScheduler used to assigning shards without nodes.
type AssignShardScheduler struct {
- factory *coordinator.Factory
- nodePicker coordinator.NodePicker
+ factory *coordinator.Factory
+ nodePicker coordinator.NodePicker
+ procedureExecutingBatchSize uint32
}
-func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
+func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
return &AssignShardScheduler{
- factory: factory,
- nodePicker: nodePicker,
+ factory: factory,
+ nodePicker: nodePicker,
+ procedureExecutingBatchSize: procedureExecutingBatchSize,
}
}
@@ -33,6 +33,8 @@
return ScheduleResult{}, nil
}
+ var procedures []procedure.Procedure
+ var reasons strings.Builder
// Check whether there is a shard without node mapping.
for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping {
_, exists := findNodeByShard(shardView.ShardID, clusterSnapshot.Topology.ClusterView.ShardNodes)
@@ -53,12 +55,30 @@
if err != nil {
return ScheduleResult{}, err
}
- return ScheduleResult{
- Procedure: p,
- Reason: fmt.Sprintf("try to assign shard:%d to node:%s ,reason:%v", shardView.ShardID, newLeaderNode.Node.Name, AssignReason),
- }, nil
+
+ procedures = append(procedures, p)
+ reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name))
+ if len(procedures) >= int(a.procedureExecutingBatchSize) {
+ break
+ }
}
- return ScheduleResult{}, nil
+
+ if len(procedures) == 0 {
+ return ScheduleResult{}, nil
+ }
+
+ batchProcedure, err := a.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{
+ Batch: procedures,
+ BatchType: procedure.TransferLeader,
+ })
+ if err != nil {
+ return ScheduleResult{}, err
+ }
+
+ return ScheduleResult{
+ batchProcedure,
+ reasons.String(),
+ }, nil
}
func findNodeByShard(shardID storage.ShardID, shardNodes []storage.ShardNode) (storage.ShardNode, bool) {
diff --git a/server/coordinator/scheduler/assign_shard_scheduler_test.go b/server/coordinator/scheduler/assign_shard_scheduler_test.go
index 7493ace..1fd2aed 100644
--- a/server/coordinator/scheduler/assign_shard_scheduler_test.go
+++ b/server/coordinator/scheduler/assign_shard_scheduler_test.go
@@ -19,23 +19,23 @@
procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
- s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
+ s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)
// EmptyCluster would be scheduled an empty procedure.
emptyCluster := test.InitEmptyCluster(ctx, t)
result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.Nil(result.Procedure)
+ re.Empty(result)
// PrepareCluster would be scheduled a transfer leader procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.NotNil(result.Procedure)
+ re.NotEmpty(result)
// StableCluster with all shards assigned would be scheduled an empty procedure.
stableCluster := test.InitStableCluster(ctx, t)
result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.Nil(result.Procedure)
+ re.Empty(result)
}
diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go
index 7517273..63b70fe 100644
--- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go
+++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go
@@ -5,23 +5,27 @@
import (
"context"
"fmt"
+ "strings"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
+ "github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"go.uber.org/zap"
)
type RebalancedShardScheduler struct {
- logger *zap.Logger
- factory *coordinator.Factory
- nodePicker coordinator.NodePicker
+ logger *zap.Logger
+ factory *coordinator.Factory
+ nodePicker coordinator.NodePicker
+ procedureExecutingBatchSize uint32
}
-func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
+func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
return &RebalancedShardScheduler{
- logger: logger,
- factory: factory,
- nodePicker: nodePicker,
+ logger: logger,
+ factory: factory,
+ nodePicker: nodePicker,
+ procedureExecutingBatchSize: procedureExecutingBatchSize,
}
}
@@ -31,6 +35,8 @@
return ScheduleResult{}, nil
}
+ var procedures []procedure.Procedure
+ var reasons strings.Builder
// TODO: Improve scheduling efficiency and verify whether the topology changes.
for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes {
node, err := r.nodePicker.PickNode(ctx, shardNode.ID, clusterSnapshot.RegisteredNodes)
@@ -48,12 +54,25 @@
if err != nil {
return ScheduleResult{}, err
}
- return ScheduleResult{
- Procedure: p,
- Reason: fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s", shardNode.ID, shardNode.NodeName, node.Node.Name),
- }, nil
+ procedures = append(procedures, p)
+ reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name))
+ if len(procedures) >= int(r.procedureExecutingBatchSize) {
+ break
+ }
}
}
- return ScheduleResult{}, nil
+ if len(procedures) == 0 {
+ return ScheduleResult{}, nil
+ }
+
+ batchProcedure, err := r.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{
+ Batch: procedures,
+ BatchType: procedure.TransferLeader,
+ })
+ if err != nil {
+ return ScheduleResult{}, err
+ }
+
+ return ScheduleResult{batchProcedure, reasons.String()}, nil
}
diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go
index 89c2f80..ce92628 100644
--- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go
+++ b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go
@@ -19,22 +19,22 @@
procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
- s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
+ s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)
// EmptyCluster would be scheduled an empty procedure.
emptyCluster := test.InitEmptyCluster(ctx, t)
result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.Nil(result.Procedure)
+ re.Empty(result)
// PrepareCluster would be scheduled an empty procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.Nil(result.Procedure)
+ re.Empty(result)
// StableCluster with all shards assigned would be scheduled a load balance procedure.
stableCluster := test.InitStableCluster(ctx, t)
- result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
+ _, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
}
diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go
index c3312c6..2b78eec 100644
--- a/server/coordinator/scheduler/scheduler_manager.go
+++ b/server/coordinator/scheduler/scheduler_manager.go
@@ -52,15 +52,16 @@
rootPath string
// This lock is used to protect the following field.
- lock sync.RWMutex
- registerSchedulers []Scheduler
- shardWatch watch.ShardWatch
- isRunning atomic.Bool
- enableSchedule bool
- topologyType storage.TopologyType
+ lock sync.RWMutex
+ registerSchedulers []Scheduler
+ shardWatch watch.ShardWatch
+ isRunning atomic.Bool
+ enableSchedule bool
+ topologyType storage.TopologyType
+ procedureExecutingBatchSize uint32
}
-func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType) Manager {
+func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) Manager {
var shardWatch watch.ShardWatch
switch topologyType {
case storage.TopologyTypeDynamic:
@@ -71,17 +72,18 @@
}
return &ManagerImpl{
- procedureManager: procedureManager,
- registerSchedulers: []Scheduler{},
- factory: factory,
- nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas),
- clusterMetadata: clusterMetadata,
- client: client,
- shardWatch: shardWatch,
- rootPath: rootPath,
- enableSchedule: enableSchedule,
- topologyType: topologyType,
- logger: logger,
+ procedureManager: procedureManager,
+ registerSchedulers: []Scheduler{},
+ factory: factory,
+ nodePicker: coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas),
+ clusterMetadata: clusterMetadata,
+ client: client,
+ shardWatch: shardWatch,
+ rootPath: rootPath,
+ enableSchedule: enableSchedule,
+ topologyType: topologyType,
+ procedureExecutingBatchSize: procedureExecutingBatchSize,
+ logger: logger,
}
}
@@ -189,13 +191,13 @@
}
func (m *ManagerImpl) createStaticTopologySchedulers() []Scheduler {
- staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker)
+ staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize)
return []Scheduler{staticTopologyShardScheduler}
}
func (m *ManagerImpl) createDynamicTopologySchedulers() []Scheduler {
- assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker)
- rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker)
+ assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize)
+ rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker, m.procedureExecutingBatchSize)
return []Scheduler{assignShardScheduler, rebalancedShardScheduler}
}
diff --git a/server/coordinator/scheduler/scheduler_manager_test.go b/server/coordinator/scheduler/scheduler_manager_test.go
index d2dec5c..a9df691 100644
--- a/server/coordinator/scheduler/scheduler_manager_test.go
+++ b/server/coordinator/scheduler/scheduler_manager_test.go
@@ -31,14 +31,14 @@
_, client, _ := etcdutil.PrepareEtcdServerAndClient(t)
// Create scheduler manager with enableScheduler equal to false.
- schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic)
+ schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1)
err = schedulerManager.Start(ctx)
re.NoError(err)
err = schedulerManager.Stop(ctx)
re.NoError(err)
// Create scheduler manager with static topology.
- schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic)
+ schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1)
err = schedulerManager.Start(ctx)
re.NoError(err)
schedulers := schedulerManager.ListScheduler()
@@ -47,7 +47,7 @@
re.NoError(err)
// Create scheduler manager with dynamic topology.
- schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic)
+ schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1)
err = schedulerManager.Start(ctx)
re.NoError(err)
schedulers = schedulerManager.ListScheduler()
diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go
index ceb67a3..ff4b08b 100644
--- a/server/coordinator/scheduler/static_topology_shard_scheduler.go
+++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go
@@ -5,24 +5,30 @@
import (
"context"
"fmt"
+ "strings"
"time"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
+ "github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
)
type StaticTopologyShardScheduler struct {
- factory *coordinator.Factory
- nodePicker coordinator.NodePicker
+ factory *coordinator.Factory
+ nodePicker coordinator.NodePicker
+ procedureExecutingBatchSize uint32
}
-func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
- return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker}
+func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
+ return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize}
}
func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
+ var procedures []procedure.Procedure
+ var reasons strings.Builder
+
switch clusterSnapshot.Topology.ClusterView.State {
case storage.ClusterStateEmpty:
return ScheduleResult{}, nil
@@ -47,10 +53,11 @@
if err != nil {
return ScheduleResult{}, err
}
- return ScheduleResult{
- Procedure: p,
- Reason: fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s", shardView.ShardID, newLeaderNode.Node.Name),
- }, nil
+ procedures = append(procedures, p)
+ reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
+ if len(procedures) >= int(s.procedureExecutingBatchSize) {
+ break
+ }
}
case storage.ClusterStateStable:
for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ {
@@ -70,15 +77,28 @@
if err != nil {
return ScheduleResult{}, err
}
- return ScheduleResult{
- Procedure: p,
- Reason: fmt.Sprintf("Cluster state is stable, shard:%d is reopened in node:%s", shardNode.ID, node.Node.Name),
- }, nil
+ procedures = append(procedures, p)
+ reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name))
+ if len(procedures) >= int(s.procedureExecutingBatchSize) {
+ break
+ }
}
}
}
- return ScheduleResult{}, nil
+ if len(procedures) == 0 {
+ return ScheduleResult{}, nil
+ }
+
+ batchProcedure, err := s.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{
+ Batch: procedures,
+ BatchType: procedure.TransferLeader,
+ })
+ if err != nil {
+ return ScheduleResult{}, err
+ }
+
+ return ScheduleResult{batchProcedure, reasons.String()}, nil
}
func findOnlineNodeByName(nodeName string, nodes []metadata.RegisteredNode) (metadata.RegisteredNode, error) {
diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go
index 153d555..c76c997 100644
--- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go
+++ b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go
@@ -19,23 +19,23 @@
procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
- s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
+ s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)
// EmptyCluster would be scheduled an empty procedure.
emptyCluster := test.InitEmptyCluster(ctx, t)
result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.Nil(result.Procedure)
+ re.Empty(result)
// PrepareCluster would be scheduled a transfer leader procedure.
prepareCluster := test.InitPrepareCluster(ctx, t)
result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.NotNil(result.Procedure)
+ re.NotEmpty(result)
// StableCluster with all shards assigned would be scheduled a transfer leader procedure by hash rule.
stableCluster := test.InitStableCluster(ctx, t)
result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
re.NoError(err)
- re.NotNil(result.Procedure)
+ re.NotEmpty(result)
}
diff --git a/server/server.go b/server/server.go
index c77bf62..051661c 100644
--- a/server/server.go
+++ b/server/server.go
@@ -237,11 +237,12 @@
}
defaultCluster, err := srv.clusterManager.CreateCluster(ctx, srv.cfg.DefaultClusterName,
metadata.CreateClusterOpts{
- NodeCount: uint32(srv.cfg.DefaultClusterNodeCount),
- ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor),
- ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal),
- EnableSchedule: srv.cfg.EnableSchedule,
- TopologyType: topologyType,
+ NodeCount: uint32(srv.cfg.DefaultClusterNodeCount),
+ ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor),
+ ShardTotal: uint32(srv.cfg.DefaultClusterShardTotal),
+ EnableSchedule: srv.cfg.EnableSchedule,
+ TopologyType: topologyType,
+ ProcedureExecutingBatchSize: srv.cfg.ProcedureExecutingBatchSize,
})
if err != nil {
log.Warn("create default cluster failed", zap.Error(err))
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go
index 94945f1..692ffd6 100644
--- a/server/service/grpc/service.go
+++ b/server/service/grpc/service.go
@@ -297,7 +297,7 @@
return &metaservicepb.RouteTablesResponse{Header: responseHeader(err, "grpc routeTables")}, nil
}
- log.Info("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ",")))
+ log.Debug("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ",")))
// Forward request to the leader.
if ceresmetaClient != nil {
diff --git a/server/service/http/api.go b/server/service/http/api.go
index 6eb4aaa..cd2ed99 100644
--- a/server/service/http/api.go
+++ b/server/service/http/api.go
@@ -542,10 +542,11 @@
}
type UpdateClusterRequest struct {
- NodeCount uint32 `json:"NodeCount"`
- ShardTotal uint32 `json:"ShardTotal"`
- EnableSchedule bool `json:"enableSchedule"`
- TopologyType string `json:"topologyType"`
+ NodeCount uint32 `json:"nodeCount"`
+ ShardTotal uint32 `json:"shardTotal"`
+ EnableSchedule bool `json:"enableSchedule"`
+ TopologyType string `json:"topologyType"`
+ ProcedureExecutingBatchSize uint32 `json:"procedureExecutingBatchSize"`
}
func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) {
@@ -590,8 +591,9 @@
}
if err := a.clusterManager.UpdateCluster(req.Context(), clusterName, metadata.UpdateClusterOpts{
- EnableSchedule: updateClusterRequest.EnableSchedule,
- TopologyType: topologyType,
+ EnableSchedule: updateClusterRequest.EnableSchedule,
+ TopologyType: topologyType,
+ ProcedureExecutingBatchSize: updateClusterRequest.ProcedureExecutingBatchSize,
}); err != nil {
log.Error("update cluster failed", zap.Error(err))
a.respondError(writer, metadata.ErrUpdateCluster, fmt.Sprintf("err: %s", err.Error()))
diff --git a/server/storage/types.go b/server/storage/types.go
index a80cb82..02ef785 100644
--- a/server/storage/types.go
+++ b/server/storage/types.go
@@ -152,12 +152,13 @@
Name string
MinNodeCount uint32
// Deprecated: ReplicationFactor is deprecated after CeresMeta v1.2.0
- ReplicationFactor uint32
- ShardTotal uint32
- EnableSchedule bool
- TopologyType TopologyType
- CreatedAt uint64
- ModifiedAt uint64
+ ReplicationFactor uint32
+ ShardTotal uint32
+ EnableSchedule bool
+ TopologyType TopologyType
+ ProcedureExecutingBatchSize uint32
+ CreatedAt uint64
+ ModifiedAt uint64
}
type ShardNode struct {
@@ -258,27 +259,29 @@
func convertClusterPB(cluster *clusterpb.Cluster) Cluster {
return Cluster{
- ID: ClusterID(cluster.Id),
- Name: cluster.Name,
- MinNodeCount: cluster.MinNodeCount,
- ShardTotal: cluster.ShardTotal,
- EnableSchedule: cluster.EnableSchedule,
- TopologyType: convertTopologyTypePB(cluster.TopologyType),
- CreatedAt: cluster.CreatedAt,
- ModifiedAt: cluster.ModifiedAt,
+ ID: ClusterID(cluster.Id),
+ Name: cluster.Name,
+ MinNodeCount: cluster.MinNodeCount,
+ ShardTotal: cluster.ShardTotal,
+ EnableSchedule: cluster.EnableSchedule,
+ TopologyType: convertTopologyTypePB(cluster.TopologyType),
+ ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize,
+ CreatedAt: cluster.CreatedAt,
+ ModifiedAt: cluster.ModifiedAt,
}
}
func convertClusterToPB(cluster Cluster) clusterpb.Cluster {
return clusterpb.Cluster{
- Id: uint32(cluster.ID),
- Name: cluster.Name,
- MinNodeCount: cluster.MinNodeCount,
- ShardTotal: cluster.ShardTotal,
- EnableSchedule: cluster.EnableSchedule,
- TopologyType: convertTopologyTypeToPB(cluster.TopologyType),
- CreatedAt: cluster.CreatedAt,
- ModifiedAt: cluster.ModifiedAt,
+ Id: uint32(cluster.ID),
+ Name: cluster.Name,
+ MinNodeCount: cluster.MinNodeCount,
+ ShardTotal: cluster.ShardTotal,
+ EnableSchedule: cluster.EnableSchedule,
+ TopologyType: convertTopologyTypeToPB(cluster.TopologyType),
+ ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize,
+ CreatedAt: cluster.CreatedAt,
+ ModifiedAt: cluster.ModifiedAt,
}
}