blob: b31e4a8e9965c0acae226af882726515e679fd8b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.net;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.REGION_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.DATACENTER_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.NODEGROUP_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.runner.RunWith;
/** Test the network topology functions. */
@RunWith(Parameterized.class)
public class TestNetworkTopologyImpl {
private static final Logger LOG = LoggerFactory.getLogger(
TestNetworkTopologyImpl.class);
private NetworkTopology cluster;
private Node[] dataNodes;
private Random random = new Random();
public TestNetworkTopologyImpl(NodeSchema[] schemas, Node[] nodeArray) {
NodeSchemaManager.getInstance().init(schemas, true);
cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
dataNodes = nodeArray;
for (int i = 0; i < dataNodes.length; i++) {
cluster.add(dataNodes[i]);
}
}
@Rule
public Timeout testTimeout = new Timeout(3000000);
@Parameters
public static Collection<Object[]> setupDatanodes() {
Object[][] topologies = new Object[][]{
{new NodeSchema[] {ROOT_SCHEMA, LEAF_SCHEMA},
new Node[]{
createDatanode("1.1.1.1", "/"),
createDatanode("2.2.2.2", "/"),
createDatanode("3.3.3.3", "/"),
createDatanode("4.4.4.4", "/"),
createDatanode("5.5.5.5", "/"),
createDatanode("6.6.6.6", "/"),
createDatanode("7.7.7.7", "/"),
createDatanode("8.8.8.8", "/"),
}},
{new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA},
new Node[]{
createDatanode("1.1.1.1", "/r1"),
createDatanode("2.2.2.2", "/r1"),
createDatanode("3.3.3.3", "/r2"),
createDatanode("4.4.4.4", "/r2"),
createDatanode("5.5.5.5", "/r2"),
createDatanode("6.6.6.6", "/r3"),
createDatanode("7.7.7.7", "/r3"),
createDatanode("8.8.8.8", "/r3"),
}},
{new NodeSchema[]
{ROOT_SCHEMA, DATACENTER_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA},
new Node[]{
createDatanode("1.1.1.1", "/d1/r1"),
createDatanode("2.2.2.2", "/d1/r1"),
createDatanode("3.3.3.3", "/d1/r2"),
createDatanode("4.4.4.4", "/d1/r2"),
createDatanode("5.5.5.5", "/d1/r2"),
createDatanode("6.6.6.6", "/d2/r3"),
createDatanode("7.7.7.7", "/d2/r3"),
createDatanode("8.8.8.8", "/d2/r3"),
}},
{new NodeSchema[] {ROOT_SCHEMA, DATACENTER_SCHEMA, RACK_SCHEMA,
NODEGROUP_SCHEMA, LEAF_SCHEMA},
new Node[]{
createDatanode("1.1.1.1", "/d1/r1/ng1"),
createDatanode("2.2.2.2", "/d1/r1/ng1"),
createDatanode("3.3.3.3", "/d1/r2/ng2"),
createDatanode("4.4.4.4", "/d1/r2/ng2"),
createDatanode("5.5.5.5", "/d1/r2/ng3"),
createDatanode("6.6.6.6", "/d2/r3/ng3"),
createDatanode("7.7.7.7", "/d2/r3/ng3"),
createDatanode("8.8.8.8", "/d2/r3/ng3"),
createDatanode("9.9.9.9", "/d3/r1/ng1"),
createDatanode("10.10.10.10", "/d3/r1/ng1"),
createDatanode("11.11.11.11", "/d3/r1/ng1"),
createDatanode("12.12.12.12", "/d3/r2/ng2"),
createDatanode("13.13.13.13", "/d3/r2/ng2"),
createDatanode("14.14.14.14", "/d4/r1/ng1"),
createDatanode("15.15.15.15", "/d4/r1/ng1"),
createDatanode("16.16.16.16", "/d4/r1/ng1"),
createDatanode("17.17.17.17", "/d4/r1/ng2"),
createDatanode("18.18.18.18", "/d4/r1/ng2"),
createDatanode("19.19.19.19", "/d4/r1/ng3"),
createDatanode("20.20.20.20", "/d4/r1/ng3"),
}},
{new NodeSchema[] {ROOT_SCHEMA, REGION_SCHEMA, DATACENTER_SCHEMA,
RACK_SCHEMA, NODEGROUP_SCHEMA, LEAF_SCHEMA},
new Node[]{
createDatanode("1.1.1.1", "/d1/rg1/r1/ng1"),
createDatanode("2.2.2.2", "/d1/rg1/r1/ng1"),
createDatanode("3.3.3.3", "/d1/rg1/r1/ng2"),
createDatanode("4.4.4.4", "/d1/rg1/r1/ng1"),
createDatanode("5.5.5.5", "/d1/rg1/r1/ng1"),
createDatanode("6.6.6.6", "/d1/rg1/r1/ng2"),
createDatanode("7.7.7.7", "/d1/rg1/r1/ng2"),
createDatanode("8.8.8.8", "/d1/rg1/r1/ng2"),
createDatanode("9.9.9.9", "/d1/rg1/r1/ng2"),
createDatanode("10.10.10.10", "/d1/rg1/r1/ng2"),
createDatanode("11.11.11.11", "/d1/rg1/r2/ng1"),
createDatanode("12.12.12.12", "/d1/rg1/r2/ng1"),
createDatanode("13.13.13.13", "/d1/rg1/r2/ng1"),
createDatanode("14.14.14.14", "/d1/rg1/r2/ng1"),
createDatanode("15.15.15.15", "/d1/rg1/r2/ng1"),
createDatanode("16.16.16.16", "/d1/rg1/r2/ng2"),
createDatanode("17.17.17.17", "/d1/rg1/r2/ng2"),
createDatanode("18.18.18.18", "/d1/rg1/r2/ng2"),
createDatanode("19.19.19.19", "/d1/rg1/r2/ng2"),
createDatanode("20.20.20.20", "/d1/rg1/r2/ng2"),
createDatanode("21.21.21.21", "/d2/rg1/r2/ng1"),
createDatanode("22.22.22.22", "/d2/rg1/r2/ng1"),
createDatanode("23.23.23.23", "/d2/rg2/r2/ng1"),
createDatanode("24.24.24.24", "/d2/rg2/r2/ng1"),
createDatanode("25.25.25.25", "/d2/rg2/r2/ng1"),
}}
};
return Arrays.asList(topologies);
}
@Test
public void testContains() {
Node nodeNotInMap = createDatanode("8.8.8.8", "/d2/r4");
for (int i=0; i < dataNodes.length; i++) {
assertTrue(cluster.contains(dataNodes[i]));
}
assertFalse(cluster.contains(nodeNotInMap));
}
@Test
public void testNumOfChildren() {
assertEquals(dataNodes.length, cluster.getNumOfLeafNode(null));
assertEquals(0, cluster.getNumOfLeafNode("/switch1/node1"));
}
@Test
public void testGetNode() {
assertEquals(cluster.getNode(""), cluster.getNode(null));
assertEquals(cluster.getNode(""), cluster.getNode("/"));
assertEquals(null, cluster.getNode("/switch1/node1"));
assertEquals(null, cluster.getNode("/switch1"));
}
@Test
public void testCreateInvalidTopology() {
List<NodeSchema> schemas = new ArrayList<NodeSchema>();
schemas.add(ROOT_SCHEMA);
schemas.add(RACK_SCHEMA);
schemas.add(LEAF_SCHEMA);
NodeSchemaManager.getInstance().init(schemas.toArray(new NodeSchema[0]),
true);
NetworkTopology newCluster = new NetworkTopologyImpl(
NodeSchemaManager.getInstance());
Node[] invalidDataNodes = new Node[] {
createDatanode("1.1.1.1", "/r1"),
createDatanode("2.2.2.2", "/r2"),
createDatanode("3.3.3.3", "/d1/r2")
};
newCluster.add(invalidDataNodes[0]);
newCluster.add(invalidDataNodes[1]);
try {
newCluster.add(invalidDataNodes[2]);
fail("expected InvalidTopologyException");
} catch (NetworkTopology.InvalidTopologyException e) {
assertTrue(e.getMessage().contains("Failed to add"));
assertTrue(e.getMessage().contains("Its path depth is not " +
newCluster.getMaxLevel()));
}
}
@Test
public void testInitWithConfigFile() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Configuration conf = new Configuration();
try {
String filePath = classLoader.getResource(
"./networkTopologyTestFiles/good.xml").getPath();
conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, filePath);
NetworkTopology newCluster = new NetworkTopologyImpl(conf);
LOG.info("network topology max level = " + newCluster.getMaxLevel());
} catch (Throwable e) {
fail("should succeed");
}
}
@Test
public void testAncestor() {
assumeTrue(cluster.getMaxLevel() > 2);
int maxLevel = cluster.getMaxLevel();
assertTrue(cluster.isSameParent(dataNodes[0], dataNodes[1]));
while(maxLevel > 1) {
assertTrue(cluster.isSameAncestor(dataNodes[0], dataNodes[1],
maxLevel - 1));
maxLevel--;
}
assertFalse(cluster.isSameParent(dataNodes[1], dataNodes[2]));
assertFalse(cluster.isSameParent(null, dataNodes[2]));
assertFalse(cluster.isSameParent(dataNodes[1], null));
assertFalse(cluster.isSameParent(null, null));
assertFalse(cluster.isSameAncestor(dataNodes[1], dataNodes[2], 0));
assertFalse(cluster.isSameAncestor(dataNodes[1], null, 1));
assertFalse(cluster.isSameAncestor(null, dataNodes[2], 1));
assertFalse(cluster.isSameAncestor(null, null, 1));
maxLevel = cluster.getMaxLevel();
assertTrue(cluster.isSameAncestor(
dataNodes[random.nextInt(cluster.getNumOfLeafNode(null))],
dataNodes[random.nextInt(cluster.getNumOfLeafNode(null))],
maxLevel - 1));
}
@Test
public void testAddRemove() {
for(int i = 0; i < dataNodes.length; i++) {
cluster.remove(dataNodes[i]);
}
for(int i = 0; i < dataNodes.length; i++) {
assertFalse(cluster.contains(dataNodes[i]));
}
// no leaf nodes
assertEquals(0, cluster.getNumOfLeafNode(null));
// no inner nodes
assertEquals(0, cluster.getNumOfNodes(2));
for(int i = 0; i < dataNodes.length; i++) {
cluster.add(dataNodes[i]);
}
// Inner nodes are created automatically
assertTrue(cluster.getNumOfNodes(2) > 0);
try {
cluster.add(cluster.chooseRandom(null).getParent());
fail("Inner node can not be added manually");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Not allowed to add an inner node"));
}
try {
cluster.remove(cluster.chooseRandom(null).getParent());
fail("Inner node can not be removed manually");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Not allowed to remove an inner node"));
}
}
@Test
public void testGetNodesWithLevel() {
int maxLevel = cluster.getMaxLevel();
try {
assertEquals(1, cluster.getNumOfNodes(0));
fail("level 0 is not supported");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().startsWith("Invalid level"));
}
try {
assertEquals(1, cluster.getNumOfNodes(0));
fail("level 0 is not supported");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().startsWith("Invalid level"));
}
try {
assertEquals(1, cluster.getNumOfNodes(maxLevel + 1));
fail("level out of scope");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().startsWith("Invalid level"));
}
try {
assertEquals(1, cluster.getNumOfNodes(maxLevel + 1));
fail("level out of scope");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().startsWith("Invalid level"));
}
// root node
assertEquals(1, cluster.getNumOfNodes(1));
assertEquals(1, cluster.getNumOfNodes(1));
// leaf nodes
assertEquals(dataNodes.length, cluster.getNumOfNodes(maxLevel));
assertEquals(dataNodes.length, cluster.getNumOfNodes(maxLevel));
}
@Test
public void testChooseRandomSimple() {
String path =
dataNodes[random.nextInt(dataNodes.length)].getNetworkFullPath();
assertEquals(path, cluster.chooseRandom(path).getNetworkFullPath());
path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
// test chooseRandom(String scope)
while (!path.equals(ROOT)) {
assertTrue(cluster.chooseRandom(path).getNetworkLocation()
.startsWith(path));
Node node = cluster.chooseRandom("~" + path);
assertTrue(!node.getNetworkLocation()
.startsWith(path));
path = path.substring(0,
path.lastIndexOf(PATH_SEPARATOR_STR));
}
assertNotNull(cluster.chooseRandom(null));
assertNotNull(cluster.chooseRandom(""));
assertNotNull(cluster.chooseRandom("/"));
assertNull(cluster.chooseRandom("~"));
assertNull(cluster.chooseRandom("~/"));
// test chooseRandom(String scope, String excludedScope)
path = dataNodes[random.nextInt(dataNodes.length)].getNetworkFullPath();
List<String> pathList = new ArrayList<>();
pathList.add(path);
assertNull(cluster.chooseRandom(path, pathList));
assertNotNull(cluster.chooseRandom(null, pathList));
assertNotNull(cluster.chooseRandom("", pathList));
// test chooseRandom(String scope, Collection<Node> excludedNodes)
assertNull(cluster.chooseRandom("", Arrays.asList(dataNodes)));
assertNull(cluster.chooseRandom("/", Arrays.asList(dataNodes)));
assertNull(cluster.chooseRandom("~", Arrays.asList(dataNodes)));
assertNull(cluster.chooseRandom("~/", Arrays.asList(dataNodes)));
assertNull(cluster.chooseRandom(null, Arrays.asList(dataNodes)));
}
/**
* Following test checks that chooseRandom works for an excluded scope.
*/
@Test
public void testChooseRandomExcludedScope() {
int[] excludedNodeIndexs = {0, dataNodes.length - 1,
random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
String scope;
Map<Node, Integer> frequency;
for (int i : excludedNodeIndexs) {
String path = dataNodes[i].getNetworkFullPath();
while (!path.equals(ROOT)) {
scope = "~" + path;
frequency = pickNodesAtRandom(100, scope, null, 0);
for (Node key : dataNodes) {
if (key.getNetworkFullPath().startsWith(path)) {
assertTrue(frequency.get(key) == 0);
}
}
path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
}
}
// null excludedScope, every node should be chosen
frequency = pickNodes(100, null, null, null, 0);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) != 0);
}
// "" excludedScope, no node will ever be chosen
List<String> pathList = new ArrayList();
pathList.add("");
frequency = pickNodes(100, pathList, null, null, 0);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) == 0);
}
// "~" scope, no node will ever be chosen
scope = "~";
frequency = pickNodesAtRandom(100, scope, null, 0);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) == 0);
}
// out network topology excluded scope, every node should be chosen
pathList.clear();
pathList.add("/city1");
frequency = pickNodes(
cluster.getNumOfLeafNode(null), pathList, null, null, 0);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) != 0);
}
}
/**
* Following test checks that chooseRandom works for an excluded nodes.
*/
@Test
public void testChooseRandomExcludedNode() {
Node[][] excludedNodeLists = {
{},
{dataNodes[0]},
{dataNodes[dataNodes.length - 1]},
{dataNodes[random.nextInt(dataNodes.length)]},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)]
},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
}};
int leafNum = cluster.getNumOfLeafNode(null);
Map<Node, Integer> frequency;
for(Node[] list : excludedNodeLists) {
List<Node> excludedList = Arrays.asList(list);
int ancestorGen = 0;
while(ancestorGen < cluster.getMaxLevel()) {
frequency = pickNodesAtRandom(leafNum, null, excludedList, ancestorGen);
List<Node> ancestorList = NetUtils.getAncestorList(cluster,
excludedList, ancestorGen);
for (Node key : dataNodes) {
if (excludedList.contains(key) ||
(ancestorList.size() > 0 &&
ancestorList.stream()
.map(a -> (InnerNode) a)
.filter(a -> a.isAncestor(key))
.collect(Collectors.toList()).size() > 0)) {
assertTrue(frequency.get(key) == 0);
}
}
ancestorGen++;
}
}
// all nodes excluded, no node will be picked
List<Node> excludedList = Arrays.asList(dataNodes);
int ancestorGen = 0;
while(ancestorGen < cluster.getMaxLevel()) {
frequency = pickNodesAtRandom(leafNum, null, excludedList, ancestorGen);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) == 0);
}
ancestorGen++;
}
// out scope excluded nodes, each node will be picked
excludedList = Arrays.asList(createDatanode("1.1.1.1.", "/city1/rack1"));
ancestorGen = 0;
while(ancestorGen < cluster.getMaxLevel()) {
frequency = pickNodes(leafNum, null, excludedList, null, ancestorGen);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) != 0);
}
ancestorGen++;
}
}
/**
* Following test checks that chooseRandom works for excluded nodes and scope.
*/
@Test
public void testChooseRandomExcludedNodeAndScope() {
int[] excludedNodeIndexs = {0, dataNodes.length - 1,
random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
Node[][] excludedNodeLists = {
{},
{dataNodes[0]},
{dataNodes[dataNodes.length - 1]},
{dataNodes[random.nextInt(dataNodes.length)]},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)]
},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
}};
int leafNum = cluster.getNumOfLeafNode(null);
Map<Node, Integer> frequency;
String scope;
for (int i : excludedNodeIndexs) {
String path = dataNodes[i].getNetworkFullPath();
while (!path.equals(ROOT)) {
scope = "~" + path;
int ancestorGen = 0;
while(ancestorGen < cluster.getMaxLevel()) {
for (Node[] list : excludedNodeLists) {
List<Node> excludedList = Arrays.asList(list);
frequency =
pickNodesAtRandom(leafNum, scope, excludedList, ancestorGen);
List<Node> ancestorList = NetUtils.getAncestorList(cluster,
excludedList, ancestorGen);
for (Node key : dataNodes) {
if (excludedList.contains(key) ||
key.getNetworkFullPath().startsWith(path) ||
(ancestorList.size() > 0 &&
ancestorList.stream()
.map(a -> (InnerNode) a)
.filter(a -> a.isAncestor(key))
.collect(Collectors.toList()).size() > 0)) {
assertTrue(frequency.get(key) == 0);
}
}
}
ancestorGen++;
}
path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
}
}
// all nodes excluded, no node will be picked
List<Node> excludedList = Arrays.asList(dataNodes);
for (int i : excludedNodeIndexs) {
String path = dataNodes[i].getNetworkFullPath();
while (!path.equals(ROOT)) {
scope = "~" + path;
int ancestorGen = 0;
while (ancestorGen < cluster.getMaxLevel()) {
frequency =
pickNodesAtRandom(leafNum, scope, excludedList, ancestorGen);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) == 0);
}
ancestorGen++;
}
path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
}
}
// no node excluded and no excluded scope, each node will be picked
int ancestorGen = 0;
while (ancestorGen < cluster.getMaxLevel()) {
frequency = pickNodes(leafNum, null, null, null, ancestorGen);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) != 0);
}
ancestorGen++;
}
}
/**
* Following test checks that chooseRandom works for excluded nodes, scope
* and ancestor generation.
*/
@Test
public void testChooseRandomWithAffinityNode() {
int[] excludedNodeIndexs = {0, dataNodes.length - 1,
random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
Node[][] excludedNodeLists = {
{},
{dataNodes[0]},
{dataNodes[dataNodes.length - 1]},
{dataNodes[random.nextInt(dataNodes.length)]},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)]
},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
}};
int[] affinityNodeIndexs = {0, dataNodes.length - 1,
random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
Node[][] excludedScopeIndexs = {{dataNodes[0]},
{dataNodes[dataNodes.length - 1]},
{dataNodes[random.nextInt(dataNodes.length)]},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)]
},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
}};
int leafNum = cluster.getNumOfLeafNode(null);
Map<Node, Integer> frequency;
List<String> pathList = new ArrayList<>();
for (int k : affinityNodeIndexs) {
for (Node[] excludedScopes : excludedScopeIndexs) {
pathList.clear();
pathList.addAll(Arrays.stream(excludedScopes)
.map(node -> node.getNetworkFullPath())
.collect(Collectors.toList()));
while (!pathList.get(0).equals(ROOT)) {
int ancestorGen = cluster.getMaxLevel() - 1;
while (ancestorGen > 0) {
for (Node[] list : excludedNodeLists) {
List<Node> excludedList = Arrays.asList(list);
frequency = pickNodes(leafNum, pathList, excludedList,
dataNodes[k], ancestorGen);
Node affinityAncestor = dataNodes[k].getAncestor(ancestorGen);
for (Node key : dataNodes) {
if (affinityAncestor != null) {
if (frequency.get(key) > 0) {
assertTrue(affinityAncestor.isAncestor(key));
} else if (!affinityAncestor.isAncestor(key)) {
continue;
} else if (excludedList != null &&
excludedList.contains(key)) {
continue;
} else if (pathList != null &&
pathList.stream().anyMatch(path ->
key.getNetworkFullPath().startsWith(path))) {
continue;
} else {
fail("Node is not picked when sequentially going " +
"through ancestor node's leaf nodes. node:" +
key.getNetworkFullPath() + ", ancestor node:" +
affinityAncestor.getNetworkFullPath() +
", excludedScope: " + pathList.toString() + ", " +
"excludedList:" + (excludedList == null ? "" :
excludedList.toString()));
}
}
}
}
ancestorGen--;
}
pathList = pathList.stream().map(path ->
path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR)))
.collect(Collectors.toList());
}
}
}
// all nodes excluded, no node will be picked
String scope;
List<Node> excludedList = Arrays.asList(dataNodes);
for (int k : affinityNodeIndexs) {
for (int i : excludedNodeIndexs) {
String path = dataNodes[i].getNetworkFullPath();
while (!path.equals(ROOT)) {
scope = "~" + path;
int ancestorGen = 0;
while (ancestorGen < cluster.getMaxLevel()) {
frequency = pickNodesAtRandom(leafNum, scope, excludedList,
dataNodes[k], ancestorGen);
for (Node key : dataNodes) {
assertTrue(frequency.get(key) == 0);
}
ancestorGen++;
}
path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
}
}
}
// no node excluded and no excluded scope, each node will be picked
int ancestorGen = cluster.getMaxLevel() - 1;
for (int k : affinityNodeIndexs) {
while (ancestorGen > 0) {
frequency =
pickNodes(leafNum, null, null, dataNodes[k], ancestorGen);
Node affinityAncestor = dataNodes[k].getAncestor(ancestorGen);
for (Node key : dataNodes) {
if (frequency.get(key) > 0) {
if (affinityAncestor != null) {
assertTrue(affinityAncestor.isAncestor(key));
}
}
}
ancestorGen--;
}
}
// check invalid ancestor generation
try {
cluster.chooseRandom(null, null, null, dataNodes[0],
cluster.getMaxLevel());
fail("ancestor generation exceeds max level, should fail");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("ancestorGen " +
cluster.getMaxLevel() +
" exceeds this network topology acceptable level"));
}
}
@Test
public void testCost() {
// network topology with default cost
List<NodeSchema> schemas = new ArrayList<>();
schemas.add(ROOT_SCHEMA);
schemas.add(RACK_SCHEMA);
schemas.add(NODEGROUP_SCHEMA);
schemas.add(LEAF_SCHEMA);
NodeSchemaManager manager = NodeSchemaManager.getInstance();
manager.init(schemas.toArray(new NodeSchema[0]), true);
NetworkTopology newCluster =
new NetworkTopologyImpl(manager);
Node[] nodeList = new Node[] {
createDatanode("1.1.1.1", "/r1/ng1"),
createDatanode("2.2.2.2", "/r1/ng1"),
createDatanode("3.3.3.3", "/r1/ng2"),
createDatanode("4.4.4.4", "/r2/ng1"),
};
for (Node node: nodeList) {
newCluster.add(node);
}
Node outScopeNode1 = createDatanode("5.5.5.5", "/r2/ng2");
Node outScopeNode2 = createDatanode("6.6.6.6", "/r2/ng2");
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(nodeList[0], null));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(null, nodeList[0]));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(outScopeNode1, nodeList[0]));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(nodeList[0], outScopeNode1));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(outScopeNode1, outScopeNode2));
assertEquals(0, newCluster.getDistanceCost(null, null));
assertEquals(0, newCluster.getDistanceCost(nodeList[0], nodeList[0]));
assertEquals(2, newCluster.getDistanceCost(nodeList[0], nodeList[1]));
assertEquals(4, newCluster.getDistanceCost(nodeList[0], nodeList[2]));
assertEquals(6, newCluster.getDistanceCost(nodeList[0], nodeList[3]));
// network topology with customized cost
schemas.clear();
schemas.add(new NodeSchema.Builder()
.setType(NodeSchema.LayerType.ROOT).setCost(5).build());
schemas.add(new NodeSchema.Builder()
.setType(NodeSchema.LayerType.INNER_NODE).setCost(3).build());
schemas.add(new NodeSchema.Builder()
.setType(NodeSchema.LayerType.INNER_NODE).setCost(1).build());
schemas.add(new NodeSchema.Builder()
.setType(NodeSchema.LayerType.LEAF_NODE).build());
manager = NodeSchemaManager.getInstance();
manager.init(schemas.toArray(new NodeSchema[0]), true);
newCluster = new NetworkTopologyImpl(manager);
for (Node node: nodeList) {
newCluster.add(node);
}
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(nodeList[0], null));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(null, nodeList[0]));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(outScopeNode1, nodeList[0]));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(nodeList[0], outScopeNode1));
assertEquals(Integer.MAX_VALUE,
newCluster.getDistanceCost(outScopeNode1, outScopeNode2));
assertEquals(0, newCluster.getDistanceCost(null, null));
assertEquals(0, newCluster.getDistanceCost(nodeList[0], nodeList[0]));
assertEquals(2, newCluster.getDistanceCost(nodeList[0], nodeList[1]));
assertEquals(8, newCluster.getDistanceCost(nodeList[0], nodeList[2]));
assertEquals(18, newCluster.getDistanceCost(nodeList[0], nodeList[3]));
}
@Test
public void testSortByDistanceCost() {
Node[][] nodes = {
{},
{dataNodes[0]},
{dataNodes[dataNodes.length - 1]},
{dataNodes[random.nextInt(dataNodes.length)]},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)]
},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
},
{dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
}};
Node[] readers = {null, dataNodes[0], dataNodes[dataNodes.length - 1],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)],
dataNodes[random.nextInt(dataNodes.length)]
};
for (Node reader : readers) {
for (Node[] nodeList : nodes) {
int length = nodeList.length;
while (length > 0) {
List<? extends Node> ret = cluster.sortByDistanceCost(reader,
Arrays.asList(nodeList), length);
for (int i = 0; i < ret.size(); i++) {
if ((i + 1) < ret.size()) {
int cost1 = cluster.getDistanceCost(reader, ret.get(i));
int cost2 = cluster.getDistanceCost(reader, ret.get(i + 1));
assertTrue("reader:" + (reader != null ?
reader.getNetworkFullPath() : "null") +
",node1:" + ret.get(i).getNetworkFullPath() +
",node2:" + ret.get(i + 1).getNetworkFullPath() +
",cost1:" + cost1 + ",cost2:" + cost2,
cost1 == Integer.MAX_VALUE || cost1 <= cost2);
}
}
length--;
}
}
}
// sort all nodes
List<Node> nodeList = Arrays.asList(dataNodes.clone());
for (Node reader : readers) {
int length = nodeList.size();
while (length >= 0) {
List<? extends Node> sortedNodeList =
cluster.sortByDistanceCost(reader, nodeList, length);
for (int i = 0; i < sortedNodeList.size(); i++) {
if ((i + 1) < sortedNodeList.size()) {
int cost1 = cluster.getDistanceCost(reader, sortedNodeList.get(i));
int cost2 = cluster.getDistanceCost(
reader, sortedNodeList.get(i + 1));
// node can be removed when called in testConcurrentAccess
assertTrue("reader:" + (reader != null ?
reader.getNetworkFullPath() : "null") +
",node1:" + sortedNodeList.get(i).getNetworkFullPath() +
",node2:" + sortedNodeList.get(i + 1).getNetworkFullPath() +
",cost1:" + cost1 + ",cost2:" + cost2,
cost1 == Integer.MAX_VALUE || cost1 <= cost2);
}
}
length--;
}
}
}
private static Node createDatanode(String name, String path) {
return new NodeImpl(name, path, NetConstants.NODE_COST_DEFAULT);
}
/**
* This picks a large number of nodes at random in order to ensure coverage.
*
* @param numNodes the number of nodes
* @param excludedScope the excluded scope
* @param excludedNodes the excluded node list
* @param ancestorGen the chosen node cannot share the same ancestor at
* this generation with excludedNodes
* @return the frequency that nodes were chosen
*/
private Map<Node, Integer> pickNodesAtRandom(int numNodes,
String excludedScope, Collection<Node> excludedNodes, int ancestorGen) {
Map<Node, Integer> frequency = new HashMap<Node, Integer>();
for (Node dnd : dataNodes) {
frequency.put(dnd, 0);
}
for (int j = 0; j < numNodes; j++) {
Node node = cluster.chooseRandom(excludedScope, excludedNodes,
ancestorGen);
if (node != null) {
frequency.put(node, frequency.get(node) + 1);
}
}
LOG.info("Result:" + frequency);
return frequency;
}
/**
* This picks a large number of nodes at random in order to ensure coverage.
*
* @param numNodes the number of nodes
* @param excludedScope the excluded scope
* @param excludedNodes the excluded node list
* @param affinityNode the chosen node should share the same ancestor at
* generation "ancestorGen" with this node
* @param ancestorGen the chosen node cannot share the same ancestor at
* this generation with excludedNodes
* @return the frequency that nodes were chosen
*/
private Map<Node, Integer> pickNodesAtRandom(int numNodes,
String excludedScope, Collection<Node> excludedNodes, Node affinityNode,
int ancestorGen) {
Map<Node, Integer> frequency = new HashMap<Node, Integer>();
for (Node dnd : dataNodes) {
frequency.put(dnd, 0);
}
List<String> pathList = new ArrayList<>();
pathList.add(excludedScope.substring(1));
for (int j = 0; j < numNodes; j++) {
Node node = cluster.chooseRandom("", pathList, excludedNodes,
affinityNode, ancestorGen);
if (node != null) {
frequency.put(node, frequency.get(node) + 1);
}
}
LOG.info("Result:" + frequency);
return frequency;
}
/**
* This picks a large amount of nodes sequentially.
*
* @param numNodes the number of nodes
* @param excludedScopes the excluded scopes, should not start with "~"
* @param excludedNodes the excluded node list
* @param affinityNode the chosen node should share the same ancestor at
* generation "ancestorGen" with this node
* @param ancestorGen the chosen node cannot share the same ancestor at
* this generation with excludedNodes
* @return the frequency that nodes were chosen
*/
private Map<Node, Integer> pickNodes(int numNodes,
List<String> excludedScopes, Collection<Node> excludedNodes,
Node affinityNode, int ancestorGen) {
Map<Node, Integer> frequency = new HashMap<>();
for (Node dnd : dataNodes) {
frequency.put(dnd, 0);
}
excludedNodes = excludedNodes == null ? null :
excludedNodes.stream().distinct().collect(Collectors.toList());
for (int j = 0; j < numNodes; j++) {
Node node = cluster.getNode(j, null, excludedScopes, excludedNodes,
affinityNode, ancestorGen);
if (node != null) {
frequency.put(node, frequency.get(node) + 1);
}
}
LOG.info("Result:" + frequency);
return frequency;
}
}