blob: ba3936ca997169db3dbee0038107027bf6c35f15 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
/**
* The Cache Manager handles caching on DataNodes.
*
* This class is instantiated by the FSNamesystem.
* It maintains the mapping of cached blocks to datanodes via processing
* datanode cache reports. Based on these reports and addition and removal of
* caching directives, we will schedule caching and uncaching work.
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
public final class CacheManager {
public static final Log LOG = LogFactory.getLog(CacheManager.class);
private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;
// TODO: add pending / underCached / schedule cached blocks stats.
/**
* The FSNamesystem that contains this CacheManager.
*/
private final FSNamesystem namesystem;
/**
* The BlockManager associated with the FSN that owns this CacheManager.
*/
private final BlockManager blockManager;
/**
* Cache directives, sorted by ID.
*
* listCacheDirectives relies on the ordering of elements in this map
* to track what has already been listed by the client.
*/
private final TreeMap<Long, CacheDirective> directivesById =
new TreeMap<Long, CacheDirective>();
/**
* The directive ID to use for a new directive. IDs always increase, and are
* never reused.
*/
private long nextDirectiveId;
/**
* Cache directives, sorted by path
*/
private final TreeMap<String, List<CacheDirective>> directivesByPath =
new TreeMap<String, List<CacheDirective>>();
/**
* Cache pools, sorted by name.
*/
private final TreeMap<String, CachePool> cachePools =
new TreeMap<String, CachePool>();
/**
* Maximum number of cache pools to list in one operation.
*/
private final int maxListCachePoolsResponses;
/**
* Maximum number of cache pool directives to list in one operation.
*/
private final int maxListCacheDirectivesNumResponses;
/**
* Interval between scans in milliseconds.
*/
private final long scanIntervalMs;
/**
* All cached blocks.
*/
private final GSet<CachedBlock, CachedBlock> cachedBlocks;
/**
* Lock which protects the CacheReplicationMonitor.
*/
private final ReentrantLock crmLock = new ReentrantLock();
private final SerializerCompat serializerCompat = new SerializerCompat();
/**
* The CacheReplicationMonitor.
*/
private CacheReplicationMonitor monitor;
CacheManager(FSNamesystem namesystem, Configuration conf,
BlockManager blockManager) {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.nextDirectiveId = 1;
this.maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
this.maxListCacheDirectivesNumResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT);
scanIntervalMs = conf.getLong(
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
float cachedBlocksPercent = conf.getFloat(
DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT,
DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT);
if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) {
LOG.info("Using minimum value " + MIN_CACHED_BLOCKS_PERCENT +
" for " + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
}
this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
LightWeightGSet.computeCapacity(cachedBlocksPercent,
"cachedBlocks"));
}
/**
* Resets all tracked directives and pools. Called during 2NN checkpointing to
* reset FSNamesystem state. See {FSNamesystem{@link #clear()}.
*/
void clear() {
directivesById.clear();
directivesByPath.clear();
cachePools.clear();
nextDirectiveId = 1;
}
public void startMonitorThread() {
crmLock.lock();
try {
if (this.monitor == null) {
this.monitor = new CacheReplicationMonitor(namesystem, this,
scanIntervalMs, crmLock);
this.monitor.start();
}
} finally {
crmLock.unlock();
}
}
public void stopMonitorThread() {
crmLock.lock();
try {
if (this.monitor != null) {
CacheReplicationMonitor prevMonitor = this.monitor;
this.monitor = null;
IOUtils.closeQuietly(prevMonitor);
}
} finally {
crmLock.unlock();
}
}
public void clearDirectiveStats() {
assert namesystem.hasWriteLock();
for (CacheDirective directive : directivesById.values()) {
directive.resetStatistics();
}
}
/**
* @return Unmodifiable view of the collection of CachePools.
*/
public Collection<CachePool> getCachePools() {
assert namesystem.hasReadLock();
return Collections.unmodifiableCollection(cachePools.values());
}
/**
* @return Unmodifiable view of the collection of CacheDirectives.
*/
public Collection<CacheDirective> getCacheDirectives() {
assert namesystem.hasReadLock();
return Collections.unmodifiableCollection(directivesById.values());
}
@VisibleForTesting
public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
assert namesystem.hasReadLock();
return cachedBlocks;
}
private long getNextDirectiveId() throws IOException {
assert namesystem.hasWriteLock();
if (nextDirectiveId >= Long.MAX_VALUE - 1) {
throw new IOException("No more available IDs.");
}
return nextDirectiveId++;
}
// Helper getter / validation methods
private static void checkWritePermission(FSPermissionChecker pc,
CachePool pool) throws AccessControlException {
if ((pc != null)) {
pc.checkPermission(pool, FsAction.WRITE);
}
}
private static String validatePoolName(CacheDirectiveInfo directive)
throws InvalidRequestException {
String pool = directive.getPool();
if (pool == null) {
throw new InvalidRequestException("No pool specified.");
}
if (pool.isEmpty()) {
throw new InvalidRequestException("Invalid empty pool name.");
}
return pool;
}
private static String validatePath(CacheDirectiveInfo directive)
throws InvalidRequestException {
if (directive.getPath() == null) {
throw new InvalidRequestException("No path specified.");
}
String path = directive.getPath().toUri().getPath();
if (!DFSUtil.isValidName(path)) {
throw new InvalidRequestException("Invalid path '" + path + "'.");
}
return path;
}
private static short validateReplication(CacheDirectiveInfo directive,
short defaultValue) throws InvalidRequestException {
short repl = (directive.getReplication() != null)
? directive.getReplication() : defaultValue;
if (repl <= 0) {
throw new InvalidRequestException("Invalid replication factor " + repl
+ " <= 0");
}
return repl;
}
/**
* Calculates the absolute expiry time of the directive from the
* {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
* into an absolute time based on the local clock.
*
* @param info to validate.
* @param maxRelativeExpiryTime of the info's pool.
* @return the expiration time, or the pool's max absolute expiration if the
* info's expiration was not set.
* @throws InvalidRequestException if the info's Expiration is invalid.
*/
private static long validateExpiryTime(CacheDirectiveInfo info,
long maxRelativeExpiryTime) throws InvalidRequestException {
if (LOG.isTraceEnabled()) {
LOG.trace("Validating directive " + info
+ " pool maxRelativeExpiryTime " + maxRelativeExpiryTime);
}
final long now = new Date().getTime();
final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime;
if (info == null || info.getExpiration() == null) {
return maxAbsoluteExpiryTime;
}
Expiration expiry = info.getExpiration();
if (expiry.getMillis() < 0l) {
throw new InvalidRequestException("Cannot set a negative expiration: "
+ expiry.getMillis());
}
long relExpiryTime, absExpiryTime;
if (expiry.isRelative()) {
relExpiryTime = expiry.getMillis();
absExpiryTime = now + relExpiryTime;
} else {
absExpiryTime = expiry.getMillis();
relExpiryTime = absExpiryTime - now;
}
// Need to cap the expiry so we don't overflow a long when doing math
if (relExpiryTime > Expiration.MAX_RELATIVE_EXPIRY_MS) {
throw new InvalidRequestException("Expiration "
+ expiry.toString() + " is too far in the future!");
}
// Fail if the requested expiry is greater than the max
if (relExpiryTime > maxRelativeExpiryTime) {
throw new InvalidRequestException("Expiration " + expiry.toString()
+ " exceeds the max relative expiration time of "
+ maxRelativeExpiryTime + " ms.");
}
return absExpiryTime;
}
/**
* Throws an exception if the CachePool does not have enough capacity to
* cache the given path at the replication factor.
*
* @param pool CachePool where the path is being cached
* @param path Path that is being cached
* @param replication Replication factor of the path
* @throws InvalidRequestException if the pool does not have enough capacity
*/
private void checkLimit(CachePool pool, String path,
short replication) throws InvalidRequestException {
CacheDirectiveStats stats = computeNeeded(path, replication);
if (pool.getLimit() == CachePoolInfo.LIMIT_UNLIMITED) {
return;
}
if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool
.getLimit()) {
throw new InvalidRequestException("Caching path " + path + " of size "
+ stats.getBytesNeeded() / replication + " bytes at replication "
+ replication + " would exceed pool " + pool.getPoolName()
+ "'s remaining capacity of "
+ (pool.getLimit() - pool.getBytesNeeded()) + " bytes.");
}
}
/**
* Computes the needed number of bytes and files for a path.
* @return CacheDirectiveStats describing the needed stats for this path
*/
private CacheDirectiveStats computeNeeded(String path, short replication) {
FSDirectory fsDir = namesystem.getFSDirectory();
INode node;
long requestedBytes = 0;
long requestedFiles = 0;
CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
try {
node = fsDir.getINode(path);
} catch (UnresolvedLinkException e) {
// We don't cache through symlinks
return builder.build();
}
if (node == null) {
return builder.build();
}
if (node.isFile()) {
requestedFiles = 1;
INodeFile file = node.asFile();
requestedBytes = file.computeFileSize();
} else if (node.isDirectory()) {
INodeDirectory dir = node.asDirectory();
ReadOnlyList<INode> children = dir
.getChildrenList(Snapshot.CURRENT_STATE_ID);
requestedFiles = children.size();
for (INode child : children) {
if (child.isFile()) {
requestedBytes += child.asFile().computeFileSize();
}
}
}
return new CacheDirectiveStats.Builder()
.setBytesNeeded(requestedBytes)
.setFilesCached(requestedFiles)
.build();
}
/**
* Get a CacheDirective by ID, validating the ID and that the directive
* exists.
*/
private CacheDirective getById(long id) throws InvalidRequestException {
// Check for invalid IDs.
if (id <= 0) {
throw new InvalidRequestException("Invalid negative ID.");
}
// Find the directive.
CacheDirective directive = directivesById.get(id);
if (directive == null) {
throw new InvalidRequestException("No directive with ID " + id
+ " found.");
}
return directive;
}
/**
* Get a CachePool by name, validating that it exists.
*/
private CachePool getCachePool(String poolName)
throws InvalidRequestException {
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new InvalidRequestException("Unknown pool " + poolName);
}
return pool;
}
// RPC handlers
private void addInternal(CacheDirective directive, CachePool pool) {
boolean addedDirective = pool.getDirectiveList().add(directive);
assert addedDirective;
directivesById.put(directive.getId(), directive);
String path = directive.getPath();
List<CacheDirective> directives = directivesByPath.get(path);
if (directives == null) {
directives = new ArrayList<CacheDirective>(1);
directivesByPath.put(path, directives);
}
directives.add(directive);
// Fix up pool stats
CacheDirectiveStats stats =
computeNeeded(directive.getPath(), directive.getReplication());
directive.addBytesNeeded(stats.getBytesNeeded());
directive.addFilesNeeded(directive.getFilesNeeded());
setNeedsRescan();
}
/**
* Adds a directive, skipping most error checking. This should only be called
* internally in special scenarios like edit log replay.
*/
CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
throws InvalidRequestException {
long id = directive.getId();
CacheDirective entry = new CacheDirective(directive);
CachePool pool = cachePools.get(directive.getPool());
addInternal(entry, pool);
if (nextDirectiveId <= id) {
nextDirectiveId = id + 1;
}
return entry.toInfo();
}
public CacheDirectiveInfo addDirective(
CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
throws IOException {
assert namesystem.hasWriteLock();
CacheDirective directive;
try {
CachePool pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
String path = validatePath(info);
short replication = validateReplication(info, (short)1);
long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
// Do quota validation if required
if (!flags.contains(CacheFlag.FORCE)) {
checkLimit(pool, path, replication);
}
// All validation passed
// Add a new entry with the next available ID.
long id = getNextDirectiveId();
directive = new CacheDirective(id, path, replication, expiryTime);
addInternal(directive, pool);
} catch (IOException e) {
LOG.warn("addDirective of " + info + " failed: ", e);
throw e;
}
LOG.info("addDirective of " + info + " successful.");
return directive.toInfo();
}
/**
* Factory method that makes a new CacheDirectiveInfo by applying fields in a
* CacheDirectiveInfo to an existing CacheDirective.
*
* @param info with some or all fields set.
* @param defaults directive providing default values for unset fields in
* info.
*
* @return new CacheDirectiveInfo of the info applied to the defaults.
*/
private static CacheDirectiveInfo createFromInfoAndDefaults(
CacheDirectiveInfo info, CacheDirective defaults) {
// Initialize the builder with the default values
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder(defaults.toInfo());
// Replace default with new value if present
if (info.getPath() != null) {
builder.setPath(info.getPath());
}
if (info.getReplication() != null) {
builder.setReplication(info.getReplication());
}
if (info.getPool() != null) {
builder.setPool(info.getPool());
}
if (info.getExpiration() != null) {
builder.setExpiration(info.getExpiration());
}
return builder.build();
}
/**
* Modifies a directive, skipping most error checking. This is for careful
* internal use only. modifyDirective can be non-deterministic since its error
* checking depends on current system time, which poses a problem for edit log
* replay.
*/
void modifyDirectiveFromEditLog(CacheDirectiveInfo info)
throws InvalidRequestException {
// Check for invalid IDs.
Long id = info.getId();
if (id == null) {
throw new InvalidRequestException("Must supply an ID.");
}
CacheDirective prevEntry = getById(id);
CacheDirectiveInfo newInfo = createFromInfoAndDefaults(info, prevEntry);
removeInternal(prevEntry);
addInternal(new CacheDirective(newInfo), getCachePool(newInfo.getPool()));
}
public void modifyDirective(CacheDirectiveInfo info,
FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
assert namesystem.hasWriteLock();
String idString =
(info.getId() == null) ?
"(null)" : info.getId().toString();
try {
// Check for invalid IDs.
Long id = info.getId();
if (id == null) {
throw new InvalidRequestException("Must supply an ID.");
}
CacheDirective prevEntry = getById(id);
checkWritePermission(pc, prevEntry.getPool());
// Fill in defaults
CacheDirectiveInfo infoWithDefaults =
createFromInfoAndDefaults(info, prevEntry);
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder(infoWithDefaults);
// Do validation
validatePath(infoWithDefaults);
validateReplication(infoWithDefaults, (short)-1);
// Need to test the pool being set here to avoid rejecting a modify for a
// directive that's already been forced into a pool
CachePool srcPool = prevEntry.getPool();
CachePool destPool = getCachePool(validatePoolName(infoWithDefaults));
if (!srcPool.getPoolName().equals(destPool.getPoolName())) {
checkWritePermission(pc, destPool);
if (!flags.contains(CacheFlag.FORCE)) {
checkLimit(destPool, infoWithDefaults.getPath().toUri().getPath(),
infoWithDefaults.getReplication());
}
}
// Verify the expiration against the destination pool
validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs());
// Indicate changes to the CRM
setNeedsRescan();
// Validation passed
removeInternal(prevEntry);
addInternal(new CacheDirective(builder.build()), destPool);
} catch (IOException e) {
LOG.warn("modifyDirective of " + idString + " failed: ", e);
throw e;
}
LOG.info("modifyDirective of " + idString + " successfully applied " +
info+ ".");
}
private void removeInternal(CacheDirective directive)
throws InvalidRequestException {
assert namesystem.hasWriteLock();
// Remove the corresponding entry in directivesByPath.
String path = directive.getPath();
List<CacheDirective> directives = directivesByPath.get(path);
if (directives == null || !directives.remove(directive)) {
throw new InvalidRequestException("Failed to locate entry " +
directive.getId() + " by path " + directive.getPath());
}
if (directives.size() == 0) {
directivesByPath.remove(path);
}
// Fix up the stats from removing the pool
final CachePool pool = directive.getPool();
directive.addBytesNeeded(-directive.getBytesNeeded());
directive.addFilesNeeded(-directive.getFilesNeeded());
directivesById.remove(directive.getId());
pool.getDirectiveList().remove(directive);
assert directive.getPool() == null;
setNeedsRescan();
}
public void removeDirective(long id, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
try {
CacheDirective directive = getById(id);
checkWritePermission(pc, directive.getPool());
removeInternal(directive);
} catch (IOException e) {
LOG.warn("removeDirective of " + id + " failed: ", e);
throw e;
}
LOG.info("removeDirective of " + id + " successful.");
}
public BatchedListEntries<CacheDirectiveEntry>
listCacheDirectives(long prevId,
CacheDirectiveInfo filter,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
String filterPath = null;
if (filter.getId() != null) {
throw new IOException("Filtering by ID is unsupported.");
}
if (filter.getPath() != null) {
filterPath = validatePath(filter);
}
if (filter.getReplication() != null) {
throw new IOException("Filtering by replication is unsupported.");
}
ArrayList<CacheDirectiveEntry> replies =
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
SortedMap<Long, CacheDirective> tailMap =
directivesById.tailMap(prevId + 1);
for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
if (numReplies >= maxListCacheDirectivesNumResponses) {
return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
}
CacheDirective curDirective = cur.getValue();
CacheDirectiveInfo info = cur.getValue().toInfo();
if (filter.getPool() != null &&
!info.getPool().equals(filter.getPool())) {
continue;
}
if (filterPath != null &&
!info.getPath().toUri().getPath().equals(filterPath)) {
continue;
}
boolean hasPermission = true;
if (pc != null) {
try {
pc.checkPermission(curDirective.getPool(), FsAction.READ);
} catch (AccessControlException e) {
hasPermission = false;
}
}
if (hasPermission) {
replies.add(new CacheDirectiveEntry(info, cur.getValue().toStats()));
numReplies++;
}
}
return new BatchedListEntries<CacheDirectiveEntry>(replies, false);
}
/**
* Create a cache pool.
*
* Only the superuser should be able to call this function.
*
* @param info The info for the cache pool to create.
* @return Information about the cache pool we created.
*/
public CachePoolInfo addCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
CachePool pool;
try {
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
pool = cachePools.get(poolName);
if (pool != null) {
throw new InvalidRequestException("Cache pool " + poolName
+ " already exists.");
}
pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool);
} catch (IOException e) {
LOG.info("addCachePool of " + info + " failed: ", e);
throw e;
}
LOG.info("addCachePool of " + info + " successful.");
return pool.getInfo(true);
}
/**
* Modify a cache pool.
*
* Only the superuser should be able to call this function.
*
* @param info
* The info for the cache pool to modify.
*/
public void modifyCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
StringBuilder bld = new StringBuilder();
try {
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new InvalidRequestException("Cache pool " + poolName
+ " does not exist.");
}
String prefix = "";
if (info.getOwnerName() != null) {
pool.setOwnerName(info.getOwnerName());
bld.append(prefix).
append("set owner to ").append(info.getOwnerName());
prefix = "; ";
}
if (info.getGroupName() != null) {
pool.setGroupName(info.getGroupName());
bld.append(prefix).
append("set group to ").append(info.getGroupName());
prefix = "; ";
}
if (info.getMode() != null) {
pool.setMode(info.getMode());
bld.append(prefix).append("set mode to " + info.getMode());
prefix = "; ";
}
if (info.getLimit() != null) {
pool.setLimit(info.getLimit());
bld.append(prefix).append("set limit to " + info.getLimit());
prefix = "; ";
// New limit changes stats, need to set needs refresh
setNeedsRescan();
}
if (info.getMaxRelativeExpiryMs() != null) {
final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
pool.setMaxRelativeExpiryMs(maxRelativeExpiry);
bld.append(prefix).append("set maxRelativeExpiry to "
+ maxRelativeExpiry);
prefix = "; ";
}
if (prefix.isEmpty()) {
bld.append("no changes.");
}
} catch (IOException e) {
LOG.info("modifyCachePool of " + info + " failed: ", e);
throw e;
}
LOG.info("modifyCachePool of " + info.getPoolName() + " successful; "
+ bld.toString());
}
/**
* Remove a cache pool.
*
* Only the superuser should be able to call this function.
*
* @param poolName
* The name for the cache pool to remove.
*/
public void removeCachePool(String poolName)
throws IOException {
assert namesystem.hasWriteLock();
try {
CachePoolInfo.validateName(poolName);
CachePool pool = cachePools.remove(poolName);
if (pool == null) {
throw new InvalidRequestException(
"Cannot remove non-existent cache pool " + poolName);
}
// Remove all directives in this pool.
Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
while (iter.hasNext()) {
CacheDirective directive = iter.next();
directivesByPath.remove(directive.getPath());
directivesById.remove(directive.getId());
iter.remove();
}
setNeedsRescan();
} catch (IOException e) {
LOG.info("removeCachePool of " + poolName + " failed: ", e);
throw e;
}
LOG.info("removeCachePool of " + poolName + " successful.");
}
public BatchedListEntries<CachePoolEntry>
listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<CachePoolEntry> results =
new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
int numListed = 0;
for (Entry<String, CachePool> cur : tailMap.entrySet()) {
if (numListed++ >= maxListCachePoolsResponses) {
return new BatchedListEntries<CachePoolEntry>(results, true);
}
results.add(cur.getValue().getEntry(pc));
}
return new BatchedListEntries<CachePoolEntry>(results, false);
}
public void setCachedLocations(LocatedBlock block) {
CachedBlock cachedBlock =
new CachedBlock(block.getBlock().getBlockId(),
(short)0, false);
cachedBlock = cachedBlocks.get(cachedBlock);
if (cachedBlock == null) {
return;
}
List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
for (DatanodeDescriptor datanode : datanodes) {
block.addCachedLoc(datanode);
}
}
public final void processCacheReport(final DatanodeID datanodeID,
final List<Long> blockIds) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow();
final long endTime;
try {
final DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(datanodeID);
if (datanode == null || !datanode.isAlive) {
throw new IOException(
"processCacheReport from dead or unregistered datanode: " +
datanode);
}
processCacheReportImpl(datanode, blockIds);
} finally {
endTime = Time.monotonicNow();
namesystem.writeUnlock();
}
// Log the block report processing stats from Namenode perspective
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
if (metrics != null) {
metrics.addCacheBlockReport((int) (endTime - startTime));
}
LOG.info("Processed cache report from "
+ datanodeID + ", blocks: " + blockIds.size()
+ ", processing time: " + (endTime - startTime) + " msecs");
}
private void processCacheReportImpl(final DatanodeDescriptor datanode,
final List<Long> blockIds) {
CachedBlocksList cached = datanode.getCached();
cached.clear();
CachedBlocksList cachedList = datanode.getCached();
CachedBlocksList pendingCachedList = datanode.getPendingCached();
for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
long blockId = iter.next();
CachedBlock cachedBlock =
new CachedBlock(blockId, (short)0, false);
CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
// Add the block ID from the cache report to the cachedBlocks map
// if it's not already there.
if (prevCachedBlock != null) {
cachedBlock = prevCachedBlock;
} else {
cachedBlocks.put(cachedBlock);
}
// Add the block to the datanode's implicit cached block list
// if it's not already there. Similarly, remove it from the pending
// cached block list if it exists there.
if (!cachedBlock.isPresent(cachedList)) {
cachedList.add(cachedBlock);
}
if (cachedBlock.isPresent(pendingCachedList)) {
pendingCachedList.remove(cachedBlock);
}
}
}
/**
* Saves the current state of the CacheManager to the DataOutput. Used
* to persist CacheManager state in the FSImage.
* @param out DataOutput to persist state
* @param sdPath path of the storage directory
* @throws IOException
*/
public void saveStateCompat(DataOutputStream out, String sdPath)
throws IOException {
serializerCompat.save(out, sdPath);
}
/**
* Reloads CacheManager state from the passed DataInput. Used during namenode
* startup to restore CacheManager state from an FSImage.
* @param in DataInput from which to restore state
* @throws IOException
*/
public void loadStateCompat(DataInput in) throws IOException {
serializerCompat.load(in);
}
private final class SerializerCompat {
private void save(DataOutputStream out, String sdPath) throws IOException {
out.writeLong(nextDirectiveId);
savePools(out, sdPath);
saveDirectives(out, sdPath);
}
private void load(DataInput in) throws IOException {
nextDirectiveId = in.readLong();
// pools need to be loaded first since directives point to their parent pool
loadPools(in);
loadDirectives(in);
}
/**
* Save cache pools to fsimage
*/
private void savePools(DataOutputStream out,
String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) {
FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/*
* Save cache entries to fsimage
*/
private void saveDirectives(DataOutputStream out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(directivesById.size());
for (CacheDirective directive : directivesById.values()) {
FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/**
* Load cache pools from fsimage
*/
private void loadPools(DataInput in)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfPools = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfPools; i++) {
addCachePool(FSImageSerialization.readCachePoolInfo(in));
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
/**
* Load cache directives from the fsimage
*/
private void loadDirectives(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numDirectives = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numDirectives; i++) {
CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
// Get pool reference by looking it up in the map
final String poolName = info.getPool();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("Directive refers to pool " + poolName +
", which does not exist.");
}
CacheDirective directive =
new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
info.getReplication(), info.getExpiration().getAbsoluteMillis());
boolean addedDirective = pool.getDirectiveList().add(directive);
assert addedDirective;
if (directivesById.put(directive.getId(), directive) != null) {
throw new IOException("A directive with ID " + directive.getId() +
" already exists");
}
List<CacheDirective> directives =
directivesByPath.get(directive.getPath());
if (directives == null) {
directives = new LinkedList<CacheDirective>();
directivesByPath.put(directive.getPath(), directives);
}
directives.add(directive);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
}
public void waitForRescanIfNeeded() {
crmLock.lock();
try {
if (monitor != null) {
monitor.waitForRescanIfNeeded();
}
} finally {
crmLock.unlock();
}
}
private void setNeedsRescan() {
crmLock.lock();
try {
if (monitor != null) {
monitor.setNeedsRescan();
}
} finally {
crmLock.unlock();
}
}
@VisibleForTesting
public Thread getCacheReplicationMonitor() {
crmLock.lock();
try {
return monitor;
} finally {
crmLock.unlock();
}
}
}