blob: 779e1cac623d92835deec4a7f49cdc22b71c0ad3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package cluster_test
import (
"context"
"crypto/rand"
"fmt"
"math/big"
"testing"
"time"
"github.com/apache/incubator-horaedb-meta/server/cluster"
"github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
"github.com/apache/incubator-horaedb-meta/server/etcdutil"
"github.com/apache/incubator-horaedb-meta/server/storage"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
defaultTimeout = time.Second * 20
cluster1 = "testCluster1"
defaultSchema = "testSchema"
defaultNodeCount = 2
defaultShardTotal = 8
defaultProcedureExecutingBatchSize = 100
defaultTopologyType = storage.TopologyTypeStatic
node1 = "127.0.0.1:8081"
node2 = "127.0.0.2:8081"
defaultSchemaID = 0
testRootPath = "/rootPath"
defaultIDAllocatorStep = 20
)
func newTestStorage(t *testing.T) (storage.Storage, clientv3.KV, *clientv3.Client, etcdutil.CloseFn) {
_, client, closeSrv := etcdutil.PrepareEtcdServerAndClient(t)
storage := storage.NewStorageWithEtcdBackend(client, testRootPath, storage.Options{
MaxScanLimit: 100, MinScanLimit: 10, MaxOpsPerTxn: 32,
})
return storage, client, client, closeSrv
}
func newClusterManagerWithStorage(storage storage.Storage, kv clientv3.KV, client *clientv3.Client) (cluster.Manager, error) {
return cluster.NewManagerImpl(storage, kv, client, testRootPath, defaultIDAllocatorStep, defaultTopologyType)
}
func TestClusterManager(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
s, kv, client, closeSrv := newTestStorage(t)
defer closeSrv()
manager, err := newClusterManagerWithStorage(s, kv, client)
re.NoError(err)
re.NoError(manager.Start(ctx))
testCreateCluster(ctx, re, manager, cluster1)
testRegisterNode(ctx, re, manager, cluster1, node1)
testRegisterNode(ctx, re, manager, cluster1, node2)
testInitShardView(ctx, re, manager, cluster1)
testGetNodeAndShard(ctx, re, manager, cluster1)
testGetTables(re, manager, node1, cluster1, 0)
testAllocSchemaID(ctx, re, manager, cluster1, defaultSchema, defaultSchemaID)
testAllocSchemaID(ctx, re, manager, cluster1, defaultSchema, defaultSchemaID)
var testTableNames []string
for i := uint64(0); i < 5; i++ {
testTableName := fmt.Sprintf("testTable%d", i)
testTableNames = append(testTableNames, testTableName)
testCreateTable(ctx, re, manager, cluster1, defaultSchema, testTableName, storage.ShardID(i))
}
testRouteTables(ctx, re, manager, cluster1, defaultSchema, testTableNames)
for _, tableName := range testTableNames {
testDropTable(ctx, re, manager, cluster1, defaultSchema, tableName)
}
re.NoError(manager.Stop(ctx))
}
func testGetNodeAndShard(ctx context.Context, re *require.Assertions, manager cluster.Manager, clusterName string) {
c, err := manager.GetCluster(ctx, clusterName)
re.NoError(err)
nodes, err := manager.ListRegisteredNodes(ctx, cluster1)
re.NoError(err)
re.Equal(2, len(nodes))
node, err := manager.GetRegisteredNode(ctx, cluster1, node1)
re.NoError(err)
re.Equal(node1, node.Node.Name)
nodShards, err := manager.GetNodeShards(ctx, cluster1)
re.NoError(err)
re.Equal(int(c.GetMetadata().GetTotalShardNum()), len(nodShards.NodeShards))
}
func testInitShardView(ctx context.Context, re *require.Assertions, manager cluster.Manager, clusterName string) {
c, err := manager.GetCluster(ctx, clusterName)
re.NoError(err)
snapshot := c.GetMetadata().GetClusterSnapshot()
shardNodes := make([]storage.ShardNode, 0, c.GetMetadata().GetTotalShardNum())
for _, shardView := range snapshot.Topology.ShardViewsMapping {
selectNodeIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(snapshot.RegisteredNodes))))
re.NoError(err)
shardNodes = append(shardNodes, storage.ShardNode{
ID: shardView.ShardID,
ShardRole: storage.ShardRoleLeader,
NodeName: snapshot.RegisteredNodes[selectNodeIdx.Int64()].Node.Name,
})
}
err = c.GetMetadata().UpdateClusterView(ctx, storage.ClusterStateStable, shardNodes)
re.NoError(err)
}
func testCreateCluster(ctx context.Context, re *require.Assertions, manager cluster.Manager, clusterName string) {
_, err := manager.CreateCluster(ctx, clusterName, metadata.CreateClusterOpts{
NodeCount: defaultNodeCount,
EnableSchedule: false,
ShardTotal: defaultShardTotal,
TopologyType: defaultTopologyType,
ProcedureExecutingBatchSize: defaultProcedureExecutingBatchSize,
})
re.NoError(err)
}
func testRegisterNode(ctx context.Context, re *require.Assertions, manager cluster.Manager,
clusterName, nodeName string,
) {
node := metadata.RegisteredNode{
Node: storage.Node{
Name: nodeName,
LastTouchTime: uint64(time.Now().UnixMilli()),
State: storage.NodeStateOnline,
NodeStats: storage.NewEmptyNodeStats(),
}, ShardInfos: []metadata.ShardInfo{},
}
err := manager.RegisterNode(ctx, clusterName, node)
re.NoError(err)
}
func testAllocSchemaID(ctx context.Context, re *require.Assertions, manager cluster.Manager,
cluster, schema string, schemaID uint32,
) {
id, _, err := manager.AllocSchemaID(ctx, cluster, schema)
re.NoError(err)
re.Equal(storage.SchemaID(schemaID), id)
}
func testCreateTable(ctx context.Context, re *require.Assertions, manager cluster.Manager,
clusterName, schema, tableName string, shardID storage.ShardID,
) {
c, err := manager.GetCluster(ctx, clusterName)
re.NoError(err)
_, err = c.GetMetadata().CreateTable(ctx, metadata.CreateTableRequest{
ShardID: shardID,
LatestVersion: 0,
SchemaName: schema,
TableName: tableName,
PartitionInfo: storage.PartitionInfo{Info: nil},
})
re.NoError(err)
}
func testGetTables(re *require.Assertions, manager cluster.Manager, node, cluster string, num int) {
shardIDs := make([]storage.ShardID, 0, defaultShardTotal)
for i := 0; i < defaultShardTotal; i++ {
shardIDs = append(shardIDs, storage.ShardID(i))
}
shardTables, err := manager.GetTablesByShardIDs(cluster, node, shardIDs)
re.NoError(err)
re.Equal(defaultShardTotal, len(shardTables))
tableNum := 0
for _, tables := range shardTables {
re.Equal(storage.ShardRoleLeader, tables.Shard.Role)
tableNum += len(tables.Tables)
}
re.Equal(num, tableNum)
}
func testRouteTables(ctx context.Context, re *require.Assertions, manager cluster.Manager, cluster, schema string, tableNames []string) {
ret, err := manager.RouteTables(ctx, cluster, schema, tableNames)
re.NoError(err)
re.Equal(len(tableNames), len(ret.RouteEntries))
for _, entry := range ret.RouteEntries {
re.Equal(1, len(entry.NodeShards))
re.Equal(storage.ShardRoleLeader, entry.NodeShards[0].ShardNode.ShardRole)
}
}
func testDropTable(ctx context.Context, re *require.Assertions, manager cluster.Manager, clusterName string, schemaName string, tableName string) {
err := manager.DropTable(ctx, clusterName, schemaName, tableName)
re.NoError(err)
}