| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.hdds.scm.container.placement.algorithms; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.conf.StorageUnit; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; |
| import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; |
| import org.apache.hadoop.hdds.scm.HddsTestUtils; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.net.NetConstants; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopology; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; |
| import org.apache.hadoop.hdds.scm.net.Node; |
| import org.apache.hadoop.hdds.scm.net.NodeSchema; |
| import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; |
| import org.apache.hadoop.hdds.scm.node.DatanodeInfo; |
| import org.apache.hadoop.hdds.scm.node.NodeManager; |
| import org.apache.hadoop.hdds.scm.node.NodeStatus; |
| import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.Assertions; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.MethodSource; |
| import org.junit.jupiter.params.provider.ValueSource; |
| import org.mockito.Mockito; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.stream.IntStream; |
| |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN; |
| import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; |
| import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; |
| import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| import static org.junit.jupiter.api.Assumptions.assumeTrue; |
| import static org.mockito.Mockito.when; |
| |
| /** |
| * Test for the scm container rack aware placement. |
| */ |
| public class TestSCMContainerPlacementRackScatter { |
| private NetworkTopology cluster; |
| private OzoneConfiguration conf; |
| private NodeManager nodeManager; |
| private final List<DatanodeDetails> datanodes = new ArrayList<>(); |
| private final List<DatanodeInfo> dnInfos = new ArrayList<>(); |
| // policy with fallback capability |
| private SCMContainerPlacementRackScatter policy; |
| // node storage capacity |
| private static final long STORAGE_CAPACITY = 100L; |
| private SCMContainerPlacementMetrics metrics; |
| private static final int NODE_PER_RACK = 5; |
| |
| private static IntStream numDatanodes() { |
| return IntStream.concat(IntStream.rangeClosed(3, 15), |
| IntStream.of(20, 25, 30)); |
| } |
| |
| private void setup(int datanodeCount) { |
| //initialize network topology instance |
| conf = new OzoneConfiguration(); |
| // We are using small units here |
| conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, |
| 1, StorageUnit.BYTES); |
| NodeSchema[] schemas = new NodeSchema[] |
| {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}; |
| NodeSchemaManager.getInstance().init(schemas, true); |
| cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance()); |
| |
| // build datanodes, and network topology |
| String rack = "/rack"; |
| String hostname = "node"; |
| for (int i = 0; i < datanodeCount; i++) { |
| // Totally 6 racks, each has 5 datanodes |
| DatanodeDetails datanodeDetails = |
| MockDatanodeDetails.createDatanodeDetails( |
| hostname + i, rack + (i / NODE_PER_RACK)); |
| |
| datanodes.add(datanodeDetails); |
| cluster.add(datanodeDetails); |
| DatanodeInfo datanodeInfo = new DatanodeInfo( |
| datanodeDetails, NodeStatus.inServiceHealthy(), |
| UpgradeUtils.defaultLayoutVersionProto()); |
| |
| StorageReportProto storage1 = HddsTestUtils.createStorageReport( |
| datanodeInfo.getUuid(), "/data1-" + datanodeInfo.getUuidString(), |
| STORAGE_CAPACITY, 0, 100L, null); |
| MetadataStorageReportProto metaStorage1 = |
| HddsTestUtils.createMetadataStorageReport( |
| "/metadata1-" + datanodeInfo.getUuidString(), |
| STORAGE_CAPACITY, 0, 100L, null); |
| datanodeInfo.updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage1))); |
| datanodeInfo.updateMetaDataStorageReports( |
| new ArrayList<>(Arrays.asList(metaStorage1))); |
| dnInfos.add(datanodeInfo); |
| } |
| |
| if (datanodeCount > 4) { |
| StorageReportProto storage2 = HddsTestUtils.createStorageReport( |
| dnInfos.get(2).getUuid(), |
| "/data1-" + datanodes.get(2).getUuidString(), |
| STORAGE_CAPACITY, 90L, 10L, null); |
| dnInfos.get(2).updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage2))); |
| StorageReportProto storage3 = HddsTestUtils.createStorageReport( |
| dnInfos.get(3).getUuid(), |
| "/data1-" + dnInfos.get(3).getUuidString(), |
| STORAGE_CAPACITY, 80L, 20L, null); |
| dnInfos.get(3).updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage3))); |
| StorageReportProto storage4 = HddsTestUtils.createStorageReport( |
| dnInfos.get(4).getUuid(), |
| "/data1-" + dnInfos.get(4).getUuidString(), |
| STORAGE_CAPACITY, 70L, 30L, null); |
| dnInfos.get(4).updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage4))); |
| } else if (datanodeCount > 3) { |
| StorageReportProto storage2 = HddsTestUtils.createStorageReport( |
| dnInfos.get(2).getUuid(), |
| "/data1-" + dnInfos.get(2).getUuidString(), |
| STORAGE_CAPACITY, 90L, 10L, null); |
| dnInfos.get(2).updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage2))); |
| StorageReportProto storage3 = HddsTestUtils.createStorageReport( |
| dnInfos.get(3).getUuid(), |
| "/data1-" + dnInfos.get(3).getUuidString(), |
| STORAGE_CAPACITY, 80L, 20L, null); |
| dnInfos.get(3).updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage3))); |
| } else if (datanodeCount > 2) { |
| StorageReportProto storage2 = HddsTestUtils.createStorageReport( |
| dnInfos.get(2).getUuid(), |
| "/data1-" + dnInfos.get(2).getUuidString(), |
| STORAGE_CAPACITY, 84L, 16L, null); |
| dnInfos.get(2).updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage2))); |
| } |
| |
| // create mock node manager |
| nodeManager = Mockito.mock(NodeManager.class); |
| when(nodeManager.getNodes(NodeStatus.inServiceHealthy())) |
| .thenReturn(new ArrayList<>(datanodes)); |
| for (DatanodeInfo dn: dnInfos) { |
| when(nodeManager.getNodeByUuid(dn.getUuidString())) |
| .thenReturn(dn); |
| } |
| when(nodeManager.getClusterNetworkTopologyMap()) |
| .thenReturn(cluster); |
| |
| // create placement policy instances |
| policy = new SCMContainerPlacementRackScatter( |
| nodeManager, conf, cluster, true, metrics); |
| } |
| |
| @BeforeEach |
| public void init() { |
| metrics = SCMContainerPlacementMetrics.create(); |
| } |
| |
| @AfterEach |
| public void teardown() { |
| metrics.unRegister(); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("numDatanodes") |
| public void chooseNodeWithNoExcludedNodes(int datanodeCount) |
| throws SCMException { |
| setup(datanodeCount); |
| int rackLevel = cluster.getMaxLevel() - 1; |
| int rackNum = cluster.getNumOfNodes(rackLevel); |
| |
| // test choose new datanodes for new pipeline cases |
| // 1 replica |
| int nodeNum = 1; |
| List<DatanodeDetails> datanodeDetails = |
| policy.chooseDatanodes(null, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| |
| // 2 replicas |
| nodeNum = 2; |
| datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertTrue(!cluster.isSameParent(datanodeDetails.get(0), |
| datanodeDetails.get(1)) || (datanodeCount <= NODE_PER_RACK)); |
| |
| // 3 replicas |
| nodeNum = 3; |
| if (datanodeCount > nodeNum) { |
| datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(getRackSize(datanodeDetails), |
| Math.min(nodeNum, rackNum)); |
| } |
| |
| // 5 replicas |
| nodeNum = 5; |
| if (datanodeCount > nodeNum) { |
| assumeTrue(datanodeCount >= NODE_PER_RACK); |
| datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(getRackSize(datanodeDetails), |
| Math.min(nodeNum, rackNum)); |
| } |
| |
| // 10 replicas |
| nodeNum = 10; |
| if (datanodeCount > nodeNum) { |
| assumeTrue(datanodeCount > 2 * NODE_PER_RACK); |
| datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(getRackSize(datanodeDetails), |
| Math.min(nodeNum, rackNum)); |
| } |
| } |
| |
| @ParameterizedTest |
| @MethodSource("numDatanodes") |
| public void chooseNodeWithExcludedNodes(int datanodeCount) |
| throws SCMException { |
| // test choose new datanodes for under replicated pipeline |
| // 3 replicas, two existing datanodes on same rack |
| assumeTrue(datanodeCount > NODE_PER_RACK); |
| setup(datanodeCount); |
| int rackLevel = cluster.getMaxLevel() - 1; |
| int rackNum = cluster.getNumOfNodes(rackLevel); |
| int totalNum; |
| int nodeNum = 1; |
| List<DatanodeDetails> excludedNodes = new ArrayList<>(); |
| |
| excludedNodes.add(datanodes.get(0)); |
| excludedNodes.add(datanodes.get(1)); |
| List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0), |
| excludedNodes.get(0))); |
| Assertions.assertFalse(cluster.isSameParent(datanodeDetails.get(0), |
| excludedNodes.get(1))); |
| |
| // 3 replicas, one existing datanode |
| nodeNum = 2; |
| totalNum = 3; |
| excludedNodes.clear(); |
| excludedNodes.add(datanodes.get(0)); |
| datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes), |
| Math.min(totalNum, rackNum)); |
| |
| // 3 replicas, two existing datanodes on different rack |
| nodeNum = 1; |
| totalNum = 3; |
| excludedNodes.clear(); |
| excludedNodes.add(datanodes.get(0)); |
| excludedNodes.add(datanodes.get(5)); |
| datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes), |
| Math.min(totalNum, rackNum)); |
| |
| // 5 replicas, one existing datanode |
| nodeNum = 4; |
| totalNum = 5; |
| excludedNodes.clear(); |
| excludedNodes.add(datanodes.get(0)); |
| datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes), |
| Math.min(totalNum, rackNum)); |
| |
| // 5 replicas, two existing datanodes on different rack |
| nodeNum = 3; |
| totalNum = 5; |
| excludedNodes.clear(); |
| excludedNodes.add(datanodes.get(0)); |
| excludedNodes.add(datanodes.get(5)); |
| if (datanodeCount == 6) { |
| /* |
| * when datanodeCount is 6, the clusterMap will be |
| * /rack0/node0 |
| * /rack0/node1 |
| * /rack0/node2 |
| * /rack0/node3 |
| * /rack0/node4 |
| * /rack1/node5 |
| * if we select node0 and node5 as the excluded datanode, |
| * only datanode in rack0 will be chosen when calling |
| * `policy.chooseDatanodes` and the placement will not be |
| * met since there are two racks exist, but only one |
| * of them is chosen |
| * */ |
| SCMException e = assertThrows(SCMException.class, |
| () -> policy.chooseDatanodes(excludedNodes, null, 3, 0, 15)); |
| String message = e.getMessage(); |
| assumeTrue(message.contains("ContainerPlacementPolicy not met")); |
| } else { |
| datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(getRackSize(datanodeDetails, excludedNodes), |
| Math.min(totalNum, rackNum)); |
| } |
| } |
| |
| @ParameterizedTest |
| @MethodSource("numDatanodes") |
| public void chooseNodeWithFavoredNodes(int datanodeCount) |
| throws SCMException { |
| setup(datanodeCount); |
| int nodeNum = 1; |
| List<DatanodeDetails> excludedNodes = new ArrayList<>(); |
| List<DatanodeDetails> favoredNodes = new ArrayList<>(); |
| |
| // no excludedNodes, only favoredNodes |
| favoredNodes.add(datanodes.get(0)); |
| List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, favoredNodes, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(datanodeDetails.get(0).getNetworkFullPath(), |
| favoredNodes.get(0).getNetworkFullPath()); |
| |
| // no overlap between excludedNodes and favoredNodes, favoredNodes can be |
| // chosen. |
| excludedNodes.clear(); |
| favoredNodes.clear(); |
| excludedNodes.add(datanodes.get(0)); |
| favoredNodes.add(datanodes.get(1)); |
| datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, favoredNodes, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(datanodeDetails.get(0).getNetworkFullPath(), |
| favoredNodes.get(0).getNetworkFullPath()); |
| |
| // there is overlap between excludedNodes and favoredNodes, favoredNodes |
| // should not be chosen. |
| excludedNodes.clear(); |
| favoredNodes.clear(); |
| excludedNodes.add(datanodes.get(0)); |
| favoredNodes.add(datanodes.get(0)); |
| datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, favoredNodes, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertNotEquals(datanodeDetails.get(0).getNetworkFullPath(), |
| favoredNodes.get(0).getNetworkFullPath()); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("numDatanodes") |
| public void testNoInfiniteLoop(int datanodeCount) { |
| setup(datanodeCount); |
| int nodeNum = 1; |
| |
| try { |
| // request storage space larger than node capability |
| policy.chooseDatanodes(null, null, nodeNum, STORAGE_CAPACITY + 0, 15); |
| fail("Storage requested exceeds capacity, this call should fail"); |
| } catch (Exception e) { |
| assertEquals("SCMException", e.getClass().getSimpleName()); |
| } |
| |
| // get metrics |
| long totalRequest = metrics.getDatanodeRequestCount(); |
| long successCount = metrics.getDatanodeChooseSuccessCount(); |
| long tryCount = metrics.getDatanodeChooseAttemptCount(); |
| long compromiseCount = metrics.getDatanodeChooseFallbackCount(); |
| |
| Assertions.assertEquals(totalRequest, nodeNum); |
| Assertions.assertEquals(successCount, 0); |
| Assertions.assertTrue(tryCount >= nodeNum, "Not enough try"); |
| Assertions.assertEquals(compromiseCount, 0); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("numDatanodes") |
| public void testDatanodeWithDefaultNetworkLocation(int datanodeCount) |
| throws SCMException { |
| setup(datanodeCount); |
| String hostname = "node"; |
| List<DatanodeInfo> dnInfoList = new ArrayList<>(); |
| List<DatanodeDetails> dataList = new ArrayList<>(); |
| NetworkTopology clusterMap = |
| new NetworkTopologyImpl(NodeSchemaManager.getInstance()); |
| for (int i = 0; i < 30; i++) { |
| // Totally 6 racks, each has 5 datanodes |
| DatanodeDetails dn = MockDatanodeDetails.createDatanodeDetails( |
| hostname + i, null); |
| DatanodeInfo dnInfo = new DatanodeInfo( |
| dn, NodeStatus.inServiceHealthy(), |
| UpgradeUtils.defaultLayoutVersionProto()); |
| |
| StorageReportProto storage1 = HddsTestUtils.createStorageReport( |
| dnInfo.getUuid(), "/data1-" + dnInfo.getUuidString(), |
| STORAGE_CAPACITY, 0, 100L, null); |
| MetadataStorageReportProto metaStorage1 = |
| HddsTestUtils.createMetadataStorageReport( |
| "/metadata1-" + dnInfo.getUuidString(), |
| STORAGE_CAPACITY, 0, 100L, null); |
| dnInfo.updateStorageReports( |
| new ArrayList<>(Arrays.asList(storage1))); |
| dnInfo.updateMetaDataStorageReports( |
| new ArrayList<>(Arrays.asList(metaStorage1))); |
| |
| dataList.add(dn); |
| clusterMap.add(dn); |
| dnInfoList.add(dnInfo); |
| } |
| Assertions.assertEquals(dataList.size(), StringUtils.countMatches( |
| clusterMap.toString(), NetConstants.DEFAULT_RACK)); |
| for (DatanodeInfo dn: dnInfoList) { |
| when(nodeManager.getNodeByUuid(dn.getUuidString())) |
| .thenReturn(dn); |
| } |
| |
| // choose nodes to host 5 replica |
| int nodeNum = 5; |
| SCMContainerPlacementRackScatter newPolicy = |
| new SCMContainerPlacementRackScatter(nodeManager, conf, clusterMap, |
| true, metrics); |
| List<DatanodeDetails> datanodeDetails = |
| newPolicy.chooseDatanodes(null, null, nodeNum, 0, 15); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| Assertions.assertEquals(1, getRackSize(datanodeDetails)); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(ints = {15, 20, 25, 30}) |
| public void testValidateContainerPlacement(int datanodeCount) { |
| // Only run this test for the full set of DNs. 5 DNs per rack on 6 racks. |
| assumeTrue(datanodeCount >= 15); |
| setup(datanodeCount); |
| List<DatanodeDetails> dns = new ArrayList<>(); |
| // First 5 node are on the same rack |
| dns.add(datanodes.get(0)); |
| dns.add(datanodes.get(1)); |
| dns.add(datanodes.get(2)); |
| ContainerPlacementStatus stat = policy.validateContainerPlacement(dns, 3); |
| assertFalse(stat.isPolicySatisfied()); |
| assertEquals(2, stat.misReplicationCount()); |
| |
| // Pick a new list which spans 2 racks |
| dns = new ArrayList<>(); |
| dns.add(datanodes.get(0)); |
| dns.add(datanodes.get(1)); |
| dns.add(datanodes.get(5)); // This is on second rack |
| stat = policy.validateContainerPlacement(dns, 3); |
| assertFalse(stat.isPolicySatisfied()); |
| assertEquals(1, stat.misReplicationCount()); |
| |
| // Pick single DN, expecting 3 replica. Policy is not met. |
| dns = new ArrayList<>(); |
| dns.add(datanodes.get(0)); |
| stat = policy.validateContainerPlacement(dns, 3); |
| assertFalse(stat.isPolicySatisfied()); |
| assertEquals(2, stat.misReplicationCount()); |
| |
| // Pick single DN, expecting 1 replica. Policy is met. |
| dns = new ArrayList<>(); |
| dns.add(datanodes.get(0)); |
| stat = policy.validateContainerPlacement(dns, 1); |
| assertTrue(stat.isPolicySatisfied()); |
| assertEquals(0, stat.misReplicationCount()); |
| } |
| |
| @Test |
| public void testValidateContainerPlacementSingleRackCluster() { |
| final int datanodeCount = 5; |
| setup(datanodeCount); |
| |
| // All nodes are on the same rack in this test, and the cluster only has |
| // one rack. |
| List<DatanodeDetails> dns = new ArrayList<>(); |
| dns.add(datanodes.get(0)); |
| dns.add(datanodes.get(1)); |
| dns.add(datanodes.get(2)); |
| ContainerPlacementStatus stat = policy.validateContainerPlacement(dns, 3); |
| assertTrue(stat.isPolicySatisfied()); |
| assertEquals(0, stat.misReplicationCount()); |
| |
| // Single DN - policy met as cluster only has one rack. |
| dns = new ArrayList<>(); |
| dns.add(datanodes.get(0)); |
| stat = policy.validateContainerPlacement(dns, 3); |
| assertTrue(stat.isPolicySatisfied()); |
| assertEquals(0, stat.misReplicationCount()); |
| |
| // Single DN - only 1 replica expected |
| dns = new ArrayList<>(); |
| dns.add(datanodes.get(0)); |
| stat = policy.validateContainerPlacement(dns, 1); |
| assertTrue(stat.isPolicySatisfied()); |
| assertEquals(0, stat.misReplicationCount()); |
| } |
| |
| @Test |
| public void testExcludedNodesOverlapsOutOfServiceNodes() throws SCMException { |
| final int datanodeCount = 6; |
| setup(datanodeCount); |
| |
| // DN 5 is out of service |
| dnInfos.get(5).setNodeStatus(new NodeStatus(DECOMMISSIONED, HEALTHY)); |
| |
| // SCM should have detected that DN 5 is dead |
| cluster.remove(datanodes.get(5)); |
| |
| // Here we still have 5 DNs, so pick 5 should be possible |
| int nodeNum = 5; |
| List<DatanodeDetails> excludedNodes = new ArrayList<>(); |
| // The DN 5 is out of service, |
| // but the client already has it in the excludeList. |
| // So there is an overlap. |
| excludedNodes.add(datanodes.get(5)); |
| |
| List<DatanodeDetails> datanodeDetails = policy.chooseDatanodes( |
| excludedNodes, null, nodeNum, 0, 5); |
| Assertions.assertEquals(nodeNum, datanodeDetails.size()); |
| } |
| |
| private int getRackSize(List<DatanodeDetails>... datanodeDetails) { |
| Set<Node> racks = new HashSet<>(); |
| for (List<DatanodeDetails> list : datanodeDetails) { |
| for (DatanodeDetails dn : list) { |
| racks.add(cluster.getAncestor(dn, 1)); |
| } |
| } |
| return racks.size(); |
| } |
| } |