blob: 4838ccbd45568d9c7def5f7635e0fb9c50c4f28b [file] [log] [blame]
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
package transferleader_test
import (
"context"
"testing"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
)
type mockProcedure struct {
ClusterID storage.ClusterID
clusterVersion uint64
typ procedure.Typ
ShardWithVersion map[storage.ShardID]uint64
}
func (m mockProcedure) ID() uint64 {
return 0
}
func (m mockProcedure) Typ() procedure.Typ {
return m.typ
}
func (m mockProcedure) Start(_ context.Context) error {
return nil
}
func (m mockProcedure) Cancel(_ context.Context) error {
return nil
}
func (m mockProcedure) State() procedure.State {
return procedure.StateInit
}
func (m mockProcedure) RelatedVersionInfo() procedure.RelatedVersionInfo {
return procedure.RelatedVersionInfo{
ClusterID: m.ClusterID,
ShardWithVersion: m.ShardWithVersion,
ClusterVersion: m.clusterVersion,
}
}
func (m mockProcedure) Priority() procedure.Priority {
return procedure.PriorityLow
}
func TestBatchProcedure(t *testing.T) {
re := require.New(t)
var procedures []procedure.Procedure
// Procedures with same type and version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := CreateMockProcedure(storage.ClusterID(0), 0, 0, shardWithVersion)
procedures = append(procedures, p)
}
_, err := transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.NoError(err)
// Procedure with different clusterID.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := CreateMockProcedure(storage.ClusterID(i), 0, procedure.TransferLeader, shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)
// Procedures with different type.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(i)] = 0
p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)
// Procedures with different version.
for i := 0; i < 3; i++ {
shardWithVersion := map[storage.ShardID]uint64{}
shardWithVersion[storage.ShardID(0)] = uint64(i)
p := CreateMockProcedure(0, 0, procedure.Typ(i), shardWithVersion)
procedures = append(procedures, p)
}
_, err = transferleader.NewBatchTransferLeaderProcedure(0, procedures)
re.Error(err)
}
func CreateMockProcedure(clusterID storage.ClusterID, clusterVersion uint64, typ procedure.Typ, shardWithVersion map[storage.ShardID]uint64) procedure.Procedure {
return mockProcedure{
ClusterID: clusterID,
clusterVersion: clusterVersion,
typ: typ,
ShardWithVersion: shardWithVersion,
}
}