blob: dd59b6f9ec4c5d0e82812bbe7155258a0a7802fb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.net.DFSTopologyNodeImpl;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class extends the DatanodeInfo class with ephemeral information (eg
* health, capacity, what blocks are associated with the Datanode) that is
* private to the Namenode, ie this class is not exposed to clients.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeDescriptor extends DatanodeInfo {
public static final Logger LOG =
LoggerFactory.getLogger(DatanodeDescriptor.class);
public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
/** Block and targets pair */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public static class BlockTargetPair {
public final Block block;
public final DatanodeStorageInfo[] targets;
BlockTargetPair(Block block, DatanodeStorageInfo[] targets) {
this.block = block;
this.targets = targets;
}
}
/** A BlockTargetPair queue. */
private static class BlockQueue<E> {
private final Queue<E> blockq = new LinkedList<>();
/** Size of the queue */
synchronized int size() {return blockq.size();}
/** Enqueue */
synchronized boolean offer(E e) {
return blockq.offer(e);
}
/** Dequeue */
synchronized List<E> poll(int numBlocks) {
if (numBlocks <= 0 || blockq.isEmpty()) {
return null;
}
List<E> results = new ArrayList<>();
for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
results.add(blockq.poll());
}
return results;
}
/**
* Returns <tt>true</tt> if the queue contains the specified element.
*/
synchronized boolean contains(E e) {
return blockq.contains(e);
}
synchronized void clear() {
blockq.clear();
}
}
/**
* A list of CachedBlock objects on this datanode.
*/
public static class CachedBlocksList extends IntrusiveCollection<CachedBlock> {
public enum Type {
PENDING_CACHED,
CACHED,
PENDING_UNCACHED
}
private final DatanodeDescriptor datanode;
private final Type type;
CachedBlocksList(DatanodeDescriptor datanode, Type type) {
this.datanode = datanode;
this.type = type;
}
public DatanodeDescriptor getDatanode() {
return datanode;
}
public Type getType() {
return type;
}
}
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
private final LeavingServiceStatus leavingServiceStatus =
new LeavingServiceStatus();
private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
/**
* The blocks which we want to cache on this DataNode.
*/
private final CachedBlocksList pendingCached =
new CachedBlocksList(this, CachedBlocksList.Type.PENDING_CACHED);
/**
* The blocks which we know are cached on this datanode.
* This list is updated by periodic cache reports.
*/
private final CachedBlocksList cached =
new CachedBlocksList(this, CachedBlocksList.Type.CACHED);
/**
* The blocks which we want to uncache on this DataNode.
*/
private final CachedBlocksList pendingUncached =
new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
/**
* The time when the last batch of caching directives was sent, in
* monotonic milliseconds.
*/
private long lastCachingDirectiveSentTimeMs;
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
private boolean isAlive = false;
private boolean needKeyUpdate = false;
private boolean forceRegistration = false;
// A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling
// "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
// following 'bandwidth' variable gets updated with the new value for each
// node. Once the heartbeat command is issued to update the value on the
// specified datanode, this value will be set back to 0.
private long bandwidth;
/** A queue of blocks to be replicated by this datanode */
private final BlockQueue<BlockTargetPair> replicateBlocks =
new BlockQueue<>();
/** A queue of blocks to be recovered by this datanode */
private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
/** A set of blocks to be invalidated by this datanode */
private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
* while writing the block).
*/
private EnumCounters<StorageType> currApproxBlocksScheduled
= new EnumCounters<>(StorageType.class);
private EnumCounters<StorageType> prevApproxBlocksScheduled
= new EnumCounters<>(StorageType.class);
private long lastBlocksScheduledRollTime = 0;
private int volumeFailures = 0;
private VolumeFailureSummary volumeFailureSummary = null;
/**
* When set to true, the node is not in include list and is not allowed
* to communicate with the namenode
*/
private boolean disallowed = false;
// The number of replication work pending before targets are determined
private int PendingReplicationWithoutTargets = 0;
// HB processing can use it to tell if it is the first HB since DN restarted
private boolean heartbeatedSinceRegistration = false;
/**
* DatanodeDescriptor constructor
* @param nodeID id of the data node
*/
public DatanodeDescriptor(DatanodeID nodeID) {
super(nodeID);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
/**
* DatanodeDescriptor constructor
* @param nodeID id of the data node
* @param networkLocation location of the data node in network
*/
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) {
super(nodeID, networkLocation);
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
public CachedBlocksList getPendingCached() {
return pendingCached;
}
public CachedBlocksList getCached() {
return cached;
}
public CachedBlocksList getPendingUncached() {
return pendingUncached;
}
public boolean isAlive() {
return isAlive;
}
public void setAlive(boolean isAlive) {
this.isAlive = isAlive;
}
public synchronized boolean needKeyUpdate() {
return needKeyUpdate;
}
public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) {
this.needKeyUpdate = needKeyUpdate;
}
public LeavingServiceStatus getLeavingServiceStatus() {
return leavingServiceStatus;
}
@VisibleForTesting
public boolean isHeartbeatedSinceRegistration() {
return heartbeatedSinceRegistration;
}
@VisibleForTesting
public DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) {
return storageMap.get(storageID);
}
}
@VisibleForTesting
public DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) {
final Collection<DatanodeStorageInfo> storages = storageMap.values();
return storages.toArray(new DatanodeStorageInfo[storages.size()]);
}
}
public EnumSet<StorageType> getStorageTypes() {
EnumSet<StorageType> storageTypes = EnumSet.noneOf(StorageType.class);
for (DatanodeStorageInfo dsi : getStorageInfos()) {
storageTypes.add(dsi.getStorageType());
}
return storageTypes;
}
public StorageReport[] getStorageReports() {
final DatanodeStorageInfo[] infos = getStorageInfos();
final StorageReport[] reports = new StorageReport[infos.length];
for(int i = 0; i < infos.length; i++) {
reports[i] = infos[i].toStorageReport();
}
return reports;
}
boolean hasStaleStorages() {
synchronized (storageMap) {
for (DatanodeStorageInfo storage : storageMap.values()) {
if (storage.areBlockContentsStale()) {
return true;
}
}
return false;
}
}
public void resetBlocks() {
updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null);
this.invalidateBlocks.clear();
this.volumeFailures = 0;
// pendingCached, cached, and pendingUncached are protected by the
// FSN lock.
this.pendingCached.clear();
this.cached.clear();
this.pendingUncached.clear();
}
public void clearBlockQueues() {
synchronized (invalidateBlocks) {
this.invalidateBlocks.clear();
}
this.recoverBlocks.clear();
this.replicateBlocks.clear();
// pendingCached, cached, and pendingUncached are protected by the
// FSN lock.
this.pendingCached.clear();
this.cached.clear();
this.pendingUncached.clear();
}
public int numBlocks() {
int blocks = 0;
for (DatanodeStorageInfo entry : getStorageInfos()) {
blocks += entry.numBlocks();
}
return blocks;
}
/**
* Updates stats from datanode heartbeat.
*/
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
heartbeatedSinceRegistration = true;
}
/**
* process datanode heartbeat or stats initialization.
*/
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
setLastUpdate(Time.now());
setLastUpdateMonotonic(Time.monotonicNow());
rollBlocksScheduled(getLastUpdateMonotonic());
}
private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
long totalCapacity = 0;
long totalRemaining = 0;
long totalBlockPoolUsed = 0;
long totalDfsUsed = 0;
long totalNonDfsUsed = 0;
Set<DatanodeStorageInfo> failedStorageInfos = null;
// Decide if we should check for any missing StorageReport and mark it as
// failed. There are different scenarios.
// 1. When DN is running, a storage failed. Given the current DN
// implementation doesn't add recovered storage back to its storage list
// until DN restart, we can assume volFailures won't decrease
// during the current DN registration session.
// When volumeFailures == this.volumeFailures, it implies there is no
// state change. No need to check for failed storage. This is an
// optimization. Recent versions of the DataNode report a
// VolumeFailureSummary containing the date/time of the last volume
// failure. If that's available, then we check that instead for greater
// accuracy.
// 2. After DN restarts, volFailures might not increase and it is possible
// we still have new failed storage. For example, admins reduce
// available storages in configuration. Another corner case
// is the failed volumes might change after restart; a) there
// is one good storage A, one restored good storage B, so there is
// one element in storageReports and that is A. b) A failed. c) Before
// DN sends HB to NN to indicate A has failed, DN restarts. d) After DN
// restarts, storageReports has one element which is B.
final boolean checkFailedStorages;
if (volumeFailureSummary != null && this.volumeFailureSummary != null) {
checkFailedStorages = volumeFailureSummary.getLastVolumeFailureDate() >
this.volumeFailureSummary.getLastVolumeFailureDate();
} else {
checkFailedStorages = (volFailures > this.volumeFailures) ||
!heartbeatedSinceRegistration;
}
if (checkFailedStorages) {
if (this.volumeFailures != volFailures) {
LOG.info("Number of failed storages changes from {} to {}",
this.volumeFailures, volFailures);
}
synchronized (storageMap) {
failedStorageInfos =
new HashSet<>(storageMap.values());
}
}
setCacheCapacity(cacheCapacity);
setCacheUsed(cacheUsed);
setXceiverCount(xceiverCount);
this.volumeFailures = volFailures;
this.volumeFailureSummary = volumeFailureSummary;
for (StorageReport report : reports) {
DatanodeStorageInfo storage = updateStorage(report.getStorage());
if (checkFailedStorages) {
failedStorageInfos.remove(storage);
}
storage.receivedHeartbeat(report);
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
totalNonDfsUsed += report.getNonDfsUsed();
}
// Update total metrics for the node.
setCapacity(totalCapacity);
setRemaining(totalRemaining);
setBlockPoolUsed(totalBlockPoolUsed);
setDfsUsed(totalDfsUsed);
setNonDfsUsed(totalNonDfsUsed);
if (checkFailedStorages) {
updateFailedStorage(failedStorageInfos);
}
long storageMapSize;
synchronized (storageMap) {
storageMapSize = storageMap.size();
}
if (storageMapSize != reports.length) {
pruneStorageMap(reports);
}
}
/**
* Remove stale storages from storageMap. We must not remove any storages
* as long as they have associated block replicas.
*/
private void pruneStorageMap(final StorageReport[] reports) {
synchronized (storageMap) {
LOG.debug("Number of storages reported in heartbeat={};"
+ " Number of storages in storageMap={}", reports.length,
storageMap.size());
HashMap<String, DatanodeStorageInfo> excessStorages;
// Init excessStorages with all known storages.
excessStorages = new HashMap<>(storageMap);
// Remove storages that the DN reported in the heartbeat.
for (final StorageReport report : reports) {
excessStorages.remove(report.getStorage().getStorageID());
}
// For each remaining storage, remove it if there are no associated
// blocks.
for (final DatanodeStorageInfo storageInfo : excessStorages.values()) {
if (storageInfo.numBlocks() == 0) {
DatanodeStorageInfo info =
storageMap.remove(storageInfo.getStorageID());
if (!hasStorageType(info.getStorageType())) {
// we removed a storage, and as result there is no more such storage
// type, inform the parent about this.
if (getParent() instanceof DFSTopologyNodeImpl) {
((DFSTopologyNodeImpl) getParent()).childRemoveStorage(getName(),
info.getStorageType());
}
}
LOG.info("Removed storage {} from DataNode {}", storageInfo, this);
} else {
// This can occur until all block reports are received.
LOG.debug("Deferring removal of stale storage {} with {} blocks",
storageInfo, storageInfo.numBlocks());
}
}
}
}
private void updateFailedStorage(
Set<DatanodeStorageInfo> failedStorageInfos) {
for (DatanodeStorageInfo storageInfo : failedStorageInfos) {
if (storageInfo.getState() != DatanodeStorage.State.FAILED) {
LOG.info("{} failed.", storageInfo);
storageInfo.setState(DatanodeStorage.State.FAILED);
}
}
}
private static class BlockIterator implements Iterator<BlockInfo> {
private int index = 0;
private final List<Iterator<BlockInfo>> iterators;
private BlockIterator(final int startBlock,
final DatanodeStorageInfo... storages) {
if(startBlock < 0) {
throw new IllegalArgumentException(
"Illegal value startBlock = " + startBlock);
}
List<Iterator<BlockInfo>> iterators = new ArrayList<>();
int s = startBlock;
int sumBlocks = 0;
for (DatanodeStorageInfo e : storages) {
int numBlocks = e.numBlocks();
sumBlocks += numBlocks;
if(sumBlocks <= startBlock) {
s -= numBlocks;
} else {
iterators.add(e.getBlockIterator());
}
}
this.iterators = Collections.unmodifiableList(iterators);
// skip to the storage containing startBlock
for(; s > 0 && hasNext(); s--) {
next();
}
}
@Override
public boolean hasNext() {
update();
return index < iterators.size() && iterators.get(index).hasNext();
}
@Override
public BlockInfo next() {
update();
return iterators.get(index).next();
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove unsupported.");
}
private void update() {
while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) {
index++;
}
}
}
Iterator<BlockInfo> getBlockIterator() {
return getBlockIterator(0);
}
/**
* Get iterator, which starts iterating from the specified block.
*/
Iterator<BlockInfo> getBlockIterator(final int startBlock) {
return new BlockIterator(startBlock, getStorageInfos());
}
void incrementPendingReplicationWithoutTargets() {
PendingReplicationWithoutTargets++;
}
void decrementPendingReplicationWithoutTargets() {
PendingReplicationWithoutTargets--;
}
/**
* Store block replication work.
*/
void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
assert(block != null && targets != null && targets.length > 0);
replicateBlocks.offer(new BlockTargetPair(block, targets));
}
/**
* Store block recovery work.
*/
void addBlockToBeRecovered(BlockInfo block) {
if(recoverBlocks.contains(block)) {
// this prevents adding the same block twice to the recovery queue
BlockManager.LOG.info(block + " is already in the recovery queue");
return;
}
recoverBlocks.offer(block);
}
/**
* Store block invalidation work.
*/
void addBlocksToBeInvalidated(List<Block> blocklist) {
assert(blocklist != null && blocklist.size() > 0);
synchronized (invalidateBlocks) {
for(Block blk : blocklist) {
invalidateBlocks.add(blk);
}
}
}
/**
* The number of work items that are pending to be replicated
*/
int getNumberOfBlocksToBeReplicated() {
return PendingReplicationWithoutTargets + replicateBlocks.size();
}
public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}
public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) {
List<BlockInfo> blocks = recoverBlocks.poll(maxTransfers);
if(blocks == null)
return null;
return blocks.toArray(new BlockInfo[blocks.size()]);
}
/**
* Remove the specified number of blocks to be invalidated
*/
public Block[] getInvalidateBlocks(int maxblocks) {
synchronized (invalidateBlocks) {
Block[] deleteList = invalidateBlocks.pollToArray(new Block[Math.min(
invalidateBlocks.size(), maxblocks)]);
return deleteList.length == 0 ? null : deleteList;
}
}
/**
* Find whether the datanode contains good storage of given type to
* place block of size <code>blockSize</code>.
*
* <p>Currently datanode only cares about the storage type, in this
* method, the first storage of given type we see is returned.
*
* @param t requested storage type
* @param blockSize requested block size
*/
public DatanodeStorageInfo chooseStorage4Block(StorageType t,
long blockSize) {
final long requiredSize =
blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
final long scheduledSize = blockSize * getBlocksScheduled(t);
long remaining = 0;
DatanodeStorageInfo storage = null;
for (DatanodeStorageInfo s : getStorageInfos()) {
if (s.getState() == State.NORMAL && s.getStorageType() == t) {
if (storage == null) {
storage = s;
}
long r = s.getRemaining();
if (r >= requiredSize) {
remaining += r;
}
}
}
if (requiredSize > remaining - scheduledSize) {
BlockPlacementPolicy.LOG.debug(
"The node {} does not have enough {} space (required={},"
+ " scheduled={}, remaining={}).",
this, t, requiredSize, scheduledSize, remaining);
return null;
}
return storage;
}
/**
* @return Approximate number of blocks currently scheduled to be written
* to the given storage type of this datanode.
*/
public int getBlocksScheduled(StorageType t) {
return (int)(currApproxBlocksScheduled.get(t)
+ prevApproxBlocksScheduled.get(t));
}
/**
* @return Approximate number of blocks currently scheduled to be written
* to this datanode.
*/
public int getBlocksScheduled() {
return (int)(currApproxBlocksScheduled.sum()
+ prevApproxBlocksScheduled.sum());
}
/** Increment the number of blocks scheduled. */
void incrementBlocksScheduled(StorageType t) {
currApproxBlocksScheduled.add(t, 1);
}
/** Decrement the number of blocks scheduled. */
void decrementBlocksScheduled(StorageType t) {
if (prevApproxBlocksScheduled.get(t) > 0) {
prevApproxBlocksScheduled.subtract(t, 1);
} else if (currApproxBlocksScheduled.get(t) > 0) {
currApproxBlocksScheduled.subtract(t, 1);
}
// its ok if both counters are zero.
}
/** Adjusts curr and prev number of blocks scheduled every few minutes. */
private void rollBlocksScheduled(long now) {
if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
prevApproxBlocksScheduled.set(currApproxBlocksScheduled);
currApproxBlocksScheduled.reset();
lastBlocksScheduledRollTime = now;
}
}
@Override
public int hashCode() {
// Super implementation is sufficient
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
// Sufficient to use super equality as datanodes are uniquely identified
// by DatanodeID
return (this == obj) || super.equals(obj);
}
/** Leaving service status. */
public class LeavingServiceStatus {
private int underReplicatedBlocks;
private int outOfServiceOnlyReplicas;
private int underReplicatedInOpenFiles;
private long startTime;
synchronized void set(int underRepBlocks,
int outOfServiceOnlyRep, int underRepInOpenFiles) {
if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return;
}
underReplicatedBlocks = underRepBlocks;
outOfServiceOnlyReplicas = outOfServiceOnlyRep;
underReplicatedInOpenFiles = underRepInOpenFiles;
}
/** @return the number of under-replicated blocks */
public synchronized int getUnderReplicatedBlocks() {
if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
return underReplicatedBlocks;
}
/** @return the number of blocks with out-of-service-only replicas */
public synchronized int getOutOfServiceOnlyReplicas() {
if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
return outOfServiceOnlyReplicas;
}
/** @return the number of under-replicated blocks in open files */
public synchronized int getUnderReplicatedInOpenFiles() {
if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
return underReplicatedInOpenFiles;
}
/** Set start time */
public synchronized void setStartTime(long time) {
if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return;
}
startTime = time;
}
/** @return start time */
public synchronized long getStartTime() {
if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
return startTime;
}
} // End of class DecommissioningStatus
/**
* Set the flag to indicate if this datanode is disallowed from communicating
* with the namenode.
*/
public void setDisallowed(boolean flag) {
disallowed = flag;
}
/** Is the datanode disallowed from communicating with the namenode? */
public boolean isDisallowed() {
return disallowed;
}
/**
* @return number of failed volumes in the datanode.
*/
public int getVolumeFailures() {
return volumeFailures;
}
/**
* Returns info about volume failures.
*
* @return info about volume failures, possibly null
*/
public VolumeFailureSummary getVolumeFailureSummary() {
return volumeFailureSummary;
}
/**
* @param nodeReg DatanodeID to update registration for.
*/
@Override
public void updateRegInfo(DatanodeID nodeReg) {
super.updateRegInfo(nodeReg);
// must re-process IBR after re-registration
for(DatanodeStorageInfo storage : getStorageInfos()) {
storage.setBlockReportCount(0);
}
heartbeatedSinceRegistration = false;
forceRegistration = false;
}
/**
* @return balancer bandwidth in bytes per second for this datanode
*/
public synchronized long getBalancerBandwidth() {
return this.bandwidth;
}
/**
* @param bandwidth balancer bandwidth in bytes per second for this datanode
*/
public synchronized void setBalancerBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
}
@Override
public String dumpDatanode() {
StringBuilder sb = new StringBuilder(super.dumpDatanode());
int repl = replicateBlocks.size();
if (repl > 0) {
sb.append(" ").append(repl).append(" blocks to be replicated;");
}
int inval = invalidateBlocks.size();
if (inval > 0) {
sb.append(" ").append(inval).append(" blocks to be invalidated;");
}
int recover = recoverBlocks.size();
if (recover > 0) {
sb.append(" ").append(recover).append(" blocks to be recovered;");
}
return sb.toString();
}
DatanodeStorageInfo updateStorage(DatanodeStorage s) {
synchronized (storageMap) {
DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
DFSTopologyNodeImpl parent = null;
if (getParent() instanceof DFSTopologyNodeImpl) {
parent = (DFSTopologyNodeImpl) getParent();
}
if (storage == null) {
LOG.info("Adding new storage ID {} for DN {}", s.getStorageID(),
getXferAddr());
StorageType type = s.getStorageType();
if (!hasStorageType(type) && parent != null) {
// we are about to add a type this node currently does not have,
// inform the parent that a new type is added to this datanode
parent.childAddStorage(getName(), s.getStorageType());
}
storage = new DatanodeStorageInfo(this, s);
storageMap.put(s.getStorageID(), storage);
} else if (storage.getState() != s.getState() ||
storage.getStorageType() != s.getStorageType()) {
// For backwards compatibility, make sure that the type and
// state are updated. Some reports from older datanodes do
// not include these fields so we may have assumed defaults.
StorageType oldType = storage.getStorageType();
StorageType newType = s.getStorageType();
if (oldType != newType && !hasStorageType(newType) && parent != null) {
// we are about to add a type this node currently does not have
// inform the parent that a new type is added to this datanode
// if old == new, nothing's changed. don't bother
parent.childAddStorage(getName(), newType);
}
storage.updateFromStorage(s);
storageMap.put(storage.getStorageID(), storage);
if (oldType != newType && !hasStorageType(oldType) && parent != null) {
// there is no more old type storage on this datanode, inform parent
// about this change.
parent.childRemoveStorage(getName(), oldType);
}
}
return storage;
}
}
/**
* @return The time at which we last sent caching directives to this
* DataNode, in monotonic milliseconds.
*/
public long getLastCachingDirectiveSentTimeMs() {
return this.lastCachingDirectiveSentTimeMs;
}
/**
* @param time The time at which we last sent caching directives to this
* DataNode, in monotonic milliseconds.
*/
public void setLastCachingDirectiveSentTimeMs(long time) {
this.lastCachingDirectiveSentTimeMs = time;
}
/**
* @return whether at least first block report has been received
*/
public boolean checkBlockReportReceived() {
if(this.getStorageInfos().length == 0) {
return false;
}
for(DatanodeStorageInfo storageInfo: this.getStorageInfos()) {
if(storageInfo.getBlockReportCount() == 0 )
return false;
}
return true;
}
public void setForceRegistration(boolean force) {
forceRegistration = force;
}
public boolean isRegistered() {
return isAlive() && !forceRegistration;
}
public boolean hasStorageType(StorageType type) {
for (DatanodeStorageInfo dnStorage : getStorageInfos()) {
if (dnStorage.getStorageType() == type) {
return true;
}
}
return false;
}
}