blob: 045c8cd2eecd96579987feaa2d7d5fc9c4e48c9c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.InstrumentedReadWriteLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static final Logger LOG = LoggerFactory.getLogger(FsDatasetImpl.class);
private final static boolean isNativeIOAvailable;
private Timer timer;
static {
isNativeIOAvailable = NativeIO.isAvailable();
if (Path.WINDOWS && !isNativeIOAvailable) {
LOG.warn("Data node cannot fully support concurrent reading"
+ " and writing without native code extensions on Windows.");
@Override // FsDatasetSpi
public FsVolumeReferences getFsVolumeReferences() {
return new FsVolumeReferences(volumes.getVolumes());
public DatanodeStorage getStorage(final String storageUuid) {
return storageMap.get(storageUuid);
@Override // FsDatasetSpi
public StorageReport[] getStorageReports(String bpid)
throws IOException {
List<StorageReport> reports;
// Volumes are the references from a copy-on-write snapshot, so the
// access on the volume metrics doesn't require an additional lock.
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
reports = new ArrayList<>(curVolumes.size());
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
StorageReport sr = new StorageReport(volume.toDatanodeStorage(),
} catch (ClosedChannelException e) {
return reports.toArray(new StorageReport[reports.size()]);
public FsVolumeImpl getVolume(final ExtendedBlock b) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) {
return null;
return new Block(blkid, r.getBytesOnDisk(), r.getGenerationStamp());
* The deepCopyReplica call doesn't use the datasetock since it will lead the
* potential deadlock with the {@link FsVolumeList#addBlockPool} call.
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
replicas =
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
: volumeMap.replicas(bpid));
return Collections.unmodifiableSet(replicas);
* This should be primarily used for testing.
* @return clone of replica store in datanode memory
ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
ReplicaInfo r = volumeMap.get(bpid, blockId);
if (r == null) {
return null;
switch(r.getState()) {
case RBW:
case RWR:
case RUR:
return new ReplicaBuilder(r.getState()).from(r).build();
return null;
@Override // FsDatasetSpi
public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
throws IOException {
ReplicaInfo info = getBlockReplica(b);
if (info == null || !info.metadataExists()) {
return null;
return info.getMetadataInputStream(0);
final DataNode datanode;
final DataStorage dataStorage;
private final FsVolumeList volumes;
final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService;
final Daemon lazyWriter;
final FsDatasetCache cacheManager;
private final Configuration conf;
private final int volFailuresTolerated;
private final int volsConfigured;
private volatile boolean fsRunning;
final ReplicaMap volumeMap;
final Map<String, Set<Long>> deletingBlock;
final RamDiskReplicaTracker ramDiskReplicaTracker;
final RamDiskAsyncLazyPersistService asyncLazyPersistService;
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
private final int smallBufferSize;
final LocalFileSystem localFS;
private boolean blockPinningEnabled;
private final int maxDataLength;
final AutoCloseableLock datasetWriteLock;
final AutoCloseableLock datasetReadLock;
final InstrumentedReadWriteLock datasetRWLock;
private final Condition datasetWriteLockCondition;
private static String blockPoolId = "";
* An FSDataset has a directory where it loads its data files.
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
this.fsRunning = true;
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetRWLock = new InstrumentedReadWriteLock(
"FsDatasetRWLock", LOG, conf.getTimeDuration(
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
boolean enableRL = conf.getBoolean(
// The read lock can be disabled by the above config key. If it is disabled
// then we simply make the both the read and write lock variables hold
// the write lock. All accesses to the lock are via these variables, so that
// effectively disables the read lock.
if (enableRL) {"The datanode lock is a read write lock");
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
} else {"The datanode lock is an exclusive write lock");
this.datasetReadLock = this.datasetWriteLock;
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
dataLocations, storage);
volsConfigured = datanode.getDnConf().getVolsConfigured();
int volsFailed = volumeFailureInfos.size();
if (volFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT
|| volFailuresTolerated >= volsConfigured) {
throw new DiskErrorException("Invalid value configured for "
+ "dfs.datanode.failed.volumes.tolerated - " + volFailuresTolerated
+ ". Value configured is either less than maxVolumeFailureLimit or greater than "
+ "to the number of configured volumes (" + volsConfigured + ").");
if (volFailuresTolerated == DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
if (volsConfigured == volsFailed) {
throw new DiskErrorException(
"Too many failed volumes - " + "current valid volumes: "
+ storage.getNumStorageDirs() + ", volumes configured: "
+ volsConfigured + ", volumes failed: " + volsFailed
+ ", volume failures tolerated: " + volFailuresTolerated);
} else {
if (volsFailed > volFailuresTolerated) {
throw new DiskErrorException(
"Too many failed volumes - " + "current valid volumes: "
+ storage.getNumStorageDirs() + ", volumes configured: "
+ volsConfigured + ", volumes failed: " + volsFailed
+ ", volume failures tolerated: " + volFailuresTolerated);
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
deletingBlock = new HashMap<String, Set<Long>>();
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps.
// We need to start the lazy writer even if MaxLockedMemory is set to
// zero because we may have un-persisted replicas in memory from before
// the process restart. To minimize the chances of data loss we'll
// ensure they get written to disk now.
if (ramDiskReplicaTracker.numReplicasNotPersisted() > 0 ||
datanode.getDnConf().getMaxLockedMemory() > 0) {
lazyWriter = new Daemon(new LazyWriter(conf));
} else {
lazyWriter = null;
// Add a Metrics2 Source Interface. This is same
// data as MXBean. We can remove the registerMbean call
// in a release where we can break backward compatibility
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.register("FSDatasetState", "FSDatasetState", this);
localFS = FileSystem.getLocal(conf);
blockPinningEnabled = conf.getBoolean(
maxDataLength = conf.getInt(
public AutoCloseableLock acquireDatasetLock() {
return datasetWriteLock.acquire();
public AutoCloseableLock acquireDatasetReadLock() {
return datasetReadLock.acquire();
* Gets initial volume failure information for all volumes that failed
* immediately at startup. The method works by determining the set difference
* between all configured storage locations and the actual storage locations in
* use after attempting to put all of them into service.
* @return each storage location that has failed
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
Set<StorageLocation> failedLocationSet = Sets.newHashSetWithExpectedSize(
for (StorageLocation sl: dataLocations) {
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd =;
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
long failureDate =;
for (StorageLocation failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
return volumeFailureInfos;
* Activate a volume to serve requests.
* @throws IOException if the storage UUID already exists.
private void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
"Found duplicated storage UUID: %s in %s.",
sd.getStorageUuid(), sd.getVersionFile());
throw new IOException(errorMsg);
new DatanodeStorage(sd.getStorageUuid(),
asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume());
private void addVolume(Storage.StorageDirectory sd) throws IOException {
final StorageLocation storageLocation = sd.getStorageLocation();
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImplBuilder()
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);"Added volume - " + storageLocation + ", StorageType: " +
public FsVolumeImpl createFsVolume(String storageUuid,
Storage.StorageDirectory sd,
final StorageLocation location) throws IOException {
return new FsVolumeImplBuilder()
public void addVolume(final StorageLocation location,
final List<NamespaceInfo> nsInfos)
throws IOException {
// Prepare volume in DataStorage
final DataStorage.VolumeBuilder builder;
try {
builder = dataStorage.prepareVolume(datanode, location, nsInfos);
} catch (IOException e) {
volumes.addVolumeFailureInfo(new VolumeFailureInfo(location,;
throw e;
final Storage.StorageDirectory sd = builder.getStorageDirectory();
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd, location);
final ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) {
String bpid = nsInfo.getBlockPoolID();
try {
fsVolume.addBlockPool(bpid, this.conf, this.timer);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume +
". Will throw later.", e);
if (!exceptions.isEmpty()) {
try {
} catch (IOException e) {
throw MultipleIOException.createIOException(exceptions);
final FsVolumeReference ref = fsVolume.obtainReference();
activateVolume(tempVolumeMap, sd, storageType, ref);"Added volume - " + location + ", StorageType: " + storageType);
* Removes a set of volumes from FsDataset.
* @param storageLocsToRemove a set of
* {@link StorageLocation}s for each volume.
* @param clearFailure set true to clear failure information.
public void removeVolumes(
final Collection<StorageLocation> storageLocsToRemove,
boolean clearFailure) {
Collection<StorageLocation> storageLocationsToRemove =
new ArrayList<>(storageLocsToRemove);
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final StorageLocation sdLocation = sd.getStorageLocation();"Checking removing StorageLocation " +
sdLocation + " with id " + sd.getStorageUuid());
if (storageLocationsToRemove.contains(sdLocation)) {"Removing StorageLocation " + sdLocation + " with id " +
sd.getStorageUuid() + " from FsDataset.");
// Disable the volume from the service.
volumes.removeVolume(sdLocation, clearFailure);
volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
// Removed all replica information for the blocks on the volume.
// Unlike updating the volumeMap in addVolume(), this operation does
// not scan disks.
for (String bpid : volumeMap.getBlockPoolList()) {
List<ReplicaInfo> blocks = blkToInvalidate
.computeIfAbsent(bpid, (k) -> new ArrayList<>());
for (Iterator<ReplicaInfo> it =
volumeMap.replicas(bpid).iterator(); it.hasNext();) {
ReplicaInfo block =;
final StorageLocation blockStorageLocation =
LOG.trace("checking for block " + block.getBlockId() +
" with storageLocation " + blockStorageLocation);
if (blockStorageLocation.equals(sdLocation)) {
// A reconfigure can remove the storage location which is already
// removed when the failure was detected by DataNode#checkDiskErrorAsync.
// Now, lets remove this from the failed volume list.
if (clearFailure) {
for (StorageLocation storageLocToRemove : storageLocationsToRemove) {
// Call this outside the lock.
for (Map.Entry<String, List<ReplicaInfo>> entry :
blkToInvalidate.entrySet()) {
String bpid = entry.getKey();
List<ReplicaInfo> blocks = entry.getValue();
for (ReplicaInfo block : blocks) {
invalidate(bpid, block);
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for(String storageUuid : storageToRemove) {
* Return the total space used by dfs datanode
@Override // FSDatasetMBean
public long getDfsUsed() throws IOException {
return volumes.getDfsUsed();
* Return the total space used by dfs datanode
@Override // FSDatasetMBean
public long getBlockPoolUsed(String bpid) throws IOException {
return volumes.getBlockPoolUsed(bpid);
* Return true - if there are still valid volumes on the DataNode.
@Override // FsDatasetSpi
public boolean hasEnoughResource() {
if (volFailuresTolerated == DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
// If volFailuresTolerated configured maxVolumeFailureLimit then minimum
// one volume is required.
return volumes.getVolumes().size() >= 1;
} else {
return getNumFailedVolumes() <= volFailuresTolerated;
* Return total capacity, used and unused
@Override // FSDatasetMBean
public long getCapacity() throws IOException {
return volumes.getCapacity();
* Return how many bytes can still be stored in the FSDataset
@Override // FSDatasetMBean
public long getRemaining() throws IOException {
return volumes.getRemaining();
* Return the number of failed volumes in the FSDataset.
@Override // FSDatasetMBean
public int getNumFailedVolumes() {
return volumes.getVolumeFailureInfos().length;
@Override // FSDatasetMBean
public String[] getFailedStorageLocations() {
VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
for (VolumeFailureInfo info: infos) {
return failedStorageLocations.toArray(
new String[failedStorageLocations.size()]);
@Override // FSDatasetMBean
public long getLastVolumeFailureDate() {
long lastVolumeFailureDate = 0;
for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
long failureDate = info.getFailureDate();
if (failureDate > lastVolumeFailureDate) {
lastVolumeFailureDate = failureDate;
return lastVolumeFailureDate;
@Override // FSDatasetMBean
public long getEstimatedCapacityLostTotal() {
long estimatedCapacityLostTotal = 0;
for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
return estimatedCapacityLostTotal;
@Override // FsDatasetSpi
public VolumeFailureSummary getVolumeFailureSummary() {
VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
if (infos.length == 0) {
return null;
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
long lastVolumeFailureDate = 0;
long estimatedCapacityLostTotal = 0;
for (VolumeFailureInfo info: infos) {
long failureDate = info.getFailureDate();
if (failureDate > lastVolumeFailureDate) {
lastVolumeFailureDate = failureDate;
estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
return new VolumeFailureSummary(
failedStorageLocations.toArray(new String[failedStorageLocations.size()]),
lastVolumeFailureDate, estimatedCapacityLostTotal);
@Override // FSDatasetMBean
public long getCacheUsed() {
return cacheManager.getCacheUsed();
@Override // FSDatasetMBean
public long getCacheCapacity() {
return cacheManager.getCacheCapacity();
@Override // FSDatasetMBean
public long getNumBlocksFailedToCache() {
return cacheManager.getNumBlocksFailedToCache();
@Override // FSDatasetMBean
public long getNumBlocksFailedToUncache() {
return cacheManager.getNumBlocksFailedToUncache();
* Get metrics from the metrics source
* @param collector to contain the resulting metrics snapshot
* @param all if true, return all metrics even if unchanged.
public void getMetrics(MetricsCollector collector, boolean all) {
try {
DataNodeMetricHelper.getMetrics(collector, this, "FSDatasetState");
} catch (Exception e) {
LOG.warn("Exception thrown while metric collection. Exception : "
+ e.getMessage());
@Override // FSDatasetMBean
public long getNumBlocksCached() {
return cacheManager.getNumBlocksCached();
* Find the block's on-disk length
@Override // FsDatasetSpi
public long getLength(ExtendedBlock b) throws IOException {
return getBlockReplica(b).getBlockDataLength();
* Get File name for a given block.
private ReplicaInfo getBlockReplica(ExtendedBlock b) throws IOException {
return getBlockReplica(b.getBlockPoolId(), b.getBlockId());
* Get File name for a given block.
ReplicaInfo getBlockReplica(String bpid, long blockId) throws IOException {
ReplicaInfo r = validateBlockFile(bpid, blockId);
if (r == null) {
throw new FileNotFoundException("BlockId " + blockId + " is not valid.");
return r;
@Override // FsDatasetSpi
public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
ReplicaInfo info;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
if (info != null && info.getVolume().isTransientStorage()) {
ramDiskReplicaTracker.touch(b.getBlockPoolId(), b.getBlockId());
if (info == null || !info.blockDataExists()) {
throw new IOException("No data exists for block " + b);
return getBlockInputStreamWithCheckingPmemCache(info, b, seekOffset);
* Check whether the replica is cached to persistent memory.
* If so, get DataInputStream of the corresponding cache file on pmem.
private InputStream getBlockInputStreamWithCheckingPmemCache(
ReplicaInfo info, ExtendedBlock b, long seekOffset) throws IOException {
String cachePath = cacheManager.getReplicaCachePath(
b.getBlockPoolId(), b.getBlockId());
if (cachePath != null) {
long addr = cacheManager.getCacheAddress(
b.getBlockPoolId(), b.getBlockId());
if (addr != -1) {
LOG.debug("Get InputStream by cache address.");
return FsDatasetUtil.getDirectInputStream(
addr + seekOffset, info.getBlockDataLength() - seekOffset);
LOG.debug("Get InputStream by cache file path.");
return FsDatasetUtil.getInputStreamAndSeek(
new File(cachePath), seekOffset);
return info.getDataInputStream(seekOffset);
* Get the meta info of a block stored in volumeMap. To find a block,
* block pool Id, block Id and generation stamp must match.
* @param b extended block
* @return the meta replica information
* @throws ReplicaNotFoundException if no entry is in the map or
* there is a generation stamp mismatch
ReplicaInfo getReplicaInfo(ExtendedBlock b)
throws ReplicaNotFoundException {
ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
if (info == null) {
if (volumeMap.get(b.getBlockPoolId(), b.getLocalBlock().getBlockId())
== null) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
} else {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b);
return info;
* Get the meta info of a block stored in volumeMap. Block is looked up
* without matching the generation stamp.
* @param bpid block pool Id
* @param blkid block Id
* @return the meta replica information; null if block was not found
* @throws ReplicaNotFoundException if no entry is in the map or
* there is a generation stamp mismatch
ReplicaInfo getReplicaInfo(String bpid, long blkid)
throws ReplicaNotFoundException {
ReplicaInfo info = volumeMap.get(bpid, blkid);
if (info == null) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
return info;
* Returns handles to the block file and its metadata file
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
InputStream blockInStream = info.getDataInputStream(blkOffset);
try {
InputStream metaInStream = info.getMetadataInputStream(metaOffset);
return new ReplicaInputStreams(
blockInStream, metaInStream, ref, datanode.getFileIoProvider());
} catch (IOException e) {
IOUtils.cleanup(null, blockInStream);
throw e;
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
static File moveBlockFiles(Block b, ReplicaInfo replicaInfo, File destdir)
throws IOException {
final File dstfile = new File(destdir, b.getBlockName());
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
try {
} catch (IOException e) {
throw new IOException("Failed to move meta file for " + b
+ " from " + replicaInfo.getMetadataURI() + " to " + dstmeta, e);
try {
} catch (IOException e) {
throw new IOException("Failed to move block file for " + b
+ " from " + replicaInfo.getBlockURI() + " to "
+ dstfile.getAbsolutePath(), e);
if (LOG.isDebugEnabled()) {
LOG.debug("addFinalizedBlock: Moved " + replicaInfo.getMetadataURI()
+ " to " + dstmeta + " and " + replicaInfo.getBlockURI()
+ " to " + dstfile);
return dstfile;
* Copy the block and meta files for the given block to the given destination.
* @return the new meta and block files.
* @throws IOException
static File[] copyBlockFiles(long blockId, long genStamp,
ReplicaInfo srcReplica, File destRoot, boolean calculateChecksum,
int smallBufferSize, final Configuration conf) throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
// blockName is same as the filename for the block
final File dstFile = new File(destDir, srcReplica.getBlockName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
return copyBlockFiles(srcReplica, dstMeta, dstFile, calculateChecksum,
smallBufferSize, conf);
static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
File dstFile, boolean calculateChecksum,
int smallBufferSize, final Configuration conf)
throws IOException {
if (calculateChecksum) {
computeChecksum(srcReplica, dstMeta, smallBufferSize, conf);
} else {
try {
} catch (IOException e) {
throw new IOException("Failed to copy " + srcReplica + " metadata to "
+ dstMeta, e);
try {
} catch (IOException e) {
throw new IOException("Failed to copy " + srcReplica + " block file to "
+ dstFile, e);
if (LOG.isDebugEnabled()) {
if (calculateChecksum) {
LOG.debug("Copied " + srcReplica.getMetadataURI() + " meta to "
+ dstMeta + " and calculated checksum");
} else {
LOG.debug("Copied " + srcReplica.getBlockURI() + " to " + dstFile);
return new File[] {dstMeta, dstFile};
* Move block files from one storage to another storage.
* @return Returns the Old replicaInfo
* @throws IOException
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
StorageType targetStorageType, String targetStorageId)
throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(block);
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
if (replicaInfo.getNumBytes() != block.getNumBytes()) {
throw new IOException("Corrupted replica " + replicaInfo
+ " with a length of " + replicaInfo.getNumBytes()
+ " expected length is " + block.getNumBytes());
if (replicaInfo.getVolume().getStorageType() == targetStorageType) {
throw new ReplicaAlreadyExistsException("Replica " + replicaInfo
+ " already exists on storage " + targetStorageType);
if (replicaInfo.isOnTransientStorage()) {
// Block movement from RAM_DISK will be done by LazyPersist mechanism
throw new IOException("Replica " + replicaInfo
+ " cannot be moved from storageType : "
+ replicaInfo.getVolume().getStorageType());
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
try {
moveBlock(block, replicaInfo, volumeRef);
} finally {
if (volumeRef != null) {
// Replace the old block if any to reschedule the scanning.
return replicaInfo;
* Moves a block from a given volume to another.
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @return newReplicaInfo
* @throws IOException
ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
finalizeNewReplica(newReplicaInfo, block);
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
return newReplicaInfo;
* Cleanup the replicaInfo object passed.
* @param bpid - block pool id
* @param replicaInfo - ReplicaInfo
private void cleanupReplica(String bpid, ReplicaInfo replicaInfo) {
if (replicaInfo.deleteBlockData() || !replicaInfo.blockDataExists()) {
FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
volume.onBlockFileDeletion(bpid, replicaInfo.getBytesOnDisk());
if (replicaInfo.deleteMetadata() || !replicaInfo.metadataExists()) {
volume.onMetaFileDeletion(bpid, replicaInfo.getMetadataLength());
* Create a new temporary replica of replicaInfo object in specified volume.
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @return newReplicaInfo new replica object created in specified volume.
* @throws IOException
ReplicaInfo copyReplicaToVolume(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Copy files to temp dir first
ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block,
replicaInfo, smallBufferSize, conf);
return newReplicaInfo;
* Finalizes newReplica by calling finalizeReplica internally.
* @param newReplicaInfo - ReplicaInfo
* @param block - Extended Block
* @throws IOException
void finalizeNewReplica(ReplicaInfo newReplicaInfo,
ExtendedBlock block) throws IOException {
// Finalize the copied files
try {
String bpid = block.getBlockPoolId();
finalizeReplica(bpid, newReplicaInfo);
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
} catch (IOException ioe) {
// Cleanup block data and metadata
// Decrement of dfsUsed and noOfBlocks for volume not required
throw ioe;
* Moves a given block from one volume to another volume. This is used by disk
* balancer.
* @param block - ExtendedBlock
* @param destination - Destination volume
* @return Old replica info
public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi
destination) throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(block);
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = destination.obtainReference();
try {
moveBlock(block, replicaInfo, volumeRef);
} finally {
if (volumeRef != null) {
return replicaInfo;
* Compute and store the checksum for a block file that does not already have
* its checksum computed.
* @param srcReplica source {@link ReplicaInfo}, containing only the checksum
* header, not a calculated checksum
* @param dstMeta destination meta file, into which this method will write a
* full computed checksum
* @param smallBufferSize buffer size to use
* @param conf the {@link Configuration}
* @throws IOException
static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
int smallBufferSize, final Configuration conf)
throws IOException {
final File srcMeta = new File(srcReplica.getMetadataURI());
DataChecksum checksum;
try (FileInputStream fis =
srcReplica.getVolume(), srcMeta)) {
checksum = BlockMetadataHeader.readDataChecksum(
fis, DFSUtilClient.getIoFileBufferSize(conf), srcMeta);
final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
DataOutputStream metaOut = null;
try {
File parentFile = dstMeta.getParentFile();
if (parentFile != null) {
if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
throw new IOException("Destination '" + parentFile
+ "' directory cannot be created");
metaOut = new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(dstMeta), smallBufferSize));
BlockMetadataHeader.writeHeader(metaOut, checksum);
int offset = 0;
try (InputStream dataIn = srcReplica.getDataInputStream(0)) {
for (int n; (n =, offset, data.length - offset)) != -1; ) {
if (n > 0) {
n += offset;
offset = n % checksum.getBytesPerChecksum();
final int length = n - offset;
if (length > 0) {
checksum.calculateChunkedSums(data, 0, length, crcs, 0);
metaOut.write(crcs, 0, checksum.getChecksumSize(length));
System.arraycopy(data, length, data, 0, offset);
// calculate and write the last crc
checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
metaOut.write(crcs, 0, 4);
metaOut = null;
} finally {
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
// re-opens the connection and retries sending those packets.
// The other reason is that an "append" is occurring to this block.
// check the validity of the parameter
if (newGS < b.getGenerationStamp()) {
throw new IOException("The new generation stamp " + newGS +
" should be greater than the replica " + b + "'s generation stamp");
ReplicaInfo replicaInfo = getReplicaInfo(b);"Appending to " + replicaInfo);
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
if (replicaInfo.getNumBytes() != expectedBlockLen) {
throw new IOException("Corrupted replica " + replicaInfo +
" with a length of " + replicaInfo.getNumBytes() +
" expected length is " + expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaInPipeline replica = null;
try {
replica = append(b.getBlockPoolId(), replicaInfo, newGS,
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
return new ReplicaHandler(replica, ref);
/** Append to a finalized replica
* Change a finalized replica to be a RBW replica and
* bump its generation stamp to be the newGS
* @param bpid block pool Id
* @param replicaInfo a finalized replica
* @param newGS new generation stamp
* @param estimateBlockLen estimate block length
* @return a RBW replica
* @throws IOException if moving the replica from finalized directory
* to rbw directory fails
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
+ "Replica with blk id " + replicaInfo.getBlockId() + " has state "
+ replicaInfo.getState());
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
// If there are any hardlinks to the block, break them. This ensures
// we are not appending to a file that is part of a previous/ directory.
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
ReplicaInPipeline rip = v.append(bpid, replicaInfo,
newGS, estimateBlockLen);
if (rip.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("Append on block " + replicaInfo.getBlockId() +
" returned a replica of state " + rip.getReplicaInfo().getState()
+ "; expected RBW");
// Replace finalized replica by a RBW replica in replicas map
volumeMap.add(bpid, rip.getReplicaInfo());
return rip;
private static class MustStopExistingWriter extends Exception {
private final ReplicaInPipeline rip;
MustStopExistingWriter(ReplicaInPipeline rip) { = rip;
ReplicaInPipeline getReplicaInPipeline() {
return rip;
private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException, MustStopExistingWriter {
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check state
if (replicaInfo.getState() != ReplicaState.FINALIZED &&
replicaInfo.getState() != ReplicaState.RBW) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
// check generation stamp
long replicaGenerationStamp = replicaInfo.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
replicaGenerationStamp > newGS) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
+ ". Expected GS range is [" + b.getGenerationStamp() + ", " +
newGS + "].");
// stop the previous writer before check a replica's length
long replicaLen = replicaInfo.getNumBytes();
if (replicaInfo.getState() == ReplicaState.RBW) {
ReplicaInPipeline rbw = (ReplicaInPipeline) replicaInfo;
if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
throw new MustStopExistingWriter(rbw);
// check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
if (replicaLen != rbw.getBytesOnDisk()
|| replicaLen != rbw.getBytesAcked()) {
throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo +
"bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" +
rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
") are not the same.");
// check block length
if (replicaLen != expectedBlockLen) {
throw new IOException("Corrupted replica " + replicaInfo +
" with a length of " + replicaLen +
" expected length is " + expectedBlockLen);
return replicaInfo;
@Override // FsDatasetSpi
public ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {"Recover failed append to " + b);
while (true) {
try {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaInPipeline replica;
try {
// change the replica's state/gs etc.
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
replica = append(b.getBlockPoolId(), replicaInfo,
newGS, b.getNumBytes());
} else { //RBW
replica = (ReplicaInPipeline) replicaInfo;
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
return new ReplicaHandler(replica, ref);
} catch (MustStopExistingWriter e) {
@Override // FsDatasetSpi
public Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {"Recover failed close " + b);
while (true) {
try {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
// finalize the replica if RBW
if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo);
return replicaInfo;
} catch (MustStopExistingWriter e) {
@Override // FsDatasetSpi
public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
// create a new block
FsVolumeReference ref = null;
// Use ramdisk only if block size is a multiple of OS page size.
// This simplifies reservation for partially used replicas
// significantly.
if (allowLazyPersist &&
lazyWriter != null &&
b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
reserveLockedMemory(b.getNumBytes())) {
try {
// First try to place the block on a transient volume.
ref = volumes.getNextTransientVolume(b.getNumBytes());
} catch (DiskOutOfSpaceException de) {
// Ignore the exception since we just fall back to persistent storage.
LOG.warn("Insufficient space for placing the block on a transient "
+ "volume, fall back to persistent storage: "
+ de.getMessage());
} finally {
if (ref == null) {
if (ref == null) {
ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create an rbw file to hold block in the designated volume
if (allowLazyPersist && !v.isTransientStorage()) {
ReplicaInPipeline newReplicaInfo;
try {
newReplicaInfo = v.createRbw(b);
if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("CreateRBW returned a replica of state "
+ newReplicaInfo.getReplicaInfo().getState()
+ " for block " + b.getBlockId());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
@Override // FsDatasetSpi
public ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {"Recover RBW replica " + b);
while (true) {
try {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
if (replicaInfo.getState() != ReplicaState.RBW) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
ReplicaInPipeline rbw = (ReplicaInPipeline)replicaInfo;
if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
throw new MustStopExistingWriter(rbw);
}"At " + datanode.getDisplayName() + ", Recovering " + rbw);
return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
} catch (MustStopExistingWriter e) {
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
replicaGenerationStamp > newGS) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
". Expected GS range is [" + b.getGenerationStamp() + ", " +
newGS + "].");
// check replica length
long bytesAcked = rbw.getBytesAcked();
long numBytes = rbw.getNumBytes();
if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) {
throw new ReplicaNotFoundException("Unmatched length replica " +
rbw + ": BytesAcked = " + bytesAcked +
" BytesRcvd = " + numBytes + " are not in the range of [" +
minBytesRcvd + ", " + maxBytesRcvd + "].");
long bytesOnDisk = rbw.getBytesOnDisk();
long blockDataLength = rbw.getReplicaInfo().getBlockDataLength();
if (bytesOnDisk != blockDataLength) {"Resetting bytesOnDisk to match blockDataLength (={}) for " +
"replica {}", blockDataLength, rbw);
bytesOnDisk = blockDataLength;
rbw.setLastChecksumAndDataLen(bytesOnDisk, null);
if (bytesOnDisk < bytesAcked) {
throw new ReplicaNotFoundException("Found fewer bytesOnDisk than " +
"bytesAcked for replica " + rbw);
FsVolumeReference ref = rbw.getReplicaInfo()
try {
// Truncate the potentially corrupt portion.
// If the source was client and the last node in the pipeline was lost,
// any corrupt data written after the acked length can go unnoticed.
if (bytesOnDisk > bytesAcked) {
rbw.setLastChecksumAndDataLen(bytesAcked, null);
// bump the replica's generation stamp to newGS
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
return new ReplicaHandler(rbw, ref);
@Override // FsDatasetSpi
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();"Convert " + b + " from Temporary to RBW, visible length="
+ visible);
final ReplicaInfo temp;
// get replica
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
if (r == null) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
// check the replica's state
if (r.getState() != ReplicaState.TEMPORARY) {
throw new ReplicaAlreadyExistsException(
"r.getState() != ReplicaState.TEMPORARY, r=" + r);
temp = r;
// check generation stamp
if (temp.getGenerationStamp() != expectedGs) {
throw new ReplicaAlreadyExistsException(
"temp.getGenerationStamp() != expectedGs = " + expectedGs
+ ", temp=" + temp);
// TODO: check writer?
// set writer to the current thread
// temp.setWriter(Thread.currentThread());
// check length
final long numBytes = temp.getNumBytes();
if (numBytes < visible) {
throw new IOException(numBytes + " = numBytes < visible = "
+ visible + ", temp=" + temp);
// check volume
final FsVolumeImpl v = (FsVolumeImpl) temp.getVolume();
if (v == null) {
throw new IOException("r.getVolume() = null, temp=" + temp);
final ReplicaInPipeline rbw = v.convertTemporaryToRbw(b, temp);
if(rbw.getState() != ReplicaState.RBW) {
throw new IOException("Expected replica state: " + ReplicaState.RBW
+ " obtained " + rbw.getState() + " for converting block "
+ b.getBlockId());
// overwrite the RBW in the volume map
volumeMap.add(b.getBlockPoolId(), rbw.getReplicaInfo());
return rbw;
private boolean isReplicaProvided(ReplicaInfo replicaInfo) {
if (replicaInfo == null) {
return false;
return replicaInfo.getVolume().getStorageType() == StorageType.PROVIDED;
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(StorageType storageType,
String storageId, ExtendedBlock b, boolean isTransfer)
throws IOException {
long startTimeMs = Time.monotonicNow();
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
boolean isInPipeline = false;
do {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
} else {
isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
|| currentReplicaInfo.getState() == ReplicaState.RBW;
* If the current block is not PROVIDED and old, reject.
* else If transfer request, then accept it.
* else if state is not RBW/Temporary, then reject
* If current block is PROVIDED, ignore the replica.
if (((currentReplicaInfo.getGenerationStamp() >= b
.getGenerationStamp()) || (!isTransfer && !isInPipeline))
&& !isReplicaProvided(currentReplicaInfo)) {
throw new ReplicaAlreadyExistsException("Block " + b
+ " already exists in state " + currentReplicaInfo.getState()
+ " and thus cannot be created.");
lastFoundReplicaInfo = currentReplicaInfo;
if (!isInPipeline) {
// Hang too long, just bail out. This is not supposed to happen.
long writerStopMs = Time.monotonicNow() - startTimeMs;
if (writerStopMs > writerStopTimeoutMs) {
LOG.warn("Unable to stop existing writer for block " + b + " after "
+ writerStopMs + " miniseconds.");
throw new IOException("Unable to stop existing writer for block " + b
+ " after " + writerStopMs + " miniseconds.");
// if lastFoundReplicaInfo is PROVIDED and FINALIZED,
// stopWriter isn't required.
if (isReplicaProvided(lastFoundReplicaInfo) &&
lastFoundReplicaInfo.getState() == ReplicaState.FINALIZED) {
// Stop the previous writer
} while (true);
if (lastFoundReplicaInfo != null
&& !isReplicaProvided(lastFoundReplicaInfo)) {
// Old blockfile should be deleted synchronously as it might collide
// with the new block if allocated in same volume.
// Do the deletion outside of lock as its DISK IO.
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
try {
newReplicaInfo = v.createTemporary(b);
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
* Sets the offset in the meta file so that the
* last checksum will be overwritten.
@Override // FsDatasetSpi
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
int checksumSize) throws IOException {
FileOutputStream file = (FileOutputStream)streams.getChecksumOut();
FileChannel channel = file.getChannel();
long oldPos = channel.position();
long newPos = oldPos - checksumSize;
if (LOG.isDebugEnabled()) {
LOG.debug("Changing meta file offset of block " + b + " from " +
oldPos + " to " + newPos);
// REMIND - mjc - eventually we should have a timeout system
// in place to clean up block files left by abandoned clients.
// We should have some timer in place, so that if a blockfile
// is created but non-valid, and has been idle for >48 hours,
// we can GC it safely.
* Complete the block write!
@Override // FsDatasetSpi
public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
* Sync the directory after rename from tmp/rbw to Finalized if
* configured. Though rename should be atomic operation, sync on both
* dest and src directories are done because IOUtils.fsync() calls
* directory's channel sync, not the journal itself.
if (fsyncDir && finalizedReplicaInfo instanceof FinalizedReplica
&& replicaInfo instanceof LocalReplica) {
FinalizedReplica finalizedReplica =
(FinalizedReplica) finalizedReplicaInfo;
LocalReplica localReplica = (LocalReplica) replicaInfo;
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
throw new IOException("Generation Stamp should be monotonically "
+ "increased.");
ReplicaInfo newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
== ReplicaState.FINALIZED) {
newReplicaInfo = replicaInfo.getOriginalReplica();
} else {
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
if (v == null) {
throw new IOException("No volume for block " + replicaInfo);
newReplicaInfo = v.addFinalizedBlock(
bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved());
if (v.isTransientStorage()) {
- replicaInfo.getNumBytes(),
bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
assert newReplicaInfo.getState() == ReplicaState.FINALIZED
: "Replica should be finalized";
volumeMap.add(bpid, newReplicaInfo);
return newReplicaInfo;
* Remove the temporary block file (if any)
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
if (replicaInfo != null &&
replicaInfo.getState() == ReplicaState.TEMPORARY) {
// remove from volumeMap
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
// delete the on-disk temp file
if (delBlockFromDisk(replicaInfo)) {
LOG.warn("Block " + b + " unfinalized and removed. ");
if (replicaInfo.getVolume().isTransientStorage()) {
b.getBlockId(), true);
* Remove a block from disk
* @param info the replica that needs to be deleted
* @return true if data for the replica are deleted; false otherwise
private boolean delBlockFromDisk(ReplicaInfo info) {
if (!info.deleteBlockData()) {
LOG.warn("Not able to delete the block data for replica " + info);
return false;
} else { // remove the meta file
if (!info.deleteMetadata()) {
LOG.warn("Not able to delete the meta data for replica " + info);
return false;
return true;
@Override // FsDatasetSpi
public List<Long> getCacheReport(String bpid) {
return cacheManager.getCachedBlocks(bpid);
public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
new HashMap<DatanodeStorage, BlockListAsLongs>();
Map<String, BlockListAsLongs.Builder> builders =
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
Set<String> missingVolumesReported = new HashSet<>();
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
// skip PROVIDED replicas.
if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
String volStorageID = b.getVolume().getStorageID();
switch(b.getState()) {
case RBW:
case RWR:
case RUR:
// use the original replica.
b = b.getOriginalReplica();
assert false : "Illegal ReplicaInfo state.";
BlockListAsLongs.Builder storageBuilder = builders.get(volStorageID);
// a storage in the process of failing will not be in the volumes list
// but will be in the replica map.
if (storageBuilder != null) {
} else {
if (!missingVolumesReported.contains(volStorageID)) {
LOG.warn("Storage volume: " + volStorageID + " missing for the"
+ " replica block: " + b + ". Probably being removed!");
for (FsVolumeImpl v : curVolumes) {
return blockReportsMap;
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
* {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
* pool.
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if (b.getState() == ReplicaState.FINALIZED) {
return finalized;
* Check if a block is valid.
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
* @throws ReplicaNotFoundException If the replica is not found
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
* @throws FileNotFoundException If the block file is not found or there
* was an error locating it.
* @throws EOFException If the replica length is too short.
* @throws IOException May be thrown from the methods called.
@Override // FsDatasetSpi
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException,
FileNotFoundException, EOFException, IOException {
final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
if (replicaInfo == null) {
throw new ReplicaNotFoundException(b);
if (replicaInfo.getState() != state) {
throw new UnexpectedReplicaStateException(b,state);
if (!replicaInfo.blockDataExists()) {
throw new FileNotFoundException(replicaInfo.getBlockURI().toString());
long onDiskLength = getLength(b);
if (onDiskLength < minLength) {
throw new EOFException(b + "'s on-disk length " + onDiskLength
+ " is shorter than minLength " + minLength);
* Check whether the given block is a valid one.
* valid means finalized
@Override // FsDatasetSpi
public boolean isValidBlock(ExtendedBlock b) {
// If block passed is null, we should return false.
if (b == null) {
return false;
return isValid(b, ReplicaState.FINALIZED);
* Check whether the given block is a valid RBW.
@Override // {@link FsDatasetSpi}
public boolean isValidRbw(final ExtendedBlock b) {
// If block passed is null, we should return false.
if (b == null) {
return false;
return isValid(b, ReplicaState.RBW);
/** Does the block exist and have the given state? */
private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
try {
checkBlock(b, 0, state);
} catch (IOException e) {
return false;
return true;
* Find the file corresponding to the block and return it if it exists.
ReplicaInfo validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final ReplicaInfo r;
r = volumeMap.get(bpid, blockId);
if (r != null) {
if (r.blockDataExists()) {
return r;
// if file is not null, but doesn't exist - possibly disk failed
if (LOG.isDebugEnabled()) {
LOG.debug("blockId=" + blockId + ", replica=" + r);
return null;
/** Check the files of a replica. */
static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
//check replica's data exists
if (!r.blockDataExists()) {
throw new FileNotFoundException("Block data not found, r=" + r);
if (r.getBytesOnDisk() != r.getBlockDataLength()) {
throw new IOException("Block length mismatch, len="
+ r.getBlockDataLength() + " but r=" + r);
//check replica's meta file
if (!r.metadataExists()) {
throw new IOException(r.getMetadataURI() + " does not exist, r=" + r);
if (r.getMetadataLength() == 0) {
throw new IOException("Metafile is empty, r=" + r);
* We're informed that a block is no longer valid. Delete it.
@Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
invalidate(bpid, invalidBlks, true);
private void invalidate(String bpid, Block[] invalidBlks, boolean async)
throws IOException {
final List<String> errors = new ArrayList<String>();
for (int i = 0; i < invalidBlks.length; i++) {
final ReplicaInfo removing;
final FsVolumeImpl v;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) {
ReplicaInfo infoByBlockId =
volumeMap.get(bpid, invalidBlks[i].getBlockId());
if (infoByBlockId == null) {
// It is okay if the block is not found -- it
// may be deleted earlier."Failed to delete replica " + invalidBlks[i]
+ ": ReplicaInfo not found.");
} else {
errors.add("Failed to delete replica " + invalidBlks[i]
+ ": GenerationStamp not matched, existing replica is "
+ Block.toString(infoByBlockId));
v = (FsVolumeImpl)info.getVolume();
if (v == null) {
errors.add("Failed to delete replica " + invalidBlks[i]
+ ". No volume for replica " + info);
try {
File blockFile = new File(info.getBlockURI());
if (blockFile != null && blockFile.getParentFile() == null) {
errors.add("Failed to delete replica " + invalidBlks[i]
+ ". Parent not found for block file: " + blockFile);
} catch(IllegalArgumentException e) {
LOG.warn("Parent directory check failed; replica " + info
+ " is not backed by a local file");
removing = volumeMap.remove(bpid, invalidBlks[i]);
addDeletingBlock(bpid, removing.getBlockId());
if (LOG.isDebugEnabled()) {
LOG.debug("Block file " + removing.getBlockURI()
+ " is to be deleted");
if (removing instanceof ReplicaInPipeline) {
((ReplicaInPipeline) removing).releaseAllBytesReserved();
if (v.isTransientStorage()) {
RamDiskReplica replicaInfo =
ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
if (replicaInfo != null) {
if (!replicaInfo.getIsPersisted()) {
replicaInfo.getBlockId(), true);
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
try {
if (async) {
// Delete the block asynchronously to make sure we can do it fast
// enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
asyncDiskService.deleteAsync(v.obtainReference(), removing,
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForReplica(bpid, removing));
} else {
asyncDiskService.deleteSync(v.obtainReference(), removing,
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForReplica(bpid, removing));
} catch (ClosedChannelException e) {
LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
"block " + invalidBlks[i]);
if (!errors.isEmpty()) {
StringBuilder b = new StringBuilder("Failed to delete ")
.append(errors.size()).append(" (out of ").append(invalidBlks.length)
.append(") replica(s):");
for(int i = 0; i < errors.size(); i++) {
b.append("\n").append(i).append(") ").append(errors.get(i));
throw new IOException(b.toString());
* Invalidate a block but does not delete the actual on-disk block file.
* It should only be used when deactivating disks.
* @param bpid the block pool ID.
* @param block The block to be invalidated.
public void invalidate(String bpid, ReplicaInfo block) {
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
new ExtendedBlockId(block.getBlockId(), bpid));
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, block.getBlockId());
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
* Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
private void cacheBlock(String bpid, long blockId) {
FsVolumeImpl volume;
String blockFileName;
long length, genstamp;
Executor volumeExecutor;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false;
try {
if (info == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
if (info.getState() != ReplicaState.FINALIZED) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": replica is not finalized; it is in state " +
try {
volume = (FsVolumeImpl)info.getVolume();
if (volume == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": volume not found.");
} catch (ClassCastException e) {
LOG.warn("Failed to cache block with id " + blockId +
": volume was not an instance of FsVolumeImpl.");
if (volume.isTransientStorage()) {
LOG.warn("Caching not supported on block with id " + blockId +
" since the volume is backed by RAM.");
success = true;
} finally {
if (!success) {
blockFileName = info.getBlockURI().toString();
length = info.getVisibleLength();
genstamp = info.getGenerationStamp();
volumeExecutor = volume.getCacheExecutor();
cacheManager.cacheBlock(blockId, bpid,
blockFileName, length, genstamp, volumeExecutor);
@Override // FsDatasetSpi
public void cache(String bpid, long[] blockIds) {
for (int i=0; i < blockIds.length; i++) {
cacheBlock(bpid, blockIds[i]);
@Override // FsDatasetSpi
public void uncache(String bpid, long[] blockIds) {
for (int i=0; i < blockIds.length; i++) {
cacheManager.uncacheBlock(bpid, blockIds[i]);
public boolean isCached(String bpid, long blockId) {
return cacheManager.isCached(bpid, blockId);
@Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId);
return (r != null && r.blockDataExists());
* check if a data directory is healthy
* if some volumes failed - the caller must emove all the blocks that belong
* to these failed volumes.
* @return the failed volumes. Returns null if no volume failed.
* @param failedVolumes
@Override // FsDatasetSpi
public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
@Override // FsDatasetSpi
public String toString() {
return "FSDataset{dirpath='"+volumes+"'}";
private ObjectName mbeanName;
* Register the FSDataset MBean using the name
* "hadoop:service=DataNode,name=FSDatasetState-<datanodeUuid>"
void registerMBean(final String datanodeUuid) {
// We wrap to bypass standard mbean naming convetion.
// This wraping can be removed in java 6 as it is more flexible in
// package naming for mbeans and their impl.
try {
StandardMBean bean = new StandardMBean(this,FSDatasetMBean.class);
mbeanName = MBeans.register("DataNode", "FSDatasetState-" + datanodeUuid, bean);
} catch (NotCompliantMBeanException e) {
LOG.warn("Error registering FSDatasetState MBean", e);
}"Registered FSDatasetState MBean");
@Override // FsDatasetSpi
public void shutdown() {
fsRunning = false;
if (lazyWriter != null) {
((LazyWriter) lazyWriter.getRunnable()).stop();
if (mbeanName != null) {
if (asyncDiskService != null) {
if (asyncLazyPersistService != null) {
if(volumes != null) {
if (lazyWriter != null) {
try {
} catch (InterruptedException ie) {
LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
"from LazyWriter.join");
@Override // FSDatasetMBean
public String getStorageInfo() {
return toString();
* Reconcile the difference between blocks on the disk and blocks in
* volumeMap
* Check the given block for inconsistencies. Look at the
* current state of the block and reconcile the differences as follows:
* <ul>
* <li>If the block file is missing, delete the block from volumeMap</li>
* <li>If the block file exists and the block is missing in volumeMap,
* add the block to volumeMap <li>
* <li>If generation stamp does not match, then update the block with right
* generation stamp</li>
* <li>If the block length in memory does not match the actual block file length
* then mark the block as corrupt and update the block length in memory</li>
* <li>If the file in {@link ReplicaInfo} does not match the file on
* the disk, update {@link ReplicaInfo} with the correct file</li>
* </ul>
* @param bpid block pool ID
* @param scanInfo {@link ScanInfo} for a given block
public void checkAndUpdate(String bpid, ScanInfo scanInfo)
throws IOException {
long blockId = scanInfo.getBlockId();
File diskFile = scanInfo.getBlockFile();
File diskMetaFile = scanInfo.getMetaFile();
FsVolumeSpi vol = scanInfo.getVolume();
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
final boolean diskMetaFileExists = diskMetaFile != null &&
fileIoProvider.exists(vol, diskMetaFile);
final boolean diskFileExists = diskFile != null &&
fileIoProvider.exists(vol, diskFile);
final long diskGS = diskMetaFileExists ?
Block.getGenerationStamp(diskMetaFile.getName()) :
if (vol.getStorageType() == StorageType.PROVIDED) {
if (memBlockInfo == null) {
// replica exists on provided store but not in memory
ReplicaInfo diskBlockInfo =
new ReplicaBuilder(ReplicaState.FINALIZED)
volumeMap.add(bpid, diskBlockInfo);
LOG.warn("Added missing block to memory " + diskBlockInfo);
} else {
// replica exists in memory but not in the provided store
volumeMap.remove(bpid, blockId);
LOG.warn("Deleting missing provided block " + memBlockInfo);
if (!diskFileExists) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
// If metadata file exists then delete it
if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file without a block "
+ diskMetaFile.getAbsolutePath());
if (!memBlockInfo.blockDataExists()) {
// Block is in memory and not on the disk
// Remove the block from volumeMap
volumeMap.remove(bpid, blockId);
if (vol.isTransientStorage()) {
ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file for the deleted block "
+ diskMetaFile.getAbsolutePath());
* Block file exists on the disk
if (memBlockInfo == null) {
// Block is missing in memory - add the block to volumeMap
ReplicaInfo diskBlockInfo = new ReplicaBuilder(ReplicaState.FINALIZED)
volumeMap.add(bpid, diskBlockInfo);
if (vol.isTransientStorage()) {
long lockedBytesReserved =
cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ?
diskBlockInfo.getNumBytes() : 0;
bpid, blockId, (FsVolumeImpl) vol, lockedBytesReserved);
LOG.warn("Added missing block to memory " + diskBlockInfo);
* Block exists in volumeMap and the block file exists on the disk
// Compare block files
if (memBlockInfo.blockDataExists()) {
if (memBlockInfo.getBlockURI().compareTo(diskFile.toURI()) != 0) {
if (diskMetaFileExists) {
if (memBlockInfo.metadataExists()) {
// We have two sets of block+meta files. Decide which one to
// keep.
ReplicaInfo diskBlockInfo =
new ReplicaBuilder(ReplicaState.FINALIZED)
((FsVolumeImpl) vol).resolveDuplicateReplicas(bpid,
memBlockInfo, diskBlockInfo, volumeMap);
} else {
if (!fileIoProvider.delete(vol, diskFile)) {
LOG.warn("Failed to delete " + diskFile);
} else {
// Block refers to a block file that does not exist.
// Update the block with the file found on the disk. Since the block
// file and metadata file are found as a pair on the disk, update
// the block based on the metadata file found on the disk
LOG.warn("Block file in replica "
+ memBlockInfo.getBlockURI()
+ " does not exist. Updating it to the file found during scan "
+ diskFile.getAbsolutePath());
LOG.warn("Updating generation stamp for block " + blockId
+ " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
// Compare generation stamp
if (memBlockInfo.getGenerationStamp() != diskGS) {
File memMetaFile = FsDatasetUtil.getMetaFile(diskFile,
if (fileIoProvider.exists(vol, memMetaFile)) {
String warningPrefix = "Metadata file in memory "
+ memMetaFile.getAbsolutePath()
+ " does not match file found by scan ";
if (!diskMetaFileExists) {
LOG.warn(warningPrefix + "null");
} else if (memMetaFile.compareTo(diskMetaFile) != 0) {
LOG.warn(warningPrefix + diskMetaFile.getAbsolutePath());
} else {
// Metadata file corresponding to block in memory is missing
// If metadata file found during the scan is on the same directory
// as the block file, then use the generation stamp from it
try {
File memFile = new File(memBlockInfo.getBlockURI());
long gs = diskMetaFileExists &&
diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
LOG.warn("Updating generation stamp for block " + blockId
+ " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
} catch (IllegalArgumentException e) {
//exception arises because the URI cannot be converted to a file
LOG.warn("Block URI could not be resolved to a file", e);
// Compare block size
if (memBlockInfo.getNumBytes() != memBlockInfo.getBlockDataLength()) {
// Update the length based on the block file
corruptBlock = new Block(memBlockInfo);
LOG.warn("Updating size of block " + blockId + " from "
+ memBlockInfo.getNumBytes() + " to "
+ memBlockInfo.getBlockDataLength());
// Send corrupt block report outside the lock
if (corruptBlock != null) {
LOG.warn("Reporting the block " + corruptBlock
+ " as corrupt due to length mismatch");
try {
datanode.reportBadBlocks(new ExtendedBlock(bpid, corruptBlock),
} catch (IOException e) {
LOG.warn("Failed to report bad block " + corruptBlock, e);
* @deprecated use {@link #fetchReplicaInfo(String, long)} instead.
@Override // FsDatasetSpi
public ReplicaInfo getReplica(String bpid, long blockId) {
return volumeMap.get(bpid, blockId);
public String getReplicaString(String bpid, long blockId) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString();
@Override // FsDatasetSpi
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
/** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) {
try {
try (AutoCloseableLock lock = map.getLock().acquire()) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
} catch (MustStopExistingWriter e) {
static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
Block block, long recoveryId)
throws IOException, MustStopExistingWriter {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());"initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
//check replica
if (replica == null) {
return null;
//stop writer if there is any
if (replica.getState() == ReplicaState.TEMPORARY ||
replica.getState() == ReplicaState.RBW) {
final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
if (!rip.attemptToSetWriter(null, Thread.currentThread())) {
throw new MustStopExistingWriter(rip);
//check replica bytes on disk.
if (replica.getBytesOnDisk() < replica.getVisibleLength()) {
throw new IOException("getBytesOnDisk() < getVisibleLength(), rip="
+ replica);
//check the replica's files
//check generation stamp
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ block + ", replica=" + replica);
//check recovery id
if (replica.getGenerationStamp() >= recoveryId) {
+ " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+ ", block=" + block + ", replica=" + replica);
//check RUR
final ReplicaInfo rur;
if (replica.getState() == ReplicaState.RUR) {
rur = replica;
if (rur.getRecoveryID() >= recoveryId) {
throw new RecoveryInProgressException(
"rur.getRecoveryID() >= recoveryId = " + recoveryId
+ ", block=" + block + ", rur=" + rur);
final long oldRecoveryID = rur.getRecoveryID();
rur.setRecoveryID(recoveryId);"initReplicaRecovery: update recovery id for " + block
+ " from " + oldRecoveryID + " to " + recoveryId);
else {
rur = new ReplicaBuilder(ReplicaState.RUR)
map.add(bpid, rur);"initReplicaRecovery: changing replica state for "
+ block + " from " + replica.getState()
+ " to " + rur.getState());
if (replica.getState() == ReplicaState.TEMPORARY || replica
.getState() == ReplicaState.RBW) {
((ReplicaInPipeline) replica).releaseAllBytesReserved();
return rur.createInfo();
@Override // FsDatasetSpi
public Replica updateReplicaUnderRecovery(
final ExtendedBlock oldBlock,
final long recoveryId,
final long newBlockId,
final long newlength) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());"updateReplica: " + oldBlock
+ ", recoveryId=" + recoveryId
+ ", length=" + newlength
+ ", replica=" + replica);
//check replica
if (replica == null) {
throw new ReplicaNotFoundException(oldBlock);
//check replica state
if (replica.getState() != ReplicaState.RUR) {
throw new IOException("replica.getState() != " + ReplicaState.RUR
+ ", replica=" + replica);
//check replica's byte on disk
if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
+ " replica.getBytesOnDisk() != block.getNumBytes(), block="
+ oldBlock + ", replica=" + replica);
//check replica files before update
//update replica
final ReplicaInfo finalized = updateReplicaUnderRecovery(oldBlock
.getBlockPoolId(), replica, recoveryId,
newBlockId, newlength);
boolean copyTruncate = newBlockId != oldBlock.getBlockId();
if (!copyTruncate) {
assert finalized.getBlockId() == oldBlock.getBlockId()
&& finalized.getGenerationStamp() == recoveryId
&& finalized.getNumBytes() == newlength
: "Replica information mismatched: oldBlock=" + oldBlock
+ ", recoveryId=" + recoveryId + ", newlength=" + newlength
+ ", newBlockId=" + newBlockId + ", finalized=" + finalized;
} else {
assert finalized.getBlockId() == oldBlock.getBlockId()
&& finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
&& finalized.getNumBytes() == oldBlock.getNumBytes()
: "Finalized and old information mismatched: oldBlock=" + oldBlock
+ ", genStamp=" + oldBlock.getGenerationStamp()
+ ", len=" + oldBlock.getNumBytes()
+ ", finalized=" + finalized;
//check replica files after update
return finalized;
private ReplicaInfo updateReplicaUnderRecovery(
String bpid,
ReplicaInfo rur,
long recoveryId,
long newBlockId,
long newlength) throws IOException {
//check recovery id
if (rur.getRecoveryID() != recoveryId) {
throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
+ ", rur=" + rur);
boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId;
// bump rur's GS to be recovery id
if(!copyOnTruncate) {
//update length
if (rur.getNumBytes() < newlength) {
throw new IOException("rur.getNumBytes() < newlength = " + newlength
+ ", rur=" + rur);
if (rur.getNumBytes() > newlength) {
if(!copyOnTruncate) {
// update RUR with the new length
} else {
// Copying block to a new block with new blockId.
// Not truncating original block.
FsVolumeImpl volume = (FsVolumeImpl) rur.getVolume();
ReplicaInPipeline newReplicaInfo = volume.updateRURCopyOnTruncate(
rur, bpid, newBlockId, recoveryId, newlength);
if (newReplicaInfo.getState() != ReplicaState.RBW) {
throw new IOException("Append on block " + rur.getBlockId()
+ " returned a replica of state " + newReplicaInfo.getState()
+ "; expected RBW");
volumeMap.add(bpid, newReplicaInfo.getReplicaInfo());
finalizeReplica(bpid, newReplicaInfo.getReplicaInfo());
// finalize the block
return finalizeReplica(bpid, rur);
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ block + ", replica=" + replica);
return replica.getVisibleLength();
public void addBlockPool(String bpid, Configuration conf)
throws IOException {"Adding block pool " + bpid);
AddBlockPoolException volumeExceptions = new AddBlockPoolException();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try {
volumes.addBlockPool(bpid, conf);
} catch (AddBlockPoolException e) {
try {
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
} catch (AddBlockPoolException e) {
if (volumeExceptions.hasExceptions()) {
throw volumeExceptions;
// For test use only.
if (!blockPoolId.isEmpty()) {
bpid = blockPoolId;
public static void setBlockPoolId(String bpid) {
blockPoolId = bpid;
public void shutdownBlockPool(String bpid) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {"Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
= getBlockReports(bpid);
volumes.removeBlockPool(bpid, blocksPerVolume);
* Class for representing the Datanode volume information
private static class VolumeInfo {
final String directory;
final long usedSpace; // size of space used by HDFS
final long freeSpace; // size of free space excluding reserved space
final long reservedSpace; // size of space reserved for non-HDFS
final long reservedSpaceForReplicas; // size of space reserved RBW or
// re-replication
final long numBlocks;
final StorageType storageType;
VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) { = v.toString();
this.usedSpace = usedSpace;
this.freeSpace = freeSpace;
this.reservedSpace = v.getReserved();
this.reservedSpaceForReplicas = v.getReservedForReplicas();
this.numBlocks = v.getNumBlocks();
this.storageType = v.getStorageType();
private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
for (FsVolumeImpl volume : volumes.getVolumes()) {
long used = 0;
long free = 0;
try (FsVolumeReference ref = volume.obtainReference()) {
used = volume.getDfsUsed();
free = volume.getAvailable();
} catch (ClosedChannelException e) {
} catch (IOException e) {
used = 0;
free = 0;
info.add(new VolumeInfo(volume, used, free));
return info;
public Map<String, Object> getVolumeInfoMap() {
final Map<String, Object> info = new HashMap<String, Object>();
Collection<VolumeInfo> volumes = getVolumeInfo();
for (VolumeInfo v : volumes) {
final Map<String, Object> innerInfo = new HashMap<String, Object>();
innerInfo.put("usedSpace", v.usedSpace);
innerInfo.put("freeSpace", v.freeSpace);
innerInfo.put("reservedSpace", v.reservedSpace);
innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
innerInfo.put("numBlocks", v.numBlocks);
innerInfo.put("storageType", v.storageType);
info.put(, innerInfo);
return info;
@Override //FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) {
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
if (!volume.isBPDirEmpty(bpid)) {
+ " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, "
+ "it contains some block files");
} catch (ClosedChannelException e) {
// ignore.
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
volume.deleteBPDirectories(bpid, force);
} catch (ClosedChannelException e) {
// ignore.
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
if (replica == null) {
throw new ReplicaNotFoundException(block);
synchronized(replica) {
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
ReplicaInfo r = getBlockReplica(block);
File blockFile = new File(r.getBlockURI());
File metaFile = new File(r.getMetadataURI());
BlockLocalPathInfo info = new BlockLocalPathInfo(block,
blockFile.getAbsolutePath(), metaFile.toString());
return info;
public void enableTrash(String bpid) {
public void clearTrash(String bpid) {
public boolean trashEnabled(String bpid) {
return dataStorage.trashEnabled(bpid);
public void setRollingUpgradeMarker(String bpid) throws IOException {
public void clearRollingUpgradeMarker(String bpid) throws IOException {
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
+ savedFiles[1].length());
// Update metrics (ignore the metadata file size)
Time.monotonicNow() - creationTime);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter: Finish persisting RamDisk block: "
+ " block pool Id: " + bpId + " block id: " + blockId
+ " to block file " + savedFiles[1] + " and meta file " + savedFiles[0]
+ " on target volume " + targetVolume);
public void onFailLazyPersist(String bpId, long blockId) {
RamDiskReplica block = null;
block = ramDiskReplicaTracker.getReplica(bpId, blockId);
if (block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
FsVolumeImpl fsVolumeImpl = this.getVolume(block);
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset,
nbytes, flags);
private boolean ramDiskConfigured() {
for (FsVolumeImpl v: volumes.getVolumes()){
if (v.isTransientStorage()) {
return true;
return false;
// Add/Remove per DISK volume async lazy persist thread when RamDisk volume is
// added or removed.
// This should only be called when the FsDataSetImpl#volumes list is finalized.
private void setupAsyncLazyPersistThreads() {
for (FsVolumeImpl v: volumes.getVolumes()){
private void setupAsyncLazyPersistThread(final FsVolumeImpl v) {
// Skip transient volumes
if (v.isTransientStorage()) {
boolean ramDiskConfigured = ramDiskConfigured();
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
asyncLazyPersistService != null &&
!asyncLazyPersistService.queryVolume(v)) {
// Remove thread for DISK volume if RamDisk is not configured
if (!ramDiskConfigured &&
asyncLazyPersistService != null &&
asyncLazyPersistService.queryVolume(v)) {
* Cleanup the old replica and notifies the NN about new replica.
* @param replicaInfo - Old replica to be deleted
* @param newReplicaInfo - New replica object
* @param bpid - block pool id
private void removeOldReplica(ReplicaInfo replicaInfo,
ReplicaInfo newReplicaInfo, final String bpid) {
// Before deleting the files from old storage we must notify the
// NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost.
// Replicas must be evicted from client short-circuit caches, because the
// storage will no longer be same, and thus will require validating
// checksum. This also stops a client from holding file descriptors,
// which would prevent the OS from reclaiming the memory.
ExtendedBlock extendedBlock =
new ExtendedBlock(bpid, newReplicaInfo);
extendedBlock, null, newReplicaInfo.getStorageUuid(),
// Remove the old replicas
cleanupReplica(bpid, replicaInfo);
// If deletion failed then the directory scanner will cleanup the blocks
// eventually.
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
public LazyWriter(Configuration conf) {
this.checkpointerInterval = conf.getInt(
* Checkpoint a pending replica to persistent storage now.
* If we fail then move the replica to the end of the queue.
* @return true if there is more work to be done, false otherwise.
private boolean saveNextReplica() {
RamDiskReplica block = null;
FsVolumeReference targetReference;
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
boolean succeeded = false;
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
// it could be checkpointed or it is already on persistent storage.
// This can occur if a second replica on persistent storage was found
// after the lazy write was scheduled.
if (replicaInfo != null &&
replicaInfo.getVolume().isTransientStorage()) {
// Pick a target volume to persist the block.
targetReference = volumes.getNextVolume(
StorageType.DEFAULT, null, replicaInfo.getNumBytes());
targetVolume = (FsVolumeImpl) targetReference.getVolume();
block.getBlockPoolId(), block.getBlockId(), targetVolume);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter: Start persisting RamDisk block:"
+ " block pool Id: " + block.getBlockPoolId()
+ " block id: " + block.getBlockId()
+ " on target volume " + targetVolume);
block.getBlockPoolId(), block.getBlockId(),
replicaInfo.getGenerationStamp(), block.getCreationTime(),
replicaInfo, targetReference);
succeeded = true;
} catch(IOException ioe) {
LOG.warn("Exception saving replica " + block, ioe);
} finally {
if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
return succeeded;
* Attempt to evict one or more transient block replicas until we
* have at least bytesNeeded bytes free.
public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0;
final long cacheCapacity = cacheManager.getMemCacheCapacity();
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
(cacheCapacity - cacheManager.getMemCacheUsed()) < bytesNeeded) {
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Evicting block " + replicaState);
ReplicaInfo replicaInfo, newReplicaInfo;
final String bpid = replicaState.getBlockPoolId();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId(), false);
// Move the replica from lazyPersist/ to finalized/ on
// the target volume
newReplicaInfo =
replicaInfo, replicaState);
// Update the volumeMap entry.
volumeMap.add(bpid, newReplicaInfo);
// Update metrics
Time.monotonicNow() - replicaState.getCreationTime());
if (replicaState.getNumReads() == 0) {
// Delete the block+meta files from RAM disk and release locked
// memory.
removeOldReplica(replicaInfo, newReplicaInfo, bpid);
public void run() {
int numSuccessiveFailures = 0;
while (fsRunning && shouldRun) {
try {
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
} catch (InterruptedException e) {"LazyWriter was interrupted, exiting");
} catch (Exception e) {
LOG.warn("Ignoring exception in LazyWriter:", e);
public void stop() {
shouldRun = false;
public void setPinning(ExtendedBlock block) throws IOException {
if (!blockPinningEnabled) {
ReplicaInfo r = getBlockReplica(block);
public boolean getPinning(ExtendedBlock block) throws IOException {
if (!blockPinningEnabled) {
return false;
ReplicaInfo r = getBlockReplica(block);
return r.getPinning(localFS);
public boolean isDeletingBlock(String bpid, long blockId) {
synchronized(deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
return s != null ? s.contains(blockId) : false;
public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
synchronized (deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
if (s != null) {
for (Long id : blockIds) {
private void addDeletingBlock(String bpid, Long blockId) {
synchronized(deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
if (s == null) {
s = new HashSet<Long>();
deletingBlock.put(bpid, s);
void releaseLockedMemory(long count, boolean roundup) {
if (roundup) {
} else {
* Attempt to evict blocks from cache Manager to free the requested
* bytes.
* @param bytesNeeded
public void evictLazyPersistBlocks(long bytesNeeded) {
try {
((LazyWriter) lazyWriter.getRunnable()).evictBlocks(bytesNeeded);
} catch(IOException ioe) {"Ignoring exception ", ioe);
* Attempt to reserve the given amount of memory with the cache Manager.
* @param bytesNeeded
* @return
boolean reserveLockedMemory(long bytesNeeded) {
if (cacheManager.reserve(bytesNeeded) > 0) {
return true;
// Round up bytes needed to osPageSize and attempt to evict
// one more more blocks to free up the reservation.
bytesNeeded = cacheManager.roundUpPageSize(bytesNeeded);
return cacheManager.reserve(bytesNeeded) > 0;
public int getNonPersistentReplicas() {
return ramDiskReplicaTracker.numReplicasNotPersisted();
public void setTimer(Timer newTimer) {
this.timer = newTimer;
* Return the number of BP service count.
public int getBPServiceCount() {
return datanode.getBpOsCount();
* Return the number of volume.
public int getVolumeCount() {
return volumes.getVolumes().size();
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (String bpid : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
for (ReplicaInfo replicaInfo : replicas) {
if ((replicaInfo.getState() == ReplicaState.TEMPORARY
|| replicaInfo.getState() == ReplicaState.RBW)
&& replicaInfo.getVolume().equals(volume)) {
ReplicaInPipeline replicaInPipeline =
(ReplicaInPipeline) replicaInfo;