blob: 0781218e0bf240d9cc5b8e1f058d240adb758b82 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.Time.now;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.InetAddresses;
/**
* Manage datanodes, include decommission and other activities.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeManager {
static final Log LOG = LogFactory.getLog(DatanodeManager.class);
private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager;
/**
* Stores the datanode -> block map.
* <p>
* Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
* storage id. In order to keep the storage map consistent it tracks
* all storages ever registered with the namenode.
* A descriptor corresponding to a specific storage id can be
* <ul>
* <li>added to the map if it is a new storage id;</li>
* <li>updated with a new datanode started as a replacement for the old one
* with the same storage id; and </li>
* <li>removed if and only if an existing datanode is restarted to serve a
* different storage id.</li>
* </ul> <br>
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*/
private final NavigableMap<String, DatanodeDescriptor> datanodeMap
= new TreeMap<String, DatanodeDescriptor>();
/** Cluster network topology */
private final NetworkTopology networktopology;
/** Host names to datanode descriptors mapping. */
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
private final DNSToSwitchMapping dnsToSwitchMapping;
/** Read include/exclude files*/
private final HostsFileReader hostsReader;
/** The period to wait for datanode heartbeat.*/
private final long heartbeatExpireInterval;
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
/** Whether or not to check the stale datanodes */
private volatile boolean checkForStaleNodes;
/** The time interval for detecting stale datanodes */
private volatile long staleInterval;
DatanodeManager(final BlockManager blockManager,
final Namesystem namesystem, final Configuration conf
) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
Class<? extends NetworkTopology> networkTopologyClass =
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopology.class, NetworkTopology.class);
networktopology = (NetworkTopology) ReflectionUtils.newInstance(
networkTopologyClass, conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.hostsReader = new HostsFileReader(
conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
// If the dns to switch mapping supports cache, resolve network
// locations of those hosts in the include list and store the mapping
// in the cache; so future calls to resolve will be fast.
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}
final long heartbeatIntervalSeconds = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
final int heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
+ 10 * 1000 * heartbeatIntervalSeconds;
final int blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
this.blockInvalidateLimit = conf.getInt(
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
// set the value of stale interval based on configuration
this.checkForStaleNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
if (this.checkForStaleNodes) {
this.staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
LOG.warn("The given interval for marking stale datanode = "
+ this.staleInterval + ", which is smaller than the default value "
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
+ ".");
}
}
}
private Daemon decommissionthread = null;
void activate(final Configuration conf) {
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
decommissionthread.start();
heartbeatManager.activate(conf);
}
void close() {
if (decommissionthread != null) {
decommissionthread.interrupt();
try {
decommissionthread.join(3000);
} catch (InterruptedException e) {
}
}
heartbeatManager.close();
}
/** @return the network topology. */
public NetworkTopology getNetworkTopology() {
return networktopology;
}
/** @return the heartbeat manager. */
HeartbeatManager getHeartbeatManager() {
return heartbeatManager;
}
/** @return the datanode statistics. */
public DatanodeStatistics getDatanodeStatistics() {
return heartbeatManager;
}
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks) {
//sort the blocks
// As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
Node client = getDatanodeByHost(targethost);
if (client == null) {
List<String> hosts = new ArrayList<String> (1);
hosts.add(targethost);
String rName = dnsToSwitchMapping.resolve(hosts).get(0);
if (rName != null)
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
}
Comparator<DatanodeInfo> comparator = checkForStaleNodes ?
new DFSUtil.DecomStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR;
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned/stale datanodes to the bottom
Arrays.sort(b.getLocations(), comparator);
}
}
CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
final String firstkey) {
return new CyclicIteration<String, DatanodeDescriptor>(
datanodeMap, firstkey);
}
/** @return the datanode descriptor for the host. */
public DatanodeDescriptor getDatanodeByHost(final String host) {
return host2DatanodeMap.getDatanodeByHost(host);
}
/** Get a datanode descriptor given corresponding storageID */
DatanodeDescriptor getDatanode(final String storageID) {
return datanodeMap.get(storageID);
}
/**
* Get data node by storage ID.
*
* @param nodeID
* @return DatanodeDescriptor or null if the node is not found.
* @throws UnregisteredNodeException
*/
public DatanodeDescriptor getDatanode(DatanodeID nodeID
) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
if (node == null)
return null;
if (!node.getXferAddr().equals(nodeID.getXferAddr())) {
final UnregisteredNodeException e = new UnregisteredNodeException(
nodeID, node);
NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
+ e.getLocalizedMessage());
throw e;
}
return node;
}
/** Prints information about all datanodes. */
void datanodeDump(final PrintWriter out) {
synchronized (datanodeMap) {
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
out.println(node.dumpDatanode());
}
}
}
/**
* Remove a datanode descriptor.
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo) {
assert namesystem.hasWriteLock();
heartbeatManager.removeDatanode(nodeInfo);
blockManager.removeBlocksAssociatedTo(nodeInfo);
networktopology.remove(nodeInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo);
}
namesystem.checkSafeMode();
}
/**
* Remove a datanode
* @throws UnregisteredNodeException
*/
public void removeDatanode(final DatanodeID node
) throws UnregisteredNodeException {
namesystem.writeLock();
try {
final DatanodeDescriptor descriptor = getDatanode(node);
if (descriptor != null) {
removeDatanode(descriptor);
} else {
NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
+ node + " does not exist");
}
} finally {
namesystem.writeUnlock();
}
}
/** Remove a dead datanode. */
void removeDeadDatanode(final DatanodeID nodeID) {
synchronized(datanodeMap) {
DatanodeDescriptor d;
try {
d = getDatanode(nodeID);
} catch(IOException e) {
d = null;
}
if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info(
"BLOCK* removeDeadDatanode: lost heartbeat from " + d);
removeDatanode(d);
}
}
}
/** Is the datanode dead? */
boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(Time.now() - heartbeatExpireInterval));
}
/** Add a datanode. */
private void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
synchronized(datanodeMap) {
host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
}
host2DatanodeMap.add(node);
networktopology.add(node);
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node + " is added to datanodeMap.");
}
}
/** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) {
final String key = node.getStorageID();
synchronized (datanodeMap) {
host2DatanodeMap.remove(datanodeMap.remove(key));
}
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
+ node + "): storage " + key
+ " is removed from datanodeMap.");
}
}
/* Resolve a node's network location */
private void resolveNetworkLocation (DatanodeDescriptor node) {
List<String> names = new ArrayList<String>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
names.add(node.getIpAddr());
} else {
names.add(node.getHostName());
}
// resolve its network location
List<String> rName = dnsToSwitchMapping.resolve(names);
String networkLocation;
if (rName == null) {
LOG.error("The resolve call returned null! Using " +
NetworkTopology.DEFAULT_RACK + " for host " + names);
networkLocation = NetworkTopology.DEFAULT_RACK;
} else {
networkLocation = rName.get(0);
}
node.setNetworkLocation(networkLocation);
}
private boolean inHostsList(DatanodeID node) {
return checkInList(node, hostsReader.getHosts(), false);
}
private boolean inExcludedHostsList(DatanodeID node) {
return checkInList(node, hostsReader.getExcludedHosts(), true);
}
/**
* Remove an already decommissioned data node who is neither in include nor
* exclude hosts lists from the the list of live or dead nodes. This is used
* to not display an already decommssioned data node to the operators.
* The operation procedure of making a already decommissioned data node not
* to be displayed is as following:
* <ol>
* <li>
* Host must have been in the include hosts list and the include hosts list
* must not be empty.
* </li>
* <li>
* Host is decommissioned by remaining in the include hosts list and added
* into the exclude hosts list. Name node is updated with the new
* information by issuing dfsadmin -refreshNodes command.
* </li>
* <li>
* Host is removed from both include hosts and exclude hosts lists. Name
* node is updated with the new informationby issuing dfsamin -refreshNodes
* command.
* <li>
* </ol>
*
* @param nodeList
* , array list of live or dead nodes.
*/
private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
// If the include list is empty, any nodes are welcomed and it does not
// make sense to exclude any nodes from the cluster. Therefore, no remove.
if (hostsReader.getHosts().isEmpty()) {
return;
}
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if ((!inHostsList(node)) && (!inExcludedHostsList(node))
&& node.isDecommissioned()) {
// Include list is not empty, an existing datanode does not appear
// in both include or exclude lists and it has been decommissioned.
// Remove it from the node list.
it.remove();
}
}
}
/**
* Check if the given DatanodeID is in the given (include or exclude) list.
*
* @param node the DatanodeID to check
* @param hostsList the list of hosts in the include/exclude file
* @param isExcludeList true if this is the exclude list
* @return true if the node is in the list, false otherwise
*/
private static boolean checkInList(final DatanodeID node,
final Set<String> hostsList,
final boolean isExcludeList) {
final InetAddress iaddr;
try {
iaddr = InetAddress.getByName(node.getIpAddr());
} catch (UnknownHostException e) {
LOG.warn("Unknown IP: " + node.getIpAddr(), e);
return isExcludeList;
}
// if include list is empty, host is in include list
if ( (!isExcludeList) && (hostsList.isEmpty()) ){
return true;
}
return // compare ipaddress(:port)
(hostsList.contains(iaddr.getHostAddress().toString()))
|| (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+ node.getXferPort()))
// compare hostname(:port)
|| (hostsList.contains(iaddr.getHostName()))
|| (hostsList.contains(iaddr.getHostName() + ":" + node.getXferPort()))
|| ((node instanceof DatanodeInfo) && hostsList
.contains(((DatanodeInfo) node).getHostName()));
}
/**
* Decommission the node if it is in exclude list.
*/
private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) {
// If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg)) {
startDecommission(nodeReg);
}
}
/**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
boolean checkDecommissionState(DatanodeDescriptor node) {
// Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
if (node.isDecommissionInProgress()) {
if (!blockManager.isReplicationInProgress(node)) {
node.setDecommissioned();
LOG.info("Decommission complete for node " + node);
}
}
return node.isDecommissioned();
}
/** Start decommissioning the specified datanode. */
private void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node + " with " +
node.numBlocks() + " blocks.");
heartbeatManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now());
// all the blocks that reside on this node have to be replicated.
checkDecommissionState(node);
}
}
/** Stop decommissioning the specified datanodes. */
void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning node " + node);
heartbeatManager.stopDecommission(node);
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
}
/**
* Generate new storage ID.
*
* @return unique storage ID
*
* Note: that collisions are still possible if somebody will try
* to bring in a data storage from a different cluster.
*/
private String newStorageID() {
String newID = null;
while(newID == null) {
newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
if (datanodeMap.get(newID) != null)
newID = null;
}
return newID;
}
/**
* Register the given datanode with the namenode. NB: the given
* registration is mutated and given back to the datanode.
*
* @param nodeReg the datanode registration
* @throws DisallowedDatanodeException if the registration request is
* denied because the datanode does not match includes/excludes
*/
public void registerDatanode(DatanodeRegistration nodeReg)
throws DisallowedDatanodeException {
String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) {
// Mostly called inside an RPC.
// But if not, use address passed by the data-node.
dnAddress = nodeReg.getIpAddr();
}
// Update the IP to the address of the RPC request that is
// registering this datanode.
nodeReg.setIpAddr(dnAddress);
nodeReg.setExportedKeys(blockManager.getBlockKeys());
// Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering.
if (!inHostsList(nodeReg)) {
throw new DisallowedDatanodeException(nodeReg);
}
NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
+ "node registration from " + nodeReg
+ " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getXferAddr());
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+ "node from name: " + nodeN);
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
removeDatanode(nodeN);
// physically remove node from datanodeMap
wipeDatanode(nodeN);
nodeN = null;
}
if (nodeS != null) {
if (nodeN == nodeS) {
// The same datanode has been just restarted to serve the same data
// storage. We do not need to remove old data blocks, the delta will
// be calculated on the next block report from the datanode
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+ "node restarted.");
}
} else {
// nodeS is found
/* The registering datanode is a replacement node for the existing
data storage, which from now on will be served by a new node.
If this message repeats, both nodes might have same storageID
by (insanely rare) random chance. User needs to restart one of the
nodes with its data cleared (or user can just remove the StorageID
value in "VERSION" file under the data directory of the datanode,
but this is might not work if VERSION file format has changed
*/
NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
+ "node " + nodeS
+ " is replaced by " + nodeReg +
" with the same storageID " +
nodeReg.getStorageID());
}
// update cluster map
getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
resolveNetworkLocation(nodeS);
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
checkDecommissioning(nodeS, dnAddress);
return;
}
// this is a new datanode serving a new data storage
if ("".equals(nodeReg.getStorageID())) {
// this data storage has never been registered
// it is either empty or was created by pre-storageID version of DFS
nodeReg.setStorageID(newStorageID());
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
+ "new storageID " + nodeReg.getStorageID() + " assigned.");
}
}
// register new datanode
DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
resolveNetworkLocation(nodeDescr);
addDatanode(nodeDescr);
checkDecommissioning(nodeDescr, dnAddress);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
}
/**
* Rereads conf to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
* checks if any of the hosts have changed states:
*/
public void refreshNodes(final Configuration conf) throws IOException {
refreshHostsReader(conf);
namesystem.writeLock();
try {
refreshDatanodes();
} finally {
namesystem.writeUnlock();
}
}
/** Reread include/exclude files. */
private void refreshHostsReader(Configuration conf) throws IOException {
// Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list.
if (conf == null) {
conf = new HdfsConfiguration();
}
hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
hostsReader.refresh();
}
/**
* 1. Added to hosts --> no further work needed here.
* 2. Removed from hosts --> mark AdminState as decommissioned.
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
private void refreshDatanodes() throws IOException {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node)) {
node.setDisallowed(true); // case 2.
} else {
if (inExcludedHostsList(node)) {
startDecommission(node); // case 3.
} else {
stopDecommission(node); // case 4.
}
}
}
}
/** @return the number of live datanodes. */
public int getNumLiveDataNodes() {
int numLive = 0;
synchronized (datanodeMap) {
for(DatanodeDescriptor dn : datanodeMap.values()) {
if (!isDatanodeDead(dn) ) {
numLive++;
}
}
}
return numLive;
}
/** @return the number of dead datanodes. */
public int getNumDeadDataNodes() {
int numDead = 0;
synchronized (datanodeMap) {
for(DatanodeDescriptor dn : datanodeMap.values()) {
if (isDatanodeDead(dn) ) {
numDead++;
}
}
}
return numDead;
}
/** @return list of datanodes where decommissioning is in progress. */
public List<DatanodeDescriptor> getDecommissioningNodes() {
namesystem.readLock();
try {
final List<DatanodeDescriptor> decommissioningNodes
= new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> results = getDatanodeListForReport(
DatanodeReportType.LIVE);
for(DatanodeDescriptor node : results) {
if (node.isDecommissionInProgress()) {
decommissioningNodes.add(node);
}
}
return decommissioningNodes;
} finally {
namesystem.readUnlock();
}
}
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
if (live == null && dead == null) {
throw new HadoopIllegalArgumentException("Both live and dead lists are null");
}
namesystem.readLock();
try {
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node)) {
if (dead != null) {
dead.add(node);
}
} else {
if (live != null) {
live.add(node);
}
}
}
} finally {
namesystem.readUnlock();
}
if (removeDecommissionNode) {
if (live != null) {
removeDecomNodeFromList(live);
}
if (dead != null) {
removeDecomNodeFromList(dead);
}
}
}
/**
* @return true if this cluster has ever consisted of multiple racks, even if
* it is not now a multi-rack cluster.
*/
boolean hasClusterEverBeenMultiRack() {
return hasClusterEverBeenMultiRack;
}
/**
* Check if the cluster now consists of multiple racks. If it does, and this
* is the first time it's consisted of multiple racks, then process blocks
* that may now be misreplicated.
*
* @param node DN which caused cluster to become multi-rack. Used for logging.
*/
@VisibleForTesting
void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
String message = "DN " + node + " joining cluster has expanded a formerly " +
"single-rack cluster to be multi-rack. ";
if (namesystem.isPopulatingReplQueues()) {
message += "Re-checking all blocks for replication, since they should " +
"now be replicated cross-rack";
LOG.info(message);
} else {
message += "Not checking for mis-replicated blocks because this NN is " +
"not yet processing repl queues.";
LOG.debug(message);
}
hasClusterEverBeenMultiRack = true;
if (namesystem.isPopulatingReplQueues()) {
blockManager.processMisReplicatedBlocks();
}
}
}
/**
* Parse a DatanodeID from a hosts file entry
* @param hostLine of form [hostname|ip][:port]?
* @return DatanodeID constructed from the given string
*/
private DatanodeID parseDNFromHostsEntry(String hostLine) {
DatanodeID dnId;
String hostStr;
int port;
int idx = hostLine.indexOf(':');
if (-1 == idx) {
hostStr = hostLine;
port = DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT;
} else {
hostStr = hostLine.substring(0, idx);
port = Integer.valueOf(hostLine.substring(idx));
}
if (InetAddresses.isInetAddress(hostStr)) {
// The IP:port is sufficient for listing in a report
dnId = new DatanodeID(hostStr, "", "", port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
} else {
String ipAddr = "";
try {
ipAddr = InetAddress.getByName(hostStr).getHostAddress();
} catch (UnknownHostException e) {
LOG.warn("Invalid hostname " + hostStr + " in hosts file");
}
dnId = new DatanodeID(ipAddr, hostStr, "", port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
}
return dnId;
}
/** For generating datanode reports */
public List<DatanodeDescriptor> getDatanodeListForReport(
final DatanodeReportType type) {
boolean listLiveNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.LIVE;
boolean listDeadNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.DEAD;
HashMap<String, String> mustList = new HashMap<String, String>();
if (listDeadNodes) {
// Put all nodes referenced in the hosts files in the map
Iterator<String> it = hostsReader.getHosts().iterator();
while (it.hasNext()) {
mustList.put(it.next(), "");
}
it = hostsReader.getExcludedHosts().iterator();
while (it.hasNext()) {
mustList.put(it.next(), "");
}
}
ArrayList<DatanodeDescriptor> nodes = null;
synchronized(datanodeMap) {
nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
mustList.size());
Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn = it.next();
final boolean isDead = isDatanodeDead(dn);
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
nodes.add(dn);
}
// Remove any nodes we know about from the map
try {
InetAddress inet = InetAddress.getByName(dn.getIpAddr());
// compare hostname(:port)
mustList.remove(inet.getHostName());
mustList.remove(inet.getHostName()+":"+dn.getXferPort());
// compare ipaddress(:port)
mustList.remove(inet.getHostAddress().toString());
mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getXferPort());
} catch (UnknownHostException e) {
mustList.remove(dn.getName());
mustList.remove(dn.getIpAddr());
LOG.warn(e);
}
}
}
if (listDeadNodes) {
Iterator<String> it = mustList.keySet().iterator();
while (it.hasNext()) {
// The remaining nodes are ones that are referenced by the hosts
// files but that we do not know about, ie that we have never
// head from. Eg. a host that is no longer part of the cluster
// or a bogus entry was given in the hosts files
DatanodeID dnId = parseDNFromHostsEntry(it.next());
DatanodeDescriptor dn = new DatanodeDescriptor(dnId);
dn.setLastUpdate(0); // Consider this node dead for reporting
nodes.add(dn);
}
}
return nodes;
}
private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
node.setLastUpdate(0);
}
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final String blockPoolId,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int maxTransfers, int failedVolumes
) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
remaining, blockPoolUsed, xceiverCount, failedVolumes);
//check lease recovery
BlockInfoUnderConstruction[] blocks = nodeinfo
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
for (BlockInfoUnderConstruction b : blocks) {
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
.getBlockRecoveryId()));
}
return new DatanodeCommand[] { brCommand };
}
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
}
return new DatanodeCommand[0];
}
/**
* Tell all datanodes to use a new, non-persistent bandwidth value for
* dfs.balance.bandwidthPerSec.
*
* A system administrator can tune the balancer bandwidth parameter
* (dfs.datanode.balance.bandwidthPerSec) dynamically by calling
* "dfsadmin -setBalanacerBandwidth newbandwidth", at which point the
* following 'bandwidth' variable gets updated with the new value for each
* node. Once the heartbeat command is issued to update the value on the
* specified datanode, this value will be set back to 0.
*
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
* @throws IOException
*/
public void setBalancerBandwidth(long bandwidth) throws IOException {
synchronized(datanodeMap) {
for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
nodeInfo.setBalancerBandwidth(bandwidth);
}
}
}
public void markAllDatanodesStale() {
LOG.info("Marking all datandoes as stale");
synchronized (datanodeMap) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.markStaleAfterFailover();
}
}
}
/**
* Clear any actions that are queued up to be sent to the DNs
* on their next heartbeats. This includes block invalidations,
* recoveries, and replication requests.
*/
public void clearPendingQueues() {
synchronized (datanodeMap) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.clearBlockQueues();
}
}
}
@Override
public String toString() {
return getClass().getSimpleName() + ": " + host2DatanodeMap;
}
}