blob: 5d9c39c16a0f2b3a04be64254de0fbb5fb971a6a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.Time;
/**
* Scans the namesystem, scheduling blocks to be cached as appropriate.
*
* The CacheReplicationMonitor does a full scan when the NameNode first
* starts up, and at configurable intervals afterwards.
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
public class CacheReplicationMonitor extends Thread implements Closeable {
private static final Log LOG =
LogFactory.getLog(CacheReplicationMonitor.class);
private final FSNamesystem namesystem;
private final BlockManager blockManager;
private final CacheManager cacheManager;
private final GSet<CachedBlock, CachedBlock> cachedBlocks;
/**
* Pseudorandom number source
*/
private final Random random = new Random();
/**
* The interval at which we scan the namesystem for caching changes.
*/
private final long intervalMs;
/**
* True if we should rescan immediately, regardless of how much time
* elapsed since the previous scan.
*/
private boolean rescanImmediately;
/**
* The monotonic time at which the current scan started.
*/
private long scanTimeMs;
/**
* Mark status of the current scan.
*/
private boolean mark = false;
/**
* True if this monitor should terminate.
*/
private boolean shutdown;
/**
* Cache directives found in the previous scan.
*/
private int scannedDirectives;
/**
* Blocks found in the previous scan.
*/
private long scannedBlocks;
public CacheReplicationMonitor(FSNamesystem namesystem,
CacheManager cacheManager, long intervalMs) {
this.namesystem = namesystem;
this.blockManager = namesystem.getBlockManager();
this.cacheManager = cacheManager;
this.cachedBlocks = cacheManager.getCachedBlocks();
this.intervalMs = intervalMs;
}
@Override
public void run() {
shutdown = false;
rescanImmediately = true;
scanTimeMs = 0;
LOG.info("Starting CacheReplicationMonitor with interval " +
intervalMs + " milliseconds");
try {
long curTimeMs = Time.monotonicNow();
while (true) {
synchronized(this) {
while (true) {
if (shutdown) {
LOG.info("Shutting down CacheReplicationMonitor");
return;
}
if (rescanImmediately) {
LOG.info("Rescanning on request");
rescanImmediately = false;
break;
}
long delta = (scanTimeMs + intervalMs) - curTimeMs;
if (delta <= 0) {
LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) +
" milliseconds");
break;
}
this.wait(delta);
curTimeMs = Time.monotonicNow();
}
}
scanTimeMs = curTimeMs;
mark = !mark;
rescan();
curTimeMs = Time.monotonicNow();
LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " +
"millisecond(s).");
}
} catch (Throwable t) {
LOG.fatal("Thread exiting", t);
terminate(1, t);
}
}
/**
* Kick the monitor thread.
*
* If it is sleeping, it will wake up and start scanning.
* If it is currently scanning, it will finish the scan and immediately do
* another one.
*/
public synchronized void kick() {
rescanImmediately = true;
this.notifyAll();
}
/**
* Shut down and join the monitor thread.
*/
@Override
public void close() throws IOException {
synchronized(this) {
if (shutdown) return;
shutdown = true;
this.notifyAll();
}
try {
if (this.isAlive()) {
this.join(60000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void rescan() {
scannedDirectives = 0;
scannedBlocks = 0;
namesystem.writeLock();
try {
rescanPathBasedCacheEntries();
} finally {
namesystem.writeUnlock();
}
namesystem.writeLock();
try {
rescanCachedBlockMap();
} finally {
namesystem.writeUnlock();
}
}
/**
* Scan all PathBasedCacheEntries. Use the information to figure out
* what cache replication factor each block should have.
*
* @param mark Whether the current scan is setting or clearing the mark
*/
private void rescanPathBasedCacheEntries() {
FSDirectory fsDir = namesystem.getFSDirectory();
for (PathBasedCacheEntry pce : cacheManager.getEntriesById().values()) {
scannedDirectives++;
String path = pce.getPath();
INode node;
try {
node = fsDir.getINode(path);
} catch (UnresolvedLinkException e) {
// We don't cache through symlinks
continue;
}
if (node == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No inode found at " + path);
}
} else if (node.isDirectory()) {
INodeDirectory dir = node.asDirectory();
ReadOnlyList<INode> children = dir.getChildrenList(null);
for (INode child : children) {
if (child.isFile()) {
rescanFile(pce, child.asFile());
}
}
} else if (node.isFile()) {
rescanFile(pce, node.asFile());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring non-directory, non-file inode " + node +
" found at " + path);
}
}
}
}
/**
* Apply a PathBasedCacheEntry to a file.
*
* @param pce The PathBasedCacheEntry to apply.
* @param file The file.
*/
private void rescanFile(PathBasedCacheEntry pce, INodeFile file) {
BlockInfo[] blockInfos = file.getBlocks();
for (BlockInfo blockInfo : blockInfos) {
if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
// We don't try to cache blocks that are under construction.
continue;
}
Block block = new Block(blockInfo.getBlockId());
CachedBlock ncblock = new CachedBlock(block.getBlockId(),
pce.getReplication(), mark);
CachedBlock ocblock = cachedBlocks.get(ncblock);
if (ocblock == null) {
cachedBlocks.put(ncblock);
} else {
if (mark != ocblock.getMark()) {
// Mark hasn't been set in this scan, so update replication and mark.
ocblock.setReplicationAndMark(pce.getReplication(), mark);
} else {
// Mark already set in this scan. Set replication to highest value in
// any PathBasedCacheEntry that covers this file.
ocblock.setReplicationAndMark((short)Math.max(
pce.getReplication(), ocblock.getReplication()), mark);
}
}
}
}
/**
* Scan through the cached block map.
* Any blocks which are under-replicated should be assigned new Datanodes.
* Blocks that are over-replicated should be removed from Datanodes.
*/
private void rescanCachedBlockMap() {
for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
cbIter.hasNext(); ) {
scannedBlocks++;
CachedBlock cblock = cbIter.next();
List<DatanodeDescriptor> pendingCached =
cblock.getDatanodes(Type.PENDING_CACHED);
List<DatanodeDescriptor> cached =
cblock.getDatanodes(Type.CACHED);
List<DatanodeDescriptor> pendingUncached =
cblock.getDatanodes(Type.PENDING_UNCACHED);
// Remove nodes from PENDING_UNCACHED if they were actually uncached.
for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
iter.hasNext(); ) {
DatanodeDescriptor datanode = iter.next();
if (!cblock.isInList(datanode.getCached())) {
datanode.getPendingUncached().remove(cblock);
iter.remove();
}
}
// If the block's mark doesn't match with the mark of this scan, that
// means that this block couldn't be reached during this scan. That means
// it doesn't need to be cached any more.
int neededCached = (cblock.getMark() != mark) ?
0 : cblock.getReplication();
int numCached = cached.size();
if (numCached >= neededCached) {
// If we have enough replicas, drop all pending cached.
for (DatanodeDescriptor datanode : pendingCached) {
datanode.getPendingCached().remove(cblock);
}
pendingCached.clear();
}
if (numCached < neededCached) {
// If we don't have enough replicas, drop all pending uncached.
for (DatanodeDescriptor datanode : pendingUncached) {
datanode.getPendingUncached().remove(cblock);
}
pendingUncached.clear();
}
int neededUncached = numCached -
(pendingUncached.size() + neededCached);
if (neededUncached > 0) {
addNewPendingUncached(neededUncached, cblock, cached,
pendingUncached);
} else {
int additionalCachedNeeded = neededCached -
(numCached + pendingCached.size());
if (additionalCachedNeeded > 0) {
addNewPendingCached(additionalCachedNeeded, cblock, cached,
pendingCached);
}
}
if ((neededCached == 0) &&
pendingUncached.isEmpty() &&
pendingCached.isEmpty()) {
// we have nothing more to do with this block.
cbIter.remove();
}
}
}
/**
* Add new entries to the PendingUncached list.
*
* @param neededUncached The number of replicas that need to be uncached.
* @param cachedBlock The block which needs to be uncached.
* @param cached A list of DataNodes currently caching the block.
* @param pendingUncached A list of DataNodes that will soon uncache the
* block.
*/
private void addNewPendingUncached(int neededUncached,
CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
List<DatanodeDescriptor> pendingUncached) {
if (!cacheManager.isActive()) {
return;
}
// Figure out which replicas can be uncached.
LinkedList<DatanodeDescriptor> possibilities =
new LinkedList<DatanodeDescriptor>();
for (DatanodeDescriptor datanode : cached) {
if (!pendingUncached.contains(datanode)) {
possibilities.add(datanode);
}
}
while (neededUncached > 0) {
if (possibilities.isEmpty()) {
LOG.warn("Logic error: we're trying to uncache more replicas than " +
"actually exist for " + cachedBlock);
return;
}
DatanodeDescriptor datanode =
possibilities.remove(random.nextInt(possibilities.size()));
pendingUncached.add(datanode);
boolean added = datanode.getPendingUncached().add(cachedBlock);
assert added;
neededUncached--;
}
}
/**
* Add new entries to the PendingCached list.
*
* @param neededCached The number of replicas that need to be cached.
* @param cachedBlock The block which needs to be cached.
* @param cached A list of DataNodes currently caching the block.
* @param pendingCached A list of DataNodes that will soon cache the
* block.
*/
private void addNewPendingCached(int neededCached,
CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
List<DatanodeDescriptor> pendingCached) {
if (!cacheManager.isActive()) {
return;
}
// To figure out which replicas can be cached, we consult the
// blocksMap. We don't want to try to cache a corrupt replica, though.
BlockInfo blockInfo = blockManager.
getStoredBlock(new Block(cachedBlock.getBlockId()));
if (blockInfo == null) {
LOG.debug("Not caching block " + cachedBlock + " because it " +
"was deleted from all DataNodes.");
return;
}
if (!blockInfo.isComplete()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not caching block " + cachedBlock + " because it " +
"is not yet complete.");
}
return;
}
List<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
int numReplicas = blockInfo.getCapacity();
Collection<DatanodeDescriptor> corrupt =
blockManager.getCorruptReplicas(blockInfo);
for (int i = 0; i < numReplicas; i++) {
DatanodeDescriptor datanode = blockInfo.getDatanode(i);
if ((datanode != null) &&
((!pendingCached.contains(datanode)) &&
((corrupt == null) || (!corrupt.contains(datanode))))) {
possibilities.add(datanode);
}
}
while (neededCached > 0) {
if (possibilities.isEmpty()) {
LOG.warn("We need " + neededCached + " more replica(s) than " +
"actually exist to provide a cache replication of " +
cachedBlock.getReplication() + " for " + cachedBlock);
return;
}
DatanodeDescriptor datanode =
possibilities.remove(random.nextInt(possibilities.size()));
if (LOG.isDebugEnabled()) {
LOG.debug("AddNewPendingCached: datanode " + datanode +
" will now cache block " + cachedBlock);
}
pendingCached.add(datanode);
boolean added = datanode.getPendingCached().add(cachedBlock);
assert added;
neededCached--;
}
}
}