blob: a3891b33a9d223df8d8dc03b8924262faabf69ec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class tests the decommissioning of nodes.
*/
public class TestDecommission extends AdminStatesBaseTest {
public static final Logger LOG = LoggerFactory.getLogger(TestDecommission
.class);
/**
* Verify that the number of replicas are as expected for each block in
* the given file.
* For blocks with a decommissioned node, verify that their replication
* is 1 more than what is specified.
* For blocks without decommissioned nodes, verify their replication is
* equal to what is specified.
*
* @param downnode - if null, there is no decommissioned node for this file.
* @return - null if no failure found, else an error message string.
*/
private static String checkFile(FileSystem fileSys, Path name, int repl,
String downnode, int numDatanodes) throws IOException {
boolean isNodeDown = (downnode != null);
// need a raw stream
assertTrue("Not HDFS:"+fileSys.getUri(),
fileSys instanceof DistributedFileSystem);
HdfsDataInputStream dis = (HdfsDataInputStream)
fileSys.open(name);
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
for (LocatedBlock blk : dinfo) { // for each block
int hasdown = 0;
DatanodeInfo[] nodes = blk.getLocations();
for (int j = 0; j < nodes.length; j++) { // for each replica
if (isNodeDown && nodes[j].getXferAddr().equals(downnode)) {
hasdown++;
//Downnode must actually be decommissioned
if (!nodes[j].isDecommissioned()) {
return "For block " + blk.getBlock() + " replica on " +
nodes[j] + " is given as downnode, " +
"but is not decommissioned";
}
//Decommissioned node (if any) should only be last node in list.
if (j != nodes.length - 1) {
return "For block " + blk.getBlock() + " decommissioned node "
+ nodes[j] + " was not last node in list: "
+ (j + 1) + " of " + nodes.length;
}
LOG.info("Block " + blk.getBlock() + " replica on " +
nodes[j] + " is decommissioned.");
} else {
//Non-downnodes must not be decommissioned
if (nodes[j].isDecommissioned()) {
return "For block " + blk.getBlock() + " replica on " +
nodes[j] + " is unexpectedly decommissioned";
}
}
}
LOG.info("Block " + blk.getBlock() + " has " + hasdown
+ " decommissioned replica.");
if(Math.min(numDatanodes, repl+hasdown) != nodes.length) {
return "Wrong number of replicas for block " + blk.getBlock() +
": " + nodes.length + ", expected " +
Math.min(numDatanodes, repl+hasdown);
}
}
return null;
}
private void verifyStats(NameNode namenode, FSNamesystem fsn,
DatanodeInfo info, DataNode node, boolean decommissioning)
throws InterruptedException, IOException {
// Do the stats check over 10 heartbeats
for (int i = 0; i < 10; i++) {
long[] newStats = namenode.getRpcServer().getStats();
// For decommissioning nodes, ensure capacity of the DN and dfsUsed
// is no longer counted towards total
assertEquals(newStats[0],
decommissioning ? 0 : info.getCapacity());
// Ensure cluster used capacity is counted for normal nodes only
assertEquals(newStats[1], decommissioning ? 0 : info.getDfsUsed());
// For decommissioning nodes, remaining space from the DN is not counted
assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining());
// Ensure transceiver count is same as that DN
assertEquals(fsn.getTotalLoad(), info.getXceiverCount());
DataNodeTestUtils.triggerHeartbeat(node);
}
}
/**
* Tests decommission for non federated cluster
*/
@Test(timeout=360000)
public void testDecommission() throws IOException {
testDecommission(1, 6);
}
/**
* Tests decommission with replicas on the target datanode cannot be migrated
* to other datanodes and satisfy the replication factor. Make sure the
* datanode won't get stuck in decommissioning state.
*/
@Test(timeout = 360000)
public void testDecommission2() throws IOException {
LOG.info("Starting test testDecommission");
int numNamenodes = 1;
int numDatanodes = 4;
getConf().setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
startCluster(numNamenodes, numDatanodes);
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(
numNamenodes);
namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes));
Path file1 = new Path("testDecommission2.dat");
int replicas = 4;
// Start decommissioning one namenode at a time
ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0);
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, replicas);
int deadDecomissioned = ns.getNumDecomDeadDataNodes();
int liveDecomissioned = ns.getNumDecomLiveDataNodes();
// Decommission one node. Verify that node is decommissioned.
DatanodeInfo decomNode = takeNodeOutofService(0, null, 0,
decommissionedNodes, AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode);
assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes());
// Ensure decommissioned datanode is not automatically shutdown
DFSClient client = getDfsClient(0);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
numDatanodes));
cleanupFile(fileSys, file1);
// Restart the cluster and ensure recommissioned datanodes
// are allowed to register with the namenode
shutdownCluster();
startCluster(1, 4);
}
/**
* Test decommission for federeated cluster
*/
@Test(timeout=360000)
public void testDecommissionFederation() throws IOException {
testDecommission(2, 2);
}
/**
* Test decommission process on standby NN.
* Verify admins can run "dfsadmin -refreshNodes" on SBN and decomm
* process can finish as long as admins run "dfsadmin -refreshNodes"
* on active NN.
* SBN used to mark excess replica upon recommission. The SBN's pick
* for excess replica could be different from the one picked by ANN.
* That creates inconsistent state and prevent SBN from finishing
* decommission.
*/
@Test(timeout=360000)
public void testDecommissionOnStandby() throws Exception {
getConf().setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
30000);
getConf().setInt(
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2);
// The time to wait so that the slow DN's heartbeat is considered old
// by BlockPlacementPolicyDefault and thus will choose that DN for
// excess replica.
long slowHeartbeatDNwaitTime =
getConf().getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (getConf().
getInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT) + 1);
startSimpleHACluster(3);
// Step 1, create a cluster with 4 DNs. Blocks are stored on the
// first 3 DNs. The last DN is empty. Also configure the last DN to have
// slow heartbeat so that it will be chosen as excess replica candidate
// during recommission.
// Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the
// same as # of DNs, each DN will have a replica for any block.
Path file1 = new Path("testDecommissionHA.dat");
int replicas = 3;
FileSystem activeFileSys = getCluster().getFileSystem(0);
writeFile(activeFileSys, file1, replicas);
HATestUtil.waitForStandbyToCatchUp(getCluster().getNameNode(0),
getCluster().getNameNode(1));
// Step 1.b, start a DN with slow heartbeat, so that we can know for sure it
// will be chosen as the target of excess replica during recommission.
getConf().setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
getCluster().startDataNodes(getConf(), 1, true, null, null, null);
DataNode lastDN = getCluster().getDataNodes().get(3);
lastDN.getDatanodeUuid();
// Step 2, decommission the first DN at both ANN and SBN.
DataNode firstDN = getCluster().getDataNodes().get(0);
// Step 2.a, ask ANN to decomm the first DN
DatanodeInfo decommissionedNodeFromANN = takeNodeOutofService(
0, firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED);
// Step 2.b, ask SBN to decomm the first DN
DatanodeInfo decomNodeFromSBN = takeNodeOutofService(1,
firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED);
// Step 3, recommission the first DN on SBN and ANN to create excess replica
// It recommissions the node on SBN first to create potential
// inconsistent state. In production cluster, such insistent state can
// happen even if recommission command was issued on ANN first given the
// async nature of the system.
// Step 3.a, ask SBN to recomm the first DN.
// SBN has been fixed so that it no longer invalidates excess replica during
// recommission.
// Before the fix, SBN could get into the following state.
// 1. the last DN would have been chosen as excess replica, given its
// heartbeat is considered old.
// Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
// 2. After recommissionNode finishes, SBN has 3 live replicas (0, 1, 2)
// and one excess replica ( 3 )
// After the fix,
// After recommissionNode finishes, SBN has 4 live replicas (0, 1, 2, 3)
Thread.sleep(slowHeartbeatDNwaitTime);
putNodeInService(1, decomNodeFromSBN);
// Step 3.b, ask ANN to recommission the first DN.
// To verify the fix, the test makes sure the excess replica picked by ANN
// is different from the one picked by SBN before the fix.
// To achieve that, we make sure next-to-last DN is chosen as excess replica
// by ANN.
// 1. restore LastDNprop's heartbeat interval.
// 2. Make next-to-last DN's heartbeat slow.
MiniDFSCluster.DataNodeProperties lastDNprop =
getCluster().stopDataNode(3);
lastDNprop.conf.setLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
getCluster().restartDataNode(lastDNprop);
MiniDFSCluster.DataNodeProperties nextToLastDNprop =
getCluster().stopDataNode(2);
nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
30);
getCluster().restartDataNode(nextToLastDNprop);
getCluster().waitActive();
Thread.sleep(slowHeartbeatDNwaitTime);
putNodeInService(0, decommissionedNodeFromANN);
// Step 3.c, make sure the DN has deleted the block and report to NNs
getCluster().triggerHeartbeats();
HATestUtil.waitForDNDeletions(getCluster());
getCluster().triggerDeletionReports();
// Step 4, decommission the first DN on both ANN and SBN
// With the fix to make sure SBN no longer marks excess replica
// during recommission, SBN's decommission can finish properly
takeNodeOutofService(0, firstDN.getDatanodeUuid(), 0, null,
AdminStates.DECOMMISSIONED);
// Ask SBN to decomm the first DN
takeNodeOutofService(1, firstDN.getDatanodeUuid(), 0, null,
AdminStates.DECOMMISSIONED);
}
private void testDecommission(int numNamenodes, int numDatanodes)
throws IOException {
LOG.info("Starting test testDecommission");
startCluster(numNamenodes, numDatanodes);
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
for(int i = 0; i < numNamenodes; i++) {
namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
}
Path file1 = new Path("testDecommission.dat");
for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
int replicas = numDatanodes - iteration - 1;
// Start decommissioning one namenode at a time
for (int i = 0; i < numNamenodes; i++) {
ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
FileSystem fileSys = getCluster().getFileSystem(i);
FSNamesystem ns = getCluster().getNamesystem(i);
writeFile(fileSys, file1, replicas);
int deadDecomissioned = ns.getNumDecomDeadDataNodes();
int liveDecomissioned = ns.getNumDecomLiveDataNodes();
// Decommission one node. Verify that node is decommissioned.
DatanodeInfo decomNode = takeNodeOutofService(i, null, 0,
decommissionedNodes, AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode);
assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes());
// Ensure decommissioned datanode is not automatically shutdown
DFSClient client = getDfsClient(i);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
// wait for the block to be replicated
int tries = 0;
while (tries++ < 20) {
try {
Thread.sleep(1000);
if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
numDatanodes) == null) {
break;
}
} catch (InterruptedException ie) {
}
}
assertTrue("Checked if block was replicated after decommission, tried "
+ tries + " times.", tries < 20);
cleanupFile(fileSys, file1);
}
}
// Restart the cluster and ensure decommissioned datanodes
// are allowed to register with the namenode
shutdownCluster();
startCluster(numNamenodes, numDatanodes);
}
/**
* Test that over-replicated blocks are deleted on recommission.
*/
@Test(timeout=120000)
public void testRecommission() throws Exception {
final int numDatanodes = 6;
try {
LOG.info("Starting test testRecommission");
startCluster(1, numDatanodes);
final Path file1 = new Path("testDecommission.dat");
final int replicas = numDatanodes - 1;
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
final FileSystem fileSys = getCluster().getFileSystem();
// Write a file to n-1 datanodes
writeFile(fileSys, file1, replicas);
// Decommission one of the datanodes with a replica
BlockLocation loc = fileSys.getFileBlockLocations(file1, 0, 1)[0];
assertEquals("Unexpected number of replicas from getFileBlockLocations",
replicas, loc.getHosts().length);
final String toDecomHost = loc.getNames()[0];
String toDecomUuid = null;
for (DataNode d : getCluster().getDataNodes()) {
if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) {
toDecomUuid = d.getDatanodeId().getDatanodeUuid();
break;
}
}
assertNotNull("Could not find a dn with the block!", toDecomUuid);
final DatanodeInfo decomNode = takeNodeOutofService(0, toDecomUuid,
0, decommissionedNodes, AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode);
final BlockManager blockManager =
getCluster().getNamesystem().getBlockManager();
final DatanodeManager datanodeManager =
blockManager.getDatanodeManager();
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
// Ensure decommissioned datanode is not automatically shutdown
DFSClient client = getDfsClient(0);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
// wait for the block to be replicated
final ExtendedBlock b = DFSTestUtil.getFirstBlock(fileSys, file1);
final String uuid = toDecomUuid;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
BlockInfo info =
blockManager.getStoredBlock(b.getLocalBlock());
int count = 0;
StringBuilder sb = new StringBuilder("Replica locations: ");
for (int i = 0; i < info.numNodes(); i++) {
DatanodeDescriptor dn = info.getDatanode(i);
sb.append(dn + ", ");
if (!dn.getDatanodeUuid().equals(uuid)) {
count++;
}
}
LOG.info(sb.toString());
LOG.info("Count: " + count);
return count == replicas;
}
}, 500, 30000);
// redecommission and wait for over-replication to be fixed
putNodeInService(0, decomNode);
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
DFSTestUtil.waitForReplication(getCluster(), b, 1, replicas, 0);
cleanupFile(fileSys, file1);
} finally {
shutdownCluster();
}
}
/**
* Tests cluster storage statistics during decommissioning for non
* federated cluster
*/
@Test(timeout=360000)
public void testClusterStats() throws Exception {
testClusterStats(1);
}
/**
* Tests cluster storage statistics during decommissioning for
* federated cluster
*/
@Test(timeout=360000)
public void testClusterStatsFederation() throws Exception {
testClusterStats(3);
}
public void testClusterStats(int numNameNodes) throws IOException,
InterruptedException {
LOG.info("Starting test testClusterStats");
int numDatanodes = 1;
startCluster(numNameNodes, numDatanodes);
for (int i = 0; i < numNameNodes; i++) {
FileSystem fileSys = getCluster().getFileSystem(i);
Path file = new Path("testClusterStats.dat");
writeFile(fileSys, file, 1);
FSNamesystem fsn = getCluster().getNamesystem(i);
NameNode namenode = getCluster().getNameNode(i);
DatanodeInfo decomInfo = takeNodeOutofService(i, null, 0, null,
AdminStates.DECOMMISSION_INPROGRESS);
DataNode decomNode = getDataNode(decomInfo);
// Check namenode stats for multiple datanode heartbeats
verifyStats(namenode, fsn, decomInfo, decomNode, true);
// Stop decommissioning and verify stats
DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo);
putNodeInService(i, retInfo);
DataNode retNode = getDataNode(decomInfo);
verifyStats(namenode, fsn, retInfo, retNode, false);
}
}
private DataNode getDataNode(DatanodeInfo decomInfo) {
DataNode decomNode = null;
for (DataNode dn: getCluster().getDataNodes()) {
if (decomInfo.equals(dn.getDatanodeId())) {
decomNode = dn;
break;
}
}
assertNotNull("Could not find decomNode in cluster!", decomNode);
return decomNode;
}
/**
* Test host/include file functionality. Only datanodes
* in the include file are allowed to connect to the namenode in a non
* federated cluster.
*/
@Test(timeout=360000)
public void testHostsFile() throws IOException, InterruptedException {
// Test for a single namenode cluster
testHostsFile(1);
}
/**
* Test host/include file functionality. Only datanodes
* in the include file are allowed to connect to the namenode in a
* federated cluster.
*/
@Test(timeout=360000)
public void testHostsFileFederation()
throws IOException, InterruptedException {
// Test for 3 namenode federated cluster
testHostsFile(3);
}
public void testHostsFile(int numNameNodes) throws IOException,
InterruptedException {
int numDatanodes = 1;
startCluster(numNameNodes, numDatanodes, true, null, false);
// Now empty hosts file and ensure the datanode is disallowed
// from talking to namenode, resulting in it's shutdown.
final String bogusIp = "127.0.30.1";
initIncludeHost(bogusIp);
for (int j = 0; j < numNameNodes; j++) {
refreshNodes(j);
DFSClient client = getDfsClient(j);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
for (int i = 0 ; i < 5 && info.length != 0; i++) {
LOG.info("Waiting for datanode to be marked dead");
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
info = client.datanodeReport(DatanodeReportType.LIVE);
}
assertEquals("Number of live nodes should be 0", 0, info.length);
// Test that bogus hostnames are considered "dead".
// The dead report should have an entry for the bogus entry in the hosts
// file. The original datanode is excluded from the report because it
// is no longer in the included list.
info = client.datanodeReport(DatanodeReportType.DEAD);
assertEquals("There should be 1 dead node", 1, info.length);
assertEquals(bogusIp, info[0].getHostName());
}
}
@Test(timeout=120000)
public void testDecommissionWithOpenfile()
throws IOException, InterruptedException {
LOG.info("Starting test testDecommissionWithOpenfile");
//At most 4 nodes will be decommissioned
startCluster(1, 7);
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
String openFile = "/testDecommissionWithOpenfile.dat";
writeFile(fileSys, new Path(openFile), (short)3);
// make sure the file was open for write
FSDataOutputStream fdos = fileSys.append(new Path(openFile));
LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
getCluster().getNameNode(0), openFile, 0, fileSize);
DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations();
DatanodeInfo[] dnInfos4FirstBlock = lbs.get(0).getLocations();
ArrayList<String> nodes = new ArrayList<String>();
ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();
DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) {
DatanodeInfo found = datanodeInfo;
for (DatanodeInfo dif: dnInfos4LastBlock) {
if (datanodeInfo.equals(dif)) {
found = null;
}
}
if (found != null) {
nodes.add(found.getXferAddr());
dnInfos.add(dm.getDatanode(found));
}
}
//decommission one of the 3 nodes which have last block
nodes.add(dnInfos4LastBlock[0].getXferAddr());
dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));
initExcludeHosts(nodes);
refreshNodes(0);
for (DatanodeInfo dn : dnInfos) {
waitNodeState(dn, AdminStates.DECOMMISSIONED);
}
fdos.close();
}
@Test(timeout = 360000)
public void testDecommissionWithOpenFileAndBlockRecovery()
throws IOException, InterruptedException {
startCluster(1, 6);
getCluster().waitActive();
Path file = new Path("/testRecoveryDecommission");
// Create a file and never close the output stream to trigger recovery
DistributedFileSystem dfs = getCluster().getFileSystem();
FSDataOutputStream out = dfs.create(file, true,
getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) 3, blockSize);
// Write data to the file
long writtenBytes = 0;
while (writtenBytes < fileSize) {
out.writeLong(writtenBytes);
writtenBytes += 8;
}
out.hsync();
DatanodeInfo[] lastBlockLocations = NameNodeAdapter.getBlockLocations(
getCluster().getNameNode(), "/testRecoveryDecommission", 0, fileSize)
.getLastLocatedBlock().getLocations();
// Decommission all nodes of the last block
ArrayList<String> toDecom = new ArrayList<>();
for (DatanodeInfo dnDecom : lastBlockLocations) {
toDecom.add(dnDecom.getXferAddr());
}
initExcludeHosts(toDecom);
refreshNodes(0);
// Make sure hard lease expires to trigger replica recovery
getCluster().setLeasePeriod(300L, 300L);
Thread.sleep(2 * BLOCKREPORT_INTERVAL_MSEC);
for (DatanodeInfo dnDecom : lastBlockLocations) {
DatanodeInfo datanode = NameNodeAdapter.getDatanode(
getCluster().getNamesystem(), dnDecom);
waitNodeState(datanode, AdminStates.DECOMMISSIONED);
}
assertEquals(dfs.getFileStatus(file).getLen(), writtenBytes);
}
@Test(timeout=120000)
public void testCloseWhileDecommission() throws IOException,
ExecutionException, InterruptedException {
LOG.info("Starting test testCloseWhileDecommission");
// min replication = 2
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
startCluster(1, 3);
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
String openFile = "/testDecommissionWithOpenfile.dat";
writeFile(fileSys, new Path(openFile), (short)3);
// make sure the file was open for write
FSDataOutputStream fdos = fileSys.append(new Path(openFile));
byte[] bytes = new byte[1];
fdos.write(bytes);
fdos.hsync();
LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
getCluster().getNameNode(0), openFile, 0, fileSize);
DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations();
ArrayList<String> nodes = new ArrayList<String>();
ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();
DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
//decommission 2 of the 3 nodes which have last block
nodes.add(dnInfos4LastBlock[0].getXferAddr());
dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));
nodes.add(dnInfos4LastBlock[1].getXferAddr());
dnInfos.add(dm.getDatanode(dnInfos4LastBlock[1]));
// because the cluster has only 3 nodes, and 2 of which are decomm'ed,
// the last block file will remain under replicated.
initExcludeHosts(nodes);
refreshNodes(0);
// the close() should not fail despite the number of live replicas of
// the last block becomes one.
fdos.close();
// make sure the two datanodes remain in decomm in progress state
BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrackedAndPending(dm.getDatanodeAdminManager(), 2, 0);
}
/**
* Simulate the following scene:
* Client writes Block(bk1) to three data nodes (dn1/dn2/dn3). bk1 has been
* completely written to three data nodes, and the data node succeeds
* FinalizeBlock, joins IBR and waits to report to NameNode. The client
* commits bk1 after receiving the ACK. When the DN has not been reported
* to the IBR, all three nodes dn1/dn2/dn3 enter Decommissioning and then the
* DN reports the IBR.
*/
@Test(timeout = 360000)
public void testAllocAndIBRWhileDecommission() throws IOException {
LOG.info("Starting test testAllocAndIBRWhileDecommission");
// Simulating IBR delay report by using long interval.
getConf().setLong(
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, 10000);
startCluster(1, 6);
getCluster().waitActive();
FSNamesystem ns = getCluster().getNamesystem(0);
DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
Path file = new Path("/testAllocAndIBRWhileDecommission");
DistributedFileSystem dfs = getCluster().getFileSystem();
FSDataOutputStream out = dfs.create(file, true,
getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY,
4096), (short) 3, blockSize);
// Write first block data to the file, write one more long number will
// commit first block and allocate second block.
long writtenBytes = 0;
while (writtenBytes + 8 < blockSize) {
out.writeLong(writtenBytes);
writtenBytes += 8;
}
out.hsync();
// Get fist block information
LocatedBlock firstLocatedBlock =
NameNodeAdapter.getBlockLocations(getCluster().getNameNode(),
"/testAllocAndIBRWhileDecommission", 0, fileSize)
.getLastLocatedBlock();
DatanodeInfo[] firstBlockLocations = firstLocatedBlock.getLocations();
// Close first block's datanode heartbeat and IBR.
ArrayList<String> toDecom = new ArrayList<>();
ArrayList<DatanodeInfo> decomDNInfos = new ArrayList<>();
for (DatanodeInfo datanodeInfo : firstBlockLocations) {
toDecom.add(datanodeInfo.getXferAddr());
decomDNInfos.add(dm.getDatanode(datanodeInfo));
DataNode dn = getDataNode(datanodeInfo);
DataNodeTestUtils.triggerHeartbeat(dn);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
// Write more than one block, then commit first block, allocate second
// block.
while (writtenBytes <= blockSize) {
out.writeLong(writtenBytes);
writtenBytes += 8;
}
out.hsync();
// Heartbeat and IBR closed, so the first block UCState is COMMITTED, not
// COMPLETE.
assertEquals(BlockUCState.COMMITTED,
((BlockInfo) firstLocatedBlock.getBlock().getLocalBlock())
.getBlockUCState());
// Decommission all nodes of the first block
initExcludeHosts(toDecom);
refreshNodes(0);
// Waiting nodes at DECOMMISSION_INPROGRESS state.
for (DatanodeInfo dnDecom : decomDNInfos) {
waitNodeState(dnDecom, AdminStates.DECOMMISSION_INPROGRESS);
DataNode dn = getDataNode(dnDecom);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
// Recover first block's datanode hertbeat and IBR, then report the first
// block state to NN.
for (DataNode dn : getCluster().getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
// NN receive first block report, transfer block state from COMMITTED to
// COMPLETE.
assertEquals(BlockUCState.COMPLETE,
((BlockInfo) firstLocatedBlock.getBlock().getLocalBlock())
.getBlockUCState());
out.close();
shutdownCluster();
}
/**
* Tests restart of namenode while datanode hosts are added to exclude file
**/
@Test(timeout=360000)
public void testDecommissionWithNamenodeRestart()
throws IOException, InterruptedException {
LOG.info("Starting test testDecommissionWithNamenodeRestart");
int numNamenodes = 1;
int numDatanodes = 1;
int replicas = 1;
getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5);
startCluster(numNamenodes, numDatanodes);
Path file1 = new Path("testDecommissionWithNamenodeRestart.dat");
FileSystem fileSys = getCluster().getFileSystem();
writeFile(fileSys, file1, replicas);
DFSClient client = getDfsClient(0);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
DatanodeID excludedDatanodeID = info[0];
String excludedDatanodeName = info[0].getXferAddr();
initExcludeHost(excludedDatanodeName);
//Add a new datanode to cluster
getCluster().startDataNodes(getConf(), 1, true, null, null, null, null);
numDatanodes+=1;
assertEquals("Number of datanodes should be 2 ", 2,
getCluster().getDataNodes().size());
//Restart the namenode
getCluster().restartNameNode();
DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode(
getCluster().getNamesystem(), excludedDatanodeID);
waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED);
// Ensure decommissioned datanode is not automatically shutdown
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
assertTrue("Checked if block was replicated after decommission.",
checkFile(fileSys, file1, replicas, datanodeInfo.getXferAddr(),
numDatanodes) == null);
cleanupFile(fileSys, file1);
// Restart the cluster and ensure recommissioned datanodes
// are allowed to register with the namenode
shutdownCluster();
startCluster(numNamenodes, numDatanodes);
}
/**
* Tests dead node count after restart of namenode
**/
@Test(timeout=360000)
public void testDeadNodeCountAfterNamenodeRestart()throws Exception {
LOG.info("Starting test testDeadNodeCountAfterNamenodeRestart");
int numNamenodes = 1;
int numDatanodes = 2;
startCluster(numNamenodes, numDatanodes);
DFSClient client = getDfsClient(0);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
DatanodeInfo excludedDatanode = info[0];
String excludedDatanodeName = info[0].getXferAddr();
List<String> hosts = new ArrayList<String>(Arrays.asList(
excludedDatanodeName, info[1].getXferAddr()));
initIncludeHosts(hosts.toArray(new String[hosts.size()]));
takeNodeOutofService(0, excludedDatanode.getDatanodeUuid(), 0, null,
AdminStates.DECOMMISSIONED);
getCluster().stopDataNode(excludedDatanodeName);
DFSTestUtil.waitForDatanodeState(
getCluster(), excludedDatanode.getDatanodeUuid(), false, 20000);
//Restart the namenode
getCluster().restartNameNode();
assertEquals("There should be one node alive", 1,
client.datanodeReport(DatanodeReportType.LIVE).length);
assertEquals("There should be one node dead", 1,
client.datanodeReport(DatanodeReportType.DEAD).length);
}
/**
* Test using a "registration name" in a host include file.
*
* Registration names are DataNode names specified in the configuration by
* dfs.datanode.hostname. The DataNode will send this name to the NameNode
* as part of its registration. Registration names are helpful when you
* want to override the normal first result of DNS resolution on the
* NameNode. For example, a given datanode IP may map to two hostnames,
* and you may want to choose which hostname is used internally in the
* cluster.
*
* It is not recommended to use a registration name which is not also a
* valid DNS hostname for the DataNode. See HDFS-5237 for background.
*/
@Ignore
@Test(timeout=360000)
public void testIncludeByRegistrationName() throws Exception {
// Any IPv4 address starting with 127 functions as a "loopback" address
// which is connected to the current host. So by choosing 127.0.0.100
// as our registration name, we have chosen a name which is also a valid
// way of reaching the local DataNode we're going to start.
// Typically, a registration name would be a hostname, but we don't want
// to deal with DNS in this test.
final String registrationName = "127.0.0.100";
final String nonExistentDn = "127.0.0.10";
getConf().set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName);
startCluster(1, 1, false, null, true);
// Set up an includes file that doesn't have our datanode.
initIncludeHost(nonExistentDn);
refreshNodes(0);
// Wait for the DN to be marked dead.
LOG.info("Waiting for DN to be marked as dead.");
final DFSClient client = getDfsClient(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
BlockManagerTestUtil
.checkHeartbeat(getCluster().getNamesystem().getBlockManager());
try {
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
return info.length == 1;
} catch (IOException e) {
LOG.warn("Failed to check dead DNs", e);
return false;
}
}
}, 500, 5000);
// Use a non-empty include file with our registration name.
// It should work.
int dnPort = getCluster().getDataNodes().get(0).getXferPort();
initIncludeHost(registrationName + ":" + dnPort);
refreshNodes(0);
getCluster().restartDataNode(0);
getCluster().triggerHeartbeats();
// Wait for the DN to come back.
LOG.info("Waiting for DN to come back.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
BlockManagerTestUtil
.checkHeartbeat(getCluster().getNamesystem().getBlockManager());
try {
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
if (info.length == 1) {
Assert.assertFalse(info[0].isDecommissioned());
Assert.assertFalse(info[0].isDecommissionInProgress());
assertEquals(registrationName, info[0].getHostName());
return true;
}
} catch (IOException e) {
LOG.warn("Failed to check dead DNs", e);
}
return false;
}
}, 500, 5000);
}
@Test(timeout=120000)
public void testBlocksPerInterval() throws Exception {
org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
.setLevel(Level.TRACE);
// Turn the blocks per interval way down
getConf().setInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
3);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 3);
final FileSystem fs = getCluster().getFileSystem();
final DatanodeManager datanodeManager =
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
final DatanodeAdminManager decomManager =
datanodeManager.getDatanodeAdminManager();
// Write a 3 block file, so each node has one block. Should scan 3 nodes.
DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 3);
// Write another file, should only scan two
DFSTestUtil.createFile(fs, new Path("/file2"), 64, (short)3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 2);
// One more file, should only scan 1
DFSTestUtil.createFile(fs, new Path("/file3"), 64, (short)3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 1);
// blocks on each DN now exceeds limit, still scan at least one node
DFSTestUtil.createFile(fs, new Path("/file4"), 64, (short)3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 1);
}
@Deprecated
@Test(timeout=120000)
public void testNodesPerInterval() throws Exception {
org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
.setLevel(Level.TRACE);
// Set the deprecated configuration key which limits the # of nodes per
// interval
getConf().setInt("dfs.namenode.decommission.nodes.per.interval", 1);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 3);
final FileSystem fs = getCluster().getFileSystem();
final DatanodeManager datanodeManager =
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
final DatanodeAdminManager decomManager =
datanodeManager.getDatanodeAdminManager();
// Write a 3 block file, so each node has one block. Should scan 1 node
// each time.
DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
for (int i=0; i<3; i++) {
doDecomCheck(datanodeManager, decomManager, 1);
}
}
private void doDecomCheck(DatanodeManager datanodeManager,
DatanodeAdminManager decomManager, int expectedNumCheckedNodes)
throws IOException, ExecutionException, InterruptedException {
// Decom all nodes
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
for (DataNode d: getCluster().getDataNodes()) {
DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0,
decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
}
// Run decom scan and check
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals("Unexpected # of nodes checked", expectedNumCheckedNodes,
decomManager.getNumNodesChecked());
// Recommission all nodes
for (DatanodeInfo dn : decommissionedNodes) {
putNodeInService(0, dn);
}
}
/**
* Test DatanodeAdminManager#monitor can swallow any exceptions by default.
*/
@Test(timeout=120000)
public void testPendingNodeButDecommissioned() throws Exception {
// Only allow one node to be decom'd at a time
getConf().setInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 2);
final DatanodeManager datanodeManager =
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
final DatanodeAdminManager decomManager =
datanodeManager.getDatanodeAdminManager();
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
List<DataNode> dns = getCluster().getDataNodes();
// Try to decommission 2 datanodes
for (int i = 0; i < 2; i++) {
DataNode d = dns.get(i);
DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0,
decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
}
assertEquals(2, decomManager.getNumPendingNodes());
// Set one datanode state to Decommissioned after decommission ops.
DatanodeDescriptor dn = datanodeManager.getDatanode(dns.get(0)
.getDatanodeId());
dn.setDecommissioned();
try {
// Trigger DatanodeAdminManager#monitor
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
// Wait for OutOfServiceNodeBlocks to be 0
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
if (decomManager.getNumTrackedNodes() == 0) {
return true;
}
return false;
}
}, 500, 30000);
assertTrue(GenericTestUtils.anyThreadMatching(
Pattern.compile("DatanodeAdminMonitor-.*")));
} catch (ExecutionException e) {
GenericTestUtils.assertExceptionContains("in an invalid state!", e);
fail("DatanodeAdminManager#monitor does not swallow exceptions.");
}
}
@Test(timeout=120000)
public void testPendingNodes() throws Exception {
org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
.setLevel(Level.TRACE);
// Only allow one node to be decom'd at a time
getConf().setInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1);
// Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 3);
final FileSystem fs = getCluster().getFileSystem();
final DatanodeManager datanodeManager =
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
final DatanodeAdminManager decomManager =
datanodeManager.getDatanodeAdminManager();
// Keep a file open to prevent decom from progressing
HdfsDataOutputStream open1 =
(HdfsDataOutputStream) fs.create(new Path("/openFile1"), (short)3);
// Flush and trigger block reports so the block definitely shows up on NN
open1.write(123);
open1.hflush();
for (DataNode d: getCluster().getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(d);
}
// Decom two nodes, so one is still alive
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
for (int i=0; i<2; i++) {
final DataNode d = getCluster().getDataNodes().get(i);
DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0,
decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
}
for (int i=2; i>=0; i--) {
assertTrackedAndPending(decomManager, 0, i);
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
}
// Close file, try to decom the last node, should get stuck in tracked
open1.close();
final DataNode d = getCluster().getDataNodes().get(2);
DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0,
decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertTrackedAndPending(decomManager, 1, 0);
}
private void assertTrackedAndPending(DatanodeAdminManager decomManager,
int tracked, int pending) {
assertEquals("Unexpected number of tracked nodes", tracked,
decomManager.getNumTrackedNodes());
assertEquals("Unexpected number of pending nodes", pending,
decomManager.getNumPendingNodes());
}
/**
* Fetching Live DataNodes by passing removeDecommissionedNode value as
* false- returns LiveNodeList with Node in Decommissioned state
* true - returns LiveNodeList without Node in Decommissioned state
* @throws InterruptedException
*/
@Test
public void testCountOnDecommissionedNodeList() throws IOException{
getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1);
try {
startCluster(1, 1);
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
new ArrayList<ArrayList<DatanodeInfo>>(1);
namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(1));
// Move datanode1 to Decommissioned state
ArrayList<DatanodeInfo> decommissionedNode = namenodeDecomList.get(0);
takeNodeOutofService(0, null, 0, decommissionedNode,
AdminStates.DECOMMISSIONED);
FSNamesystem ns = getCluster().getNamesystem(0);
DatanodeManager datanodeManager =
ns.getBlockManager().getDatanodeManager();
List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
// fetchDatanode with false should return livedecommisioned node
datanodeManager.fetchDatanodes(live, null, false);
assertTrue(1==live.size());
// fetchDatanode with true should not return livedecommisioned node
datanodeManager.fetchDatanodes(live, null, true);
assertTrue(0==live.size());
}finally {
shutdownCluster();
}
}
/**
* Decommissioned node should not be considered while calculating node usage
* @throws InterruptedException
*/
@Test
public void testNodeUsageAfterDecommissioned()
throws IOException, InterruptedException {
nodeUsageVerification(2, new long[] { 26384L, 26384L },
AdminStates.DECOMMISSIONED);
}
/**
* DECOMMISSION_INPROGRESS node should not be considered
* while calculating node usage
* @throws InterruptedException
*/
@Test
public void testNodeUsageWhileDecommissioining()
throws IOException, InterruptedException {
nodeUsageVerification(1, new long[] { 26384L },
AdminStates.DECOMMISSION_INPROGRESS);
}
@SuppressWarnings({ "unchecked" })
public void nodeUsageVerification(int numDatanodes, long[] nodesCapacity,
AdminStates decommissionState) throws IOException, InterruptedException {
Map<String, Map<String, String>> usage = null;
DatanodeInfo decommissionedNodeInfo = null;
String zeroNodeUsage = "0.00%";
getConf().setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1);
FileSystem fileSys = null;
Path file1 = new Path("testNodeUsage.dat");
try {
SimulatedFSDataset.setFactory(getConf());
startCluster(1, numDatanodes, false, nodesCapacity, false);
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
new ArrayList<ArrayList<DatanodeInfo>>(1);
namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes));
if (decommissionState == AdminStates.DECOMMISSIONED) {
// Move datanode1 to Decommissioned state
ArrayList<DatanodeInfo> decommissionedNode = namenodeDecomList.get(0);
decommissionedNodeInfo = takeNodeOutofService(0, null, 0,
decommissionedNode, decommissionState);
}
// Write a file(replica 1).Hence will be written to only one live node.
fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, 1);
Thread.sleep(2000);
// min NodeUsage should not be 0.00%
usage = (Map<String, Map<String, String>>) JSON.parse(ns.getNodeUsage());
String minUsageBeforeDecom = usage.get("nodeUsage").get("min");
assertTrue(!minUsageBeforeDecom.equalsIgnoreCase(zeroNodeUsage));
if (decommissionState == AdminStates.DECOMMISSION_INPROGRESS) {
// Start decommissioning datanode
ArrayList<DatanodeInfo> decommissioningNodes = namenodeDecomList.
get(0);
decommissionedNodeInfo = takeNodeOutofService(0, null, 0,
decommissioningNodes, decommissionState);
// NodeUsage should not include DECOMMISSION_INPROGRESS node
// (minUsage should be 0.00%)
usage = (Map<String, Map<String, String>>)
JSON.parse(ns.getNodeUsage());
assertTrue(usage.get("nodeUsage").get("min").
equalsIgnoreCase(zeroNodeUsage));
}
// Recommission node
putNodeInService(0, decommissionedNodeInfo);
usage = (Map<String, Map<String, String>>) JSON.parse(ns.getNodeUsage());
String nodeusageAfterRecommi =
decommissionState == AdminStates.DECOMMISSION_INPROGRESS
? minUsageBeforeDecom
: zeroNodeUsage;
assertTrue(usage.get("nodeUsage").get("min").
equalsIgnoreCase(nodeusageAfterRecommi));
} finally {
cleanupFile(fileSys, file1);
}
}
@Test
public void testUsedCapacity() throws Exception {
int numNamenodes = 1;
int numDatanodes = 2;
startCluster(numNamenodes, numDatanodes);
FSNamesystem ns = getCluster().getNamesystem(0);
BlockManager blockManager = ns.getBlockManager();
DatanodeStatistics datanodeStatistics = blockManager.getDatanodeManager()
.getDatanodeStatistics();
long initialUsedCapacity = datanodeStatistics.getCapacityUsed();
long initialTotalCapacity = datanodeStatistics.getCapacityTotal();
long initialBlockPoolUsed = datanodeStatistics.getBlockPoolUsed();
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes));
ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0);
//decommission one node
DatanodeInfo decomNode = takeNodeOutofService(0, null, 0,
decommissionedNodes, AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode);
long newUsedCapacity = datanodeStatistics.getCapacityUsed();
long newTotalCapacity = datanodeStatistics.getCapacityTotal();
long newBlockPoolUsed = datanodeStatistics.getBlockPoolUsed();
assertTrue("DfsUsedCapacity should not be the same after a node has " +
"been decommissioned!", initialUsedCapacity != newUsedCapacity);
assertTrue("TotalCapacity should not be the same after a node has " +
"been decommissioned!", initialTotalCapacity != newTotalCapacity);
assertTrue("BlockPoolUsed should not be the same after a node has " +
"been decommissioned!",initialBlockPoolUsed != newBlockPoolUsed);
}
/**
* Verify if multiple DataNodes can be decommission at the same time.
*/
@Test(timeout = 360000)
public void testMultipleNodesDecommission() throws Exception {
startCluster(1, 5);
final Path file = new Path("/testMultipleNodesDecommission.dat");
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
final int repl = 3;
writeFile(fileSys, file, repl, 1);
// Request Decommission for DataNodes 1 and 2.
final List<DatanodeInfo> decomDataNodes = takeNodeOutofService(0,
Lists.newArrayList(getCluster().getDataNodes().get(0).getDatanodeUuid(),
getCluster().getDataNodes().get(1).getDatanodeUuid()),
Long.MAX_VALUE, null, null, AdminStates.DECOMMISSIONED);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
String errMsg = checkFile(fileSys, file, repl,
decomDataNodes.get(0).getXferAddr(), 5);
if (errMsg != null) {
LOG.warn("Check file: " + errMsg);
}
return true;
} catch (IOException e) {
LOG.warn("Check file: " + e);
return false;
}
}
}, 500, 30000);
// Put the decommissioned nodes back in service.
for (DatanodeInfo datanodeInfo : decomDataNodes) {
putNodeInService(0, datanodeInfo);
}
cleanupFile(fileSys, file);
}
}