blob: c4ccc676519d173100f8461113c4b2a9e1156464 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.junit.After;
import org.junit.Before;
/**
* This class provide utilities for testing of the admin operations of nodes.
*/
public class AdminStatesBaseTest {
public static final Log LOG = LogFactory.getLog(AdminStatesBaseTest.class);
static final long seed = 0xDEADBEEFL;
static final int blockSize = 8192;
static final int fileSize = 16384;
static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
final private Random myrand = new Random();
private HostsFileWriter hostsFileWriter;
private Configuration conf;
private MiniDFSCluster cluster = null;
private boolean useCombinedHostFileManager = false;
protected void setUseCombinedHostFileManager() {
useCombinedHostFileManager = true;
}
protected Configuration getConf() {
return conf;
}
protected MiniDFSCluster getCluster() {
return cluster;
}
@Before
public void setup() throws IOException {
// Set up the hosts/exclude files.
hostsFileWriter = new HostsFileWriter();
conf = new HdfsConfiguration();
if (useCombinedHostFileManager) {
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class);
}
// Setup conf
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
200);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
BLOCKREPORT_INTERVAL_MSEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
NAMENODE_REPLICATION_INTERVAL);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
hostsFileWriter.initialize(conf, "temp/admin");
}
@After
public void teardown() throws IOException {
hostsFileWriter.cleanup();
shutdownCluster();
}
static public FSDataOutputStream writeIncompleteFile(FileSystem fileSys,
Path name, short repl, short numOfBlocks) throws IOException {
return writeFile(fileSys, name, repl, numOfBlocks, false);
}
static protected void writeFile(FileSystem fileSys, Path name, int repl)
throws IOException {
writeFile(fileSys, name, repl, 2);
}
static protected void writeFile(FileSystem fileSys, Path name, int repl,
int numOfBlocks) throws IOException {
writeFile(fileSys, name, repl, numOfBlocks, true);
}
static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
int repl, int numOfBlocks, boolean completeFile)
throws IOException {
// create and write a file that contains two blocks of data
FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) repl, blockSize);
byte[] buffer = new byte[blockSize*numOfBlocks];
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
LOG.info("Created file " + name + " with " + repl + " replicas.");
if (completeFile) {
stm.close();
return null;
} else {
stm.flush();
// Do not close stream, return it
// so that it is not garbage collected
return stm;
}
}
/**
* Decommission or perform Maintenance for DataNodes and wait for them to
* reach the expected state.
*
* @param nnIndex NameNode index
* @param datanodeUuid DataNode to decommission/maintenance, or a random
* DataNode if null
* @param maintenanceExpirationInMS Maintenance expiration time
* @param decommissionedNodes List of DataNodes already decommissioned
* @param waitForState Await for this state for datanodeUuid DataNode
* @return DatanodeInfo DataNode taken out of service
* @throws IOException
*/
protected DatanodeInfo takeNodeOutofService(int nnIndex,
String datanodeUuid, long maintenanceExpirationInMS,
ArrayList<DatanodeInfo> decommissionedNodes,
AdminStates waitForState) throws IOException {
return takeNodeOutofService(nnIndex, datanodeUuid,
maintenanceExpirationInMS, decommissionedNodes, null, waitForState);
}
/**
* Decommission or perform Maintenance for DataNodes and wait for them to
* reach the expected state.
*
* @param nnIndex NameNode index
* @param datanodeUuid DataNode to decommission/maintenance, or a random
* DataNode if null
* @param maintenanceExpirationInMS Maintenance expiration time
* @param decommissionedNodes List of DataNodes already decommissioned
* @param inMaintenanceNodes Map of DataNodes already entering/in maintenance
* @param waitForState Await for this state for datanodeUuid DataNode
* @return DatanodeInfo DataNode taken out of service
* @throws IOException
*/
protected DatanodeInfo takeNodeOutofService(int nnIndex,
String datanodeUuid, long maintenanceExpirationInMS,
List<DatanodeInfo> decommissionedNodes,
Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
throws IOException {
return takeNodeOutofService(nnIndex, (datanodeUuid != null ?
Lists.newArrayList(datanodeUuid) : null),
maintenanceExpirationInMS, decommissionedNodes, inMaintenanceNodes,
waitForState).get(0);
}
/**
* Decommission or perform Maintenance for DataNodes and wait for them to
* reach the expected state.
*
* @param nnIndex NameNode index
* @param dataNodeUuids DataNodes to decommission/maintenance, or a random
* DataNode if null
* @param maintenanceExpirationInMS Maintenance expiration time
* @param decommissionedNodes List of DataNodes already decommissioned
* @param inMaintenanceNodes Map of DataNodes already entering/in maintenance
* @param waitForState Await for this state for datanodeUuid DataNode
* @return DatanodeInfo DataNode taken out of service
* @throws IOException
*/
protected List<DatanodeInfo> takeNodeOutofService(int nnIndex,
List<String> dataNodeUuids, long maintenanceExpirationInMS,
List<DatanodeInfo> decommissionedNodes,
Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
throws IOException {
DFSClient client = getDfsClient(nnIndex);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
boolean isDecommissionRequest =
waitForState == AdminStates.DECOMMISSION_INPROGRESS ||
waitForState == AdminStates.DECOMMISSIONED;
List<String> dataNodeNames = new ArrayList<>();
List<DatanodeInfo> datanodeInfos = new ArrayList<>();
// pick one DataNode randomly unless the caller specifies one.
if (dataNodeUuids == null) {
boolean found = false;
while (!found) {
int index = myrand.nextInt(info.length);
if ((isDecommissionRequest && !info[index].isDecommissioned()) ||
(!isDecommissionRequest && !info[index].isInMaintenance())) {
dataNodeNames.add(info[index].getXferAddr());
datanodeInfos.add(NameNodeAdapter.getDatanode(
cluster.getNamesystem(nnIndex), info[index]));
found = true;
}
}
} else {
// The caller specified a DataNode
for (String datanodeUuid : dataNodeUuids) {
boolean found = false;
for (int index = 0; index < info.length; index++) {
if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
dataNodeNames.add(info[index].getXferAddr());
datanodeInfos.add(NameNodeAdapter.getDatanode(
cluster.getNamesystem(nnIndex), info[index]));
found = true;
break;
}
}
if (!found) {
throw new IOException("invalid datanodeUuid " + datanodeUuid);
}
}
}
LOG.info("Taking node: " + Arrays.toString(dataNodeNames.toArray())
+ " out of service");
ArrayList<String> decommissionNodes = new ArrayList<String>();
if (decommissionedNodes != null) {
for (DatanodeInfo dn : decommissionedNodes) {
decommissionNodes.add(dn.getName());
}
}
Map<String, Long> maintenanceNodes = new HashMap<>();
if (inMaintenanceNodes != null) {
for (Map.Entry<DatanodeInfo, Long> dn :
inMaintenanceNodes.entrySet()) {
maintenanceNodes.put(dn.getKey().getName(), dn.getValue());
}
}
if (isDecommissionRequest) {
for (String dataNodeName : dataNodeNames) {
decommissionNodes.add(dataNodeName);
}
} else {
for (String dataNodeName : dataNodeNames) {
maintenanceNodes.put(dataNodeName, maintenanceExpirationInMS);
}
}
// write node names into the json host file.
hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
refreshNodes(nnIndex);
waitNodeState(datanodeInfos, waitForState);
return datanodeInfos;
}
/* Ask a specific NN to put the datanode in service and wait for it
* to reach the NORMAL state.
*/
protected void putNodeInService(int nnIndex,
DatanodeInfo outOfServiceNode) throws IOException {
LOG.info("Putting node: " + outOfServiceNode + " in service");
ArrayList<String> decommissionNodes = new ArrayList<>();
Map<String, Long> maintenanceNodes = new HashMap<>();
DatanodeManager dm =
cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager();
List<DatanodeDescriptor> nodes =
dm.getDatanodeListForReport(DatanodeReportType.ALL);
for (DatanodeDescriptor node : nodes) {
if (node.isMaintenance()) {
maintenanceNodes.put(node.getName(),
node.getMaintenanceExpireTimeInMS());
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
decommissionNodes.add(node.getName());
}
}
decommissionNodes.remove(outOfServiceNode.getName());
maintenanceNodes.remove(outOfServiceNode.getName());
hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
refreshNodes(nnIndex);
waitNodeState(outOfServiceNode, AdminStates.NORMAL);
}
protected void putNodeInService(int nnIndex,
String datanodeUuid) throws IOException {
DatanodeInfo datanodeInfo =
getDatanodeDesriptor(cluster.getNamesystem(nnIndex), datanodeUuid);
putNodeInService(nnIndex, datanodeInfo);
}
/**
* Wait till DataNode is transitioned to the expected state.
*/
protected void waitNodeState(DatanodeInfo node, AdminStates state) {
waitNodeState(Lists.newArrayList(node), state);
}
/**
* Wait till all DataNodes are transitioned to the expected state.
*/
protected void waitNodeState(List<DatanodeInfo> nodes, AdminStates state) {
for (DatanodeInfo node : nodes) {
boolean done = (state == node.getAdminState());
while (!done) {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
Thread.sleep(HEARTBEAT_INTERVAL * 500);
} catch (InterruptedException e) {
// nothing
}
done = (state == node.getAdminState());
}
LOG.info("node " + node + " reached the state " + state);
}
}
protected void initIncludeHost(String hostNameAndPort) throws IOException {
hostsFileWriter.initIncludeHost(hostNameAndPort);
}
protected void initIncludeHosts(String[] hostNameAndPorts)
throws IOException {
hostsFileWriter.initIncludeHosts(hostNameAndPorts);
}
protected void initExcludeHost(String hostNameAndPort) throws IOException {
hostsFileWriter.initExcludeHost(hostNameAndPort);
}
protected void initExcludeHosts(List<String> hostNameAndPorts)
throws IOException {
hostsFileWriter.initExcludeHosts(hostNameAndPorts);
}
/* Get DFSClient to the namenode */
protected DFSClient getDfsClient(final int nnIndex) throws IOException {
return new DFSClient(cluster.getNameNode(nnIndex).getNameNodeAddress(),
conf);
}
/* Validate cluster has expected number of datanodes */
protected static void validateCluster(DFSClient client, int numDNs)
throws IOException {
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
assertEquals("Number of Datanodes ", numDNs, info.length);
}
/** Start a MiniDFSCluster.
* @throws IOException */
protected void startCluster(int numNameNodes, int numDatanodes,
boolean setupHostsFile, long[] nodesCapacity,
boolean checkDataNodeHostConfig) throws IOException {
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
.numDataNodes(numDatanodes);
if (setupHostsFile) {
builder.setupHostsFile(setupHostsFile);
}
if (nodesCapacity != null) {
builder.simulatedCapacities(nodesCapacity);
}
if (checkDataNodeHostConfig) {
builder.checkDataNodeHostConfig(checkDataNodeHostConfig);
}
cluster = builder.build();
cluster.waitActive();
for (int i = 0; i < numNameNodes; i++) {
DFSClient client = getDfsClient(i);
validateCluster(client, numDatanodes);
}
}
protected void startCluster(int numNameNodes, int numDatanodes)
throws IOException {
startCluster(numNameNodes, numDatanodes, false, null, false);
}
protected void startSimpleHACluster(int numDatanodes) throws IOException {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(
numDatanodes).build();
cluster.transitionToActive(0);
cluster.waitActive();
}
protected void shutdownCluster() {
if (cluster != null) {
cluster.shutdown(true);
}
}
protected void refreshNodes(final int nnIndex) throws IOException {
cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager().
refreshNodes(conf);
}
static private DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
}
static public void cleanupFile(FileSystem fileSys, Path name)
throws IOException {
assertTrue(fileSys.exists(name));
fileSys.delete(name, true);
assertTrue(!fileSys.exists(name));
}
}