blob: c500ab5a9bf422147fe57ec47a51ca13836dbf48 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Manages decommissioning and maintenance state for DataNodes. A background
* monitor thread periodically checks the status of DataNodes that are
* decommissioning or entering maintenance state.
* <p>
* A DataNode can be decommissioned in a few situations:
* <ul>
* <li>If a DN is dead, it is decommissioned immediately.</li>
* <li>If a DN is alive, it is decommissioned after all of its blocks
* are sufficiently replicated. Merely under-replicated blocks do not
* block decommissioning as long as they are above a replication
* threshold.</li>
* </ul>
* In the second case, the DataNode transitions to a DECOMMISSION_INPROGRESS
* state and is tracked by the monitor thread. The monitor periodically scans
* through the list of insufficiently replicated blocks on these DataNodes to
* determine if they can be DECOMMISSIONED. The monitor also prunes this list
* as blocks become replicated, so monitor scans will become more efficient
* over time.
* <p>
* DECOMMISSION_INPROGRESS nodes that become dead do not progress to
* DECOMMISSIONED until they become live again. This prevents potential
* durability loss for singly-replicated blocks (see HDFS-6791).
* <p>
* DataNodes can also be put under maintenance state for any short duration
* maintenance operations. Unlike decommissioning, blocks are not always
* re-replicated for the DataNodes to enter maintenance state. When the
* blocks are replicated at least dfs.namenode.maintenance.replication.min,
* DataNodes transition to IN_MAINTENANCE state. Otherwise, just like
* decommissioning, DataNodes transition to ENTERING_MAINTENANCE state and
* wait for the blocks to be sufficiently replicated and then transition to
* IN_MAINTENANCE state. The block replication factor is relaxed for a maximum
* of maintenance expiry time. When DataNodes don't transition or join the
* cluster back by expiry time, blocks are re-replicated just as in
* decommissioning case as to avoid read or write performance degradation.
* <p>
* This class depends on the FSNamesystem lock for synchronization.
*/
@InterfaceAudience.Private
public class DatanodeAdminManager {
private static final Logger LOG =
LoggerFactory.getLogger(DatanodeAdminManager.class);
private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager hbManager;
private final ScheduledExecutorService executor;
private DatanodeAdminMonitorInterface monitor = null;
DatanodeAdminManager(final Namesystem namesystem,
final BlockManager blockManager, final HeartbeatManager hbManager) {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.hbManager = hbManager;
executor = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("DatanodeAdminMonitor-%d")
.setDaemon(true).build());
}
/**
* Start the DataNode admin monitor thread.
* @param conf
*/
void activate(Configuration conf) {
final int intervalSecs = (int) conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
checkArgument(intervalSecs >= 0, "Cannot set a negative " +
"value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
int blocksPerInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
final String deprecatedKey =
"dfs.namenode.decommission.nodes.per.interval";
final String strNodes = conf.get(deprecatedKey);
if (strNodes != null) {
LOG.warn("Deprecated configuration key {} will be ignored.",
deprecatedKey);
LOG.warn("Please update your configuration to use {} instead.",
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
}
checkArgument(blocksPerInterval > 0,
"Must set a positive value for "
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
final int maxConcurrentTrackedNodes = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
DFSConfigKeys
.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
"value for "
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
Class cls = null;
try {
cls = conf.getClass(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MONITOR_CLASS,
Class.forName(DFSConfigKeys
.DFS_NAMENODE_DECOMMISSION_MONITOR_CLASS_DEFAULT));
monitor =
(DatanodeAdminMonitorInterface)ReflectionUtils.newInstance(cls, conf);
monitor.setBlockManager(blockManager);
monitor.setNameSystem(namesystem);
monitor.setDatanodeAdminManager(this);
} catch (Exception e) {
throw new RuntimeException("Unable to create the Decommission monitor " +
"from "+cls, e);
}
executor.scheduleWithFixedDelay(monitor, intervalSecs, intervalSecs,
TimeUnit.SECONDS);
LOG.debug("Activating DatanodeAdminManager with interval {} seconds, " +
"{} max blocks per interval, " +
"{} max concurrently tracked nodes.", intervalSecs,
blocksPerInterval, maxConcurrentTrackedNodes);
}
/**
* Stop the admin monitor thread, waiting briefly for it to terminate.
*/
void close() {
executor.shutdownNow();
try {
executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {}
}
/**
* Start decommissioning the specified datanode.
* @param node
*/
@VisibleForTesting
public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.startDecommission(node);
// hbManager.startDecommission will set dead node to decommissioned.
if (node.isDecommissionInProgress()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Starting decommission of {} {} with {} blocks",
node, storage, storage.numBlocks());
}
node.getLeavingServiceStatus().setStartTime(monotonicNow());
monitor.startTrackingNode(node);
}
} else {
LOG.trace("startDecommission: Node {} in {}, nothing to do.",
node, node.getAdminState());
}
}
/**
* Stop decommissioning the specified datanode.
* @param node
*/
@VisibleForTesting
public void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
// extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
blockManager.processExtraRedundancyBlocksOnInService(node);
}
// Remove from tracking in DatanodeAdminManager
monitor.stopTrackingNode(node);
} else {
LOG.trace("stopDecommission: Node {} in {}, nothing to do.",
node, node.getAdminState());
}
}
/**
* Start maintenance of the specified datanode.
* @param node
*/
@VisibleForTesting
public void startMaintenance(DatanodeDescriptor node,
long maintenanceExpireTimeInMS) {
// Even if the node is already in maintenance, we still need to adjust
// the expiration time.
node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS);
if (!node.isMaintenance()) {
// Update DN stats maintained by HeartbeatManager
hbManager.startMaintenance(node);
// hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
if (node.isEnteringMaintenance()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Starting maintenance of {} {} with {} blocks",
node, storage, storage.numBlocks());
}
node.getLeavingServiceStatus().setStartTime(monotonicNow());
}
// Track the node regardless whether it is ENTERING_MAINTENANCE or
// IN_MAINTENANCE to support maintenance expiration.
monitor.startTrackingNode(node);
} else {
LOG.trace("startMaintenance: Node {} in {}, nothing to do.",
node, node.getAdminState());
}
}
/**
* Stop maintenance of the specified datanode.
* @param node
*/
@VisibleForTesting
public void stopMaintenance(DatanodeDescriptor node) {
if (node.isMaintenance()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopMaintenance(node);
// extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (!node.isAlive()) {
// The node became dead when it was in maintenance, at which point
// the replicas weren't removed from block maps.
// When the node leaves maintenance, the replicas should be removed
// from the block maps to trigger the necessary replication to
// maintain the safety property of "# of live replicas + maintenance
// replicas" >= the expected redundancy.
blockManager.removeBlocksAssociatedTo(node);
} else {
// Even though putting nodes in maintenance node doesn't cause live
// replicas to match expected replication factor, it is still possible
// to have over replicated when the node leaves maintenance node.
// First scenario:
// a. Node became dead when it is at AdminStates.NORMAL, thus
// block is replicated so that 3 replicas exist on other nodes.
// b. Admins put the dead node into maintenance mode and then
// have the node rejoin the cluster.
// c. Take the node out of maintenance mode.
// Second scenario:
// a. With replication factor 3, set one replica to maintenance node,
// thus block has 1 maintenance replica and 2 live replicas.
// b. Change the replication factor to 2. The block will still have
// 1 maintenance replica and 2 live replicas.
// c. Take the node out of maintenance mode.
blockManager.processExtraRedundancyBlocksOnInService(node);
}
// Remove from tracking in DatanodeAdminManager
monitor.stopTrackingNode(node);
} else {
LOG.trace("stopMaintenance: Node {} in {}, nothing to do.",
node, node.getAdminState());
}
}
protected void setDecommissioned(DatanodeDescriptor dn) {
dn.setDecommissioned();
LOG.info("Decommissioning complete for node {}", dn);
}
protected void setInMaintenance(DatanodeDescriptor dn) {
dn.setInMaintenance();
LOG.info("Node {} has entered maintenance mode.", dn);
}
/**
* Checks whether a block is sufficiently replicated/stored for
* DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE datanodes. For replicated
* blocks or striped blocks, full-strength replication or storage is not
* always necessary, hence "sufficient".
* @return true if sufficient, else false.
*/
protected boolean isSufficient(BlockInfo block, BlockCollection bc,
NumberReplicas numberReplicas,
boolean isDecommission,
boolean isMaintenance) {
if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
// Block has enough replica, skip
LOG.trace("Block {} does not need replication.", block);
return true;
}
final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
numberReplicas);
final int numLive = numberReplicas.liveReplicas();
// Block is under-replicated
LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
numLive);
if (isDecommission && numExpected > numLive) {
if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
// Can decom a UC block as long as there will still be minReplicas
if (blockManager.hasMinStorage(block, numLive)) {
LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+ ">= minR ({})", block, numLive,
blockManager.getMinStorageNum(block));
return true;
} else {
LOG.trace("UC block {} insufficiently-replicated since numLive "
+ "({}) < minR ({})", block, numLive,
blockManager.getMinStorageNum(block));
}
} else {
// Can decom a non-UC as long as the default replication is met
if (numLive >= blockManager.getDefaultStorageNum(block)) {
return true;
}
}
}
if (isMaintenance
&& numLive >= blockManager.getMinReplicationToBeInMaintenance()) {
return true;
}
return false;
}
protected void logBlockReplicationInfo(BlockInfo block,
BlockCollection bc,
DatanodeDescriptor srcNode, NumberReplicas num,
Iterable<DatanodeStorageInfo> storages) {
if (!NameNode.blockStateChangeLog.isInfoEnabled()) {
return;
}
int curReplicas = num.liveReplicas();
int curExpectedRedundancy = blockManager.getExpectedRedundancyNum(block);
StringBuilder nodeList = new StringBuilder();
for (DatanodeStorageInfo storage : storages) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
nodeList.append(node).append(' ');
}
NameNode.blockStateChangeLog.info(
"Block: " + block + ", Expected Replicas: "
+ curExpectedRedundancy + ", live replicas: " + curReplicas
+ ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissioned()
+ ", decommissioning replicas: " + num.decommissioning()
+ ", maintenance replicas: " + num.maintenanceReplicas()
+ ", live entering maintenance replicas: "
+ num.liveEnteringMaintenanceReplicas()
+ ", replicas on stale nodes: " + num.replicasOnStaleNodes()
+ ", readonly replicas: " + num.readOnlyReplicas()
+ ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress() +
", Is current datanode entering maintenance: "
+ srcNode.isEnteringMaintenance());
}
@VisibleForTesting
public int getNumPendingNodes() {
return monitor.getPendingNodeCount();
}
@VisibleForTesting
public int getNumTrackedNodes() {
return monitor.getTrackedNodeCount();
}
@VisibleForTesting
public int getNumNodesChecked() {
return monitor.getNumNodesChecked();
}
@VisibleForTesting
public Queue<DatanodeDescriptor> getPendingNodes() {
return monitor.getPendingNodes();
}
@VisibleForTesting
void runMonitorForTest() throws ExecutionException, InterruptedException {
executor.submit(monitor).get();
}
}