blob: bda729cfc44294487ee20a3bfd50c4c16140dbcd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
/**
* The Cache Manager handles caching on DataNodes.
*
* This class is instantiated by the FSNamesystem when caching is enabled.
* It maintains the mapping of cached blocks to datanodes via processing
* datanode cache reports. Based on these reports and addition and removal of
* caching directives, we will schedule caching and uncaching work.
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
public final class CacheManager {
public static final Log LOG = LogFactory.getLog(CacheManager.class);
// TODO: add pending / underCached / schedule cached blocks stats.
/**
* The FSNamesystem that contains this CacheManager.
*/
private final FSNamesystem namesystem;
/**
* The BlockManager associated with the FSN that owns this CacheManager.
*/
private final BlockManager blockManager;
/**
* Cache entries, sorted by ID.
*
* listPathBasedCacheDescriptors relies on the ordering of elements in this map
* to track what has already been listed by the client.
*/
private final TreeMap<Long, PathBasedCacheEntry> entriesById =
new TreeMap<Long, PathBasedCacheEntry>();
/**
* The entry ID to use for a new entry. Entry IDs always increase, and are
* never reused.
*/
private long nextEntryId;
/**
* Cache entries, sorted by path
*/
private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath =
new TreeMap<String, List<PathBasedCacheEntry>>();
/**
* Cache pools, sorted by name.
*/
private final TreeMap<String, CachePool> cachePools =
new TreeMap<String, CachePool>();
/**
* Maximum number of cache pools to list in one operation.
*/
private final int maxListCachePoolsResponses;
/**
* Maximum number of cache pool directives to list in one operation.
*/
private final int maxListCacheDescriptorsResponses;
/**
* Interval between scans in milliseconds.
*/
private final long scanIntervalMs;
/**
* Whether caching is enabled.
*
* If caching is disabled, we will not process cache reports or store
* information about what is cached where. We also do not start the
* CacheReplicationMonitor thread. This will save resources, but provide
* less functionality.
*
* Even when caching is disabled, we still store path-based cache
* information. This information is stored in the edit log and fsimage. We
* don't want to lose it just because a configuration setting was turned off.
* However, we will not act on this information if caching is disabled.
*/
private final boolean enabled;
/**
* Whether the CacheManager is active.
*
* When the CacheManager is active, it tells the DataNodes what to cache
* and uncache. The CacheManager cannot become active if enabled = false.
*/
private boolean active = false;
/**
* All cached blocks.
*/
private final GSet<CachedBlock, CachedBlock> cachedBlocks;
/**
* The CacheReplicationMonitor.
*/
private CacheReplicationMonitor monitor;
CacheManager(FSNamesystem namesystem, Configuration conf,
BlockManager blockManager) {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.nextEntryId = 1;
this.maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
this.maxListCacheDescriptorsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT);
scanIntervalMs = conf.getLong(
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY,
DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
this.cachedBlocks = !enabled ? null :
new LightWeightGSet<CachedBlock, CachedBlock>(
LightWeightGSet.computeCapacity(0.25, "cachedBlocks"));
}
/**
* Activate the cache manager.
*
* When the cache manager is active, tell the datanodes where to cache files.
*/
public void activate() {
assert namesystem.hasWriteLock();
if (enabled && (!active)) {
LOG.info("Activating CacheManager. " +
"Starting replication monitor thread...");
active = true;
monitor = new CacheReplicationMonitor(namesystem, this,
scanIntervalMs);
monitor.start();
}
}
/**
* Deactivate the cache manager.
*
* When the cache manager is inactive, it does not tell the datanodes where to
* cache files.
*/
public void deactivate() {
assert namesystem.hasWriteLock();
if (active) {
LOG.info("Deactivating CacheManager. " +
"stopping CacheReplicationMonitor thread...");
active = false;
IOUtils.closeQuietly(monitor);
monitor = null;
LOG.info("CacheReplicationMonitor thread stopped and deactivated.");
}
}
/**
* Return true only if the cache manager is active.
* Must be called under the FSN read or write lock.
*/
public boolean isActive() {
return active;
}
public TreeMap<Long, PathBasedCacheEntry> getEntriesById() {
assert namesystem.hasReadOrWriteLock();
return entriesById;
}
@VisibleForTesting
public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
assert namesystem.hasReadOrWriteLock();
return cachedBlocks;
}
private long getNextEntryId() throws IOException {
assert namesystem.hasWriteLock();
if (nextEntryId == Long.MAX_VALUE) {
throw new IOException("No more available IDs");
}
return nextEntryId++;
}
public PathBasedCacheDescriptor addDirective(
PathBasedCacheDirective directive, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
CachePool pool = cachePools.get(directive.getPool());
if (pool == null) {
LOG.info("addDirective " + directive + ": pool not found.");
throw new InvalidPoolNameError(directive);
}
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
LOG.info("addDirective " + directive + ": write permission denied.");
throw new PoolWritePermissionDeniedError(directive);
}
try {
directive.validate();
} catch (IOException ioe) {
LOG.info("addDirective " + directive + ": validation failed: "
+ ioe.getClass().getName() + ": " + ioe.getMessage());
throw ioe;
}
// Add a new entry with the next available ID.
PathBasedCacheEntry entry;
try {
entry = new PathBasedCacheEntry(getNextEntryId(),
directive.getPath().toUri().getPath(),
directive.getReplication(), pool);
} catch (IOException ioe) {
throw new UnexpectedAddPathBasedCacheDirectiveException(directive);
}
LOG.info("addDirective " + directive + ": added cache directive "
+ directive);
// Success!
// First, add it to the various maps
entriesById.put(entry.getEntryId(), entry);
String path = directive.getPath().toUri().getPath();
List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
if (entryList == null) {
entryList = new ArrayList<PathBasedCacheEntry>(1);
entriesByPath.put(path, entryList);
}
entryList.add(entry);
if (monitor != null) {
monitor.kick();
}
return entry.getDescriptor();
}
public void removeDescriptor(long id, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
// Check for invalid IDs.
if (id <= 0) {
LOG.info("removeDescriptor " + id + ": invalid non-positive " +
"descriptor ID.");
throw new InvalidIdException(id);
}
// Find the entry.
PathBasedCacheEntry existing = entriesById.get(id);
if (existing == null) {
LOG.info("removeDescriptor " + id + ": entry not found.");
throw new NoSuchIdException(id);
}
CachePool pool = cachePools.get(existing.getDescriptor().getPool());
if (pool == null) {
LOG.info("removeDescriptor " + id + ": pool not found for directive " +
existing.getDescriptor());
throw new UnexpectedRemovePathBasedCacheDescriptorException(id);
}
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
LOG.info("removeDescriptor " + id + ": write permission denied to " +
"pool " + pool + " for entry " + existing);
throw new RemovePermissionDeniedException(id);
}
// Remove the corresponding entry in entriesByPath.
String path = existing.getDescriptor().getPath().toUri().getPath();
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
if (entries == null || !entries.remove(existing)) {
throw new UnexpectedRemovePathBasedCacheDescriptorException(id);
}
if (entries.size() == 0) {
entriesByPath.remove(path);
}
entriesById.remove(id);
if (monitor != null) {
monitor.kick();
}
LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
}
public BatchedListEntries<PathBasedCacheDescriptor>
listPathBasedCacheDescriptors(long prevId, String filterPool,
String filterPath, FSPermissionChecker pc) throws IOException {
assert namesystem.hasReadOrWriteLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
if (filterPath != null) {
if (!DFSUtil.isValidName(filterPath)) {
throw new IOException("invalid path name '" + filterPath + "'");
}
}
ArrayList<PathBasedCacheDescriptor> replies =
new ArrayList<PathBasedCacheDescriptor>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
SortedMap<Long, PathBasedCacheEntry> tailMap = entriesById.tailMap(prevId + 1);
for (Entry<Long, PathBasedCacheEntry> cur : tailMap.entrySet()) {
if (numReplies >= maxListCacheDescriptorsResponses) {
return new BatchedListEntries<PathBasedCacheDescriptor>(replies, true);
}
PathBasedCacheEntry curEntry = cur.getValue();
PathBasedCacheDirective directive = cur.getValue().getDescriptor();
if (filterPool != null &&
!directive.getPool().equals(filterPool)) {
continue;
}
if (filterPath != null &&
!directive.getPath().toUri().getPath().equals(filterPath)) {
continue;
}
if (pc.checkPermission(curEntry.getPool(), FsAction.READ)) {
replies.add(cur.getValue().getDescriptor());
numReplies++;
}
}
return new BatchedListEntries<PathBasedCacheDescriptor>(replies, false);
}
/**
* Create a cache pool.
*
* Only the superuser should be able to call this function.
*
* @param info The info for the cache pool to create.
* @return Information about the cache pool we created.
*/
public CachePoolInfo addCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
if (pool != null) {
throw new IOException("cache pool " + poolName + " already exists.");
}
pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool);
LOG.info("created new cache pool " + pool);
return pool.getInfo(true);
}
/**
* Modify a cache pool.
*
* Only the superuser should be able to call this function.
*
* @param info
* The info for the cache pool to modify.
*/
public void modifyCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("cache pool " + poolName + " does not exist.");
}
StringBuilder bld = new StringBuilder();
String prefix = "";
if (info.getOwnerName() != null) {
pool.setOwnerName(info.getOwnerName());
bld.append(prefix).
append("set owner to ").append(info.getOwnerName());
prefix = "; ";
}
if (info.getGroupName() != null) {
pool.setGroupName(info.getGroupName());
bld.append(prefix).
append("set group to ").append(info.getGroupName());
prefix = "; ";
}
if (info.getMode() != null) {
pool.setMode(info.getMode());
bld.append(prefix).append("set mode to " + info.getMode());
prefix = "; ";
}
if (info.getWeight() != null) {
pool.setWeight(info.getWeight());
bld.append(prefix).
append("set weight to ").append(info.getWeight());
prefix = "; ";
}
if (prefix.isEmpty()) {
bld.append("no changes.");
}
LOG.info("modified " + poolName + "; " + bld.toString());
}
/**
* Remove a cache pool.
*
* Only the superuser should be able to call this function.
*
* @param poolName
* The name for the cache pool to remove.
*/
public void removeCachePool(String poolName)
throws IOException {
assert namesystem.hasWriteLock();
CachePoolInfo.validateName(poolName);
CachePool pool = cachePools.remove(poolName);
if (pool == null) {
throw new IOException("can't remove non-existent cache pool " + poolName);
}
// Remove entries using this pool
// TODO: could optimize this somewhat to avoid the need to iterate
// over all entries in entriesById
Iterator<Entry<Long, PathBasedCacheEntry>> iter =
entriesById.entrySet().iterator();
while (iter.hasNext()) {
Entry<Long, PathBasedCacheEntry> entry = iter.next();
if (entry.getValue().getPool() == pool) {
entriesByPath.remove(entry.getValue().getPath());
iter.remove();
}
}
if (monitor != null) {
monitor.kick();
}
}
public BatchedListEntries<CachePoolInfo>
listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadOrWriteLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<CachePoolInfo> results =
new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
int numListed = 0;
for (Entry<String, CachePool> cur : tailMap.entrySet()) {
if (numListed++ >= maxListCachePoolsResponses) {
return new BatchedListEntries<CachePoolInfo>(results, true);
}
if (pc == null) {
results.add(cur.getValue().getInfo(true));
} else {
results.add(cur.getValue().getInfo(pc));
}
}
return new BatchedListEntries<CachePoolInfo>(results, false);
}
public void setCachedLocations(LocatedBlock block) {
if (!enabled) {
return;
}
CachedBlock cachedBlock =
new CachedBlock(block.getBlock().getBlockId(),
(short)0, false);
cachedBlock = cachedBlocks.get(cachedBlock);
if (cachedBlock == null) {
return;
}
List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
for (DatanodeDescriptor datanode : datanodes) {
block.addCachedLoc(datanode);
}
}
public final void processCacheReport(final DatanodeID datanodeID,
final List<Long> blockIds) throws IOException {
if (!enabled) {
LOG.info("Ignoring cache report from " + datanodeID +
" because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " +
"number of blocks: " + blockIds.size());
return;
}
namesystem.writeLock();
final long startTime = Time.monotonicNow();
final long endTime;
try {
final DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(datanodeID);
if (datanode == null || !datanode.isAlive) {
throw new IOException(
"processCacheReport from dead or unregistered datanode: " + datanode);
}
processCacheReportImpl(datanode, blockIds);
} finally {
endTime = Time.monotonicNow();
namesystem.writeUnlock();
}
// Log the block report processing stats from Namenode perspective
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
if (metrics != null) {
metrics.addCacheBlockReport((int) (endTime - startTime));
}
LOG.info("Processed cache report from "
+ datanodeID + ", blocks: " + blockIds.size()
+ ", processing time: " + (endTime - startTime) + " msecs");
}
private void processCacheReportImpl(final DatanodeDescriptor datanode,
final List<Long> blockIds) {
CachedBlocksList cached = datanode.getCached();
cached.clear();
for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
Block block = new Block(iter.next());
BlockInfo blockInfo = blockManager.getStoredBlock(block);
if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
// The NameNode will eventually remove or update the out-of-date block.
// Until then, we pretend that it isn't cached.
LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
block + ": expected genstamp " + blockInfo.getGenerationStamp());
continue;
}
if (!blockInfo.isComplete()) {
LOG.warn("Ignoring block id " + block.getBlockId() + ", because " +
"it is in not complete yet. It is in state " +
blockInfo.getBlockUCState());
continue;
}
Collection<DatanodeDescriptor> corruptReplicas =
blockManager.getCorruptReplicas(blockInfo);
if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) {
// The NameNode will eventually remove or update the corrupt block.
// Until then, we pretend that it isn't cached.
LOG.warn("Ignoring cached replica on " + datanode + " of " + block +
" because it is corrupt.");
continue;
}
CachedBlock cachedBlock =
new CachedBlock(block.getBlockId(), (short)0, false);
CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
// Use the existing CachedBlock if it's present; otherwise,
// insert a new one.
if (prevCachedBlock != null) {
cachedBlock = prevCachedBlock;
} else {
cachedBlocks.put(cachedBlock);
}
if (!cachedBlock.isPresent(datanode.getCached())) {
datanode.getCached().add(cachedBlock);
}
if (cachedBlock.isPresent(datanode.getPendingCached())) {
datanode.getPendingCached().remove(cachedBlock);
}
}
}
/**
* Saves the current state of the CacheManager to the DataOutput. Used
* to persist CacheManager state in the FSImage.
* @param out DataOutput to persist state
* @param sdPath path of the storage directory
* @throws IOException
*/
public void saveState(DataOutput out, String sdPath)
throws IOException {
out.writeLong(nextEntryId);
savePools(out, sdPath);
saveEntries(out, sdPath);
}
/**
* Reloads CacheManager state from the passed DataInput. Used during namenode
* startup to restore CacheManager state from an FSImage.
* @param in DataInput from which to restore state
* @throws IOException
*/
public void loadState(DataInput in) throws IOException {
nextEntryId = in.readLong();
// pools need to be loaded first since entries point to their parent pool
loadPools(in);
loadEntries(in);
}
/**
* Save cache pools to fsimage
*/
private void savePools(DataOutput out,
String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) {
pool.getInfo(true).writeTo(out);
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/*
* Save cache entries to fsimage
*/
private void saveEntries(DataOutput out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(entriesById.size());
for (PathBasedCacheEntry entry: entriesById.values()) {
out.writeLong(entry.getEntryId());
Text.writeString(out, entry.getPath());
out.writeShort(entry.getReplication());
Text.writeString(out, entry.getPool().getPoolName());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/**
* Load cache pools from fsimage
*/
private void loadPools(DataInput in)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfPools = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfPools; i++) {
addCachePool(CachePoolInfo.readFrom(in));
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
/**
* Load cache entries from the fsimage
*/
private void loadEntries(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfEntries = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfEntries);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfEntries; i++) {
long entryId = in.readLong();
String path = Text.readString(in);
short replication = in.readShort();
String poolName = Text.readString(in);
// Get pool reference by looking it up in the map
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("Entry refers to pool " + poolName +
", which does not exist.");
}
PathBasedCacheEntry entry =
new PathBasedCacheEntry(entryId, path, replication, pool);
if (entriesById.put(entry.getEntryId(), entry) != null) {
throw new IOException("An entry with ID " + entry.getEntryId() +
" already exists");
}
List<PathBasedCacheEntry> entries = entriesByPath.get(entry.getPath());
if (entries == null) {
entries = new LinkedList<PathBasedCacheEntry>();
entriesByPath.put(entry.getPath(), entries);
}
entries.add(entry);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
}