blob: 24ad6dcb4569d91658c5ceac5ec7b6959fa72aee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
* by the heartbeat manager lock.
*/
class HeartbeatManager implements DatanodeStatistics {
static final Logger LOG = LoggerFactory.getLogger(HeartbeatManager.class);
/**
* Stores a subset of the datanodeMap in DatanodeManager,
* containing nodes that are considered alive.
* The HeartbeatMonitor periodically checks for out-dated entries,
* and removes them from the list.
* It is synchronized by the heartbeat manager lock.
*/
private final List<DatanodeDescriptor> datanodes = new ArrayList<>();
/** Statistics, which are synchronized by the heartbeat manager lock. */
private final DatanodeStats stats = new DatanodeStats();
/** The time period to check for expired datanodes */
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
private final StopWatch heartbeatStopWatch = new StopWatch();
final Namesystem namesystem;
final BlockManager blockManager;
HeartbeatManager(final Namesystem namesystem,
final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
boolean avoidStaleDataNodesForWrite = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
long recheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
this.heartbeatRecheckInterval = staleInterval;
LOG.info("Setting heartbeat recheck interval to " + staleInterval
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
+ " is less than "
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
} else {
this.heartbeatRecheckInterval = recheckInterval;
}
}
void activate() {
heartbeatThread.start();
}
void close() {
heartbeatThread.interrupt();
try {
// This will no effect if the thread hasn't yet been started.
heartbeatThread.join(3000);
} catch (InterruptedException ignored) {
}
}
synchronized int getLiveDatanodeCount() {
return datanodes.size();
}
@Override
public long getCapacityTotal() {
return stats.getCapacityTotal();
}
@Override
public long getCapacityUsed() {
return stats.getCapacityUsed();
}
@Override
public float getCapacityUsedPercent() {
return stats.getCapacityUsedPercent();
}
@Override
public long getCapacityRemaining() {
return stats.getCapacityRemaining();
}
@Override
public float getCapacityRemainingPercent() {
return stats.getCapacityRemainingPercent();
}
@Override
public long getBlockPoolUsed() {
return stats.getBlockPoolUsed();
}
@Override
public float getPercentBlockPoolUsed() {
return stats.getPercentBlockPoolUsed();
}
@Override
public long getCapacityUsedNonDFS() {
return stats.getCapacityUsedNonDFS();
}
@Override
public int getXceiverCount() {
return stats.getXceiverCount();
}
@Override
public int getInServiceXceiverCount() {
return stats.getNodesInServiceXceiverCount();
}
@Override
public int getNumDatanodesInService() {
return stats.getNodesInService();
}
@Override
public long getCacheCapacity() {
return stats.getCacheCapacity();
}
@Override
public long getCacheUsed() {
return stats.getCacheUsed();
}
@Override
public synchronized long[] getStats() {
return new long[] {getCapacityTotal(),
getCapacityUsed(),
getCapacityRemaining(),
-1L,
-1L,
-1L,
-1L,
-1L};
}
@Override
public int getExpiredHeartbeats() {
return stats.getExpiredHeartbeats();
}
@Override
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return stats.getStatsMap();
}
synchronized void register(final DatanodeDescriptor d) {
if (!d.isAlive()) {
addDatanode(d);
//update its timestamp
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
}
synchronized DatanodeDescriptor[] getDatanodes() {
return datanodes.toArray(new DatanodeDescriptor[datanodes.size()]);
}
synchronized void addDatanode(final DatanodeDescriptor d) {
// update in-service node count
stats.add(d);
datanodes.add(d);
d.setAlive(true);
}
synchronized void removeDatanode(DatanodeDescriptor node) {
if (node.isAlive()) {
stats.subtract(node);
datanodes.remove(node);
node.setAlive(false);
}
}
synchronized void updateHeartbeat(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
}
synchronized void startDecommission(final DatanodeDescriptor node) {
if (!node.isAlive()) {
LOG.info("Dead node {} is decommissioned immediately.", node);
node.setDecommissioned();
} else {
stats.subtract(node);
node.startDecommission();
stats.add(node);
}
}
synchronized void stopDecommission(final DatanodeDescriptor node) {
LOG.info("Stopping decommissioning of {} node {}",
node.isAlive() ? "live" : "dead", node);
if (!node.isAlive()) {
node.stopDecommission();
} else {
stats.subtract(node);
node.stopDecommission();
stats.add(node);
}
}
@VisibleForTesting
void restartHeartbeatStopWatch() {
heartbeatStopWatch.reset().start();
}
@VisibleForTesting
boolean shouldAbortHeartbeatCheck(long offset) {
long elapsed = heartbeatStopWatch.now(TimeUnit.MILLISECONDS);
return elapsed + offset > heartbeatRecheckInterval;
}
/**
* Check if there are any expired heartbeats, and if so,
* whether any blocks have to be re-replicated.
* While removing dead datanodes, make sure that only one datanode is marked
* dead at a time within the synchronized section. Otherwise, a cascading
* effect causes more datanodes to be declared dead.
* Check if there are any failed storage and if so,
* Remove all the blocks on the storage. It also covers the following less
* common scenarios. After DatanodeStorage is marked FAILED, it is still
* possible to receive IBR for this storage.
* 1) DN could deliver IBR for failed storage due to its implementation.
* a) DN queues a pending IBR request.
* b) The storage of the block fails.
* c) DN first sends HB, NN will mark the storage FAILED.
* d) DN then sends the pending IBR request.
* 2) SBN processes block request from pendingDNMessages.
* It is possible to have messages in pendingDNMessages that refer
* to some failed storage.
* a) SBN receives a IBR and put it in pendingDNMessages.
* b) The storage of the block fails.
* c) Edit log replay get the IBR from pendingDNMessages.
* Alternatively, we can resolve these scenarios with the following approaches.
* A. Make sure DN don't deliver IBR for failed storage.
* B. Remove all blocks in PendingDataNodeMessages for the failed storage
* when we remove all blocks from BlocksMap for that storage.
*/
@VisibleForTesting
void heartbeatCheck() {
final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (namesystem.isInStartupSafeMode()) {
return;
}
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
// locate the first failed storage that isn't on a dead node.
DatanodeStorageInfo failedStorage = null;
// check the number of stale nodes
int numOfStaleNodes = 0;
int numOfStaleStorages = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
// check if an excessive GC pause has occurred
if (shouldAbortHeartbeatCheck(0)) {
return;
}
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
}
if (d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++;
}
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
for(DatanodeStorageInfo storageInfo : storageInfos) {
if (storageInfo.areBlockContentsStale()) {
numOfStaleStorages++;
}
if (failedStorage == null &&
storageInfo.areBlocksOnFailedStorage() &&
d != dead) {
failedStorage = storageInfo;
}
}
}
// Set the number of stale nodes in the DatanodeManager
dm.setNumStaleNodes(numOfStaleNodes);
dm.setNumStaleStorages(numOfStaleStorages);
}
allAlive = dead == null && failedStorage == null;
if (!allAlive && namesystem.isInStartupSafeMode()) {
return;
}
if (dead != null) {
// acquire the fsnamesystem lock, and then remove the dead node.
blockManager.writeLock();
try {
dm.removeDeadDatanode(dead);
} finally {
blockManager.writeUnlock();
}
}
if (failedStorage != null) {
// acquire the fsnamesystem lock, and remove blocks on the storage.
blockManager.writeLock();
try {
blockManager.removeBlocksAssociatedTo(failedStorage);
} finally {
blockManager.writeUnlock();
}
}
}
}
/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
private long lastHeartbeatCheck;
private long lastBlockKeyUpdate;
@Override
public void run() {
while(namesystem.isRunning()) {
restartHeartbeatStopWatch();
try {
final long now = Time.monotonicNow();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.setNeedKeyUpdate(true);
}
}
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ignored) {
}
// avoid declaring nodes dead for another cycle if a GC pause lasts
// longer than the node recheck interval
if (shouldAbortHeartbeatCheck(-5000)) {
LOG.warn("Skipping next heartbeat scan due to excessive pause");
lastHeartbeatCheck = Time.monotonicNow();
}
}
}
}
}