refactor: support open shard concurrently (#205)

diff --git a/go.mod b/go.mod
index 3a53779..85305f7 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@
 
 require (
 	github.com/AlekSi/gocov-xml v1.0.0
-	github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c
+	github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c
 	github.com/axw/gocov v1.1.0
 	github.com/caarlos0/env/v6 v6.10.1
 	github.com/julienschmidt/httprouter v1.3.0
@@ -18,6 +18,8 @@
 	go.etcd.io/etcd/client/v3 v3.5.4
 	go.etcd.io/etcd/server/v3 v3.5.4
 	go.uber.org/zap v1.21.0
+	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+	golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
 	golang.org/x/tools v0.1.10
 	google.golang.org/grpc v1.47.0
 	google.golang.org/protobuf v1.28.0
@@ -89,11 +91,9 @@
 	golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
 	golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
 	golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
-	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
 	golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
 	golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
 	golang.org/x/text v0.3.7 // indirect
-	golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
 	golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
 	google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
diff --git a/go.sum b/go.sum
index b4f80a2..adf1236 100644
--- a/go.sum
+++ b/go.sum
@@ -18,10 +18,8 @@
 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
 github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY=
-github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c h1:7gmSQsGua+Y1g6ygsC/K75T/zK2ki7y5R5BkrN1/Ymc=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230707021308-fba75ce6409c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go
index 4cd1b89..d08ac03 100644
--- a/server/cluster/cluster.go
+++ b/server/cluster/cluster.go
@@ -39,7 +39,7 @@
 	dispatch := eventdispatch.NewDispatchImpl()
 	procedureFactory := coordinator.NewFactory(id.NewAllocatorImpl(logger, client, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage)
 
-	schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType())
+	schedulerManager := scheduler.NewManager(logger, procedureManager, procedureFactory, metadata, client, rootPath, metadata.GetEnableSchedule(), metadata.GetTopologyType(), metadata.GetProcedureExecutingBatchSize())
 
 	return &Cluster{
 		logger:           logger,
diff --git a/server/cluster/manager.go b/server/cluster/manager.go
index 3696aa7..fea613d 100644
--- a/server/cluster/manager.go
+++ b/server/cluster/manager.go
@@ -112,14 +112,15 @@
 
 	createTime := time.Now().UnixMilli()
 	clusterMetadataStorage := storage.Cluster{
-		ID:             clusterID,
-		Name:           clusterName,
-		MinNodeCount:   opts.NodeCount,
-		ShardTotal:     opts.ShardTotal,
-		EnableSchedule: opts.EnableSchedule,
-		TopologyType:   opts.TopologyType,
-		CreatedAt:      uint64(createTime),
-		ModifiedAt:     uint64(createTime),
+		ID:                          clusterID,
+		Name:                        clusterName,
+		MinNodeCount:                opts.NodeCount,
+		ShardTotal:                  opts.ShardTotal,
+		EnableSchedule:              opts.EnableSchedule,
+		TopologyType:                opts.TopologyType,
+		ProcedureExecutingBatchSize: opts.ProcedureExecutingBatchSize,
+		CreatedAt:                   uint64(createTime),
+		ModifiedAt:                  uint64(createTime),
 	}
 	err = m.storage.CreateCluster(ctx, storage.CreateClusterRequest{
 		Cluster: clusterMetadataStorage,
@@ -164,14 +165,15 @@
 	}
 
 	err = m.storage.UpdateCluster(ctx, storage.UpdateClusterRequest{Cluster: storage.Cluster{
-		ID:             c.GetMetadata().GetClusterID(),
-		Name:           c.GetMetadata().Name(),
-		MinNodeCount:   c.GetMetadata().GetClusterMinNodeCount(),
-		ShardTotal:     c.GetMetadata().GetTotalShardNum(),
-		EnableSchedule: opt.EnableSchedule,
-		TopologyType:   opt.TopologyType,
-		CreatedAt:      c.GetMetadata().GetCreateTime(),
-		ModifiedAt:     uint64(time.Now().UnixMilli()),
+		ID:                          c.GetMetadata().GetClusterID(),
+		Name:                        c.GetMetadata().Name(),
+		MinNodeCount:                c.GetMetadata().GetClusterMinNodeCount(),
+		ShardTotal:                  c.GetMetadata().GetTotalShardNum(),
+		EnableSchedule:              opt.EnableSchedule,
+		TopologyType:                opt.TopologyType,
+		ProcedureExecutingBatchSize: opt.ProcedureExecutingBatchSize,
+		CreatedAt:                   c.GetMetadata().GetCreateTime(),
+		ModifiedAt:                  uint64(time.Now().UnixMilli()),
 	}})
 	if err != nil {
 		log.Error("update cluster", zap.Error(err))
@@ -333,14 +335,15 @@
 		if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown {
 			req := storage.UpdateClusterRequest{
 				Cluster: storage.Cluster{
-					ID:             metadataStorage.ID,
-					Name:           metadataStorage.Name,
-					MinNodeCount:   metadataStorage.MinNodeCount,
-					ShardTotal:     metadataStorage.ShardTotal,
-					EnableSchedule: metadataStorage.EnableSchedule,
-					TopologyType:   m.topologyType,
-					CreatedAt:      metadataStorage.CreatedAt,
-					ModifiedAt:     uint64(time.Now().UnixMilli()),
+					ID:                          metadataStorage.ID,
+					Name:                        metadataStorage.Name,
+					MinNodeCount:                metadataStorage.MinNodeCount,
+					ShardTotal:                  metadataStorage.ShardTotal,
+					EnableSchedule:              metadataStorage.EnableSchedule,
+					TopologyType:                m.topologyType,
+					ProcedureExecutingBatchSize: metadataStorage.ProcedureExecutingBatchSize,
+					CreatedAt:                   metadataStorage.CreatedAt,
+					ModifiedAt:                  uint64(time.Now().UnixMilli()),
 				},
 			}
 			if err := m.storage.UpdateCluster(ctx, req); err != nil {
diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go
index 781f8d9..112f98a 100644
--- a/server/cluster/metadata/cluster_metadata.go
+++ b/server/cluster/metadata/cluster_metadata.go
@@ -576,6 +576,13 @@
 	return c.metaData.TopologyType
 }
 
+func (c *ClusterMetadata) GetProcedureExecutingBatchSize() uint32 {
+	c.lock.RLock()
+	defer c.lock.RUnlock()
+
+	return c.metaData.ProcedureExecutingBatchSize
+}
+
 func (c *ClusterMetadata) GetCreateTime() uint64 {
 	c.lock.RLock()
 	defer c.lock.RUnlock()
diff --git a/server/cluster/metadata/types.go b/server/cluster/metadata/types.go
index 08bd753..bcac421 100644
--- a/server/cluster/metadata/types.go
+++ b/server/cluster/metadata/types.go
@@ -47,16 +47,18 @@
 }
 
 type CreateClusterOpts struct {
-	NodeCount         uint32
-	ReplicationFactor uint32
-	ShardTotal        uint32
-	EnableSchedule    bool
-	TopologyType      storage.TopologyType
+	NodeCount                   uint32
+	ReplicationFactor           uint32
+	ShardTotal                  uint32
+	EnableSchedule              bool
+	TopologyType                storage.TopologyType
+	ProcedureExecutingBatchSize uint32
 }
 
 type UpdateClusterOpts struct {
-	EnableSchedule bool
-	TopologyType   storage.TopologyType
+	EnableSchedule              bool
+	TopologyType                storage.TopologyType
+	ProcedureExecutingBatchSize uint32
 }
 
 type CreateTableMetadataRequest struct {
diff --git a/server/config/config.go b/server/config/config.go
index 8ba4630..f728165 100644
--- a/server/config/config.go
+++ b/server/config/config.go
@@ -5,6 +5,7 @@
 import (
 	"flag"
 	"fmt"
+	"math"
 	"os"
 	"strings"
 	"time"
@@ -52,7 +53,8 @@
 	defaultClusterShardTotal        = 8
 	enableSchedule                  = true
 	// topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage.
-	defaultTopologyType = "static"
+	defaultTopologyType                = "static"
+	defaultProcedureExecutingBatchSize = math.MaxUint32
 
 	defaultHTTPPort = 8080
 
@@ -127,6 +129,8 @@
 	EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"`
 	// TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster.
 	TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"`
+	// ProcedureExecutingBatchSize determines the maximum number of shards in a single batch when opening shards concurrently.
+	ProcedureExecutingBatchSize uint32 `toml:"procedure-executing-batch-size" env:"PROCEDURE_EXECUTING_BATCH_SIZE"`
 
 	ClientUrls          string `toml:"client-urls" env:"CLIENT_URLS"`
 	PeerUrls            string `toml:"peer-urls" env:"PEER_URLS"`
@@ -296,6 +300,7 @@
 		DefaultClusterShardTotal:        defaultClusterShardTotal,
 		EnableSchedule:                  enableSchedule,
 		TopologyType:                    defaultTopologyType,
+		ProcedureExecutingBatchSize:     defaultProcedureExecutingBatchSize,
 
 		HTTPPort: defaultHTTPPort,
 	}
diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go
index 8e46d87..c5f0ec0 100644
--- a/server/coordinator/factory.go
+++ b/server/coordinator/factory.go
@@ -81,6 +81,11 @@
 	OnFailed    func(error) error
 }
 
+type BatchRequest struct {
+	Batch     []procedure.Procedure
+	BatchType procedure.Typ
+}
+
 func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory {
 	return &Factory{
 		idAllocator: allocator,
@@ -253,6 +258,15 @@
 	)
 }
 
+func (f *Factory) CreateBatchTransferLeaderProcedure(ctx context.Context, request BatchRequest) (procedure.Procedure, error) {
+	id, err := f.allocProcedureID(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	return transferleader.NewBatchTransferLeaderProcedure(id, request.Batch)
+}
+
 func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
 	id, err := f.idAllocator.Alloc(ctx)
 	if err != nil {
diff --git a/server/coordinator/procedure/error.go b/server/coordinator/procedure/error.go
index 44c2f5b..29a8864 100644
--- a/server/coordinator/procedure/error.go
+++ b/server/coordinator/procedure/error.go
@@ -22,4 +22,6 @@
 	ErrQueueFull               = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data")
 	ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure")
 	ErrShardNumberNotEnough    = coderr.NewCodeError(coderr.Internal, "shard number not enough")
+	ErrEmptyBatchProcedure     = coderr.NewCodeError(coderr.Internal, "procedure batch is empty")
+	ErrMergeBatchProcedure     = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch")
 )
diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go
index ff37a82..d60437d 100644
--- a/server/coordinator/procedure/manager_impl.go
+++ b/server/coordinator/procedure/manager_impl.go
@@ -75,6 +75,7 @@
 	return nil
 }
 
+// TODO: Filter duplicate submitted Procedure.
 func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error {
 	if err := m.waitingProcedures.Push(procedure, 0); err != nil {
 		return err
diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go
new file mode 100644
index 0000000..8d2f5fa
--- /dev/null
+++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader.go
@@ -0,0 +1,130 @@
+// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package transferleader
+
+import (
+	"context"
+	"fmt"
+	"sync"
+
+	"github.com/CeresDB/ceresmeta/pkg/log"
+	"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
+	"github.com/CeresDB/ceresmeta/server/storage"
+	"github.com/pkg/errors"
+	"go.uber.org/zap"
+	"golang.org/x/sync/errgroup"
+)
+
+// BatchTransferLeaderProcedure is a proxy procedure contains a batch of TransferLeaderProcedure.
+// It is used to support concurrent execution of a batch of TransferLeaderProcedure with same version.
+type BatchTransferLeaderProcedure struct {
+	id                 uint64
+	batch              []procedure.Procedure
+	relatedVersionInfo procedure.RelatedVersionInfo
+
+	// Protect the state.
+	lock  sync.RWMutex
+	state procedure.State
+}
+
+func NewBatchTransferLeaderProcedure(id uint64, batch []procedure.Procedure) (procedure.Procedure, error) {
+	if len(batch) == 0 {
+		return nil, procedure.ErrEmptyBatchProcedure
+	}
+
+	relateVersionInfo, err := buildBatchRelatedVersionInfo(batch)
+	if err != nil {
+		return nil, err
+	}
+
+	return &BatchTransferLeaderProcedure{id: id, batch: batch, state: procedure.StateInit, relatedVersionInfo: relateVersionInfo}, nil
+}
+
+func buildBatchRelatedVersionInfo(batch []procedure.Procedure) (procedure.RelatedVersionInfo, error) {
+	if len(batch) == 0 {
+		return procedure.RelatedVersionInfo{}, nil
+	}
+
+	result := procedure.RelatedVersionInfo{
+		ClusterID:        batch[0].RelatedVersionInfo().ClusterID,
+		ShardWithVersion: map[storage.ShardID]uint64{},
+		ClusterVersion:   batch[0].RelatedVersionInfo().ClusterVersion,
+	}
+
+	// The version of this batch of procedures must be the same.
+	for _, p := range batch {
+		if p.RelatedVersionInfo().ClusterID != result.ClusterID {
+			return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterID in the same batch is inconsistent")
+		}
+		if p.RelatedVersionInfo().ClusterVersion != result.ClusterVersion {
+			return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, "procedure clusterVersion in the same batch is inconsistent")
+		}
+		// The ShardVersion of the same shard must be consistent.
+		for shardID, version := range p.RelatedVersionInfo().ShardWithVersion {
+			if resultVersion, exists := result.ShardWithVersion[shardID]; exists {
+				if version != resultVersion {
+					return procedure.RelatedVersionInfo{}, errors.WithMessage(procedure.ErrMergeBatchProcedure, fmt.Sprintf("procedure shardVersion in the same batch is inconsistent, shardID:%d, expetcdShardVersion:%d, shardVersion:%d", shardID, version, resultVersion))
+				}
+			} else {
+				result.ShardWithVersion[shardID] = version
+			}
+		}
+	}
+
+	return result, nil
+}
+
+func (p *BatchTransferLeaderProcedure) ID() uint64 {
+	return p.id
+}
+
+func (p *BatchTransferLeaderProcedure) Typ() procedure.Typ {
+	return procedure.TransferLeader
+}
+
+func (p *BatchTransferLeaderProcedure) Start(ctx context.Context) error {
+	// Start procedures with multiple goroutine.
+	g, _ := errgroup.WithContext(ctx)
+	for _, p := range p.batch {
+		p := p
+		g.Go(func() error {
+			err := p.Start(ctx)
+			if err != nil {
+				log.Error("procedure start failed", zap.Error(err), zap.String("procedure", fmt.Sprintf("%v", p)), zap.Error(err))
+			}
+			return err
+		})
+	}
+
+	if err := g.Wait(); err != nil {
+		p.updateStateWithLock(procedure.StateFailed)
+		return err
+	}
+
+	p.updateStateWithLock(procedure.StateFinished)
+	return nil
+}
+
+func (p *BatchTransferLeaderProcedure) Cancel(_ context.Context) error {
+	p.updateStateWithLock(procedure.StateCancelled)
+	return nil
+}
+
+func (p *BatchTransferLeaderProcedure) State() procedure.State {
+	return p.state
+}
+
+func (p *BatchTransferLeaderProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
+	return p.relatedVersionInfo
+}
+
+func (p *BatchTransferLeaderProcedure) Priority() procedure.Priority {
+	return p.batch[0].Priority()
+}
+
+func (p *BatchTransferLeaderProcedure) updateStateWithLock(state procedure.State) {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	p.state = state
+}
diff --git a/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go
new file mode 100644
index 0000000..4838ccb
--- /dev/null
+++ b/server/coordinator/procedure/operation/transferleader/batch_transfer_leader_test.go
@@ -0,0 +1,106 @@
+// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
+
+package transferleader_test
+
+import (
+	"context"
+	"testing"
+
+	"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
+	"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader"
+	"github.com/CeresDB/ceresmeta/server/storage"
+	"github.com/stretchr/testify/require"
+)
+
+type mockProcedure struct {
+	ClusterID        storage.ClusterID
+	clusterVersion   uint64
+	typ              procedure.Typ
+	ShardWithVersion map[storage.ShardID]uint64
+}
+
+func (m mockProcedure) ID() uint64 {
+	return 0
+}
+
+func (m mockProcedure) Typ() procedure.Typ {
+	return m.typ
+}
+
+func (m mockProcedure) Start(_ context.Context) error {
+	return nil
+}
+
+func (m mockProcedure) Cancel(_ context.Context) error {
+	return nil
+}
+
+func (m mockProcedure) State() procedure.State {
+	return procedure.StateInit
+}
+
+func (m mockProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
+	return procedure.RelatedVersionInfo{
+		ClusterID:        m.ClusterID,
+		ShardWithVersion: m.ShardWithVersion,
+		ClusterVersion:   m.clusterVersion,
+	}
+}
+
+func (m mockProcedure) Priority() procedure.Priority {
+	return procedure.PriorityLow
+}
+
+func TestBatchProcedure(t *testing.T) {
+	re := require.New(t)
+	var procedures []procedure.Procedure
+
+	// Procedures with same type and version.
+	for i := 0; i < 3; i++ {
+		shardWithVersion := map[storage.ShardID]uint64{}
+		shardWithVersion[storage.ShardID(i)] = 0
+		p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion)
+		procedures = append(procedures, p)
+	}
+	_, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+	re.NoError(err)
+
+	// Procedure with different clusterID.
+	for i := 0; i < 3; i++ {
+		shardWithVersion := map[storage.ShardID]uint64{}
+		shardWithVersion[storage.ShardID(i)] = 0
+		p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion)
+		procedures = append(procedures, p)
+	}
+	_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+	re.Error(err)
+
+	// Procedures with different type.
+	for i := 0; i < 3; i++ {
+		shardWithVersion := map[storage.ShardID]uint64{}
+		shardWithVersion[storage.ShardID(i)] = 0
+		p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
+		procedures = append(procedures, p)
+	}
+	_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+	re.Error(err)
+
+	// Procedures with different version.
+	for i := 0; i < 3; i++ {
+		shardWithVersion := map[storage.ShardID]uint64{}
+		shardWithVersion[storage.ShardID(0)] = uint64(i)
+		p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
+		procedures = append(procedures, p)
+	}
+	_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
+	re.Error(err)
+}
+
+func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure {
+	return mockProcedure{
+		ClusterID:        clusterID,
+		clusterVersion:   clusterVersion,
+		typ:              typ,
+		ShardWithVersion: shardWithVersion,
+	}
+}
diff --git a/server/coordinator/procedure/operation/transferleader/transfer_leader.go b/server/coordinator/procedure/operation/transferleader/transfer_leader.go
index ab32098..ce0672e 100644
--- a/server/coordinator/procedure/operation/transferleader/transfer_leader.go
+++ b/server/coordinator/procedure/operation/transferleader/transfer_leader.go
@@ -279,14 +279,14 @@
 		Shard: metadata.ShardInfo{ID: req.p.params.ShardID, Role: storage.ShardRoleLeader, Version: preVersion + 1},
 	}
 
-	log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName))
+	log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName))
 
 	if err := req.p.params.Dispatch.OpenShard(ctx, req.p.params.NewLeaderNodeName, openShardRequest); err != nil {
 		procedure.CancelEventWithLog(event, err, "open shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName))
 		return
 	}
 
-	log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName))
+	log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", uint64(req.p.params.ShardID)), zap.String("newLeader", req.p.params.NewLeaderNodeName))
 }
 
 func finishCallback(event *fsm.Event) {
diff --git a/server/coordinator/procedure/test/common.go b/server/coordinator/procedure/test/common.go
index dc120e3..b267d84 100644
--- a/server/coordinator/procedure/test/common.go
+++ b/server/coordinator/procedure/test/common.go
@@ -6,6 +6,7 @@
 	"context"
 	"crypto/rand"
 	"fmt"
+	"math"
 	"math/big"
 	"testing"
 	"time"
@@ -21,16 +22,17 @@
 )
 
 const (
-	TestTableName0           = "table0"
-	TestTableName1           = "table1"
-	TestSchemaName           = "TestSchemaName"
-	TestRootPath             = "/rootPath"
-	DefaultIDAllocatorStep   = 20
-	ClusterName              = "ceresdbCluster1"
-	DefaultNodeCount         = 2
-	DefaultShardTotal        = 4
-	DefaultSchedulerOperator = true
-	DefaultTopologyType      = "static"
+	TestTableName0                     = "table0"
+	TestTableName1                     = "table1"
+	TestSchemaName                     = "TestSchemaName"
+	TestRootPath                       = "/rootPath"
+	DefaultIDAllocatorStep             = 20
+	ClusterName                        = "ceresdbCluster1"
+	DefaultNodeCount                   = 2
+	DefaultShardTotal                  = 4
+	DefaultSchedulerOperator           = true
+	DefaultTopologyType                = "static"
+	DefaultProcedureExecutingBatchSize = math.MaxUint32
 )
 
 type MockDispatch struct{}
@@ -99,13 +101,14 @@
 	logger := zap.NewNop()
 
 	clusterMetadata := metadata.NewClusterMetadata(logger, storage.Cluster{
-		ID:             0,
-		Name:           ClusterName,
-		MinNodeCount:   DefaultNodeCount,
-		ShardTotal:     DefaultShardTotal,
-		EnableSchedule: DefaultSchedulerOperator,
-		TopologyType:   DefaultTopologyType,
-		CreatedAt:      0,
+		ID:                          0,
+		Name:                        ClusterName,
+		MinNodeCount:                DefaultNodeCount,
+		ShardTotal:                  DefaultShardTotal,
+		EnableSchedule:              DefaultSchedulerOperator,
+		TopologyType:                DefaultTopologyType,
+		ProcedureExecutingBatchSize: DefaultProcedureExecutingBatchSize,
+		CreatedAt:                   0,
 	}, clusterStorage, client, TestRootPath, DefaultIDAllocatorStep)
 
 	err := clusterMetadata.Init(ctx)
diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go
index 26f756b..9d1ebde 100644
--- a/server/coordinator/scheduler/assign_shard_scheduler.go
+++ b/server/coordinator/scheduler/assign_shard_scheduler.go
@@ -5,26 +5,26 @@
 import (
 	"context"
 	"fmt"
+	"strings"
 
 	"github.com/CeresDB/ceresmeta/server/cluster/metadata"
 	"github.com/CeresDB/ceresmeta/server/coordinator"
+	"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
 	"github.com/CeresDB/ceresmeta/server/storage"
 )
 
-const (
-	AssignReason = "ShardView exists in metadata but shardNode not exists, assign shard to node"
-)
-
 // AssignShardScheduler used to assigning shards without nodes.
 type AssignShardScheduler struct {
-	factory    *coordinator.Factory
-	nodePicker coordinator.NodePicker
+	factory                     *coordinator.Factory
+	nodePicker                  coordinator.NodePicker
+	procedureExecutingBatchSize uint32
 }
 
-func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
+func NewAssignShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
 	return &AssignShardScheduler{
-		factory:    factory,
-		nodePicker: nodePicker,
+		factory:                     factory,
+		nodePicker:                  nodePicker,
+		procedureExecutingBatchSize: procedureExecutingBatchSize,
 	}
 }
 
@@ -33,6 +33,8 @@
 		return ScheduleResult{}, nil
 	}
 
+	var procedures []procedure.Procedure
+	var reasons strings.Builder
 	// Check whether there is a shard without node mapping.
 	for _, shardView := range clusterSnapshot.Topology.ShardViewsMapping {
 		_, exists := findNodeByShard(shardView.ShardID, clusterSnapshot.Topology.ClusterView.ShardNodes)
@@ -53,12 +55,30 @@
 		if err != nil {
 			return ScheduleResult{}, err
 		}
-		return ScheduleResult{
-			Procedure: p,
-			Reason:    fmt.Sprintf("try to assign shard:%d to node:%s ,reason:%v", shardView.ShardID, newLeaderNode.Node.Name, AssignReason),
-		}, nil
+
+		procedures = append(procedures, p)
+		reasons.WriteString(fmt.Sprintf("the shard is not assigned to any node, try to assign it to node, shardID:%d, node:%s.", shardView.ShardID, newLeaderNode.Node.Name))
+		if len(procedures) >= int(a.procedureExecutingBatchSize) {
+			break
+		}
 	}
-	return ScheduleResult{}, nil
+
+	if len(procedures) == 0 {
+		return ScheduleResult{}, nil
+	}
+
+	batchProcedure, err := a.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{
+		Batch:     procedures,
+		BatchType: procedure.TransferLeader,
+	})
+	if err != nil {
+		return ScheduleResult{}, err
+	}
+
+	return ScheduleResult{
+		batchProcedure,
+		reasons.String(),
+	}, nil
 }
 
 func findNodeByShard(shardID storage.ShardID, shardNodes []storage.ShardNode) (storage.ShardNode, bool) {
diff --git a/server/coordinator/scheduler/assign_shard_scheduler_test.go b/server/coordinator/scheduler/assign_shard_scheduler_test.go
index 7493ace..1fd2aed 100644
--- a/server/coordinator/scheduler/assign_shard_scheduler_test.go
+++ b/server/coordinator/scheduler/assign_shard_scheduler_test.go
@@ -19,23 +19,23 @@
 
 	procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
 
-	s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
+	s := scheduler.NewAssignShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)
 
 	// EmptyCluster would be scheduled an empty procedure.
 	emptyCluster := test.InitEmptyCluster(ctx, t)
 	result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.Nil(result.Procedure)
+	re.Empty(result)
 
 	// PrepareCluster would be scheduled a transfer leader procedure.
 	prepareCluster := test.InitPrepareCluster(ctx, t)
 	result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.NotNil(result.Procedure)
+	re.NotEmpty(result)
 
 	// StableCluster with all shards assigned would be scheduled an empty procedure.
 	stableCluster := test.InitStableCluster(ctx, t)
 	result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.Nil(result.Procedure)
+	re.Empty(result)
 }
diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler.go b/server/coordinator/scheduler/rebalanced_shard_scheduler.go
index 7517273..63b70fe 100644
--- a/server/coordinator/scheduler/rebalanced_shard_scheduler.go
+++ b/server/coordinator/scheduler/rebalanced_shard_scheduler.go
@@ -5,23 +5,27 @@
 import (
 	"context"
 	"fmt"
+	"strings"
 
 	"github.com/CeresDB/ceresmeta/server/cluster/metadata"
 	"github.com/CeresDB/ceresmeta/server/coordinator"
+	"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
 	"go.uber.org/zap"
 )
 
 type RebalancedShardScheduler struct {
-	logger     *zap.Logger
-	factory    *coordinator.Factory
-	nodePicker coordinator.NodePicker
+	logger                      *zap.Logger
+	factory                     *coordinator.Factory
+	nodePicker                  coordinator.NodePicker
+	procedureExecutingBatchSize uint32
 }
 
-func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
+func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
 	return &RebalancedShardScheduler{
-		logger:     logger,
-		factory:    factory,
-		nodePicker: nodePicker,
+		logger:                      logger,
+		factory:                     factory,
+		nodePicker:                  nodePicker,
+		procedureExecutingBatchSize: procedureExecutingBatchSize,
 	}
 }
 
@@ -31,6 +35,8 @@
 		return ScheduleResult{}, nil
 	}
 
+	var procedures []procedure.Procedure
+	var reasons strings.Builder
 	// TODO: Improve scheduling efficiency and verify whether the topology changes.
 	for _, shardNode := range clusterSnapshot.Topology.ClusterView.ShardNodes {
 		node, err := r.nodePicker.PickNode(ctx, shardNode.ID, clusterSnapshot.RegisteredNodes)
@@ -48,12 +54,25 @@
 			if err != nil {
 				return ScheduleResult{}, err
 			}
-			return ScheduleResult{
-				Procedure: p,
-				Reason:    fmt.Sprintf("the shard:%d on the node:%s does not meet the balance requirements,it should be assigned to node:%s", shardNode.ID, shardNode.NodeName, node.Node.Name),
-			}, nil
+			procedures = append(procedures, p)
+			reasons.WriteString(fmt.Sprintf("the shard does not meet the balance requirements,it should be assigned to node, shardID:%d, oldNode:%s, newNode:%s.", shardNode.ID, shardNode.NodeName, node.Node.Name))
+			if len(procedures) >= int(r.procedureExecutingBatchSize) {
+				break
+			}
 		}
 	}
 
-	return ScheduleResult{}, nil
+	if len(procedures) == 0 {
+		return ScheduleResult{}, nil
+	}
+
+	batchProcedure, err := r.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{
+		Batch:     procedures,
+		BatchType: procedure.TransferLeader,
+	})
+	if err != nil {
+		return ScheduleResult{}, err
+	}
+
+	return ScheduleResult{batchProcedure, reasons.String()}, nil
 }
diff --git a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go
index 89c2f80..ce92628 100644
--- a/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go
+++ b/server/coordinator/scheduler/rebalanced_shard_scheduler_test.go
@@ -19,22 +19,22 @@
 
 	procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
 
-	s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
+	s := scheduler.NewRebalancedShardScheduler(zap.NewNop(), procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)
 
 	// EmptyCluster would be scheduled an empty procedure.
 	emptyCluster := test.InitEmptyCluster(ctx, t)
 	result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.Nil(result.Procedure)
+	re.Empty(result)
 
 	// PrepareCluster would be scheduled an empty procedure.
 	prepareCluster := test.InitPrepareCluster(ctx, t)
 	result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.Nil(result.Procedure)
+	re.Empty(result)
 
 	// StableCluster with all shards assigned would be scheduled a load balance procedure.
 	stableCluster := test.InitStableCluster(ctx, t)
-	result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
+	_, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
 }
diff --git a/server/coordinator/scheduler/scheduler_manager.go b/server/coordinator/scheduler/scheduler_manager.go
index c3312c6..2b78eec 100644
--- a/server/coordinator/scheduler/scheduler_manager.go
+++ b/server/coordinator/scheduler/scheduler_manager.go
@@ -52,15 +52,16 @@
 	rootPath         string
 
 	// This lock is used to protect the following field.
-	lock               sync.RWMutex
-	registerSchedulers []Scheduler
-	shardWatch         watch.ShardWatch
-	isRunning          atomic.Bool
-	enableSchedule     bool
-	topologyType       storage.TopologyType
+	lock                        sync.RWMutex
+	registerSchedulers          []Scheduler
+	shardWatch                  watch.ShardWatch
+	isRunning                   atomic.Bool
+	enableSchedule              bool
+	topologyType                storage.TopologyType
+	procedureExecutingBatchSize uint32
 }
 
-func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType) Manager {
+func NewManager(logger *zap.Logger, procedureManager procedure.Manager, factory *coordinator.Factory, clusterMetadata *metadata.ClusterMetadata, client *clientv3.Client, rootPath string, enableSchedule bool, topologyType storage.TopologyType, procedureExecutingBatchSize uint32) Manager {
 	var shardWatch watch.ShardWatch
 	switch topologyType {
 	case storage.TopologyTypeDynamic:
@@ -71,17 +72,18 @@
 	}
 
 	return &ManagerImpl{
-		procedureManager:   procedureManager,
-		registerSchedulers: []Scheduler{},
-		factory:            factory,
-		nodePicker:         coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas),
-		clusterMetadata:    clusterMetadata,
-		client:             client,
-		shardWatch:         shardWatch,
-		rootPath:           rootPath,
-		enableSchedule:     enableSchedule,
-		topologyType:       topologyType,
-		logger:             logger,
+		procedureManager:            procedureManager,
+		registerSchedulers:          []Scheduler{},
+		factory:                     factory,
+		nodePicker:                  coordinator.NewConsistentHashNodePicker(logger, defaultHashReplicas),
+		clusterMetadata:             clusterMetadata,
+		client:                      client,
+		shardWatch:                  shardWatch,
+		rootPath:                    rootPath,
+		enableSchedule:              enableSchedule,
+		topologyType:                topologyType,
+		procedureExecutingBatchSize: procedureExecutingBatchSize,
+		logger:                      logger,
 	}
 }
 
@@ -189,13 +191,13 @@
 }
 
 func (m *ManagerImpl) createStaticTopologySchedulers() []Scheduler {
-	staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker)
+	staticTopologyShardScheduler := NewStaticTopologyShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize)
 	return []Scheduler{staticTopologyShardScheduler}
 }
 
 func (m *ManagerImpl) createDynamicTopologySchedulers() []Scheduler {
-	assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker)
-	rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker)
+	assignShardScheduler := NewAssignShardScheduler(m.factory, m.nodePicker, m.procedureExecutingBatchSize)
+	rebalancedShardScheduler := NewRebalancedShardScheduler(m.logger, m.factory, m.nodePicker, m.procedureExecutingBatchSize)
 	return []Scheduler{assignShardScheduler, rebalancedShardScheduler}
 }
 
diff --git a/server/coordinator/scheduler/scheduler_manager_test.go b/server/coordinator/scheduler/scheduler_manager_test.go
index d2dec5c..a9df691 100644
--- a/server/coordinator/scheduler/scheduler_manager_test.go
+++ b/server/coordinator/scheduler/scheduler_manager_test.go
@@ -31,14 +31,14 @@
 	_, client, _ := etcdutil.PrepareEtcdServerAndClient(t)
 
 	// Create scheduler manager with enableScheduler equal to false.
-	schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic)
+	schedulerManager := scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", false, storage.TopologyTypeStatic, 1)
 	err = schedulerManager.Start(ctx)
 	re.NoError(err)
 	err = schedulerManager.Stop(ctx)
 	re.NoError(err)
 
 	// Create scheduler manager with static topology.
-	schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic)
+	schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeStatic, 1)
 	err = schedulerManager.Start(ctx)
 	re.NoError(err)
 	schedulers := schedulerManager.ListScheduler()
@@ -47,7 +47,7 @@
 	re.NoError(err)
 
 	// Create scheduler manager with dynamic topology.
-	schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic)
+	schedulerManager = scheduler.NewManager(zap.NewNop(), procedureManager, f, c.GetMetadata(), client, "/rootPath", true, storage.TopologyTypeDynamic, 1)
 	err = schedulerManager.Start(ctx)
 	re.NoError(err)
 	schedulers = schedulerManager.ListScheduler()
diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler.go b/server/coordinator/scheduler/static_topology_shard_scheduler.go
index ceb67a3..ff4b08b 100644
--- a/server/coordinator/scheduler/static_topology_shard_scheduler.go
+++ b/server/coordinator/scheduler/static_topology_shard_scheduler.go
@@ -5,24 +5,30 @@
 import (
 	"context"
 	"fmt"
+	"strings"
 	"time"
 
 	"github.com/CeresDB/ceresmeta/server/cluster/metadata"
 	"github.com/CeresDB/ceresmeta/server/coordinator"
+	"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
 	"github.com/CeresDB/ceresmeta/server/storage"
 	"github.com/pkg/errors"
 )
 
 type StaticTopologyShardScheduler struct {
-	factory    *coordinator.Factory
-	nodePicker coordinator.NodePicker
+	factory                     *coordinator.Factory
+	nodePicker                  coordinator.NodePicker
+	procedureExecutingBatchSize uint32
 }
 
-func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker) Scheduler {
-	return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker}
+func NewStaticTopologyShardScheduler(factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
+	return &StaticTopologyShardScheduler{factory: factory, nodePicker: nodePicker, procedureExecutingBatchSize: procedureExecutingBatchSize}
 }
 
 func (s *StaticTopologyShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
+	var procedures []procedure.Procedure
+	var reasons strings.Builder
+
 	switch clusterSnapshot.Topology.ClusterView.State {
 	case storage.ClusterStateEmpty:
 		return ScheduleResult{}, nil
@@ -47,10 +53,11 @@
 			if err != nil {
 				return ScheduleResult{}, err
 			}
-			return ScheduleResult{
-				Procedure: p,
-				Reason:    fmt.Sprintf("Cluster initialization, shard:%d is assigned to node:%s", shardView.ShardID, newLeaderNode.Node.Name),
-			}, nil
+			procedures = append(procedures, p)
+			reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardView.ShardID, newLeaderNode.Node.Name))
+			if len(procedures) >= int(s.procedureExecutingBatchSize) {
+				break
+			}
 		}
 	case storage.ClusterStateStable:
 		for i := 0; i < len(clusterSnapshot.Topology.ClusterView.ShardNodes); i++ {
@@ -70,15 +77,28 @@
 				if err != nil {
 					return ScheduleResult{}, err
 				}
-				return ScheduleResult{
-					Procedure: p,
-					Reason:    fmt.Sprintf("Cluster state is stable, shard:%d is reopened in node:%s", shardNode.ID, node.Node.Name),
-				}, nil
+				procedures = append(procedures, p)
+				reasons.WriteString(fmt.Sprintf("Cluster initialization, assign shard to node, shardID:%d, nodeName:%s. ", shardNode.ID, node.Node.Name))
+				if len(procedures) >= int(s.procedureExecutingBatchSize) {
+					break
+				}
 			}
 		}
 	}
 
-	return ScheduleResult{}, nil
+	if len(procedures) == 0 {
+		return ScheduleResult{}, nil
+	}
+
+	batchProcedure, err := s.factory.CreateBatchTransferLeaderProcedure(ctx, coordinator.BatchRequest{
+		Batch:     procedures,
+		BatchType: procedure.TransferLeader,
+	})
+	if err != nil {
+		return ScheduleResult{}, err
+	}
+
+	return ScheduleResult{batchProcedure, reasons.String()}, nil
 }
 
 func findOnlineNodeByName(nodeName string, nodes []metadata.RegisteredNode) (metadata.RegisteredNode, error) {
diff --git a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go
index 153d555..c76c997 100644
--- a/server/coordinator/scheduler/static_topology_shard_scheduler_test.go
+++ b/server/coordinator/scheduler/static_topology_shard_scheduler_test.go
@@ -19,23 +19,23 @@
 
 	procedureFactory := coordinator.NewFactory(test.MockIDAllocator{}, test.MockDispatch{}, test.NewTestStorage(t))
 
-	s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50))
+	s := scheduler.NewStaticTopologyShardScheduler(procedureFactory, coordinator.NewConsistentHashNodePicker(zap.NewNop(), 50), 1)
 
 	// EmptyCluster would be scheduled an empty procedure.
 	emptyCluster := test.InitEmptyCluster(ctx, t)
 	result, err := s.Schedule(ctx, emptyCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.Nil(result.Procedure)
+	re.Empty(result)
 
 	// PrepareCluster would be scheduled a transfer leader procedure.
 	prepareCluster := test.InitPrepareCluster(ctx, t)
 	result, err = s.Schedule(ctx, prepareCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.NotNil(result.Procedure)
+	re.NotEmpty(result)
 
 	// StableCluster with all shards assigned would be scheduled a transfer leader procedure by hash rule.
 	stableCluster := test.InitStableCluster(ctx, t)
 	result, err = s.Schedule(ctx, stableCluster.GetMetadata().GetClusterSnapshot())
 	re.NoError(err)
-	re.NotNil(result.Procedure)
+	re.NotEmpty(result)
 }
diff --git a/server/server.go b/server/server.go
index c77bf62..051661c 100644
--- a/server/server.go
+++ b/server/server.go
@@ -237,11 +237,12 @@
 		}
 		defaultCluster, err := srv.clusterManager.CreateCluster(ctx, srv.cfg.DefaultClusterName,
 			metadata.CreateClusterOpts{
-				NodeCount:         uint32(srv.cfg.DefaultClusterNodeCount),
-				ReplicationFactor: uint32(srv.cfg.DefaultClusterReplicationFactor),
-				ShardTotal:        uint32(srv.cfg.DefaultClusterShardTotal),
-				EnableSchedule:    srv.cfg.EnableSchedule,
-				TopologyType:      topologyType,
+				NodeCount:                   uint32(srv.cfg.DefaultClusterNodeCount),
+				ReplicationFactor:           uint32(srv.cfg.DefaultClusterReplicationFactor),
+				ShardTotal:                  uint32(srv.cfg.DefaultClusterShardTotal),
+				EnableSchedule:              srv.cfg.EnableSchedule,
+				TopologyType:                topologyType,
+				ProcedureExecutingBatchSize: srv.cfg.ProcedureExecutingBatchSize,
 			})
 		if err != nil {
 			log.Warn("create default cluster failed", zap.Error(err))
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go
index 94945f1..692ffd6 100644
--- a/server/service/grpc/service.go
+++ b/server/service/grpc/service.go
@@ -297,7 +297,7 @@
 		return &metaservicepb.RouteTablesResponse{Header: responseHeader(err, "grpc routeTables")}, nil
 	}
 
-	log.Info("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ",")))
+	log.Debug("[RouteTable]", zap.String("schemaName", req.SchemaName), zap.String("clusterName", req.GetHeader().ClusterName), zap.String("tableNames", strings.Join(req.TableNames, ",")))
 
 	// Forward request to the leader.
 	if ceresmetaClient != nil {
diff --git a/server/service/http/api.go b/server/service/http/api.go
index 6eb4aaa..cd2ed99 100644
--- a/server/service/http/api.go
+++ b/server/service/http/api.go
@@ -542,10 +542,11 @@
 }
 
 type UpdateClusterRequest struct {
-	NodeCount      uint32 `json:"NodeCount"`
-	ShardTotal     uint32 `json:"ShardTotal"`
-	EnableSchedule bool   `json:"enableSchedule"`
-	TopologyType   string `json:"topologyType"`
+	NodeCount                   uint32 `json:"nodeCount"`
+	ShardTotal                  uint32 `json:"shardTotal"`
+	EnableSchedule              bool   `json:"enableSchedule"`
+	TopologyType                string `json:"topologyType"`
+	ProcedureExecutingBatchSize uint32 `json:"procedureExecutingBatchSize"`
 }
 
 func (a *API) updateCluster(writer http.ResponseWriter, req *http.Request) {
@@ -590,8 +591,9 @@
 	}
 
 	if err := a.clusterManager.UpdateCluster(req.Context(), clusterName, metadata.UpdateClusterOpts{
-		EnableSchedule: updateClusterRequest.EnableSchedule,
-		TopologyType:   topologyType,
+		EnableSchedule:              updateClusterRequest.EnableSchedule,
+		TopologyType:                topologyType,
+		ProcedureExecutingBatchSize: updateClusterRequest.ProcedureExecutingBatchSize,
 	}); err != nil {
 		log.Error("update cluster failed", zap.Error(err))
 		a.respondError(writer, metadata.ErrUpdateCluster, fmt.Sprintf("err: %s", err.Error()))
diff --git a/server/storage/types.go b/server/storage/types.go
index a80cb82..02ef785 100644
--- a/server/storage/types.go
+++ b/server/storage/types.go
@@ -152,12 +152,13 @@
 	Name         string
 	MinNodeCount uint32
 	// Deprecated: ReplicationFactor is deprecated after CeresMeta v1.2.0
-	ReplicationFactor uint32
-	ShardTotal        uint32
-	EnableSchedule    bool
-	TopologyType      TopologyType
-	CreatedAt         uint64
-	ModifiedAt        uint64
+	ReplicationFactor           uint32
+	ShardTotal                  uint32
+	EnableSchedule              bool
+	TopologyType                TopologyType
+	ProcedureExecutingBatchSize uint32
+	CreatedAt                   uint64
+	ModifiedAt                  uint64
 }
 
 type ShardNode struct {
@@ -258,27 +259,29 @@
 
 func convertClusterPB(cluster *clusterpb.Cluster) Cluster {
 	return Cluster{
-		ID:             ClusterID(cluster.Id),
-		Name:           cluster.Name,
-		MinNodeCount:   cluster.MinNodeCount,
-		ShardTotal:     cluster.ShardTotal,
-		EnableSchedule: cluster.EnableSchedule,
-		TopologyType:   convertTopologyTypePB(cluster.TopologyType),
-		CreatedAt:      cluster.CreatedAt,
-		ModifiedAt:     cluster.ModifiedAt,
+		ID:                          ClusterID(cluster.Id),
+		Name:                        cluster.Name,
+		MinNodeCount:                cluster.MinNodeCount,
+		ShardTotal:                  cluster.ShardTotal,
+		EnableSchedule:              cluster.EnableSchedule,
+		TopologyType:                convertTopologyTypePB(cluster.TopologyType),
+		ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize,
+		CreatedAt:                   cluster.CreatedAt,
+		ModifiedAt:                  cluster.ModifiedAt,
 	}
 }
 
 func convertClusterToPB(cluster Cluster) clusterpb.Cluster {
 	return clusterpb.Cluster{
-		Id:             uint32(cluster.ID),
-		Name:           cluster.Name,
-		MinNodeCount:   cluster.MinNodeCount,
-		ShardTotal:     cluster.ShardTotal,
-		EnableSchedule: cluster.EnableSchedule,
-		TopologyType:   convertTopologyTypeToPB(cluster.TopologyType),
-		CreatedAt:      cluster.CreatedAt,
-		ModifiedAt:     cluster.ModifiedAt,
+		Id:                          uint32(cluster.ID),
+		Name:                        cluster.Name,
+		MinNodeCount:                cluster.MinNodeCount,
+		ShardTotal:                  cluster.ShardTotal,
+		EnableSchedule:              cluster.EnableSchedule,
+		TopologyType:                convertTopologyTypeToPB(cluster.TopologyType),
+		ProcedureExecutingBatchSize: cluster.ProcedureExecutingBatchSize,
+		CreatedAt:                   cluster.CreatedAt,
+		ModifiedAt:                  cluster.ModifiedAt,
 	}
 }