blob: ef040a18ae6ce29ceccc0611555950b535904b0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.load.balancer.router;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.RegionGroupCache;
import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class LeaderRouterTest {
@Test
public void testGenRealTimeRoutingPolicy() {
// Build TDataNodeLocations
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
for (int i = 0; i < 6; i++) {
dataNodeLocations.add(
new TDataNodeLocation(
i,
new TEndPoint("0.0.0.0", 6667 + i),
new TEndPoint("0.0.0.0", 9003 + i),
new TEndPoint("0.0.0.0", 8777 + i),
new TEndPoint("0.0.0.0", 40010 + i),
new TEndPoint("0.0.0.0", 50010 + i)));
}
// Build nodeCacheMap
long currentTimeMillis = System.currentTimeMillis();
Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
for (int i = 0; i < 6; i++) {
nodeCacheMap.put(i, new DataNodeHeartbeatCache());
// Simulate that the DataNode-i returned a heartbeat at (currentTime - i * 1000) ms
nodeCacheMap
.get(i)
.cacheHeartbeatSample(
new NodeHeartbeatSample(
new THeartbeatResp(currentTimeMillis - i * 1000, NodeStatus.Running.getStatus()),
currentTimeMillis - i * 1000));
}
nodeCacheMap.values().forEach(BaseNodeCache::updateNodeStatus);
// Get the loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) ->
loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));
// Build TRegionReplicaSet
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
groupId1,
Arrays.asList(
dataNodeLocations.get(2), dataNodeLocations.get(1), dataNodeLocations.get(0)));
TConsensusGroupId groupId2 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 2);
TRegionReplicaSet regionReplicaSet2 =
new TRegionReplicaSet(
groupId2,
Arrays.asList(
dataNodeLocations.get(5), dataNodeLocations.get(4), dataNodeLocations.get(3)));
List<TRegionReplicaSet> regionReplicaSets = Arrays.asList(regionReplicaSet1, regionReplicaSet2);
// Build regionGroupCacheMap
Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap = new HashMap<>();
regionGroupCacheMap.put(groupId1, new RegionGroupCache(groupId1));
regionGroupCacheMap.put(groupId2, new RegionGroupCache(groupId2));
/* Simulate ratis consensus protocol(only one leader) */
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(new RegionHeartbeatSample(10, 10, 0, false, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(new RegionHeartbeatSample(11, 11, 1, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(new RegionHeartbeatSample(12, 12, 2, false, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(new RegionHeartbeatSample(13, 13, 3, false, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(new RegionHeartbeatSample(14, 14, 4, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(new RegionHeartbeatSample(15, 15, 5, false, RegionStatus.Running));
// Get leaderMap
Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
regionGroupCacheMap
.values()
.forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateRegionStatistics()));
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
// Check result
Map<TConsensusGroupId, TRegionReplicaSet> result =
new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
TRegionReplicaSet result1 = result.get(groupId1);
// Leader first
Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(0), result1.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(2), result1.getDataNodeLocations().get(2));
TRegionReplicaSet result2 = result.get(groupId2);
// Leader first
Assert.assertEquals(dataNodeLocations.get(4), result2.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(3), result2.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(5), result2.getDataNodeLocations().get(2));
/* Simulate multiLeader consensus protocol(Each Region believes it is the leader) */
for (int i = 2; i <= 1000; i++) {
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(
new RegionHeartbeatSample(i * 10, i * 10, 0, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(
new RegionHeartbeatSample(i * 10 + 1, i * 10 + 1, 1, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(
new RegionHeartbeatSample(i * 10 + 2, i * 10 + 2, 2, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(
new RegionHeartbeatSample(i * 10 + 3, i * 10 + 3, 3, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(
new RegionHeartbeatSample(i * 10 + 4, i * 10 + 4, 4, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(
new RegionHeartbeatSample(i * 10 + 5, i * 10 + 5, 5, true, RegionStatus.Running));
// Get leaderMap
leaderMap.clear();
regionGroupCacheMap.values().forEach(RegionGroupCache::updateRegionStatistics);
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
// Check result
result = new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
result1 = result.get(groupId1);
// Leader first
Assert.assertEquals(dataNodeLocations.get(2), result1.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(0), result1.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(2));
result2 = result.get(groupId2);
// Leader first
Assert.assertEquals(dataNodeLocations.get(5), result2.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(3), result2.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(4), result2.getDataNodeLocations().get(2));
}
/* Simulate multiLeader consensus protocol with a DataNode fails down */
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(
new RegionHeartbeatSample(10030, 10030, 0, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
.cacheHeartbeatSample(
new RegionHeartbeatSample(10031, 10031, 1, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(
new RegionHeartbeatSample(10033, 10033, 3, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
.cacheHeartbeatSample(
new RegionHeartbeatSample(10034, 10034, 4, true, RegionStatus.Running));
// Get leaderMap
leaderMap.clear();
regionGroupCacheMap
.values()
.forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateRegionStatistics()));
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
// Check result
result = new LeaderRouter(leaderMap, loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
result1 = result.get(groupId1);
// Leader first
Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(0), result1.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(2), result1.getDataNodeLocations().get(2));
result2 = result.get(groupId2);
// Leader first
Assert.assertEquals(dataNodeLocations.get(4), result2.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(3), result2.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(5), result2.getDataNodeLocations().get(2));
}
}