blob: a6ea1e490ca5c876465d75e0c749bc6289725d0b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.CloneTask;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static com.google.common.collect.MoreCollectors.onlyElement;
public class RebalanceTest {
private static final Logger LOG = LogManager.getLogger(RebalanceTest.class);
@Mocked
private Catalog catalog;
private long id = 10086;
private Database db;
private OlapTable olapTable;
private final SystemInfoService systemInfoService = new SystemInfoService();
private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
private Map<String, ClusterLoadStatistic> statisticMap;
@Before
public void setUp() throws AnalysisException {
db = new Database(1, "test db");
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
new Expectations() {
{
catalog.getDbIds();
minTimes = 0;
result = db.getId();
catalog.getDb(anyLong);
minTimes = 0;
result = db;
Catalog.getCurrentCatalogJournalVersion();
minTimes = 0;
result = FeConstants.meta_version;
catalog.getNextId();
minTimes = 0;
result = new Delegate() {
long a() {
return id++;
}
};
Catalog.getCurrentSystemInfo();
minTimes = 0;
result = systemInfoService;
Catalog.getCurrentInvertedIndex();
minTimes = 0;
result = invertedIndex;
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
result = 111;
Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(anyLong, anyLong, (List<Long>) any);
result = true;
}
};
// Test mock validation
Assert.assertEquals(111, Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L)));
List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
beIds.forEach(id -> systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, 0)));
olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS,
new RangePartitionInfo(), new HashDistributionInfo());
db.createTable(olapTable);
// 1 table, 3 partitions p0,p1,p2
MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null);
createPartitionsForTable(olapTable, materializedIndex, 3L);
olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()),
0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
// Tablet distribution: we add them to olapTable & build invertedIndex manually
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD,
50000, Lists.newArrayList(10001L, 10002L, 10003L));
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD,
60000, Lists.newArrayList(10001L, 10002L, 10003L));
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD,
70000, Lists.newArrayList(10001L, 10002L, 10003L));
// be4(10004) doesn't have any replica
generateStatisticMap();
}
private void generateStatisticMap() {
ClusterLoadStatistic loadStatistic = new ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
systemInfoService, invertedIndex);
loadStatistic.init();
statisticMap = Maps.newConcurrentMap();
statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, loadStatistic);
}
private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) {
// partition id start from 31
LongStream.range(0, partitionCount).forEach(idx -> {
long id = 31 + idx;
Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo());
olapTable.addPartition(partition);
olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD), (short) 3, false);
});
}
@Test
public void testPartitionRebalancer() {
Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", Level.DEBUG);
// Disable scheduler's rebalancer adding balance task, add balance tasks manually
Config.disable_balance = true;
// Create a new scheduler & checker for redundant tablets handling
// Call runAfterCatalogReady manually instead of starting daemon thread
TabletSchedulerStat stat = new TabletSchedulerStat();
PartitionRebalancer rebalancer = new PartitionRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex());
TabletScheduler tabletScheduler = new TabletScheduler(catalog, systemInfoService, invertedIndex, stat, "");
// The rebalancer inside the scheduler will use this rebalancer, for getToDeleteReplicaId
Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
TabletChecker tabletChecker = new TabletChecker(catalog, systemInfoService, tabletScheduler, stat);
rebalancer.updateLoadStatistic(statisticMap);
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
// Run once for update slots info, scheduler won't select balance cuz balance is disabled
tabletScheduler.runAfterCatalogReady();
AgentBatchTask batchTask = new AgentBatchTask();
for (TabletSchedCtx tabletCtx : alternativeTablets) {
LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
try {
tabletCtx.setStorageMedium(TStorageMedium.HDD);
tabletCtx.setTablet(olapTable.getPartition(tabletCtx.getPartitionId()).getIndex(tabletCtx.getIndexId()).getTablet(tabletCtx.getTabletId()));
tabletCtx.setVersionInfo(1, 0, 1, 0);
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first
// createCloneReplicaAndTask, create replica will change invertedIndex too.
rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots(), batchTask);
} catch (SchedException e) {
LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage());
}
}
// Show debug info of MoveInProgressMap detail
rebalancer.updateLoadStatistic(statisticMap);
rebalancer.selectAlternativeTablets();
// Get created tasks, and finish them manually
List<AgentTask> tasks = batchTask.getAllTasks();
List<Long> needCheckTablets = tasks.stream().map(AgentTask::getTabletId).collect(Collectors.toList());
LOG.info("created tasks for tablet: {}", needCheckTablets);
needCheckTablets.forEach(t -> Assert.assertEquals(4, invertedIndex.getReplicasByTabletId(t).size()));
// // If clone task execution is too slow, tabletChecker may want to delete the CLONE replica.
// tabletChecker.runAfterCatalogReady();
// Assert.assertTrue(tabletScheduler.containsTablet(50000));
// // tabletScheduler handle redundant
// tabletScheduler.runAfterCatalogReady();
for (Long tabletId : needCheckTablets) {
TabletSchedCtx tabletSchedCtx = alternativeTablets.stream().filter(ctx -> ctx.getTabletId() == tabletId).collect(onlyElement());
AgentTask task = tasks.stream().filter(t -> t.getTabletId() == tabletId).collect(onlyElement());
LOG.info("try to finish tabletCtx {}", tabletId);
try {
TFinishTaskRequest fakeReq = new TFinishTaskRequest();
fakeReq.task_status = new TStatus(TStatusCode.OK);
fakeReq.finish_tablet_infos = Lists.newArrayList(new TTabletInfo(tabletSchedCtx.getTabletId(), 5, 1, 0, 0, 0));
tabletSchedCtx.finishCloneTask((CloneTask) task, fakeReq);
} catch (SchedException e) {
e.printStackTrace();
}
}
// NeedCheckTablets are redundant, TabletChecker will add them to TabletScheduler
tabletChecker.runAfterCatalogReady();
needCheckTablets.forEach(t -> Assert.assertEquals(4, invertedIndex.getReplicasByTabletId(t).size()));
needCheckTablets.forEach(t -> Assert.assertTrue(tabletScheduler.containsTablet(t)));
// TabletScheduler handle redundant tablet
tabletScheduler.runAfterCatalogReady();
// One replica is set to DECOMMISSION, still 4 replicas
needCheckTablets.forEach(t -> {
List<Replica> replicas = invertedIndex.getReplicasByTabletId(t);
Assert.assertEquals(4, replicas.size());
Replica decommissionedReplica = replicas.stream().filter(r -> r.getState() == Replica.ReplicaState.DECOMMISSION).collect(onlyElement());
// expected watermarkTxnId is 111
Assert.assertEquals(111, decommissionedReplica.getWatermarkTxnId());
});
// Delete replica should change invertedIndex too
tabletScheduler.runAfterCatalogReady();
needCheckTablets.forEach(t -> Assert.assertEquals(3, invertedIndex.getReplicasByTabletId(t).size()));
// Check moves completed
rebalancer.selectAlternativeTablets();
rebalancer.updateLoadStatistic(statisticMap);
AtomicLong succeeded = Deencapsulation.getField(rebalancer, "counterBalanceMoveSucceeded");
Assert.assertEquals(needCheckTablets.size(), succeeded.get());
}
@Test
public void testMoveInProgressMap() {
Configurator.setLevel("org.apache.doris.clone.MovesInProgressCache", Level.DEBUG);
MovesCacheMap m = new MovesCacheMap();
m.updateMapping(statisticMap, 3);
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD).get().put(1L, new Pair<>(null, -1L));
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(2L, new Pair<>(null, -1L));
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
// Maintenance won't clean up the entries of cache
m.maintain();
Assert.assertEquals(3, m.size());
// Reset the expireAfterAccess, the whole cache map will be cleared.
m.updateMapping(statisticMap, 1);
Assert.assertEquals(0, m.size());
m.getCache(SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
try {
Thread.sleep(1000);
m.maintain();
Assert.assertEquals(0, m.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}