blob: 2d8b81633e7539e58b7dfcbf7aabc8ea71da438e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.when;
/**
* Test for the scm container rack aware placement.
*/
@RunWith(Parameterized.class)
public class TestSCMContainerPlacementRackAware {
private NetworkTopology cluster;
private Configuration conf;
private NodeManager nodeManager;
private Integer datanodeCount;
private List<DatanodeDetails> datanodes = new ArrayList<>();
// policy with fallback capability
private SCMContainerPlacementRackAware policy;
// policy prohibit fallback
private SCMContainerPlacementRackAware policyNoFallback;
// node storage capacity
private static final long STORAGE_CAPACITY = 100L;
private SCMContainerPlacementMetrics metrics;
private static final int NODE_PER_RACK = 5;
public TestSCMContainerPlacementRackAware(Integer count) {
this.datanodeCount = count;
}
@Parameterized.Parameters
public static Collection<Object[]> setupDatanodes() {
return Arrays.asList(new Object[][]{{3}, {4}, {5}, {6}, {7}, {8}, {9},
{10}, {11}, {12}, {13}, {14}, {15}});
}
@Before
public void setup() {
//initialize network topology instance
conf = new OzoneConfiguration();
NodeSchema[] schemas = new NodeSchema[]
{ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
// build datanodes, and network topology
String rack = "/rack";
String hostname = "node";
for (int i = 0; i < datanodeCount; i++) {
// Totally 3 racks, each has 5 datanodes
DatanodeDetails node = TestUtils.createDatanodeDetails(
hostname + i, rack + (i / NODE_PER_RACK));
datanodes.add(node);
cluster.add(node);
}
// create mock node manager
nodeManager = Mockito.mock(NodeManager.class);
when(nodeManager.getNodes(NodeState.HEALTHY))
.thenReturn(new ArrayList<>(datanodes));
when(nodeManager.getNodeStat(anyObject()))
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 0L, 100L));
if (datanodeCount > 4) {
when(nodeManager.getNodeStat(datanodes.get(2)))
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 90L, 10L));
when(nodeManager.getNodeStat(datanodes.get(3)))
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 80L, 20L));
when(nodeManager.getNodeStat(datanodes.get(4)))
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 70L, 30L));
} else if (datanodeCount > 3) {
when(nodeManager.getNodeStat(datanodes.get(2)))
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 90L, 10L));
when(nodeManager.getNodeStat(datanodes.get(3)))
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 80L, 20L));
} else if (datanodeCount > 2) {
when(nodeManager.getNodeStat(datanodes.get(2)))
.thenReturn(new SCMNodeMetric(STORAGE_CAPACITY, 84L, 16L));
}
// create placement policy instances
metrics = SCMContainerPlacementMetrics.create();
policy = new SCMContainerPlacementRackAware(
nodeManager, conf, cluster, true, metrics);
policyNoFallback = new SCMContainerPlacementRackAware(
nodeManager, conf, cluster, false, metrics);
}
@Test
public void chooseNodeWithNoExcludedNodes() throws SCMException {
// test choose new datanodes for new pipeline cases
// 1 replica
int nodeNum = 1;
List<DatanodeDetails> datanodeDetails =
policy.chooseDatanodes(null, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
// 2 replicas
nodeNum = 2;
datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(1)) || (datanodeCount % NODE_PER_RACK == 1));
// 3 replicas
nodeNum = 3;
datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
// requires at least 2 racks for following statement
assumeTrue(datanodeCount > NODE_PER_RACK &&
datanodeCount % NODE_PER_RACK > 1);
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(1)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
datanodeDetails.get(2)));
// 4 replicas
nodeNum = 4;
datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
// requires at least 2 racks and enough datanodes for following statement
assumeTrue(datanodeCount > NODE_PER_RACK + 1);
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(1)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
datanodeDetails.get(2)));
}
@Test
public void chooseNodeWithExcludedNodes() throws SCMException {
// test choose new datanodes for under replicated pipeline
// 3 replicas, two existing datanodes on same rack
assumeTrue(datanodeCount > NODE_PER_RACK);
int nodeNum = 1;
List<DatanodeDetails> excludedNodes = new ArrayList<>();
excludedNodes.add(datanodes.get(0));
excludedNodes.add(datanodes.get(1));
List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
excludedNodes, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
excludedNodes.get(0)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
excludedNodes.get(1)));
// 3 replicas, one existing datanode
nodeNum = 2;
excludedNodes.clear();
excludedNodes.add(datanodes.get(0));
datanodeDetails = policy.chooseDatanodes(
excludedNodes, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertTrue(cluster.isSameParent(
datanodeDetails.get(0), excludedNodes.get(0)) ||
cluster.isSameParent(datanodeDetails.get(0), excludedNodes.get(1)));
// 3 replicas, two existing datanodes on different rack
nodeNum = 1;
excludedNodes.clear();
excludedNodes.add(datanodes.get(0));
excludedNodes.add(datanodes.get(5));
datanodeDetails = policy.chooseDatanodes(
excludedNodes, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertTrue(cluster.isSameParent(
datanodeDetails.get(0), excludedNodes.get(0)) ||
cluster.isSameParent(datanodeDetails.get(0), excludedNodes.get(1)));
}
@Test
public void testFallback() throws SCMException {
// 5 replicas. there are only 3 racks. policy with fallback should
// allocate the 5th datanode though it will break the rack rule(first
// 2 replicas on same rack, others on different racks).
assumeTrue(datanodeCount > NODE_PER_RACK * 2 &&
(datanodeCount % NODE_PER_RACK > 1));
int nodeNum = 5;
List<DatanodeDetails> datanodeDetails =
policy.chooseDatanodes(null, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(1)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(3)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
datanodeDetails.get(3)));
// get metrics
long totalRequest = metrics.getDatanodeRequestCount();
long successCount = metrics.getDatanodeChooseSuccessCount();
long tryCount = metrics.getDatanodeChooseAttemptCount();
long compromiseCount = metrics.getDatanodeChooseFallbackCount();
// verify metrics
Assert.assertTrue(totalRequest == nodeNum);
Assert.assertTrue(successCount == nodeNum);
Assert.assertTrue(tryCount > nodeNum);
Assert.assertTrue(compromiseCount >= 1);
}
@Test
public void testNoFallback() throws SCMException {
assumeTrue(datanodeCount > (NODE_PER_RACK * 2) &&
(datanodeCount <= NODE_PER_RACK * 3));
// 5 replicas. there are only 3 racks. policy prohibit fallback should fail.
int nodeNum = 5;
try {
policyNoFallback.chooseDatanodes(null, null, nodeNum, 15);
fail("Fallback prohibited, this call should fail");
} catch (Exception e) {
assertTrue(e.getClass().getSimpleName().equals("SCMException"));
}
// get metrics
long totalRequest = metrics.getDatanodeRequestCount();
long successCount = metrics.getDatanodeChooseSuccessCount();
long tryCount = metrics.getDatanodeChooseAttemptCount();
long compromiseCount = metrics.getDatanodeChooseFallbackCount();
Assert.assertTrue(totalRequest == nodeNum);
Assert.assertTrue(successCount >= 3);
Assert.assertTrue(tryCount >= nodeNum);
Assert.assertTrue(compromiseCount == 0);
}
@Test
public void chooseNodeWithFavoredNodes() throws SCMException {
int nodeNum = 1;
List<DatanodeDetails> excludedNodes = new ArrayList<>();
List<DatanodeDetails> favoredNodes = new ArrayList<>();
// no excludedNodes, only favoredNodes
favoredNodes.add(datanodes.get(0));
List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes(
excludedNodes, favoredNodes, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertTrue(datanodeDetails.get(0).getNetworkFullPath()
.equals(favoredNodes.get(0).getNetworkFullPath()));
// no overlap between excludedNodes and favoredNodes, favoredNodes can been
// chosen.
excludedNodes.clear();
favoredNodes.clear();
excludedNodes.add(datanodes.get(0));
favoredNodes.add(datanodes.get(2));
datanodeDetails = policy.chooseDatanodes(
excludedNodes, favoredNodes, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertTrue(datanodeDetails.get(0).getNetworkFullPath()
.equals(favoredNodes.get(0).getNetworkFullPath()));
// there is overlap between excludedNodes and favoredNodes, favoredNodes
// should not be chosen.
excludedNodes.clear();
favoredNodes.clear();
excludedNodes.add(datanodes.get(0));
favoredNodes.add(datanodes.get(0));
datanodeDetails = policy.chooseDatanodes(
excludedNodes, favoredNodes, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertFalse(datanodeDetails.get(0).getNetworkFullPath()
.equals(favoredNodes.get(0).getNetworkFullPath()));
}
@Test
public void testNoInfiniteLoop() throws SCMException {
int nodeNum = 1;
try {
// request storage space larger than node capability
policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 15);
fail("Storage requested exceeds capacity, this call should fail");
} catch (Exception e) {
assertTrue(e.getClass().getSimpleName().equals("SCMException"));
}
// get metrics
long totalRequest = metrics.getDatanodeRequestCount();
long successCount = metrics.getDatanodeChooseSuccessCount();
long tryCount = metrics.getDatanodeChooseAttemptCount();
long compromiseCount = metrics.getDatanodeChooseFallbackCount();
Assert.assertTrue(totalRequest == nodeNum);
Assert.assertTrue(successCount == 0);
Assert.assertTrue(tryCount > nodeNum);
Assert.assertTrue(compromiseCount == 0);
}
@Test
public void testDatanodeWithDefaultNetworkLocation() throws SCMException {
String hostname = "node";
List<DatanodeDetails> dataList = new ArrayList<>();
NetworkTopology clusterMap =
new NetworkTopologyImpl(NodeSchemaManager.getInstance());
for (int i = 0; i < 15; i++) {
// Totally 3 racks, each has 5 datanodes
DatanodeDetails node = TestUtils.createDatanodeDetails(
hostname + i, null);
dataList.add(node);
clusterMap.add(node);
}
Assert.assertEquals(dataList.size(), StringUtils.countMatches(
clusterMap.toString(), NetConstants.DEFAULT_RACK));
// choose nodes to host 3 replica
int nodeNum = 3;
SCMContainerPlacementRackAware newPolicy =
new SCMContainerPlacementRackAware(nodeManager, conf, clusterMap, true,
metrics);
List<DatanodeDetails> datanodeDetails =
newPolicy.chooseDatanodes(null, null, nodeNum, 15);
Assert.assertEquals(nodeNum, datanodeDetails.size());
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(1)));
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(2)));
Assert.assertTrue(cluster.isSameParent(datanodeDetails.get(1),
datanodeDetails.get(2)));
}
}