blob: b562a6d7382b5fa3cec598a77b97450412142bfa [file] [log] [blame]
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package procedure
import (
"context"
"testing"
"time"
"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
testTableName0 = "table0"
testTableName1 = "table1"
testSchemaName = "testSchemaName"
nodeName0 = "node0"
nodeName1 = "node1"
testRootPath = "/rootPath"
defaultIDAllocatorStep = 20
clusterName = "ceresdbCluster1"
defaultNodeCount = 2
defaultReplicationFactor = 1
defaultPartitionTableProportionOfNodes = 0.5
defaultShardTotal = 4
)
type MockDispatch struct{}
func (m MockDispatch) OpenShard(_ context.Context, _ string, _ eventdispatch.OpenShardRequest) error {
return nil
}
func (m MockDispatch) CloseShard(_ context.Context, _ string, _ eventdispatch.CloseShardRequest) error {
return nil
}
func (m MockDispatch) CreateTableOnShard(_ context.Context, _ string, _ eventdispatch.CreateTableOnShardRequest) error {
return nil
}
func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ eventdispatch.DropTableOnShardRequest) error {
return nil
}
func newTestEtcdStorage(t *testing.T) (storage.Storage, clientv3.KV, etcdutil.CloseFn) {
_, client, closeSrv := etcdutil.PrepareEtcdServerAndClient(t)
storage := storage.NewStorageWithEtcdBackend(client, testRootPath, storage.Options{
MaxScanLimit: 100, MinScanLimit: 10,
})
return storage, client, closeSrv
}
func newTestCluster(ctx context.Context, t *testing.T) (cluster.Manager, *cluster.Cluster) {
re := require.New(t)
storage, kv, _ := newTestEtcdStorage(t)
manager, err := cluster.NewManagerImpl(storage, kv, testRootPath, defaultIDAllocatorStep)
re.NoError(err)
cluster, err := manager.CreateCluster(ctx, clusterName, cluster.CreateClusterOpts{
NodeCount: defaultNodeCount,
ReplicationFactor: defaultReplicationFactor,
ShardTotal: defaultShardTotal,
})
re.NoError(err)
return manager, cluster
}
// Prepare a test cluster which has scattered shards and created test schema.
// Notice: sleep(5s) will be called in this function.
func prepare(t *testing.T) (cluster.Manager, *cluster.Cluster) {
re := require.New(t)
manager, cluster := newClusterAndRegisterNode(t)
// Wait for the cluster to be ready.
time.Sleep(time.Second * 5)
_, _, err := cluster.GetOrCreateSchema(context.Background(), testSchemaName)
re.NoError(err)
return manager, cluster
}