blob: 19968f75fd4fb8cffb0897d75159e28467dbd5de [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
*/
@InterfaceAudience.Private
public class BlockManager {
static final Log LOG = LogFactory.getLog(BlockManager.class);
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
private final Namesystem namesystem;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
private volatile long pendingReplicationBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
private volatile long underReplicatedBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
private volatile long excessBlocksCount = 0L;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
return pendingReplicationBlocksCount;
}
/** Used by metrics */
public long getUnderReplicatedBlocksCount() {
return underReplicatedBlocksCount;
}
/** Used by metrics */
public long getCorruptReplicaBlocksCount() {
return corruptReplicaBlocksCount;
}
/** Used by metrics */
public long getScheduledReplicationBlocksCount() {
return scheduledReplicationBlocksCount;
}
/** Used by metrics */
public long getPendingDeletionBlocksCount() {
return invalidateBlocks.numBlocks();
}
/** Used by metrics */
public long getExcessBlocksCount() {
return excessBlocksCount;
}
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
/**
* Mapping: Block -> { INode, datanodes, self ref }
* Updated only in response to client-sent information.
*/
final BlocksMap blocksMap;
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
/** Blocks to be invalidated. */
private final InvalidateBlocks invalidateBlocks;
//
// Keeps a TreeSet for every named node. Each treeset contains
// a list of the blocks that are "extra" at that location. We'll
// eventually remove these extras.
// Mapping: StorageID -> TreeSet<Block>
//
public final Map<String, Collection<Block>> excessReplicateMap =
new TreeMap<String, Collection<Block>>();
//
// Store set of Blocks that need to be replicated 1 or more times.
// We also store pending replication-orders.
//
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
@VisibleForTesting
final PendingReplicationBlocks pendingReplications;
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/** The maximum number of outgoing replication streams
* a given node should have at one time
*/
int maxReplicationStreams;
/** Minimum copies needed or else write is disallowed */
public final short minReplication;
/** Default number of replicas */
public final int defaultReplication;
/** The maximum number of entries returned by getCorruptInodes() */
final int maxCorruptFilesReturned;
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
/** Last block index used for replication work. */
private int replIndex = 0;
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
invalidateBlocks = new InvalidateBlocks(datanodeManager);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance(
conf, stats, datanodeManager.getNetworkTopology());
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
this.maxCorruptFilesReturned = conf.getInt(
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
if (minR <= 0)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR + " <= 0");
if (maxR > Short.MAX_VALUE)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR + " > " + Short.MAX_VALUE);
if (minR > maxR)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR + " > "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR);
this.minReplication = (short)minR;
this.maxReplication = (short)maxR;
this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
: true;
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication);
LOG.info("maxReplicationStreams = " + maxReplicationStreams);
LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
}
private static BlockTokenSecretManager createBlockTokenSecretManager(
final Configuration conf) throws IOException {
final boolean isEnabled = conf.getBoolean(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + "=" + isEnabled);
if (!isEnabled) {
return null;
}
final long updateMin = conf.getLong(
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT);
final long lifetimeMin = conf.getLong(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT);
LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY
+ "=" + updateMin + " min(s), "
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
+ "=" + lifetimeMin + " min(s)");
return new BlockTokenSecretManager(true,
updateMin*60*1000L, lifetimeMin*60*1000L);
}
/** get the BlockTokenSecretManager */
BlockTokenSecretManager getBlockTokenSecretManager() {
return blockTokenSecretManager;
}
private boolean isBlockTokenEnabled() {
return blockTokenSecretManager != null;
}
/** Should the access keys be updated? */
boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
: false;
}
public void activate(Configuration conf) {
pendingReplications.start();
datanodeManager.activate(conf);
this.replicationThread.start();
}
public void close() {
if (pendingReplications != null) pendingReplications.stop();
blocksMap.close();
datanodeManager.close();
if (replicationThread != null) replicationThread.interrupt();
}
/** @return the datanodeManager */
public DatanodeManager getDatanodeManager() {
return datanodeManager;
}
/** @return the BlockPlacementPolicy */
public BlockPlacementPolicy getBlockPlacementPolicy() {
return blockplacement;
}
/** Set BlockPlacementPolicy */
public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
if (newpolicy == null) {
throw new HadoopIllegalArgumentException("newpolicy == null");
}
this.blockplacement = newpolicy;
}
/** Dump meta data to out. */
public void metaSave(PrintWriter out) {
assert namesystem.hasWriteLock();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(live, dead, false);
out.println("Live Datanodes: " + live.size());
out.println("Dead Datanodes: " + dead.size());
//
// Dump contents of neededReplication
//
synchronized (neededReplications) {
out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size());
for (Block block : neededReplications) {
List<DatanodeDescriptor> containingNodes =
new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> containingLiveReplicasNodes =
new ArrayList<DatanodeDescriptor>();
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
chooseSourceDatanode(block, containingNodes,
containingLiveReplicasNodes, numReplicas);
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
if (block instanceof BlockInfo) {
String fileName = ((BlockInfo)block).getINode().getFullPathName();
out.print(fileName + ": ");
}
// l: == live:, d: == decommissioned c: == corrupt e: == excess
out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" +
" l: " + numReplicas.liveReplicas() +
" d: " + numReplicas.decommissionedReplicas() +
" c: " + numReplicas.corruptReplicas() +
" e: " + numReplicas.excessReplicas() + ") ");
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(block);
for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
jt.hasNext();) {
DatanodeDescriptor node = jt.next();
String state = "";
if (corruptNodes != null && corruptNodes.contains(node)) {
state = "(corrupt)";
} else if (node.isDecommissioned() ||
node.isDecommissionInProgress()) {
state = "(decommissioned)";
}
out.print(" " + node + state + " : ");
}
out.println("");
}
}
// Dump blocks from pendingReplication
pendingReplications.metaSave(out);
// Dump blocks that are waiting to be deleted
invalidateBlocks.dump(out);
// Dump all datanodes
getDatanodeManager().datanodeDump(out);
}
/** @return maxReplicationStreams */
public int getMaxReplicationStreams() {
return maxReplicationStreams;
}
/**
* @param block
* @return true if the block has minimum replicas
*/
public boolean checkMinReplication(Block block) {
return (countNodes(block).liveReplicas() >= minReplication);
}
/**
* Commit a block of a file
*
* @param block block to be committed
* @param commitBlock - contains client reported block length and generation
* @return true if the block is changed to committed state.
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
private boolean commitBlock(final BlockInfoUnderConstruction block,
final Block commitBlock) throws IOException {
if (block.getBlockUCState() == BlockUCState.COMMITTED)
return false;
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
block.commitBlock(commitBlock);
return true;
}
/**
* Commit the last block of the file and mark it as complete if it has
* meets the minimum replication requirement
*
* @param fileINode file inode
* @param commitBlock - contains client reported block length and generation
* @return true if the last block is changed to committed state.
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
public boolean commitOrCompleteLastBlock(INodeFileUnderConstruction fileINode,
Block commitBlock) throws IOException {
if(commitBlock == null)
return false; // not committing, this is a block allocation retry
BlockInfo lastBlock = fileINode.getLastBlock();
if(lastBlock == null)
return false; // no blocks in file yet
if(lastBlock.isComplete())
return false; // already completed (e.g. by syncBlock)
final boolean b = commitBlock((BlockInfoUnderConstruction)lastBlock, commitBlock);
if(countNodes(lastBlock).liveReplicas() >= minReplication)
completeBlock(fileINode,fileINode.numBlocks()-1);
return b;
}
/**
* Convert a specified block of the file to a complete block.
* @param fileINode file
* @param blkIndex block index in the file
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
private BlockInfo completeBlock(final INodeFile fileINode,
final int blkIndex) throws IOException {
return completeBlock(fileINode, blkIndex, false);
}
public BlockInfo completeBlock(final INodeFile fileINode,
final int blkIndex, final boolean force) throws IOException {
if(blkIndex < 0)
return null;
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
if(curBlock.isComplete())
return curBlock;
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
if(!force && ucBlock.numNodes() < minReplication)
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
// replace penultimate block in file
fileINode.setBlock(blkIndex, completeBlock);
// replace block in the blocksMap
return blocksMap.replaceBlock(completeBlock);
}
private BlockInfo completeBlock(final INodeFile fileINode,
final BlockInfo block) throws IOException {
BlockInfo[] fileBlocks = fileINode.getBlocks();
for(int idx = 0; idx < fileBlocks.length; idx++)
if(fileBlocks[idx] == block) {
return completeBlock(fileINode, idx);
}
return block;
}
/**
* Convert the last block of the file to an under construction block.<p>
* The block is converted only if the file has blocks and the last one
* is a partial block (its size is less than the preferred block size).
* The converted block is returned to the client.
* The client uses the returned block locations to form the data pipeline
* for this block.<br>
* The methods returns null if there is no partial block at the end.
* The client is supposed to allocate a new block with the next call.
*
* @param fileINode file
* @return the last block locations if the block is partial or null otherwise
*/
public LocatedBlock convertLastBlockToUnderConstruction(
INodeFileUnderConstruction fileINode) throws IOException {
BlockInfo oldBlock = fileINode.getLastBlock();
if(oldBlock == null ||
fileINode.getPreferredBlockSize() == oldBlock.getNumBytes())
return null;
assert oldBlock == getStoredBlock(oldBlock) :
"last block of the file is not in blocksMap";
DatanodeDescriptor[] targets = getNodes(oldBlock);
BlockInfoUnderConstruction ucBlock =
fileINode.setLastBlock(oldBlock, targets);
blocksMap.replaceBlock(ucBlock);
// Remove block from replication queue.
updateNeededReplications(oldBlock, 0, 0);
// remove this block from the list of pending blocks to be deleted.
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
invalidateBlocks.remove(datanodeId, oldBlock);
}
final long fileLength = fileINode.computeContentSummary().getLength();
final long pos = fileLength - ucBlock.getNumBytes();
return createLocatedBlock(ucBlock, pos, AccessMode.WRITE);
}
/**
* Get all valid locations of the block
*/
private List<String> getValidLocations(Block block) {
ArrayList<String> machineSet =
new ArrayList<String>(blocksMap.numNodes(block));
for(Iterator<DatanodeDescriptor> it =
blocksMap.nodeIterator(block); it.hasNext();) {
String storageID = it.next().getStorageID();
// filter invalidate replicas
if(!invalidateBlocks.contains(storageID, block)) {
machineSet.add(storageID);
}
}
return machineSet;
}
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException {
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
blkSize = blocks[curBlk].getNumBytes();
assert blkSize > 0 : "Block of size 0";
if (curPos + blkSize > offset) {
break;
}
curPos += blkSize;
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
return Collections.<LocatedBlock>emptyList();
long endOff = offset + length;
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
do {
results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
&& curBlk < blocks.length
&& results.size() < nrBlocksToReturn);
return results;
}
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
final LocatedBlock lb = createLocatedBlock(blk, pos);
if (mode != null) {
setBlockToken(lb, mode);
}
return lb;
}
/** @return a LocatedBlock for the given block */
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
) throws IOException {
if (blk instanceof BlockInfoUnderConstruction) {
if (blk.isComplete()) {
throw new IOException(
"blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
+ ", blk=" + blk);
}
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeDescriptor[] locations = uc.getExpectedLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, locations, pos, false);
}
// get block locations
final int numCorruptNodes = countNodes(blk).corruptReplicas();
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for "
+ blk + " blockMap has " + numCorruptNodes
+ " but corrupt replicas map has " + numCorruptReplicas);
}
final int numNodes = blocksMap.numNodes(blk);
final boolean isCorrupt = numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
if (numMachines > 0) {
int j = 0;
for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
it.hasNext();) {
final DatanodeDescriptor d = it.next();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!isCorrupt && !replicaCorrupt))
machines[j++] = d;
}
}
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, machines, pos, isCorrupt);
}
/** Create a LocatedBlocks. */
public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
final long fileSizeExcludeBlocksUnderConstruction,
final boolean isFileUnderConstruction,
final long offset, final long length, final boolean needBlockToken
) throws IOException {
assert namesystem.hasReadOrWriteLock();
if (blocks == null) {
return null;
} else if (blocks.length == 0) {
return new LocatedBlocks(0, isFileUnderConstruction,
Collections.<LocatedBlock>emptyList(), null, false);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
}
final AccessMode mode = needBlockToken? AccessMode.READ: null;
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
blocks, offset, length, Integer.MAX_VALUE, mode);
final BlockInfo last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
final LocatedBlock lastlb = createLocatedBlock(last, lastPos, mode);
return new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
locatedblocks, lastlb, last.isComplete());
}
}
/** @return current access keys. */
public ExportedBlockKeys getBlockKeys() {
return isBlockTokenEnabled()? blockTokenSecretManager.exportKeys()
: ExportedBlockKeys.DUMMY_KEYS;
}
/** Generate a block token for the located block. */
public void setBlockToken(final LocatedBlock b,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (isBlockTokenEnabled()) {
b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(),
EnumSet.of(mode)));
}
}
void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
final DatanodeDescriptor nodeinfo) {
// check access key update
if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
nodeinfo.needKeyUpdate = false;
}
}
/**
* Clamp the specified replication between the minimum and the maximum
* replication levels.
*/
public short adjustReplication(short replication) {
return replication < minReplication? minReplication
: replication > maxReplication? maxReplication: replication;
}
/**
* Check whether the replication parameter is within the range
* determined by system configuration.
*/
public void verifyReplication(String src,
short replication,
String clientName) throws IOException {
if (replication >= minReplication && replication <= maxReplication) {
//common case. avoid building 'text'
return;
}
String text = "file " + src
+ ((clientName != null) ? " on client " + clientName : "")
+ ".\n"
+ "Requested replication " + replication;
if (replication > maxReplication)
throw new IOException(text + " exceeds maximum " + maxReplication);
if (replication < minReplication)
throw new IOException(text + " is less than the required minimum " +
minReplication);
}
/**
* return a list of blocks & their locations on <code>datanode</code> whose
* total size is <code>size</code>
*
* @param datanode on which blocks are located
* @param size total size of blocks
*/
public BlocksWithLocations getBlocks(DatanodeID datanode, long size
) throws IOException {
namesystem.readLock();
try {
namesystem.checkSuperuserPrivilege();
return getBlocksWithLocations(datanode, size);
} finally {
namesystem.readUnlock();
}
}
/** Get all blocks with location information from a datanode. */
private BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
final long size) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* getBlocks: "
+ "Asking for blocks from an unrecorded node " + datanode.getName());
throw new HadoopIllegalArgumentException(
"Datanode " + datanode.getName() + " not found.");
}
int numBlocks = node.numBlocks();
if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]);
}
Iterator<BlockInfo> iter = node.getBlockIterator();
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
// skip blocks
for(int i=0; i<startBlock; i++) {
iter.next();
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
}
return new BlocksWithLocations(
results.toArray(new BlockWithLocations[results.size()]));
}
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
final Iterator<? extends Block> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
}
node.resetBlocks();
invalidateBlocks.remove(node.getStorageID());
}
/**
* Adds block to list of blocks which will be invalidated on specified
* datanode and log the operation
*/
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
invalidateBlocks.add(block, datanode, true);
}
/**
* Adds block to list of blocks which will be invalidated on all its
* datanodes.
*/
private void addToInvalidates(Block b) {
StringBuilder datanodes = new StringBuilder();
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
.hasNext();) {
DatanodeDescriptor node = it.next();
invalidateBlocks.add(b, node, false);
datanodes.append(node.getName()).append(" ");
}
if (datanodes.length() != 0) {
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
+ b + " to " + datanodes.toString());
}
}
/**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
*/
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn) throws IOException {
namesystem.writeLock();
try {
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
// thread of Datanode reports bad block before Block reports are sent
// by the Datanode on startup
NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
+ blk + " not found.");
return;
}
markBlockAsCorrupt(storedBlock, dn);
} finally {
namesystem.writeUnlock();
}
}
private void markBlockAsCorrupt(BlockInfo storedBlock,
DatanodeInfo dn) throws IOException {
assert storedBlock != null : "storedBlock should not be null";
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot mark block " +
storedBlock.getBlockName() +
" as corrupt because datanode " + dn.getName() +
" does not exist. ");
}
INodeFile inode = storedBlock.getINode();
if (inode == null) {
NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " +
"block " + storedBlock +
" could not be marked as corrupt as it" +
" does not belong to any file");
addToInvalidates(storedBlock, node);
return;
}
// Add replica to the data-node if it is not already there
node.addBlock(storedBlock);
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
if (countNodes(storedBlock).liveReplicas() >= inode.getReplication()) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(storedBlock, node);
} else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(storedBlock, -1, 0);
}
}
/**
* Invalidates the given block on the given datanode.
*/
private void invalidateBlock(Block blk, DatanodeInfo dn)
throws IOException {
NameNode.stateChangeLog.info("BLOCK* invalidateBlock: "
+ blk + " on " + dn.getName());
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot invalidate block " + blk
+ " because datanode " + dn.getName() + " does not exist.");
}
// Check how many copies we have of the block. If we have at least one
// copy on a live node, then we can delete it.
int count = countNodes(blk).liveReplicas();
if (count >= 1) {
addToInvalidates(blk, dn);
removeStoredBlock(blk, node);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
+ blk + " on " + dn.getName() + " listed for deletion.");
}
} else {
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on "
+ dn.getName() + " is the only copy and was not deleted.");
}
}
void updateState() {
pendingReplicationBlocksCount = pendingReplications.size();
underReplicatedBlocksCount = neededReplications.size();
corruptReplicaBlocksCount = corruptReplicas.size();
}
/** Return number of under-replicated but not missing blocks */
public int getUnderReplicatedNotMissingBlocks() {
return neededReplications.getUnderReplicatedBlockCount();
}
/**
* Schedule blocks for deletion at datanodes
* @param nodesToProcess number of datanodes to schedule deletion work
* @return total number of block for deletion
*/
int computeInvalidateWork(int nodesToProcess) {
final List<String> nodes = invalidateBlocks.getStorageIDs();
Collections.shuffle(nodes);
nodesToProcess = Math.min(nodes.size(), nodesToProcess);
int blockCnt = 0;
for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
}
return blockCnt;
}
/**
* Scan blocks in {@link #neededReplications} and assign replication
* work to data-nodes they belong to.
*
* The number of process blocks equals either twice the number of live
* data-nodes or the number of under-replicated blocks whichever is less.
*
* @return number of blocks scheduled for replication during this iteration.
*/
private int computeReplicationWork(int blocksToProcess) throws IOException {
// Choose the blocks to be replicated
List<List<Block>> blocksToReplicate =
chooseUnderReplicatedBlocks(blocksToProcess);
// replicate blocks
int scheduledReplicationCount = 0;
for (int i=0; i<blocksToReplicate.size(); i++) {
for(Block block : blocksToReplicate.get(i)) {
if (computeReplicationWorkForBlock(block, i)) {
scheduledReplicationCount++;
}
}
}
return scheduledReplicationCount;
}
/**
* Get a list of block lists to be replicated The index of block lists
* represents the
*
* @param blocksToProcess
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
// initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(
UnderReplicatedBlocks.LEVEL);
for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}
namesystem.writeLock();
try {
synchronized (neededReplications) {
if (neededReplications.size() == 0) {
return blocksToReplicate;
}
// Go through all blocks that need replications.
UnderReplicatedBlocks.BlockIterator neededReplicationsIterator =
neededReplications.iterator();
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
// # of blocks to process equals either twice the number of live
// data-nodes or the number of under-replicated blocks whichever is less
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
if (!neededReplicationsIterator.hasNext()) {
// start from the beginning
replIndex = 0;
blocksToProcess = Math.min(blocksToProcess, neededReplications
.size());
if (blkCnt >= blocksToProcess)
break;
neededReplicationsIterator = neededReplications.iterator();
assert neededReplicationsIterator.hasNext() : "neededReplications should not be empty.";
}
Block block = neededReplicationsIterator.next();
int priority = neededReplicationsIterator.getPriority();
if (priority < 0 || priority >= blocksToReplicate.size()) {
LOG.warn("Unexpected replication priority: "
+ priority + " " + block);
} else {
blocksToReplicate.get(priority).add(block);
}
} // end for
} // end synchronized neededReplication
} finally {
namesystem.writeUnlock();
}
return blocksToReplicate;
}
/** Replicate a block
*
* @param block block to be replicated
* @param priority a hint of its priority in the neededReplication queue
* @return if the block gets replicated or not
*/
@VisibleForTesting
boolean computeReplicationWorkForBlock(Block block, int priority) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
DatanodeDescriptor srcNode;
INodeFile fileINode = null;
int additionalReplRequired;
namesystem.writeLock();
try {
synchronized (neededReplications) {
// block should belong to a file
fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
return false;
}
requiredReplication = fileINode.getReplication();
// get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>();
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas);
if(srcNode == null) // block can not be replicated from any node
return false;
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
return false;
}
}
if (numReplicas.liveReplicas() < requiredReplication) {
additionalReplRequired = requiredReplication - numEffectiveReplicas;
} else {
additionalReplRequired = 1; //Needed on a new rack
}
}
} finally {
namesystem.writeUnlock();
}
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
for (DatanodeDescriptor dn : containingNodes) {
excludedNodes.put(dn, dn);
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the Inode itself.
DatanodeDescriptor targets[] =
blockplacement.chooseTarget(fileINode, additionalReplRequired,
srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes());
if(targets.length == 0)
return false;
namesystem.writeLock();
try {
synchronized (neededReplications) {
// Recheck since global lock was released
// block should belong to a file
fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
return false;
}
requiredReplication = fileINode.getReplication();
// do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block);
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
return false;
}
}
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
//No use continuing, unless a new rack in this case
return false;
}
}
// Add block to the to be replicated list
srcNode.addBlockToBeReplicated(block, targets);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
pendingReplications.add(block, targets.length);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* block " + block
+ " is moved from neededReplications to pendingReplications");
}
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
}
if (NameNode.stateChangeLog.isInfoEnabled()) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) {
targetList.append(' ');
targetList.append(targets[k].getName());
}
NameNode.stateChangeLog.info(
"BLOCK* ask "
+ srcNode.getName() + " to replicate "
+ block + " to " + targetList);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* neededReplications = " + neededReplications.size()
+ " pendingReplications = " + pendingReplications.size());
}
}
}
} finally {
namesystem.writeUnlock();
}
return true;
}
/**
* Choose target datanodes according to the replication policy.
* @throws IOException if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
*/
public DatanodeDescriptor[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
final HashMap<Node, Node> excludedNodes,
final long blocksize) throws IOException {
// choose targets for the new block to be allocated.
final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
src, numOfReplicas, client, excludedNodes, blocksize);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="
+ minReplication + "). There are "
+ getDatanodeManager().getNetworkTopology().getNumOfLeaves()
+ " datanode(s) running and "
+ (excludedNodes == null? "no": excludedNodes.size())
+ " node(s) are excluded in this operation.");
}
return targets;
}
/**
* Parse the data-nodes the block belongs to and choose one,
* which will be the replication source.
*
* We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
* since the former do not have write traffic and hence are less busy.
* We do not use already decommissioned nodes as a source.
* Otherwise we choose a random node among those that did not reach their
* replication limit.
*
* In addition form a list of all nodes containing the block
* and calculate its replication numbers.
*/
private DatanodeDescriptor chooseSourceDatanode(
Block block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeDescriptor> nodesContainingLiveReplicas,
NumberReplicas numReplicas) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null;
int live = 0;
int decommissioned = 0;
int corrupt = 0;
int excess = 0;
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
while(it.hasNext()) {
DatanodeDescriptor node = it.next();
Collection<Block> excessBlocks =
excessReplicateMap.get(node.getStorageID());
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt++;
else if (node.isDecommissionInProgress() || node.isDecommissioned())
decommissioned++;
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess++;
} else {
nodesContainingLiveReplicas.add(node);
live++;
}
containingNodes.add(node);
// Check if this replica is corrupt
// If so, do not select the node as src node
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
continue;
if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
continue; // already reached replication limit
// the block must not be scheduled for removal on srcNode
if(excessBlocks != null && excessBlocks.contains(block))
continue;
// never use already decommissioned nodes
if(node.isDecommissioned())
continue;
// we prefer nodes that are in DECOMMISSION_INPROGRESS state
if(node.isDecommissionInProgress() || srcNode == null) {
srcNode = node;
continue;
}
if(srcNode.isDecommissionInProgress())
continue;
// switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
if(DFSUtil.getRandom().nextBoolean())
srcNode = node;
}
if(numReplicas != null)
numReplicas.initialize(live, decommissioned, corrupt, excess);
return srcNode;
}
/**
* If there were any replication requests that timed out, reap them
* and put them back into the neededReplication queue
*/
private void processPendingReplications() {
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) {
namesystem.writeLock();
try {
for (int i = 0; i < timedOutItems.length; i++) {
NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
num.liveReplicas())) {
neededReplications.add(timedOutItems[i],
num.liveReplicas(),
num.decommissionedReplicas(),
getReplication(timedOutItems[i]));
}
}
} finally {
namesystem.writeUnlock();
}
/* If we know the target datanodes where the replication timedout,
* we could invoke decBlocksScheduled() on it. Its ok for now.
*/
}
}
/**
* StatefulBlockInfo is used to build the "toUC" list, which is a list of
* updates to the information about under-construction blocks.
* Besides the block in question, it provides the ReplicaState
* reported by the datanode in the block report.
*/
private static class StatefulBlockInfo {
final BlockInfoUnderConstruction storedBlock;
final ReplicaState reportedState;
StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
ReplicaState reportedState) {
this.storedBlock = storedBlock;
this.reportedState = reportedState;
}
}
/**
* The given datanode is reporting all its blocks.
* Update the (machine-->blocklist) and (block-->machinelist) maps.
*/
public void processReport(final DatanodeID nodeID, final String poolId,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Util.now(); //after acquiring write lock
final long endTime;
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
throw new IOException("ProcessReport from dead or unregistered node: "
+ nodeID.getName());
}
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
if (namesystem.isInStartupSafeMode() && node.numBlocks() > 0) {
NameNode.stateChangeLog.info("BLOCK* processReport: "
+ "discarded non-initial block report from " + nodeID.getName()
+ " because namenode still in startup phase");
return;
}
if (node.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
processFirstBlockReport(node, newReport);
} else {
processReport(node, newReport);
}
} finally {
endTime = Util.now();
namesystem.writeUnlock();
}
// Log the block report processing stats from Namenode perspective
NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
NameNode.stateChangeLog.info("BLOCK* processReport: from "
+ nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+ ", processing time: " + (endTime - startTime) + " msecs");
}
private void processReport(final DatanodeDescriptor node,
final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
}
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
for (BlockInfo b : toAdd) {
addStoredBlock(b, node, null, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* processReport: block "
+ b + " on " + node.getName() + " size " + b.getNumBytes()
+ " does not belong to any file.");
addToInvalidates(b, node);
}
for (BlockInfo b : toCorrupt) {
markBlockAsCorrupt(b, node);
}
}
/**
* processFirstBlockReport is intended only for processing "initial" block
* reports, the first block report received from a DN after it registers.
* It just adds all the valid replicas to the datanode, without calculating
* a toRemove list (since there won't be any). It also silently discards
* any invalid blocks, thereby deferring their processing until
* the next block report.
* @param node - DatanodeDescriptor of the node that sent the report
* @param report - the initial block report, to be processed
* @throws IOException
*/
private void processFirstBlockReport(final DatanodeDescriptor node,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
assert (namesystem.hasWriteLock());
assert (node.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState reportedState = itBR.getCurrentReplicaState();
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
// If block does not belong to any file, we are done.
if (storedBlock == null) continue;
// If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState();
if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
markBlockAsCorrupt(storedBlock, node);
continue;
}
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
node, iblk, reportedState);
//and fall through to next clause
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
addStoredBlockImmediate(storedBlock, node);
}
}
}
private void reportDiff(DatanodeDescriptor dn,
BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockInfo> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
boolean added = dn.addBlock(delimiter);
assert added : "Delimiting block cannot be present in the node";
if(newReport == null)
newReport = new BlockListAsLongs();
// scan the report and process newly reported blocks
BlockReportIterator itBR = newReport.getBlockReportIterator();
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState();
BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
if(storedBlock != null && storedBlock.findDatanode(dn) >= 0)
dn.moveBlockToHead(storedBlock);
}
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
delimiter.getNext(0), dn);
while(it.hasNext())
toRemove.add(it.next());
dn.removeBlock(delimiter);
}
/**
* Process a block replica reported by the data-node.
* No side effects except adding to the passed-in Collections.
*
* <ol>
* <li>If the block is not known to the system (not in blocksMap) then the
* data-node should be notified to invalidate this block.</li>
* <li>If the reported replica is valid that is has the same generation stamp
* and length as recorded on the name-node, then the replica location should
* be added to the name-node.</li>
* <li>If the reported replica is not valid, then it is marked as corrupt,
* which triggers replication of the existing valid replicas.
* Corrupt replicas are removed from the system when the block
* is fully replicated.</li>
* <li>If the reported replica is for a block currently marked "under
* construction" in the NN, then it should be added to the
* BlockInfoUnderConstruction's list of replicas.</li>
* </ol>
*
* @param dn descriptor for the datanode that made the report
* @param block reported block replica
* @param reportedState reported replica state
* @param toAdd add to DatanodeDescriptor
* @param toInvalidate missing blocks (not in the blocks map)
* should be removed from the data-node
* @param toCorrupt replicas with unexpected length or generation stamp;
* add to corrupt replicas
* @param toUC replicas of blocks currently under construction
* @return
*/
private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockInfo> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block
+ " on " + dn.getName() + " size " + block.getNumBytes()
+ " replicaState = " + reportedState);
}
// find block by blockId
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
toInvalidate.add(new Block(block));
return null;
}
BlockUCState ucState = storedBlock.getBlockUCState();
// Block is on the NN
if(LOG.isDebugEnabled()) {
LOG.debug("In memory blockUCState = " + ucState);
}
// Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(dn.getStorageID(), block)) {
assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ " in invalidated blocks set should not appear in DN " + dn;
return storedBlock;
}
if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
toCorrupt.add(storedBlock);
return storedBlock;
}
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
toUC.add(new StatefulBlockInfo(
(BlockInfoUnderConstruction)storedBlock, reportedState));
return storedBlock;
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED
&& storedBlock.findDatanode(dn) < 0) {
toAdd.add(storedBlock);
}
return storedBlock;
}
/*
* The next two methods test the various cases under which we must conclude
* the replica is corrupt, or under construction. These are laid out
* as switch statements, on the theory that it is easier to understand
* the combinatorics of reportedState and ucState that way. It should be
* at least as efficient as boolean expressions.
*/
private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState,
BlockInfo storedBlock, BlockUCState ucState,
DatanodeDescriptor dn) {
switch(reportedState) {
case FINALIZED:
switch(ucState) {
case COMPLETE:
case COMMITTED:
return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()
|| storedBlock.getNumBytes() != iblk.getNumBytes());
default:
return false;
}
case RBW:
case RWR:
if (!storedBlock.isComplete()) {
return false;
} else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
return true;
} else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that
// the block report got a little bit delayed after the pipeline
// closed. So, ignore this report, assuming we will get a
// FINALIZED replica later. See HDFS-2791
LOG.info("Received an RBW replica for block " + storedBlock +
" on " + dn.getName() + ": ignoring it, since the block is " +
"complete with the same generation stamp.");
return false;
} else {
return true;
}
}
case RUR: // should not be reported
case TEMPORARY: // should not be reported
default:
LOG.warn("Unexpected replica state " + reportedState
+ " for block: " + storedBlock +
" on " + dn.getName() + " size " + storedBlock.getNumBytes());
return true;
}
}
private boolean isBlockUnderConstruction(BlockInfo storedBlock,
BlockUCState ucState, ReplicaState reportedState) {
switch(reportedState) {
case FINALIZED:
switch(ucState) {
case UNDER_CONSTRUCTION:
case UNDER_RECOVERY:
return true;
default:
return false;
}
case RBW:
case RWR:
return (!storedBlock.isComplete());
case RUR: // should not be reported
case TEMPORARY: // should not be reported
default:
return false;
}
}
void addStoredBlockUnderConstruction(
BlockInfoUnderConstruction block,
DatanodeDescriptor node,
ReplicaState reportedState)
throws IOException {
block.addReplicaIfNotPresent(node, block, reportedState);
if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
addStoredBlock(block, node, null, true);
}
}
/**
* Faster version of {@link addStoredBlock()}, intended for use with
* initial block report at startup. If not in startup safe mode, will
* call standard addStoredBlock().
* Assumes this method is called "immediately" so there is no need to
* refresh the storedBlock from blocksMap.
* Doesn't handle underReplication/overReplication, or worry about
* pendingReplications or corruptReplicas, because it's in startup safe mode.
* Doesn't log every block, because there are typically millions of them.
* @throws IOException
*/
private void addStoredBlockImmediate(BlockInfo storedBlock,
DatanodeDescriptor node)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
addStoredBlock(storedBlock, node, null, false);
return;
}
// just add it
node.addBlock(storedBlock);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& numCurrentReplica >= minReplication)
storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
if(storedBlock.isComplete())
namesystem.incrementSafeBlockCount(numCurrentReplica);
}
/**
* Modify (block-->datanode) map. Remove block from set of
* needed replications if this takes care of the problem.
* @return the block that is stored in blockMap.
*/
private Block addStoredBlock(final BlockInfo block,
DatanodeDescriptor node,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread
storedBlock = blocksMap.getStoredBlock(block);
} else {
storedBlock = block;
}
if (storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
NameNode.stateChangeLog.info("BLOCK* addStoredBlock: " + block + " on "
+ node.getName() + " size " + block.getNumBytes()
+ " but it does not belong to any file.");
// we could add this block to invalidate set of this datanode.
// it will happen in next block report otherwise.
return block;
}
assert storedBlock != null : "Block must be stored by now";
INodeFile fileINode = storedBlock.getINode();
assert fileINode != null : "Block must belong to a file";
// add block to the datanode
boolean added = node.addBlock(storedBlock);
int curReplicaDelta;
if (added) {
curReplicaDelta = 1;
if (logEveryBlock) {
NameNode.stateChangeLog.info("BLOCK* addStoredBlock: "
+ "blockMap updated: " + node.getName() + " is added to " +
storedBlock + " size " + storedBlock.getNumBytes());
}
} else {
curReplicaDelta = 0;
NameNode.stateChangeLog.warn("BLOCK* addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
+ " on " + node.getName() + " size " + storedBlock.getNumBytes());
}
// Now check for completion of blocks and safe block count
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
+ pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication)
storedBlock = completeBlock(fileINode, storedBlock);
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
// Is no-op if not in safe mode.
if(storedBlock.isComplete())
namesystem.incrementSafeBlockCount(numCurrentReplica);
// if file is under construction, then done for now
if (fileINode.isUnderConstruction()) {
return storedBlock;
}
// do not try to handle over/under-replicated blocks during safe mode
if (!namesystem.isPopulatingReplQueues()) {
return storedBlock;
}
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedReplicas(), fileReplication);
} else {
updateNeededReplications(storedBlock, curReplicaDelta, 0);
}
if (numCurrentReplica > fileReplication) {
processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
}
// If the file replication has reached desired value
// we can remove any corrupt replicas the block may have
int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
int numCorruptNodes = num.corruptReplicas();
if (numCorruptNodes != corruptReplicasCount) {
LOG.warn("Inconsistent number of corrupt replicas for " +
storedBlock + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
invalidateCorruptReplicas(storedBlock);
return storedBlock;
}
/**
* Invalidate corrupt replicas.
* <p>
* This will remove the replicas from the block's location list,
* add them to {@link #invalidateBlocks} so that they could be further
* deleted from the respective data-nodes,
* and remove the block from corruptReplicasMap.
* <p>
* This method should be called when the block has sufficient
* number of live replicas.
*
* @param blk Block whose corrupt replicas need to be invalidated
*/
private void invalidateCorruptReplicas(Block blk) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean gotException = false;
if (nodes == null)
return;
// make a copy of the array of nodes in order to avoid
// ConcurrentModificationException, when the block is removed from the node
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
for (DatanodeDescriptor node : nodesCopy) {
try {
invalidateBlock(blk, node);
} catch (IOException e) {
NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
"error in deleting bad block " + blk +
" on " + node + e);
gotException = true;
}
}
// Remove the block from corruptReplicasMap
if (!gotException)
corruptReplicas.removeFromCorruptReplicasMap(blk);
}
/**
* For each block in the name-node verify whether it belongs to any file,
* over or under replicated. Place it into the respective queue.
*/
public void processMisReplicatedBlocks() {
assert namesystem.hasWriteLock();
long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0,
nrUnderConstruction = 0;
neededReplications.clear();
for (BlockInfo block : blocksMap.getBlocks()) {
INodeFile fileINode = block.getINode();
if (fileINode == null) {
// block does not belong to any file
nrInvalid++;
addToInvalidates(block);
continue;
}
if (!block.isComplete()) {
// Incomplete blocks are never considered mis-replicated --
// they'll be reached when they are completed or recovered.
nrUnderConstruction++;
continue;
}
// calculate current replication
short expectedReplication = fileINode.getReplication();
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num
.decommissionedReplicas(), expectedReplication)) {
nrUnderReplicated++;
}
}
if (numCurrentReplica > expectedReplication) {
// over-replicated block
nrOverReplicated++;
processOverReplicatedBlock(block, expectedReplication, null, null);
}
}
LOG.info("Total number of blocks = " + blocksMap.size());
LOG.info("Number of invalid blocks = " + nrInvalid);
LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
LOG.info("Number of blocks being written = " + nrUnderConstruction);
}
/** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) throws IOException {
if (newRepl == oldRepl) {
return;
}
// update needReplication priority queues
for(Block b : blocks) {
updateNeededReplications(b, 0, newRepl-oldRepl);
}
if (oldRepl > newRepl) {
// old replication > the new one; need to remove copies
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
for(Block b : blocks) {
processOverReplicatedBlock(b, newRepl, null, null);
}
} else { // replication factor is increased
LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
}
}
/**
* Find how many of the containing nodes are "extra", if any.
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
private void processOverReplicatedBlock(final Block block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
Collection<Block> excessBlocks = excessReplicateMap.get(cur
.getStorageID());
if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
nonExcess.add(cur);
}
}
}
}
chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, blockplacement);
}
/**
* We want "replication" replicates for the block, but we now have too many.
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
*
* srcNodes.size() - dstNodes.size() == replication
*
* We pick node that make sure that replicas are spread across racks and
* also try hard to pick one with least free space.
* The algorithm is first to pick a node with least free space from nodes
* that are on a rack holding more than one replicas of the block.
* So removing such a replica won't remove a rack.
* If no such a node is available,
* then pick a node with least free space
*/
private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
INodeFile inode = getINode(b);
final Map<String, List<DatanodeDescriptor>> rackMap
= new HashMap<String, List<DatanodeDescriptor>>();
for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
iter.hasNext(); ) {
final DatanodeDescriptor node = iter.next();
final String rackName = node.getNetworkLocation();
List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
if (datanodeList == null) {
datanodeList = new ArrayList<DatanodeDescriptor>();
rackMap.put(rackName, datanodeList);
}
datanodeList.add(node);
}
// split nodes into two sets
// priSet contains nodes on rack with more than one replica
// remains contains the remaining nodes
final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
if (datanodeList.size() == 1 ) {
remains.add(datanodeList.get(0));
} else {
priSet.addAll(datanodeList);
}
}
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
while (nonExcess.size() - replication > 0) {
// check if we can delete delNodeHint
final DatanodeInfo cur;
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
&& (priSet.contains(delNodeHint)
|| (addedNode != null && !priSet.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(inode, b, replication,
priSet, remains);
}
firstOne = false;
// adjust rackmap, priSet, and remains
String rack = cur.getNetworkLocation();
final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
datanodes.remove(cur);
if (datanodes.isEmpty()) {
rackMap.remove(rack);
}
if (priSet.remove(cur)) {
if (datanodes.size() == 1) {
priSet.remove(datanodes.get(0));
remains.add(datanodes.get(0));
}
} else {
remains.remove(cur);
}
nonExcess.remove(cur);
addToExcessReplicate(cur, b);
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
addToInvalidates(b, cur);
NameNode.stateChangeLog.info("BLOCK* chooseExcessReplicates: "
+"("+cur.getName()+", "+b+") is added to invalidated blocks set.");
}
}
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
assert namesystem.hasWriteLock();
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
if (excessBlocks == null) {
excessBlocks = new TreeSet<Block>();
excessReplicateMap.put(dn.getStorageID(), excessBlocks);
}
if (excessBlocks.add(block)) {
excessBlocksCount++;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* addToExcessReplicate:"
+ " (" + dn.getName() + ", " + block
+ ") is added to excessReplicateMap");
}
}
}
/**
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
private void removeStoredBlock(Block block, DatanodeDescriptor node) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " from " + node.getName());
}
assert (namesystem.hasWriteLock());
{
if (!blocksMap.removeNode(block, node)) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " has already been removed from node " + node);
}
return;
}
//
// It's possible that the block was removed because of a datanode
// failure. If the block is still valid, check if replication is
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
INode fileINode = blocksMap.getINode(block);
if (fileINode != null) {
namesystem.decrementSafeBlockCount(block);
updateNeededReplications(block, -1, 0);
}
//
// We've removed a block from a node, so it's definitely no longer
// in "excess" there.
//
Collection<Block> excessBlocks = excessReplicateMap.get(node
.getStorageID());
if (excessBlocks != null) {
if (excessBlocks.remove(block)) {
excessBlocksCount--;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " is removed from excessBlocks");
}
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getStorageID());
}
}
}
// Remove the replica from corruptReplicas
corruptReplicas.removeFromCorruptReplicasMap(block, node);
}
}
/**
* Get all valid locations of the block & add the block to results
* return the length of the added block; 0 if the block is not added
*/
private long addBlock(Block block, List<BlockWithLocations> results) {
final List<String> machineSet = getValidLocations(block);
if(machineSet.size() == 0) {
return 0;
} else {
results.add(new BlockWithLocations(block,
machineSet.toArray(new String[machineSet.size()])));
return block.getNumBytes();
}
}
/**
* The given node is reporting that it received a certain block.
*/
@VisibleForTesting
void addBlock(DatanodeDescriptor node, Block block, String delHint)
throws IOException {
// decrement number of blocks scheduled to this datanode.
node.decBlocksScheduled();
// get the deletion hint node
DatanodeDescriptor delHintNode = null;
if (delHint != null && delHint.length() != 0) {
delHintNode = datanodeManager.getDatanode(delHint);
if (delHintNode == null) {
NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + block
+ " is expected to be removed from an unrecorded node " + delHint);
}
}
//
// Modify the blocks->datanode map and node's map.
//
pendingReplications.remove(block);
// blockReceived reports a finalized block
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
processReportedBlock(node, block, ReplicaState.FINALIZED,
toAdd, toInvalidate, toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
: "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
}
for (BlockInfo b : toAdd) {
addStoredBlock(b, node, delHintNode, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* addBlock: block "
+ b + " on " + node.getName() + " size " + b.getNumBytes()
+ " does not belong to any file.");
addToInvalidates(b, node);
}
for (BlockInfo b : toCorrupt) {
markBlockAsCorrupt(b, node);
}
}
/** The given node is reporting that it received a certain block. */
public void blockReceived(final DatanodeID nodeID, final String poolId,
final Block block, final String delHint) throws IOException {
namesystem.writeLock();
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
final String s = block + " is received from dead or unregistered node "
+ nodeID.getName();
NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
throw new IOException(s);
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
+ " is received from " + nodeID.getName());
}
addBlock(node, block, delHint);
} finally {
namesystem.writeUnlock();
}
}
/**
* Return the number of nodes that are live and decommissioned.
*/
public NumberReplicas countNodes(Block b) {
int count = 0;
int live = 0;
int corrupt = 0;
int excess = 0;
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
count++;
} else {
Collection<Block> blocksExcess =
excessReplicateMap.get(node.getStorageID());
if (blocksExcess != null && blocksExcess.contains(b)) {
excess++;
} else {
live++;
}
}
}
return new NumberReplicas(live, count, corrupt, excess);
}
/**
* Simpler, faster form of {@link countNodes()} that only returns the number
* of live nodes. If in startup safemode (or its 30-sec extension period),
* then it gains speed by ignoring issues of excess replicas or nodes
* that are decommissioned or in process of becoming decommissioned.
* If not in startup, then it calls {@link countNodes()} instead.
*
* @param b - the block being tested
* @return count of live nodes for this block
*/
int countLiveNodes(BlockInfo b) {
if (!namesystem.isInStartupSafeMode()) {
return countNodes(b).liveReplicas();
}
// else proceed with fast case
int live = 0;
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
live++;
}
return live;
}
private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
NumberReplicas num) {
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
INode fileINode = blocksMap.getINode(block);
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
StringBuilder nodeList = new StringBuilder();
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
nodeList.append(node.name);
nodeList.append(" ");
}
LOG.info("Block: " + block + ", Expected Replicas: "
+ curExpectedReplicas + ", live replicas: " + curReplicas
+ ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissionedReplicas()
+ ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + fileINode.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode.name + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress());
}
/**
* On stopping decommission, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock()
*/
void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
final Block block = it.next();
INodeFile fileINode = blocksMap.getINode(block);
short expectedReplication = fileINode.getReplication();
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) {
// over-replicated block
processOverReplicatedBlock(block, expectedReplication, null, null);
}
}
}
/**
* Return true if there are any blocks on this node that have not
* yet reached their replication factor. Otherwise returns false.
*/
boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
int underReplicatedInOpenFiles = 0;
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
final Block block = it.next();
INode fileINode = blocksMap.getINode(block);
if (fileINode != null) {
NumberReplicas num = countNodes(block);
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
if (curExpectedReplicas > curReplicas) {
//Log info about one block for this node which needs replication
if (!status) {
status = true;
logBlockReplicationInfo(block, srcNode, num);
}
underReplicatedBlocks++;
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
decommissionOnlyReplicas++;
}
if (fileINode.isUnderConstruction()) {
underReplicatedInOpenFiles++;
}
}
if (!neededReplications.contains(block) &&
pendingReplications.getNumReplicas(block) == 0) {
//
// These blocks have been reported from the datanode
// after the startDecommission method has been executed. These
// blocks were in flight when the decommissioning was started.
//
neededReplications.add(block,
curReplicas,
num.decommissionedReplicas(),
curExpectedReplicas);
}
}
}
}
srcNode.decommissioningStatus.set(underReplicatedBlocks,
decommissionOnlyReplicas,
underReplicatedInOpenFiles);
return status;
}
public int getActiveBlockCount() {
return blocksMap.size() - (int)invalidateBlocks.numBlocks();
}
public DatanodeDescriptor[] getNodes(BlockInfo block) {
DatanodeDescriptor[] nodes =
new DatanodeDescriptor[block.numNodes()];
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
for (int i = 0; it != null && it.hasNext(); i++) {
nodes[i] = it.next();
}
return nodes;
}
public int getTotalBlocks() {
return blocksMap.size();
}
public void removeBlock(Block block) {
addToInvalidates(block);
corruptReplicas.removeFromCorruptReplicasMap(block);
blocksMap.removeBlock(block);
}
public BlockInfo getStoredBlock(Block block) {
return blocksMap.getStoredBlock(block);
}
/** updates a block in under replication queue */
private void updateNeededReplications(final Block block,
final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock();
try {
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
neededReplications.update(block, repl.liveReplicas(), repl
.decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
expectedReplicasDelta);
} else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
oldExpectedReplicas);
}
} finally {
namesystem.writeUnlock();
}
}
public void checkReplication(Block block, int numExpectedReplicas) {
// filter out containingNodes that are marked for decommission.
NumberReplicas number = countNodes(block);
if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
neededReplications.add(block,
number.liveReplicas(),
number.decommissionedReplicas(),
numExpectedReplicas);
}
}
/* get replication factor of a block */
private int getReplication(Block block) {
INodeFile fileINode = blocksMap.getINode(block);
if (fileINode == null) { // block does not belong to any file
return 0;
}
assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
return fileINode.getReplication();
}
/**
* Get blocks to invalidate for <i>nodeId</i>
* in {@link #invalidateBlocks}.
*
* @return number of blocks scheduled for removal during this iteration.
*/
private int invalidateWorkForOneNode(String nodeId) {
namesystem.writeLock();
try {
// blocks should not be replicated or removed if safe mode is on
if (namesystem.isInSafeMode())
return 0;
// get blocks to invalidate for the nodeId
assert nodeId != null;
return invalidateBlocks.invalidateWork(nodeId);
} finally {
namesystem.writeUnlock();
}
}
boolean blockHasEnoughRacks(Block b) {
if (!this.shouldCheckForEnoughRacks) {
return true;
}
boolean enoughRacks = false;;
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(b);
int numExpectedReplicas = getReplication(b);
String rackName = null;
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
if (numExpectedReplicas == 1) {
enoughRacks = true;
break;
}
String rackNameNew = cur.getNetworkLocation();
if (rackName == null) {
rackName = rackNameNew;
} else if (!rackName.equals(rackNameNew)) {
enoughRacks = true;
break;
}
}
}
}
return enoughRacks;
}
boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
return false;
} else {
return true;
}
}
public long getMissingBlocksCount() {
// not locking
return this.neededReplications.getCorruptBlockSize();
}
public BlockInfo addINode(BlockInfo block, INodeFile iNode) {
return blocksMap.addINode(block, iNode);
}
public INodeFile getINode(Block b) {
return blocksMap.getINode(b);
}
/** @return an iterator of the datanodes. */
public Iterator<DatanodeDescriptor> datanodeIterator(final Block block) {
return blocksMap.nodeIterator(block);
}
public int numCorruptReplicas(Block block) {
return corruptReplicas.numCorruptReplicas(block);
}
public void removeBlockFromMap(Block block) {
blocksMap.removeBlock(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
corruptReplicas.removeFromCorruptReplicasMap(block);
}
public int getCapacity() {
namesystem.readLock();
try {
return blocksMap.getCapacity();
} finally {
namesystem.readUnlock();
}
}
/**
* Return a range of corrupt replica block ids. Up to numExpectedBlocks
* blocks starting at the next block after startingBlockId are returned
* (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
* is null, up to numExpectedBlocks blocks are returned from the beginning.
* If startingBlockId cannot be found, null is returned.
*
* @param numExpectedBlocks Number of block ids to return.
* 0 <= numExpectedBlocks <= 100
* @param startingBlockId Block id from which to start. If null, start at
* beginning.
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
*
*/
public long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
Long startingBlockId) {
return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
startingBlockId);
}
/**
* Return an iterator over the set of blocks for which there are no replicas.
*/
public Iterator<Block> getCorruptReplicaBlockIterator() {
return neededReplications.iterator(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
/** @return the size of UnderReplicatedBlocks */
public int numOfUnderReplicatedBlocks() {
return neededReplications.size();
}
/**
* Periodically calls computeReplicationWork().
*/
private class ReplicationMonitor implements Runnable {
private static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
private static final int REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
@Override
public void run() {
while (namesystem.isRunning()) {
try {
computeDatanodeWork();
processPendingReplications();
Thread.sleep(replicationRecheckInterval);
} catch (InterruptedException ie) {
LOG.warn("ReplicationMonitor thread received InterruptedException.", ie);
break;
} catch (IOException ie) {
LOG.warn("ReplicationMonitor thread received exception. " , ie);
} catch (Throwable t) {
LOG.warn("ReplicationMonitor thread received Runtime exception. ", t);
Runtime.getRuntime().exit(-1);
}
}
}
}
/**
* Compute block replication and block invalidation work that can be scheduled
* on data-nodes. The datanode will be informed of this work at the next
* heartbeat.
*
* @return number of blocks scheduled for replication or removal.
* @throws IOException
*/
int computeDatanodeWork() throws IOException {
int workFound = 0;
// Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get
// fixed up later.
if (namesystem.isInSafeMode())
return workFound;
final int numlive = heartbeatManager.getLiveDatanodeCount();
final int blocksToProcess = numlive
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
final int nodesToProcess = (int) Math.ceil(numlive
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
workFound = this.computeReplicationWork(blocksToProcess);
// Update counters
namesystem.writeLock();
try {
this.updateState();
this.scheduledReplicationBlocksCount = workFound;
} finally {
namesystem.writeUnlock();
}
workFound += this.computeInvalidateWork(nodesToProcess);
return workFound;
}
}