blob: 0bff1bf52f7782b33fb1ac429a0acba1750e9a39 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
* by the heartbeat manager lock.
*/
class HeartbeatManager implements DatanodeStatistics {
static final Log LOG = LogFactory.getLog(HeartbeatManager.class);
/**
* Stores a subset of the datanodeMap in DatanodeManager,
* containing nodes that are considered alive.
* The HeartbeatMonitor periodically checks for out-dated entries,
* and removes them from the list.
* It is synchronized by the heartbeat manager lock.
*/
private final List<DatanodeDescriptor> datanodes = new ArrayList<DatanodeDescriptor>();
/** Statistics, which are synchronized by the heartbeat manager lock. */
private final Stats stats = new Stats();
/** The time period to check for expired datanodes */
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
final Namesystem namesystem;
final BlockManager blockManager;
HeartbeatManager(final Namesystem namesystem,
final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
boolean avoidStaleDataNodesForWrite = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
long recheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
this.heartbeatRecheckInterval = staleInterval;
LOG.info("Setting heartbeat recheck interval to " + staleInterval
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
+ " is less than "
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
} else {
this.heartbeatRecheckInterval = recheckInterval;
}
}
void activate(Configuration conf) {
heartbeatThread.start();
}
void close() {
heartbeatThread.interrupt();
try {
// This will no effect if the thread hasn't yet been started.
heartbeatThread.join(3000);
} catch (InterruptedException e) {
}
}
synchronized int getLiveDatanodeCount() {
return datanodes.size();
}
@Override
public synchronized long getCapacityTotal() {
return stats.capacityTotal;
}
@Override
public synchronized long getCapacityUsed() {
return stats.capacityUsed;
}
@Override
public synchronized float getCapacityUsedPercent() {
return DFSUtil.getPercentUsed(stats.capacityUsed, stats.capacityTotal);
}
@Override
public synchronized long getCapacityRemaining() {
return stats.capacityRemaining;
}
@Override
public synchronized float getCapacityRemainingPercent() {
return DFSUtil.getPercentRemaining(
stats.capacityRemaining, stats.capacityTotal);
}
@Override
public synchronized long getBlockPoolUsed() {
return stats.blockPoolUsed;
}
@Override
public synchronized float getPercentBlockPoolUsed() {
return DFSUtil.getPercentUsed(stats.blockPoolUsed, stats.capacityTotal);
}
@Override
public synchronized long getCapacityUsedNonDFS() {
final long nonDFSUsed = stats.capacityTotal
- stats.capacityRemaining - stats.capacityUsed;
return nonDFSUsed < 0L? 0L : nonDFSUsed;
}
@Override
public synchronized int getXceiverCount() {
return stats.xceiverCount;
}
@Override
public synchronized long[] getStats() {
return new long[] {getCapacityTotal(),
getCapacityUsed(),
getCapacityRemaining(),
-1L,
-1L,
-1L,
getBlockPoolUsed()};
}
@Override
public synchronized int getExpiredHeartbeats() {
return stats.expiredHeartbeats;
}
synchronized void register(final DatanodeDescriptor d) {
if (!datanodes.contains(d)) {
addDatanode(d);
//update its timestamp
d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
}
}
synchronized DatanodeDescriptor[] getDatanodes() {
return datanodes.toArray(new DatanodeDescriptor[datanodes.size()]);
}
synchronized void addDatanode(final DatanodeDescriptor d) {
datanodes.add(d);
d.isAlive = true;
}
synchronized void removeDatanode(DatanodeDescriptor node) {
if (node.isAlive) {
stats.subtract(node);
datanodes.remove(node);
node.isAlive = false;
}
}
synchronized void updateHeartbeat(final DatanodeDescriptor node,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int failedVolumes) {
stats.subtract(node);
node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, failedVolumes);
stats.add(node);
}
synchronized void startDecommission(final DatanodeDescriptor node) {
stats.subtract(node);
node.startDecommission();
stats.add(node);
}
synchronized void stopDecommission(final DatanodeDescriptor node) {
stats.subtract(node);
node.stopDecommission();
stats.add(node);
}
/**
* Check if there are any expired heartbeats, and if so,
* whether any blocks have to be re-replicated.
* While removing dead datanodes, make sure that only one datanode is marked
* dead at a time within the synchronized section. Otherwise, a cascading
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (namesystem.isInStartupSafeMode()) {
return;
}
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
// check the number of stale nodes
int numOfStaleNodes = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
}
if (d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++;
}
}
// Set the number of stale nodes in the DatanodeManager
dm.setNumStaleNodes(numOfStaleNodes);
}
allAlive = dead == null;
if (!allAlive) {
// acquire the fsnamesystem lock, and then remove the dead node.
namesystem.writeLock();
try {
if (namesystem.isInStartupSafeMode()) {
return;
}
synchronized(this) {
dm.removeDeadDatanode(dead);
}
} finally {
namesystem.writeUnlock();
}
}
}
}
/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
private long lastHeartbeatCheck;
private long lastBlockKeyUpdate;
@Override
public void run() {
while(namesystem.isRunning()) {
try {
final long now = Time.now();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true;
}
}
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}
}
/** Datanode statistics.
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
private static class Stats {
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) {
capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
capacityTotal += node.getDfsUsed();
}
}
private void subtract(final DatanodeDescriptor node) {
capacityUsed -= node.getDfsUsed();
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
capacityTotal -= node.getDfsUsed();
}
}
/** Increment expired heartbeat counter. */
private void incrExpiredHeartbeats() {
expiredHeartbeats++;
}
}
}