blob: 20163cc5fa53197b76f5a9ebfed8cc8d8defa491 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestReplicationPolicy extends BaseReplicationPolicyTest {
private static final String filename = "/dummyfile.txt";
// The interval for marking a datanode as stale,
private static final long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
private static AtomicLong mockINodeId = new AtomicLong(0);
@Rule
public ExpectedException exception = ExpectedException.none();
public TestReplicationPolicy(String blockPlacementPolicyClassName) {
this.blockPlacementPolicy = blockPlacementPolicyClassName;
}
@Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{ BlockPlacementPolicyDefault.class.getName() },
{ BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
}
private void updateHeartbeatForExtraStorage(long capacity,
long dfsUsed, long remaining, long blockPoolUsed) {
DatanodeDescriptor dn = dataNodes[5];
dn.getStorageInfos()[1].setUtilizationForTesting(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
0L, 0L, 0, 0, null);
}
private void resetHeartbeatForStorages() {
for (int i=0; i < dataNodes.length; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L,
0, 0);
}
// No available space in the extra storage of dn0
updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
}
@Override
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
final String[] racks = {
"/d1/r1",
"/d1/r1",
"/d1/r2",
"/d1/r2",
"/d2/r3",
"/d2/r3"};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
// create an extra storage for dn5.
DatanodeStorage extraStorage = new DatanodeStorage(
storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL,
StorageType.DEFAULT);
BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(),
extraStorage);
return DFSTestUtil.toDatanodeDescriptor(storages);
}
/**
* Test whether the remaining space per storage is individually
* considered.
*/
@Test
public void testChooseNodeWithMultipleStorages1() throws Exception {
updateHeartbeatWithUsage(dataNodes[5],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
0L, 0L, 0, 0);
updateHeartbeatForExtraStorage(
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L);
DatanodeStorageInfo[] targets;
targets = chooseTarget (1, dataNodes[5],
new ArrayList<DatanodeStorageInfo>(), null);
assertEquals(1, targets.length);
assertEquals(storages[4], targets[0]);
resetHeartbeatForStorages();
}
/**
* Test whether all storages on the datanode are considered while
* choosing target to place block.
*/
@Test
public void testChooseNodeWithMultipleStorages2() throws Exception {
updateHeartbeatWithUsage(dataNodes[5],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
0L, 0L, 0, 0);
updateHeartbeatForExtraStorage(
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
DatanodeStorageInfo[] targets;
targets = chooseTarget (1, dataNodes[5],
new ArrayList<DatanodeStorageInfo>(), null);
assertEquals(1, targets.length);
assertEquals(dataNodes[5], targets[0].getDatanodeDescriptor());
resetHeartbeatForStorages();
}
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
* different rack and third should be placed on different node
* of rack chosen for 2nd node.
* The only excpetion is when the <i>numOfReplicas</i> is 2,
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
* @throws Exception
*/
@Test
public void testChooseTarget1() throws Exception {
updateHeartbeatWithUsage(dataNodes[0],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L, 4, 0); // overloaded
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(storages[0], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[0], targets[2]));
resetHeartbeatForStorages();
}
/**
* In this testcase, client is dataNodes[0], but the dataNodes[1] is
* not allowed to be chosen. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on a different
* rack, the 3rd should be on same rack as the 2nd replica, and the rest
* should be placed on a third rack.
* @throws Exception
*/
@Test
public void testChooseTarget2() throws Exception {
Set<Node> excludedNodes;
DatanodeStorageInfo[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(0, chosenNodes, excludedNodes);
assertEquals(targets.length, 0);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
assertEquals(storages[0], targets[0]);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(targets.length, 2);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
for(int i=1; i<4; i++) {
assertFalse(isOnSameRack(targets[0], targets[i]));
}
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[1], targets[3]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
null);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
for (; i < targets.length && !storages[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
/**
* In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
* to be chosen. So the 1st replica should be placed on dataNodes[1],
* the 2nd replica should be placed on a different rack,
* the 3rd replica should be placed on the same rack as the 2nd replica,
* and the rest should be placed on the third rack.
* @throws Exception
*/
@Test
public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
updateHeartbeatWithUsage(dataNodes[0],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
0L, 0L, 0, 0); // no space
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(storages[1], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(storages[1], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(storages[1], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(storages[1], targets[0]);
for(int i=1; i<4; i++) {
assertFalse(isOnSameRack(targets[0], targets[i]));
}
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[1], targets[3]));
resetHeartbeatForStorages();
}
/**
* In this testcase, client is dataNodes[0], but none of the nodes on rack 1
* is qualified to be chosen. So the 1st replica should be placed on either
* rack 2 or rack 3.
* the 2nd replica should be placed on a different rack,
* the 3rd replica should be placed on the same rack as the 1st replica,
* @throws Exception
*/
@Test
public void testChoooseTarget4() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
assertFalse(isOnSameRack(targets[i], dataNodes[0]));
}
assertTrue(isOnSameRack(targets[0], targets[1]) ||
isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[2]));
resetHeartbeatForStorages();
}
/**
* In this testcase, client is is a node outside of file system.
* So the 1st replica can be placed on any node.
* the 2nd replica should be placed on a different rack,
* the 3rd replica should be placed on the same rack as the 2nd replica,
* @throws Exception
*/
@Test
public void testChooseTarget5() throws Exception {
DatanodeDescriptor writerDesc =
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, writerDesc);
assertEquals(targets.length, 0);
targets = chooseTarget(1, writerDesc);
assertEquals(targets.length, 1);
targets = chooseTarget(2, writerDesc);
assertEquals(targets.length, 2);
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, writerDesc);
assertEquals(targets.length, 3);
assertTrue(isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[1]));
}
/**
* In this testcase, there are enough total number of nodes, but only
* one rack is actually available.
* @throws Exception
*/
@Test
public void testChooseTarget6() throws Exception {
DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo(
"DS-xxxx", "7.7.7.7", "/d2/r3", "host7");
DatanodeDescriptor newDn = storage.getDatanodeDescriptor();
Set<Node> excludedNodes;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[0]);
excludedNodes.add(dataNodes[1]);
excludedNodes.add(dataNodes[2]);
excludedNodes.add(dataNodes[3]);
DatanodeStorageInfo[] targets;
// Only two nodes available in a rack. Try picking two nodes. Only one
// should return.
targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(1, targets.length);
// Make three nodes available in a rack.
final BlockManager bm = namenode.getNamesystem().getBlockManager();
bm.getDatanodeManager().getNetworkTopology().add(newDn);
bm.getDatanodeManager().getHeartbeatManager().addDatanode(newDn);
updateHeartbeatWithUsage(newDn,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
// Try picking three nodes. Only two should return.
excludedNodes.clear();
excludedNodes.add(dataNodes[0]);
excludedNodes.add(dataNodes[1]);
excludedNodes.add(dataNodes[2]);
excludedNodes.add(dataNodes[3]);
chosenNodes.clear();
try {
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(2, targets.length);
} finally {
bm.getDatanodeManager().getNetworkTopology().remove(newDn);
}
resetHeartbeatForStorages();
}
/**
* In this testcase, it tries to choose more targets than available nodes and
* check the result, with stale node avoidance on the write path enabled.
* @throws Exception
*/
@Test
public void testChooseTargetWithMoreThanAvailableNodesWithStaleness()
throws Exception {
try {
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setNumStaleNodes(dataNodes.length);
testChooseTargetWithMoreThanAvailableNodes();
} finally {
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setNumStaleNodes(0);
}
}
/**
* In this testcase, it tries to choose more targets than available nodes and
* check the result.
* @throws Exception
*/
@Test
public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
// try to choose NUM_OF_DATANODES which is more than actually available
// nodes.
DatanodeStorageInfo[] targets = chooseTarget(dataNodes.length);
assertEquals(targets.length, dataNodes.length - 2);
final List<LoggingEvent> log = appender.getLog();
assertNotNull(log);
assertFalse(log.size() == 0);
final LoggingEvent lastLogEntry = log.get(log.size() - 1);
assertTrue(Level.WARN.isGreaterOrEqual(lastLogEntry.getLevel()));
// Suppose to place replicas on each node but two data nodes are not
// available for placing replica, so here we expect a short of 2
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
resetHeartbeatForStorages();
}
private boolean containsWithinRange(DatanodeStorageInfo target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
if (nodes[i].equals(target.getDatanodeDescriptor())) {
return true;
}
}
return false;
}
private boolean containsWithinRange(DatanodeDescriptor target,
DatanodeStorageInfo[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
if (nodes[i].getDatanodeDescriptor().equals(target)) {
return true;
}
}
return false;
}
@Test
public void testChooseTargetWithStaleNodes() throws Exception {
// Set dataNodes[0] as stale
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], -(staleInterval + 1));
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
assertTrue(namenode.getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
DatanodeStorageInfo[] targets;
// We set the datanode[0] as stale, thus should choose datanode[1] since
// datanode[1] is on the same rack with datanode[0] (writer)
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(storages[1], targets[0]);
Set<Node> excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[1]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
// reset
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], 0);
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
}
/**
* In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale,
* and when the number of replicas is less or equal to 3, all the healthy
* datanodes should be returned by the chooseTarget method. When the number
* of replicas is 4, a stale node should be included.
*
* @throws Exception
*/
@Test
public void testChooseTargetWithHalfStaleNodes() throws Exception {
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
for (int i = 0; i < 3; i++) {
DFSTestUtil
.resetLastUpdatesWithOffset(dataNodes[i], -(staleInterval + 1));
}
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
DatanodeStorageInfo[] targets = chooseTarget(0);
assertEquals(targets.length, 0);
// Since we have 6 datanodes total, stale nodes should
// not be returned until we ask for more than 3 targets
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
for (int i = 0; i < dataNodes.length; i++) {
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
}
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
}
@Test
public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
String[] hosts = new String[]{"host1", "host2", "host3",
"host4", "host5", "host6"};
String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2",
"/d1/r2", "/d2/r3", "/d2/r3"};
MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf).racks(racks)
.hosts(hosts).numDataNodes(hosts.length).build();
miniCluster.waitActive();
try {
// Step 1. Make two datanodes as stale, check whether the
// avoidStaleDataNodesForWrite calculation is correct.
// First stop the heartbeat of host1 and host2
for (int i = 0; i < 2; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
DatanodeDescriptor dnDes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
}
// Instead of waiting, explicitly call heartbeatCheck to
// let heartbeat manager to detect stale nodes
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
int numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 2);
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getDatanodeManager()
.getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
BlockPlacementPolicy replicator = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
null);
assertEquals(targets.length, 3);
assertFalse(isOnSameRack(targets[0], staleNodeInfo));
// Step 2. Set more than half of the datanodes as stale
for (int i = 0; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, -(staleInterval + 1));
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 4);
// According to our strategy, stale datanodes will be included for writing
// to avoid hotspots
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
assertEquals(targets.length, 3);
assertTrue(isOnSameRack(targets[0], staleNodeInfo));
// Step 3. Set 2 stale datanodes back to healthy nodes,
// still have 2 stale nodes
for (int i = 2; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, 0);
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 2);
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
targets = chooseTarget(3, staleNodeInfo);
assertEquals(targets.length, 3);
assertFalse(isOnSameRack(targets[0], staleNodeInfo));
} finally {
miniCluster.shutdown();
}
}
/**
* This testcase tests re-replication, when dataNodes[0] is already chosen.
* So the 1st replica can be placed on random rack.
* the 2nd replica should be placed on different node by same rack as
* the 1st replica. The 3rd replica can be placed randomly.
* @throws Exception
*/
@Test
public void testRereplicate1() throws Exception {
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
* This testcase tests re-replication,
* when dataNodes[0] and dataNodes[1] are already chosen.
* So the 1st replica should be placed on a different rack than rack 1.
* the rest replicas can be placed randomly,
* @throws Exception
*/
@Test
public void testRereplicate2() throws Exception {
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[1]);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[1], dataNodes[0]));
}
/**
* This testcase tests re-replication,
* when dataNodes[0] and dataNodes[2] are already chosen.
* So the 1st replica should be placed on the rack that the writer resides.
* the rest replicas can be placed randomly,
* @throws Exception
*/
@Test
public void testRereplicate3() throws Exception {
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[2]);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], dataNodes[2]));
targets = chooseTarget(1, dataNodes[2], chosenNodes);
assertEquals(targets.length, 1);
assertTrue(isOnSameRack(targets[0], dataNodes[2]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, dataNodes[2], chosenNodes);
assertEquals(targets.length, 2);
assertTrue(isOnSameRack(targets[0], dataNodes[2]));
}
private BlockInfo genBlockInfo(long id) {
return genBlockInfo(id, false);
}
private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) {
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
if (!isBlockCorrupted) {
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
}
return bInfo;
}
/**
* Test for the high priority blocks are processed before the low priority
* blocks.
*/
@Test(timeout = 60000)
public void testReplicationWithPriority() throws Exception {
int DFS_NAMENODE_REPLICATION_INTERVAL = 1000;
int HIGH_PRIORITY = 0;
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).build();
try {
cluster.waitActive();
final LowRedundancyBlocks neededReconstruction = cluster.getNameNode()
.getNamesystem().getBlockManager().neededReconstruction;
for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
nextLong(), true), 2, 0, 0, 3);
}
// Lets wait for the replication interval, to start process normal
// priority blocks
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Adding the block directly to high priority list
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
nextLong(), true), 1, 0, 0, 3);
// Lets wait for the replication interval
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Check replication completed successfully. Need not wait till it process
// all the 100 normal blocks.
assertFalse("Not able to clear the element from high priority list",
neededReconstruction.iterator(HIGH_PRIORITY).hasNext());
} finally {
cluster.shutdown();
}
}
/**
* Test for the ChooseLowRedundancyBlocks are processed based on priority
*/
@Test
public void testChooseLowRedundancyBlocks() throws Exception {
LowRedundancyBlocks lowRedundancyBlocks = new LowRedundancyBlocks();
for (int i = 0; i < 5; i++) {
// Adding QUEUE_HIGHEST_PRIORITY block
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 0, 3);
// Adding QUEUE_VERY_LOW_REDUNDANCY block
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 0, 7);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 6, 0, 0, 6);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 5, 0, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 0, 0, 0, 3);
}
// Choose 6 blocks from lowRedundancyBlocks. Then it should pick 5 blocks
// from QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_LOW_REDUNDANCY.
List<List<BlockInfo>> chosenBlocks =
lowRedundancyBlocks.chooseLowRedundancyBlocks(6);
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
// Choose 10 blocks from lowRedundancyBlocks. Then it should pick 4 blocks
// from QUEUE_VERY_LOW_REDUNDANCY, 5 blocks from QUEUE_LOW_REDUNDANCY and 1
// block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(10);
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
// Adding QUEUE_HIGHEST_PRIORITY
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 0, 1, 0, 3);
// Choose 10 blocks from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_HIGHEST_PRIORITY, 4 blocks from
// QUEUE_REPLICAS_BADLY_DISTRIBUTED
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(10);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4);
// Since it is reached to end of all lists,
// should start picking the blocks from start.
// Choose 7 blocks from lowRedundancyBlocks. Then it should pick 6 blocks
// from QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(7);
assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
}
/** asserts the chosen blocks with expected priority blocks */
private void assertTheChosenBlocks(
List<List<BlockInfo>> chosenBlocks, int... expectedSizes) {
int i = 0;
for(; i < chosenBlocks.size(); i++) {
assertEquals("Not returned the expected number for i=" + i,
expectedSizes[i], chosenBlocks.get(i).size());
}
for(; i < expectedSizes.length; i++) {
assertEquals("Expected size is non-zero for i=" + i, 0, expectedSizes[i]);
}
}
/**
* Test for the chooseReplicaToDelete are processed based on
* block locality and free space
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
List<DatanodeStorageInfo> replicaList = new ArrayList<>();
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
storages[0].setRemainingForTests(4*1024*1024);
dataNodes[0].setRemaining(calculateRemaining(dataNodes[0]));
replicaList.add(storages[0]);
storages[1].setRemainingForTests(3*1024*1024);
dataNodes[1].setRemaining(calculateRemaining(dataNodes[1]));
replicaList.add(storages[1]);
storages[2].setRemainingForTests(2*1024*1024);
dataNodes[2].setRemaining(calculateRemaining(dataNodes[2]));
replicaList.add(storages[2]);
//Even if this node has the most space, because the storage[5] has
//the lowest it should be chosen in case of block delete.
storages[4].setRemainingForTests(100 * 1024 * 1024);
storages[5].setRemainingForTests(512 * 1024);
dataNodes[5].setRemaining(calculateRemaining(dataNodes[5]));
replicaList.add(storages[5]);
// Refresh the last update time for all the datanodes
for (int i = 0; i < dataNodes.length; i++) {
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
}
List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>();
replicator.splitNodesWithRack(replicaList, replicaList, rackMap, first,
second);
// storages[0] and storages[1] are in first set as their rack has two
// replica nodes, while storages[2] and dataNodes[5] are in second set.
assertEquals(2, first.size());
assertEquals(2, second.size());
List<StorageType> excessTypes = new ArrayList<>();
{
// test returning null
excessTypes.add(StorageType.SSD);
assertNull(((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete(first, second, excessTypes, rackMap));
}
excessTypes.add(StorageType.DEFAULT);
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete(first, second, excessTypes, rackMap);
// Within all storages, storages[5] with least free space
assertEquals(chosen, storages[5]);
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
assertEquals(2, first.size());
assertEquals(1, second.size());
// Within first set, storages[1] with less free space
excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
first, second, excessTypes, rackMap);
assertEquals(chosen, storages[1]);
}
/**
* Test for the chooseReplicaToDelete are processed based on
* EC and STRIPED Policy.
*/
@Test
public void testStripedChooseReplicaToDelete() throws Exception {
List<DatanodeStorageInfo> replicaList = new ArrayList<>();
List<DatanodeStorageInfo> candidate = new ArrayList<>();
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
replicaList.add(storages[0]);
replicaList.add(storages[1]);
replicaList.add(storages[2]);
replicaList.add(storages[4]);
candidate.add(storages[0]);
candidate.add(storages[2]);
candidate.add(storages[4]);
// Refresh the last update time for all the datanodes
for (int i = 0; i < dataNodes.length; i++) {
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
}
List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>();
BlockPlacementPolicy policy = getStriptedPolicy();
policy.splitNodesWithRack(replicaList, candidate, rackMap, first,
second);
// storages[0] is in first set as its rack has two replica nodes,
// while storages[2] and dataNodes[4] are in second set.
assertEquals(1, first.size());
assertEquals(2, second.size());
List<StorageType> excessTypes = new ArrayList<>();
excessTypes.add(StorageType.DEFAULT);
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) policy)
.chooseReplicaToDelete(first, second, excessTypes, rackMap);
// Within all storages, storages[0] is in the rack that has two replica blocks
assertEquals(chosen, storages[0]);
policy.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
assertEquals(0, first.size());
assertEquals(2, second.size());
// Within second set, storages[2] should be next to be deleted in order.
excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) policy).chooseReplicaToDelete(
first, second, excessTypes, rackMap);
assertEquals(chosen, storages[2]);
policy.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
assertEquals(0, first.size());
assertEquals(1, second.size());
chosen = ((BlockPlacementPolicyDefault) policy).chooseReplicaToDelete(
first, second, excessTypes, rackMap);
assertEquals(chosen, null);
}
private long calculateRemaining(DatanodeDescriptor dataNode) {
long sum = 0;
for (DatanodeStorageInfo storageInfo: dataNode.getStorageInfos()){
sum += storageInfo.getRemaining();
}
return sum;
}
@Test
public void testChooseReplicasToDelete() throws Exception {
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
List<DatanodeStorageInfo> excessReplicas;
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
.createDefaultSuite();
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo(
"Storage-excess-SSD-ID", "localhost",
storages[0].getDatanodeDescriptor().getNetworkLocation(),
"foo.com", StorageType.SSD, null);
updateHeartbeatWithUsage(excessSSD.getDatanodeDescriptor(),
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0,
0);
// use delete hint case.
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0]));
// Excess type deletion
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
"foo.com", StorageType.ARCHIVE, null);
nonExcess.add(excessStorage);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.contains(excessStorage));
// The block was initially created on excessSSD(rack r1),
// storages[4](rack r3) and storages[5](rack r3) with
// ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3.
// Right after balancer moves the block from storages[5] to
// storages[3](rack r2), the application changes the storage policy from
// ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case,
// we should be able to delete excessSSD since the remaining
// storages ({storages[3]}, {storages[4], storages[5]})
// are on different racks (r2, r3).
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[3]);
nonExcess.add(storages[4]);
nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(),
storages[5].getDatanodeDescriptor());
assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD));
// Similar to above, but after policy change and before deletion,
// the replicas are located on excessSSD(rack r1), storages[1](rack r1),
// storages[2](rack r2) and storages[3](rack r2). Replication factor = 3.
// In this case, we should be able to delete excessSSD since the remaining
// storages ({storages[1]} , {storages[2], storages[3]})
// are on different racks (r1, r2).
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[1].getDatanodeDescriptor(),
storages[3].getDatanodeDescriptor());
assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD));
// Similar to above, but after policy change and before deletion,
// the replicas are located on excessSSD(rack r1), storages[2](rack r2)
// Replication factor = 1. We should be able to delete excessSSD.
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[2]);
excessTypes = storagePolicy.chooseExcess((short) 1,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 1,
excessTypes, storages[2].getDatanodeDescriptor(), null);
assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD));
// The block was initially created on excessSSD(rack r1),
// storages[4](rack r3) and storages[5](rack r3) with
// ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 2.
// In this case, no replica can be chosen as the excessive replica by
// chooseReplicasToDelete because if the SSD storage is deleted,
// the remaining storages[4] and storages[5] are the same rack (r3),
// violating block placement policy (i.e. the number of racks >= 2).
// TODO BlockPlacementPolicyDefault should be able to rebalance the replicas
// and then delete excessSSD.
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[4]);
nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 2,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 2,
excessTypes, null, null);
assertEquals(0, excessReplicas.size());
}
@Test
public void testUseDelHint() throws Exception {
List<StorageType> excessTypes = new ArrayList<>();
excessTypes.add(StorageType.ARCHIVE);
BlockPlacementPolicyDefault policyDefault =
(BlockPlacementPolicyDefault) replicator;
// no delHint
assertFalse(policyDefault.useDelHint(null, null, null, null, null));
// delHint storage type is not an excess type
assertFalse(policyDefault.useDelHint(storages[0], null, null, null,
excessTypes));
// check if removing delHint reduces the number of racks
List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
moreThanOne.add(storages[0]);
moreThanOne.add(storages[1]);
List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
exactlyOne.add(storages[3]);
exactlyOne.add(storages[5]);
excessTypes.add(StorageType.DEFAULT);
assertTrue(policyDefault.useDelHint(storages[0], null, moreThanOne,
exactlyOne, excessTypes));
// the added node adds a new rack
assertTrue(policyDefault.useDelHint(storages[3], storages[5], moreThanOne,
exactlyOne, excessTypes));
// removing delHint reduces the number of racks;
assertFalse(policyDefault.useDelHint(storages[3], storages[0], moreThanOne,
exactlyOne, excessTypes));
assertFalse(policyDefault.useDelHint(storages[3], null, moreThanOne,
exactlyOne, excessTypes));
}
@Test
public void testIsMovable() throws Exception {
List<DatanodeInfo> candidates = new ArrayList<>();
// after the move, the number of racks remains 2.
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[3]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
// after the move, the number of racks remains 3.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[4]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[1]));
// after the move, the number of racks changes from 2 to 3.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[4]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
// the move would have reduced the number of racks from 3 to 2.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[3]);
candidates.add(dataNodes[4]);
assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
}
/**
* This testcase tests whether the default value returned by
* DFSUtil.getInvalidateWorkPctPerIteration() is positive,
* and whether an IllegalArgumentException will be thrown
* when 0.0f is retrieved
*/
@Test
public void testGetInvalidateWorkPctPerIteration() {
Configuration conf = new Configuration();
float blocksInvalidateWorkPct = DFSUtil
.getInvalidateWorkPctPerIteration(conf);
assertTrue(blocksInvalidateWorkPct > 0);
conf.set(DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
"0.5f");
blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
assertEquals(blocksInvalidateWorkPct, 0.5f, blocksInvalidateWorkPct * 1e-7);
conf.set(DFSConfigKeys.
DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "1.0f");
blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
assertEquals(blocksInvalidateWorkPct, 1.0f, blocksInvalidateWorkPct * 1e-7);
conf.set(DFSConfigKeys.
DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "0.0f");
exception.expect(IllegalArgumentException.class);
blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
}
/**
* This testcase tests whether an IllegalArgumentException
* will be thrown when a negative value is retrieved by
* DFSUtil#getInvalidateWorkPctPerIteration
*/
@Test
public void testGetInvalidateWorkPctPerIteration_NegativeValue() {
Configuration conf = new Configuration();
float blocksInvalidateWorkPct = DFSUtil
.getInvalidateWorkPctPerIteration(conf);
assertTrue(blocksInvalidateWorkPct > 0);
conf.set(DFSConfigKeys.
DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "-0.5f");
exception.expect(IllegalArgumentException.class);
blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
}
/**
* This testcase tests whether an IllegalArgumentException
* will be thrown when a value greater than 1 is retrieved by
* DFSUtil#getInvalidateWorkPctPerIteration
*/
@Test
public void testGetInvalidateWorkPctPerIteration_GreaterThanOne() {
Configuration conf = new Configuration();
float blocksInvalidateWorkPct = DFSUtil
.getInvalidateWorkPctPerIteration(conf);
assertTrue(blocksInvalidateWorkPct > 0);
conf.set(DFSConfigKeys.
DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "1.5f");
exception.expect(IllegalArgumentException.class);
blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
}
/**
* This testcase tests whether the value returned by
* DFSUtil.getReplWorkMultiplier() is positive,
* and whether an IllegalArgumentException will be thrown
* when a non-positive value is retrieved
*/
@Test
public void testGetReplWorkMultiplier() {
Configuration conf = new Configuration();
int blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
assertTrue(blocksReplWorkMultiplier > 0);
conf.set(DFSConfigKeys.
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,"3");
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
assertEquals(blocksReplWorkMultiplier, 3);
conf.set(DFSConfigKeys.
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,"-1");
exception.expect(IllegalArgumentException.class);
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
}
@Test(timeout = 60000)
public void testUpdateDoesNotCauseSkippedReplication() {
LowRedundancyBlocks lowRedundancyBlocks = new LowRedundancyBlocks();
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block3 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_VERY_LOW_REDUNDANCY block
final int block1CurReplicas = 2;
final int block1ExpectedReplicas = 7;
lowRedundancyBlocks.add(block1, block1CurReplicas, 0, 0,
block1ExpectedReplicas);
// Adding QUEUE_VERY_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 2, 0, 0, 7);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block3, 2, 0, 0, 6);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Increasing the replications will move the block down a
// priority. This simulates a replica being completed in between checks.
lowRedundancyBlocks.update(block1, block1CurReplicas+1, 0, 0,
block1ExpectedReplicas, 1, 0);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block was moved up a priority and should not be skipped over.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0);
}
@Test(timeout = 60000)
public void testAddStoredBlockDoesNotCauseSkippedReplication()
throws IOException {
FSNamesystem mockNS = mock(FSNamesystem.class);
when(mockNS.hasWriteLock()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
// Adding this block collection to the BlockManager, so that when we add
// block under construction, the BlockManager will realize the expected
// replication has been achieved and remove it from the low redundancy
// queue.
BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, null);
info.setBlockCollectionId(1000L);
final INodeFile file = TestINodeFile.createINodeFile(1000L);
when(mockNS.getBlockCollection(1000L)).thenReturn(file);
bm.addBlockCollection(info, file);
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
ReplicaState.FINALIZED), storages[0]);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block remains and should not be skipped over.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@Test(timeout = 60000)
public void
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.hasWriteLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;
long blkID1 = ThreadLocalRandom.current().nextLong();
if (blkID1 < 0) {
blkID1 *= -1;
}
long blkID2 = ThreadLocalRandom.current().nextLong();
if (blkID2 < 0) {
blkID2 *= -1;
}
BlockInfo block1 = genBlockInfo(blkID1);
BlockInfo block2 = genBlockInfo(blkID2);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
final BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
final BlockCollection mbc = mock(BlockCollection.class);
when(mbc.getId()).thenReturn(1000L);
when(mbc.getLastBlock()).thenReturn(info);
when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
when(mbc.isUnderConstruction()).thenReturn(true);
ContentSummary cs = mock(ContentSummary.class);
when(cs.getLength()).thenReturn((long)1);
when(mbc.computeContentSummary(bm.getStoragePolicySuite())).thenReturn(cs);
info.setBlockCollectionId(1000);
bm.addBlockCollection(info, mbc);
DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
dataNodes[0], new DatanodeStorage("s1"))};
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
storageAry);
DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
when(dn.isDecommissioned()).thenReturn(true);
when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL);
when(storage.getDatanodeDescriptor()).thenReturn(dn);
when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
when(storage.addBlock(any(BlockInfo.class))).thenReturn
(DatanodeStorageInfo.AddBlockResult.ADDED);
info.addStorage(storage, info);
BlockInfo lastBlk = mbc.getLastBlock();
when(mbc.getLastBlock()).thenReturn(lastBlk, info);
bm.convertLastBlockToUnderConstruction(mbc, 0L);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block remains and should not be skipped over.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@Test(timeout = 60000)
public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
bm.setReplication((short)0, (short)1, block1);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block remains and should not be skipped over.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
/**
* In this testcase, passed 2 favored nodes dataNodes[0],dataNodes[1]
*
* Both favored nodes should be chosen as target for placing replication and
* then should fall into BlockPlacement policy for choosing remaining targets
* ie. third target as local writer rack , forth target on remote rack and
* fifth on same rack as second.
*
* @throws Exception
*/
@Test
public void testChooseExcessReplicaApartFromFavoredNodes() throws Exception {
DatanodeStorageInfo[] targets;
List<DatanodeDescriptor> expectedTargets =
new ArrayList<DatanodeDescriptor>();
expectedTargets.add(dataNodes[0]);
expectedTargets.add(dataNodes[1]);
expectedTargets.add(dataNodes[2]);
expectedTargets.add(dataNodes[4]);
expectedTargets.add(dataNodes[5]);
List<DatanodeDescriptor> favouredNodes =
new ArrayList<DatanodeDescriptor>();
favouredNodes.add(dataNodes[0]);
favouredNodes.add(dataNodes[1]);
targets = chooseTarget(5, dataNodes[2], null, favouredNodes);
assertEquals(targets.length, 5);
for (int i = 0; i < targets.length; i++) {
assertTrue("Target should be a part of Expected Targets",
expectedTargets.contains(targets[i].getDatanodeDescriptor()));
}
}
@Test
public void testChooseFromFavoredNodesWhenPreferLocalSetToFalse() {
((BlockPlacementPolicyDefault) replicator).setPreferLocalNode(false);
try {
DatanodeStorageInfo[] targets;
List<DatanodeDescriptor> expectedTargets = new ArrayList<>();
expectedTargets.add(dataNodes[0]);
expectedTargets.add(dataNodes[2]);
List<DatanodeDescriptor> favouredNodes = new ArrayList<>();
favouredNodes.add(dataNodes[0]);
favouredNodes.add(dataNodes[2]);
targets = chooseTarget(2, dataNodes[3], null,
favouredNodes);
assertEquals(targets.length, 2);
for (int i = 0; i < targets.length; i++) {
assertTrue("Target should be a part of Expected Targets",
expectedTargets.contains(targets[i].getDatanodeDescriptor()));
}
} finally {
((BlockPlacementPolicyDefault) replicator).setPreferLocalNode(true);
}
}
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, Set<Node> excludedNodes,
List<DatanodeDescriptor> favoredNodes) {
return chooseTarget(numOfReplicas, writer, excludedNodes,
favoredNodes, null);
}
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, Set<Node> excludedNodes,
List<DatanodeDescriptor> favoredNodes, EnumSet<AddBlockFlag> flags) {
return replicator.chooseTarget(filename, numOfReplicas, writer,
excludedNodes, BLOCK_SIZE, favoredNodes,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, flags);
}
@Test
public void testAvoidLocalWrite() throws IOException {
DatanodeDescriptor writer = dataNodes[2];
EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
DatanodeStorageInfo[] targets;
targets = chooseTarget(5, writer, null, null, flags);
for (DatanodeStorageInfo info : targets) {
assertNotEquals(info.getDatanodeDescriptor(), writer);
}
}
@Test
public void testAvoidLocalWriteNoEnoughNodes() throws IOException {
DatanodeDescriptor writer = dataNodes[2];
EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
DatanodeStorageInfo[] targets;
targets = chooseTarget(6, writer, null, null, flags);
assertEquals(6, targets.length);
boolean found = false;
for (DatanodeStorageInfo info : targets) {
if (info.getDatanodeDescriptor().equals(writer)) {
found = true;
}
}
assertTrue(found);
}
@Test
public void testMaxLoad() {
FSClusterStats statistics = mock(FSClusterStats.class);
DatanodeDescriptor node = mock(DatanodeDescriptor.class);
when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
when(node.getXceiverCount()).thenReturn(1);
final Configuration conf = new Configuration();
final Class<? extends BlockPlacementPolicy> replicatorClass = conf
.getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
BlockPlacementPolicy bpp = ReflectionUtils.
newInstance(replicatorClass, conf);
assertTrue(bpp instanceof BlockPlacementPolicyDefault);
BlockPlacementPolicyDefault bppd = (BlockPlacementPolicyDefault) bpp;
bppd.initialize(conf, statistics, null, null);
assertFalse(bppd.excludeNodeByLoad(node));
when(statistics.getInServiceXceiverAverage()).thenReturn(1.0);
when(node.getXceiverCount()).thenReturn(10);
assertTrue(bppd.excludeNodeByLoad(node));
// Enable load check per storage type.
conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
true);
bppd.initialize(conf, statistics, null, null);
Map<StorageType, StorageTypeStats> storageStats = new HashMap<>();
StorageTypeStats diskStorageTypeStats =
new StorageTypeStats(StorageType.DISK);
// Set xceiver count as 500 for DISK.
diskStorageTypeStats.setDataNodesInServiceXceiverCount(50, 10);
storageStats.put(StorageType.DISK, diskStorageTypeStats);
//Set xceiver count as 900 for ARCHIVE
StorageTypeStats archiveStorageTypeStats =
new StorageTypeStats(StorageType.ARCHIVE);
archiveStorageTypeStats.setDataNodesInServiceXceiverCount(10, 90);
storageStats.put(StorageType.ARCHIVE, diskStorageTypeStats);
when(statistics.getStorageTypeStats()).thenReturn(storageStats);
when(node.getXceiverCount()).thenReturn(29);
when(node.getStorageTypes()).thenReturn(EnumSet.of(StorageType.DISK));
when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
//Added for sanity, the number of datanodes are 100, the average xceiver
// shall be (50*100+90*100)/100 = 14
when(statistics.getInServiceXceiverAverage()).thenReturn(14.0);
when(node.getXceiverCount()).thenReturn(100);
assertFalse(bppd.excludeNodeByLoad(node));
}
@Test
public void testChosenFailureForStorageType() {
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 1,
dataNodes[0], new ArrayList<DatanodeStorageInfo>(), false, null,
BLOCK_SIZE, TestBlockStoragePolicy.POLICY_SUITE.getPolicy(
HdfsConstants.StoragePolicy.COLD.value()), null);
assertEquals(0, targets.length);
assertNotEquals(0,
appender.countLinesWithMessage("NO_REQUIRED_STORAGE_TYPE"));
}
@Test
public void testReduceChooseTimesIfNOStaleNode() {
for(int i = 0; i < 6; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L,
(HdfsServerConstants.MIN_BLOCKS_FOR_WRITE - 1) * BLOCK_SIZE,
0L, 0L, 0L, 0, 0);
}
assertFalse(dnManager.shouldAvoidStaleDataNodesForWrite());
resetHeartbeatForStorages();
}
}