blob: 0aae3d0350d6cb4521bcc2eca2bd2609d226778b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ColocateTableBalancerTest {
private ColocateTableBalancer balancer = ColocateTableBalancer.getInstance();
private Backend backend1;
private Backend backend2;
private Backend backend3;
private Backend backend4;
private Backend backend5;
private Backend backend6;
private Backend backend7;
private Backend backend8;
private Backend backend9;
private Map<Long, Double> mixLoadScores;
@Before
public void setUp() {
backend1 = new Backend(1L, "192.168.1.1", 9050);
backend2 = new Backend(2L, "192.168.1.2", 9050);
backend3 = new Backend(3L, "192.168.1.3", 9050);
backend4 = new Backend(4L, "192.168.1.4", 9050);
backend5 = new Backend(5L, "192.168.1.5", 9050);
backend6 = new Backend(6L, "192.168.1.6", 9050);
// 7,8,9 are on same host
backend7 = new Backend(7L, "192.168.1.8", 9050);
backend8 = new Backend(8L, "192.168.1.8", 9050);
backend9 = new Backend(9L, "192.168.1.8", 9050);
mixLoadScores = Maps.newHashMap();
mixLoadScores.put(1L, 0.1);
mixLoadScores.put(2L, 0.5);
mixLoadScores.put(3L, 0.4);
mixLoadScores.put(4L, 0.2);
mixLoadScores.put(5L, 0.3);
mixLoadScores.put(6L, 0.6);
mixLoadScores.put(7L, 0.8);
mixLoadScores.put(8L, 0.7);
mixLoadScores.put(9L, 0.9);
}
private ColocateTableIndex createColocateIndex(GroupId groupId, List<Long> flatList) {
ColocateTableIndex colocateTableIndex = new ColocateTableIndex();
int replicationNum = 3;
List<List<Long>> backendsPerBucketSeq = Lists.partition(flatList, replicationNum);
colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
return colocateTableIndex;
}
@Test
public void testBalance(@Mocked SystemInfoService infoService,
@Mocked ClusterLoadStatistic statistic) {
new Expectations() {
{
infoService.getBackend(1L);
result = backend1;
minTimes = 0;
infoService.getBackend(2L);
result = backend2;
minTimes = 0;
infoService.getBackend(3L);
result = backend3;
minTimes = 0;
infoService.getBackend(4L);
result = backend4;
minTimes = 0;
infoService.getBackend(5L);
result = backend5;
minTimes = 0;
infoService.getBackend(6L);
result = backend6;
minTimes = 0;
infoService.getBackend(7L);
result = backend7;
minTimes = 0;
infoService.getBackend(8L);
result = backend8;
minTimes = 0;
infoService.getBackend(9L);
result = backend9;
minTimes = 0;
statistic.getBackendLoadStatistic(anyLong);
result = null;
minTimes = 0;
}
};
GroupId groupId = new GroupId(10000, 10001);
List<Column> distributionCols = Lists.newArrayList();
distributionCols.add(new Column("k1", PrimitiveType.INT));
ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId, distributionCols, 5, (short) 3);
Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
group2Schema.put(groupId, groupSchema);
// 1. balance a imbalance group
// [[1, 2, 3], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]]
ColocateTableIndex colocateTableIndex = createColocateIndex(groupId,
Lists.newArrayList(1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L));
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
List<List<Long>> expected = Lists.partition(
Lists.newArrayList(9L, 5L, 3L, 4L, 6L, 8L, 7L, 6L, 1L, 2L, 9L, 4L, 1L, 2L, 3L), 3);
Assert.assertTrue(changed);
Assert.assertEquals(expected, balancedBackendsPerBucketSeq);
// 2. balance a already balanced group
colocateTableIndex = createColocateIndex(groupId,
Lists.newArrayList(9L, 8L, 7L, 8L, 6L, 5L, 9L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L));
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
balancedBackendsPerBucketSeq.clear();
changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
System.out.println(balancedBackendsPerBucketSeq);
Assert.assertFalse(changed);
Assert.assertTrue(balancedBackendsPerBucketSeq.isEmpty());
}
@Test
public void testFixBalanceEndlessLoop(@Mocked SystemInfoService infoService,
@Mocked ClusterLoadStatistic statistic) {
new Expectations() {
{
infoService.getBackend(1L);
result = backend1;
minTimes = 0;
infoService.getBackend(2L);
result = backend2;
minTimes = 0;
infoService.getBackend(3L);
result = backend3;
minTimes = 0;
infoService.getBackend(4L);
result = backend4;
minTimes = 0;
infoService.getBackend(5L);
result = backend5;
minTimes = 0;
infoService.getBackend(6L);
result = backend6;
minTimes = 0;
infoService.getBackend(7L);
result = backend7;
minTimes = 0;
infoService.getBackend(8L);
result = backend8;
minTimes = 0;
infoService.getBackend(9L);
result = backend9;
minTimes = 0;
statistic.getBackendLoadStatistic(anyLong);
result = null;
minTimes = 0;
}
};
GroupId groupId = new GroupId(10000, 10001);
List<Column> distributionCols = Lists.newArrayList();
distributionCols.add(new Column("k1", PrimitiveType.INT));
ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId, distributionCols, 5, (short) 1);
Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
group2Schema.put(groupId, groupSchema);
// 1. only one available backend
// [[7], [7], [7], [7], [7]]
ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
List<Long> allAvailBackendIds = Lists.newArrayList(7L);
boolean changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
Assert.assertFalse(changed);
// 2. all backends are checked but this round is not changed
// [[7], [7], [7], [7], [7]]
// and add new backends 8, 9 that are on the same host with 7
colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
balancedBackendsPerBucketSeq = Lists.newArrayList();
allAvailBackendIds = Lists.newArrayList(7L, 8L, 9L);
changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet<Long>(), allAvailBackendIds,
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
Assert.assertFalse(changed);
}
@Test
public void testFixBalanceEndlessLoop2(@Mocked SystemInfoService infoService,
@Mocked ClusterLoadStatistic statistic) {
new Expectations() {
{
statistic.getBackendLoadStatistic(anyLong);
result = new Delegate<BackendLoadStatistic>() {
BackendLoadStatistic delegate(Long beId) {
return new FakeBackendLoadStatistic(beId, null, null, null);
}
};
minTimes = 0;
}
};
GroupId groupId = new GroupId(10000, 10001);
List<Column> distributionCols = Lists.newArrayList();
ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId, distributionCols, 5, (short) 1);
Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
group2Schema.put(groupId, groupSchema);
ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
Set<Long> unAvailBackendIds = Sets.newHashSet(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
List<Long> availBackendIds = Lists.newArrayList();
boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, unAvailBackendIds, availBackendIds,
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
Assert.assertFalse(changed);
}
@Test
public void testGetSortedBackendReplicaNumPairs(@Mocked ClusterLoadStatistic statistic) {
new Expectations() {
{
statistic.getBackendLoadStatistic(anyLong);
result = new Delegate<BackendLoadStatistic>() {
BackendLoadStatistic delegate(Long beId) {
return new FakeBackendLoadStatistic(beId, null, null, null);
}
};
minTimes = 0;
}
};
// all buckets are on different be
List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
Set<Long> unavailBackendIds = Sets.newHashSet(9L);
List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
List<Map.Entry<Long, Long>> backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs",
allAvailBackendIds, unavailBackendIds, statistic, flatBackendsPerBucketSeq);
long[] backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
Assert.assertArrayEquals(new long[]{7L, 8L, 6L, 2L, 3L, 5L, 4L, 1L}, backendIds);
// 0,1 bucket on same be and 5, 6 on same be
flatBackendsPerBucketSeq = Lists.newArrayList(1L, 1L, 3L, 4L, 5L, 6L, 7L, 7L, 9L);
backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs", allAvailBackendIds, unavailBackendIds,
statistic, flatBackendsPerBucketSeq);
backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
Assert.assertArrayEquals(new long[]{7L, 1L, 6L, 3L, 5L, 4L, 8L, 2L}, backendIds);
}
public final class FakeBackendLoadStatistic extends BackendLoadStatistic {
public FakeBackendLoadStatistic(long beId, String clusterName, SystemInfoService infoService,
TabletInvertedIndex invertedIndex) {
super(beId, clusterName, infoService, invertedIndex);
}
@Override
public double getMixLoadScore() {
return mixLoadScores.get(getBeId());
}
}
@Test
public void testGetBeSeqIndexes() {
List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 2L, 3L, 4L, 2L);
List<Integer> indexes = Deencapsulation.invoke(balancer, "getBeSeqIndexes", flatBackendsPerBucketSeq, 2L);
Assert.assertArrayEquals(new int[]{1, 2, 5}, indexes.stream().mapToInt(i->i).toArray());
System.out.println("backend1 id is " + backend1.getId());
}
@Test
public void testGetUnavailableBeIdsInGroup(@Mocked ColocateTableIndex colocateTableIndex,
@Mocked SystemInfoService infoService,
@Mocked Backend myBackend2,
@Mocked Backend myBackend3,
@Mocked Backend myBackend4,
@Mocked Backend myBackend5
) {
GroupId groupId = new GroupId(10000, 10001);
Set<Long> allBackendsInGroup = Sets.newHashSet(1L, 2L, 3L, 4L, 5L);
new Expectations() {
{
infoService.getBackend(1L);
result = null;
minTimes = 0;
// backend2 is available
infoService.getBackend(2L);
result = myBackend2;
minTimes = 0;
myBackend2.isAvailable();
result = true;
minTimes = 0;
// backend3 not available, and dead for a long time
infoService.getBackend(3L);
result = myBackend3;
minTimes = 0;
myBackend3.isAvailable();
result = false;
minTimes = 0;
myBackend3.isAlive();
result = false;
minTimes = 0;
myBackend3.getLastUpdateMs();
result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20;
minTimes = 0;
// backend4 not available, and dead for a short time
infoService.getBackend(4L);
result = myBackend4;
minTimes = 0;
myBackend4.isAvailable();
result = false;
minTimes = 0;
myBackend4.isAlive();
result = false;
minTimes = 0;
myBackend4.getLastUpdateMs();
result = System.currentTimeMillis();
minTimes = 0;
// backend5 not available, and in decommission
infoService.getBackend(5L);
result = myBackend5;
minTimes = 0;
myBackend5.isAvailable();
result = false;
minTimes = 0;
myBackend5.isAlive();
result = true;
minTimes = 0;
myBackend5.isDecommissioned();
result = true;
minTimes = 0;
colocateTableIndex.getBackendsByGroup(groupId);
result = allBackendsInGroup;
minTimes = 0;
}
};
Set<Long> unavailableBeIds = Deencapsulation.invoke(balancer, "getUnavailableBeIdsInGroup", infoService, colocateTableIndex, groupId);
System.out.println(unavailableBeIds);
Assert.assertArrayEquals(new long[]{1L, 3L, 5L}, unavailableBeIds.stream().mapToLong(i->i).sorted().toArray());
}
@Test
public void testGetAvailableBeIds(@Mocked SystemInfoService infoService,
@Mocked Backend myBackend2,
@Mocked Backend myBackend3,
@Mocked Backend myBackend4,
@Mocked Backend myBackend5) {
List<Long> clusterBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L);
new Expectations(){
{
infoService.getClusterBackendIds("cluster1", false);
result = clusterBackendIds;
minTimes = 0;
infoService.getBackend(1L);
result = null;
minTimes = 0;
// backend2 is available
infoService.getBackend(2L);
result = myBackend2;
minTimes = 0;
myBackend2.isAvailable();
result = true;
minTimes = 0;
// backend3 not available, and dead for a long time
infoService.getBackend(3L);
result = myBackend3;
minTimes = 0;
myBackend3.isAvailable();
result = false;
minTimes = 0;
myBackend3.isAlive();
result = false;
minTimes = 0;
myBackend3.getLastUpdateMs();
result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20;
minTimes = 0;
// backend4 available, not alive but dead for a short time
infoService.getBackend(4L);
result = myBackend4;
minTimes = 0;
myBackend4.isAvailable();
result = false;
minTimes = 0;
myBackend4.isAlive();
result = false;
minTimes = 0;
myBackend4.getLastUpdateMs();
result = System.currentTimeMillis();
minTimes = 0;
// backend5 not available, and in decommission
infoService.getBackend(5L);
result = myBackend5;
minTimes = 0;
myBackend5.isAvailable();
result = false;
minTimes = 0;
myBackend5.isAlive();
result = true;
minTimes = 0;
myBackend5.isDecommissioned();
result = true;
minTimes = 0;
}
};
List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIds","cluster1", infoService);
Assert.assertArrayEquals(new long[]{2L, 4L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
}
}