blob: 13b93bffd294acf3c8cc432b6ce0e0ca0bddc6df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package client
import (
"fmt"
"testing"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/session"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/rand"
)
type fakeNode struct {
primaries map[base.Gpid]bool
secondaries map[base.Gpid]bool
n *util.PegasusNode
}
type fakeCluster struct {
nodes []*fakeNode
meta *fakeMeta
}
func newFakeCluster(num int) *fakeCluster {
c := &fakeCluster{
nodes: make([]*fakeNode, num),
meta: &fakeMeta{},
}
for i := range c.nodes { // 127.0.0.1:34800, 127.0.0.1:34801, 127.0.0.1:34802 and so on
ipPort := fmt.Sprintf("127.0.0.1:3480%d", i)
c.nodes[i] = &fakeNode{
n: util.NewNodeFromTCPAddr(ipPort, session.NodeTypeReplica),
primaries: map[base.Gpid]bool{},
secondaries: map[base.Gpid]bool{},
}
}
return c
}
func rand3Nodes() []*fakeNode {
nodeNum := len(fakePegasusCluster.nodes)
chosenSet := map[int]bool{}
for len(chosenSet) < 3 { // loop until we find 3 distinct nodes
chosenSet[rand.Intn(nodeNum)] = true
}
var ret []*fakeNode
for nidx := range chosenSet {
ret = append(ret, fakePegasusCluster.nodes[nidx])
}
return ret
}
func createFakeTable(tbName string, partitionNum int) {
appID, _ := fakePegasusCluster.meta.CreateApp(tbName, map[string]string{}, partitionNum)
// evenly assign replicas to nodes
for i := 0; i < partitionNum; i++ {
nodes := rand3Nodes()
nodes[0].primaries[base.Gpid{Appid: appID, PartitionIndex: int32(i)}] = true
nodes[1].secondaries[base.Gpid{Appid: appID, PartitionIndex: int32(i)}] = true
nodes[2].secondaries[base.Gpid{Appid: appID, PartitionIndex: int32(i)}] = true
}
}
var fakePegasusCluster *fakeCluster
func TestFakeClusterRand3Nodes(t *testing.T) {
fakePegasusCluster = newFakeCluster(4)
assert.Equal(t, len(fakePegasusCluster.nodes), 4)
nodes := rand3Nodes()
assert.Equal(t, len(nodes), 3)
}
// ensure we are testing upon a correct foundation
func TestFakeCluster(t *testing.T) {
fakePegasusCluster = newFakeCluster(4)
testCases := []struct {
tbName string
partitionNum int
}{
{tbName: "test1", partitionNum: 16},
{tbName: "test2", partitionNum: 32},
{tbName: "test3", partitionNum: 64},
{tbName: "test4", partitionNum: 128},
}
for _, tt := range testCases {
createFakeTable(tt.tbName, tt.partitionNum)
}
assertReplicasNotOnSameNode(t)
expectedTotalPartitions := 16 + 32 + 64 + 128
assertNoMissingReplicaInCluster(t, expectedTotalPartitions)
}
func TestFakeQueryConfigNeverReturnNilPrimary(t *testing.T) {
fakePegasusCluster = newFakeCluster(4)
createFakeTable("test", 128)
meta := fakePegasusCluster.meta
resp, _ := meta.QueryConfig("test")
part := resp.Partitions[0]
_ = shutdownReplica(meta, part.GetPid(), part.Primary)
resp, _ = meta.QueryConfig("test")
part = resp.Partitions[0]
assert.NotNil(t, part.Primary) // never nil, since this field is non-optional
assert.Equal(t, part.MaxReplicaCount, int32(3))
}
// ensure 3-replica for every partitions are not distributed on the same node.
func assertReplicasNotOnSameNode(t *testing.T) {
tables, _ := fakePegasusCluster.meta.ListAvailableApps()
for _, tb := range tables {
resp, _ := fakePegasusCluster.meta.QueryConfig(tb.AppName)
assert.Equal(t, len(resp.Partitions), int(tb.PartitionCount))
for _, p := range resp.Partitions {
for _, sec := range p.Secondaries {
assert.NotEqual(t, p.Primary.GetAddress(), sec.GetAddress())
}
if len(p.Secondaries) >= 2 {
assert.NotEqual(t, p.Secondaries[0].GetAddress(), p.Secondaries[1].GetAddress())
}
}
}
}
func assertNoMissingReplicaInCluster(t *testing.T, expectedTotalPartitions int) {
actualPri := 0
actualSec := 0
for _, n := range fakePegasusCluster.nodes {
actualPri += len(n.primaries)
actualSec += len(n.secondaries)
}
assert.Equal(t, actualPri, expectedTotalPartitions)
assert.Equal(t, actualSec, expectedTotalPartitions*2)
}