blob: e38c5985e5e8875f2ef347dc52b06e3c5694c3c2 [file] [log] [blame]
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
package coordinator_test
import (
"context"
"testing"
"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func setupFactory(t *testing.T) (*coordinator.Factory, *metadata.ClusterMetadata) {
ctx := context.Background()
c := test.InitStableCluster(ctx, t)
dispatch := test.MockDispatch{}
allocator := test.MockIDAllocator{}
storage := test.NewTestStorage(t)
f := coordinator.NewFactory(zap.NewNop(), allocator, dispatch, storage)
return f, c.GetMetadata()
}
func TestCreateTable(t *testing.T) {
re := require.New(t)
ctx := context.Background()
f, m := setupFactory(t)
// Create normal table procedure.
p, err := f.MakeCreateTableProcedure(ctx, coordinator.CreateTableRequest{
ClusterMetadata: m,
SourceReq: &metaservicepb.CreateTableRequest{
Header: nil,
SchemaName: test.TestSchemaName,
Name: "test1",
EncodedSchema: nil,
Engine: "",
CreateIfNotExist: false,
Options: nil,
PartitionTableInfo: nil,
},
OnSucceeded: nil,
OnFailed: nil,
})
re.NoError(err)
re.Equal(procedure.CreateTable, p.Typ())
re.Equal(procedure.StateInit, string(p.State()))
// Create partition table procedure.
p, err = f.MakeCreateTableProcedure(ctx, coordinator.CreateTableRequest{
ClusterMetadata: m,
SourceReq: &metaservicepb.CreateTableRequest{
Header: nil,
SchemaName: test.TestSchemaName,
Name: "test2",
EncodedSchema: nil,
Engine: "",
CreateIfNotExist: false,
Options: nil,
PartitionTableInfo: &metaservicepb.PartitionTableInfo{
PartitionInfo: nil,
SubTableNames: []string{"test2-0,test2-1"},
},
},
OnSucceeded: nil,
OnFailed: nil,
})
re.NoError(err)
re.Equal(procedure.CreatePartitionTable, p.Typ())
re.Equal(procedure.StateInit, string(p.State()))
}
func TestDropTable(t *testing.T) {
re := require.New(t)
ctx := context.Background()
f, m := setupFactory(t)
// Drop normal table procedure.
p, ok, err := f.CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
ClusterMetadata: m,
ClusterSnapshot: m.GetClusterSnapshot(),
SourceReq: &metaservicepb.DropTableRequest{
Header: nil,
SchemaName: test.TestSchemaName,
Name: "test1",
PartitionTableInfo: nil,
},
OnSucceeded: nil,
OnFailed: nil,
})
re.NoError(err)
re.False(ok)
re.Nil(p)
// Create partition table procedure.
p, ok, err = f.CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
ClusterMetadata: m,
ClusterSnapshot: m.GetClusterSnapshot(),
SourceReq: &metaservicepb.DropTableRequest{
Header: nil,
SchemaName: test.TestSchemaName,
Name: "test2",
PartitionTableInfo: &metaservicepb.PartitionTableInfo{
PartitionInfo: nil,
SubTableNames: []string{"test2-0,test2-1"},
},
},
OnSucceeded: nil,
OnFailed: nil,
})
// Drop non-existing partition table.
re.NoError(err)
re.True(ok)
re.NotNil(p)
}
func TestTransferLeader(t *testing.T) {
re := require.New(t)
ctx := context.Background()
f, m := setupFactory(t)
snapshot := m.GetClusterSnapshot()
p, err := f.CreateTransferLeaderProcedure(ctx, coordinator.TransferLeaderRequest{
Snapshot: snapshot,
ShardID: 0,
OldLeaderNodeName: "",
NewLeaderNodeName: snapshot.RegisteredNodes[0].Node.Name,
})
re.NoError(err)
re.Equal(procedure.TransferLeader, p.Typ())
re.Equal(procedure.StateInit, string(p.State()))
}
func TestSplit(t *testing.T) {
re := require.New(t)
ctx := context.Background()
f, m := setupFactory(t)
snapshot := m.GetClusterSnapshot()
p, err := f.CreateSplitProcedure(ctx, coordinator.SplitRequest{
ClusterMetadata: nil,
SchemaName: "",
TableNames: nil,
Snapshot: snapshot,
ShardID: snapshot.Topology.ClusterView.ShardNodes[0].ID,
NewShardID: 100,
TargetNodeName: snapshot.Topology.ClusterView.ShardNodes[0].NodeName,
})
re.NoError(err)
re.Equal(procedure.Split, p.Typ())
re.Equal(procedure.StateInit, string(p.State()))
}