blob: e8e8d0b98cf41ef234b86132887e6119163263ac [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
/**
* Periodically scans the data directories for block and block metadata files.
* Reconciles the differences with block information maintained in the dataset.
*/
@InterfaceAudience.Private
public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private static final int MILLIS_PER_SECOND = 1000;
private static final String START_MESSAGE =
"Periodic Directory Tree Verification scan"
+ " starting at %s with interval of %dms";
private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
+ " and throttle limit of %dms/s";
private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs;
private final int throttleLimitMsPerSec;
private volatile boolean shouldRun = false;
private boolean retainDiffs = false;
private final DataNode datanode;
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads executing. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeRunningMs = new AtomicLong(0L);
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads blocked by the throttle. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeWaitingMs = new AtomicLong(0L);
/**
* The complete list of block differences indexed by block pool ID.
*/
@VisibleForTesting
final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
/**
* Statistics about the block differences in each blockpool, indexed by
* block pool ID.
*/
@VisibleForTesting
final Map<String, Stats> stats = new HashMap<String, Stats>();
/**
* Allow retaining diffs for unit test and analysis. Defaults to false (off)
* @param b whether to retain diffs
*/
@VisibleForTesting
void setRetainDiffs(boolean b) {
retainDiffs = b;
}
/**
* Stats tracked for reporting and testing, per blockpool
*/
@VisibleForTesting
static class Stats {
final String bpid;
long totalBlocks = 0;
long missingMetaFile = 0;
long missingBlockFile = 0;
long missingMemoryBlocks = 0;
long mismatchBlocks = 0;
long duplicateBlocks = 0;
/**
* Create a new Stats object for the given blockpool ID.
* @param bpid blockpool ID
*/
public Stats(String bpid) {
this.bpid = bpid;
}
@Override
public String toString() {
return "BlockPool " + bpid
+ " Total blocks: " + totalBlocks + ", missing metadata files:"
+ missingMetaFile + ", missing block files:" + missingBlockFile
+ ", missing blocks in memory:" + missingMemoryBlocks
+ ", mismatched blocks:" + mismatchBlocks;
}
}
/**
* Helper class for compiling block info reports from report compiler threads.
*/
static class ScanInfoPerBlockPool extends
HashMap<String, LinkedList<ScanInfo>> {
private static final long serialVersionUID = 1L;
/**
* Create a new info list.
*/
ScanInfoPerBlockPool() {super();}
/**
* Create a new info list initialized to the given expected size.
* See {@link java.util.HashMap#HashMap(int)}.
*
* @param sz initial expected size
*/
ScanInfoPerBlockPool(int sz) {super(sz);}
/**
* Merges {@code that} ScanInfoPerBlockPool into this one
*
* @param the ScanInfoPerBlockPool to merge
*/
public void addAll(ScanInfoPerBlockPool that) {
if (that == null) return;
for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> list = entry.getValue();
if (this.containsKey(bpid)) {
//merge that per-bpid linked list with this one
this.get(bpid).addAll(list);
} else {
//add that new bpid and its linked list to this
this.put(bpid, list);
}
}
}
/**
* Convert all the LinkedList values in this ScanInfoPerBlockPool map
* into sorted arrays, and return a new map of these arrays per blockpool
*
* @return a map of ScanInfo arrays per blockpool
*/
public Map<String, ScanInfo[]> toSortedArrays() {
Map<String, ScanInfo[]> result =
new HashMap<String, ScanInfo[]>(this.size());
for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> list = entry.getValue();
// convert list to array
ScanInfo[] record = list.toArray(new ScanInfo[list.size()]);
// Sort array based on blockId
Arrays.sort(record);
result.put(bpid, record);
}
return result;
}
}
/**
* Tracks the files and other information related to a block on the disk
* Missing file is indicated by setting the corresponding member
* to null.
*
* Because millions of these structures may be created, we try to save
* memory here. So instead of storing full paths, we store path suffixes.
* The block file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>
* So we don't need to store the volume path, since we already know what the
* volume is.
*
* The metadata file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>_<genstamp>.meta
* So if we have a block file, there isn't any need to store the block path
* again.
*
* The accessor functions take care of these manipulations.
*/
static class ScanInfo implements Comparable<ScanInfo> {
private final long blockId;
/**
* The block file path, relative to the volume's base directory.
* If there was no block file found, this may be null. If 'vol'
* is null, then this is the full path of the block file.
*/
private final String blockSuffix;
/**
* The suffix of the meta file path relative to the block file.
* If blockSuffix is null, then this will be the entire path relative
* to the volume base directory, or an absolute path if vol is also
* null.
*/
private final String metaSuffix;
private final FsVolumeSpi volume;
/**
* Get the file's length in async block scan
*/
private final long blockFileLength;
private final static Pattern CONDENSED_PATH_REGEX =
Pattern.compile("(?<!^)(\\\\|/){2,}");
private final static String QUOTED_FILE_SEPARATOR =
Matcher.quoteReplacement(File.separator);
/**
* Get the most condensed version of the path.
*
* For example, the condensed version of /foo//bar is /foo/bar
* Unlike {@link File#getCanonicalPath()}, this will never perform I/O
* on the filesystem.
*
* @param path the path to condense
* @return the condensed path
*/
private static String getCondensedPath(String path) {
return CONDENSED_PATH_REGEX.matcher(path).
replaceAll(QUOTED_FILE_SEPARATOR);
}
/**
* Get a path suffix.
*
* @param f The file to get the suffix for.
* @param prefix The prefix we're stripping off.
*
* @return A suffix such that prefix + suffix = path to f
*/
private static String getSuffix(File f, String prefix) {
String fullPath = getCondensedPath(f.getAbsolutePath());
if (fullPath.startsWith(prefix)) {
return fullPath.substring(prefix.length());
}
throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
*/
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
this.blockId = blockId;
String condensedVolPath = vol == null ? null :
getCondensedPath(vol.getBasePath());
this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath);
this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
if (metaFile == null) {
this.metaSuffix = null;
} else if (blockFile == null) {
this.metaSuffix = getSuffix(metaFile, condensedVolPath);
} else {
this.metaSuffix = getSuffix(metaFile,
condensedVolPath + blockSuffix);
}
this.volume = vol;
}
/**
* Returns the block data file.
*
* @return the block data file
*/
File getBlockFile() {
return (blockSuffix == null) ? null :
new File(volume.getBasePath(), blockSuffix);
}
/**
* Return the length of the data block. The length returned is the length
* cached when this object was created.
*
* @return the length of the data block
*/
long getBlockFileLength() {
return blockFileLength;
}
/**
* Returns the block meta data file or null if there isn't one.
*
* @return the block meta data file
*/
File getMetaFile() {
if (metaSuffix == null) {
return null;
} else if (blockSuffix == null) {
return new File(volume.getBasePath(), metaSuffix);
} else {
return new File(volume.getBasePath(), blockSuffix + metaSuffix);
}
}
/**
* Returns the block ID.
*
* @return the block ID
*/
long getBlockId() {
return blockId;
}
/**
* Returns the volume that contains the block that this object describes.
*
* @return the volume
*/
FsVolumeSpi getVolume() {
return volume;
}
@Override // Comparable
public int compareTo(ScanInfo b) {
if (blockId < b.blockId) {
return -1;
} else if (blockId == b.blockId) {
return 0;
} else {
return 1;
}
}
@Override // Object
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ScanInfo)) {
return false;
}
return blockId == ((ScanInfo) o).blockId;
}
@Override // Object
public int hashCode() {
return (int)(blockId^(blockId>>>32));
}
public long getGenStamp() {
return metaSuffix != null ? Block.getGenerationStamp(
getMetaFile().getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
}
}
/**
* Create a new directory scanner, but don't cycle it running yet.
*
* @param datanode the parent datanode
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
int throttle =
conf.getInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
if (throttle > MILLIS_PER_SECOND) {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value above 1000 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
} else {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value below 1 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
}
throttleLimitMsPerSec =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
} else {
throttleLimitMsPerSec = throttle;
}
int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
reportCompileThreadPool = Executors.newFixedThreadPool(threads,
new Daemon.DaemonFactory());
masterThread = new ScheduledThreadPoolExecutor(1,
new Daemon.DaemonFactory());
}
/**
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() {
shouldRun = true;
long offset = ThreadLocalRandom.current().nextInt(
(int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
long firstScanTime = Time.now() + offset;
String logMsg;
if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
logMsg = String.format(START_MESSAGE_WITH_THROTTLE,
FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs,
throttleLimitMsPerSec);
} else {
logMsg = String.format(START_MESSAGE,
FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs);
}
LOG.info(logMsg);
masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs,
TimeUnit.MILLISECONDS);
}
/**
* Return whether the scanner has been started.
*
* @return whether the scanner has been started
*/
@VisibleForTesting
boolean getRunStatus() {
return shouldRun;
}
/**
* Clear the current cache of diffs and statistics.
*/
private void clear() {
diffs.clear();
stats.clear();
}
/**
* Main program loop for DirectoryScanner. Runs {@link reconcile()}
* and handles any exceptions.
*/
@Override
public void run() {
try {
if (!shouldRun) {
//shutdown has been activated
LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated");
return;
}
//We're are okay to run - do it
reconcile();
} catch (Exception e) {
//Log and continue - allows Executor to run again next cycle
LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e);
} catch (Error er) {
//Non-recoverable error - re-throw after logging the problem
LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", er);
throw er;
}
}
/**
* Stops the directory scanner. This method will wait for 1 minute for the
* main thread to exit and an additional 1 minute for the report compilation
* threads to exit. If a thread does not exit in that time period, it is
* left running, and an error is logged.
*/
void shutdown() {
if (!shouldRun) {
LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
} else {
LOG.warn("DirectoryScanner: shutdown has been called");
}
shouldRun = false;
if (masterThread != null) masterThread.shutdown();
if (reportCompileThreadPool != null) {
reportCompileThreadPool.shutdownNow();
}
if (masterThread != null) {
try {
masterThread.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("interrupted while waiting for masterThread to " +
"terminate", e);
}
}
if (reportCompileThreadPool != null) {
try {
reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("interrupted while waiting for reportCompileThreadPool to " +
"terminate", e);
}
}
if (!retainDiffs) clear();
}
/**
* Reconcile differences between disk and in-memory blocks
*/
@VisibleForTesting
void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> diff = entry.getValue();
for (ScanInfo info : diff) {
dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
info.getMetaFile(), info.getVolume());
}
}
if (!retainDiffs) clear();
}
/**
* Scan for the differences between disk and in-memory blocks
* Scan only the "finalized blocks" lists of both disk and memory.
*/
private void scan() {
clear();
Map<String, ScanInfo[]> diskReport = getDiskReport();
// Hold FSDataset lock to prevent further changes to the block map
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue();
Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord);
LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
diffs.put(bpid, diffRecord);
statsRecord.totalBlocks = blockpoolReport.length;
final List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
Collections.sort(bl); // Sort based on blockId
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < bl.size() && d < blockpoolReport.length) {
FinalizedReplica memBlock = bl.get(m);
ScanInfo info = blockpoolReport[d];
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
continue;
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord,
memBlock.getBlockId(), info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockFileLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
// volumeMap record and on-disk files don't match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
if (d < blockpoolReport.length) {
// There may be multiple on-disk records for the same block, don't increment
// the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
if (nextInfo.getBlockId() != info.blockId) {
++m;
}
} else {
++m;
}
}
while (m < bl.size()) {
FinalizedReplica current = bl.get(m++);
addDifference(diffRecord, statsRecord,
current.getBlockId(), current.getVolume());
}
while (d < blockpoolReport.length) {
if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport[d]);
}
d++;
}
LOG.info(statsRecord.toString());
} //end for
} //end synchronized
}
/**
* Add the ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is found on the disk,
* but the in-memory block is missing or does not match the block on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param info the differing info
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, ScanInfo info) {
statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
diffRecord.add(info);
}
/**
* Add a new ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is not found on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param blockId the id of the missing block
* @param vol the volume that contains the missing block
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, long blockId,
FsVolumeSpi vol) {
statsRecord.missingBlockFile++;
statsRecord.missingMetaFile++;
diffRecord.add(new ScanInfo(blockId, null, null, vol));
}
/**
* Get the lists of blocks on the disks in the dataset, sorted by blockId.
* The returned map contains one entry per blockpool, keyed by the blockpool
* ID.
*
* @return a map of sorted arrays of block information
*/
private Map<String, ScanInfo[]> getDiskReport() {
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
ScanInfoPerBlockPool[] dirReports = null;
// First get list of data directories
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
// Use an array since the threads may return out of order and
// compilersInProgress#keySet may return out of order as well.
dirReports = new ScanInfoPerBlockPool[volumes.size()];
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) {
ReportCompiler reportCompiler =
new ReportCompiler(datanode, volumes.get(i));
Future<ScanInfoPerBlockPool> result =
reportCompileThreadPool.submit(reportCompiler);
compilersInProgress.put(i, result);
}
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) {
Integer index = report.getKey();
try {
dirReports[index] = report.getValue().get();
// If our compiler threads were interrupted, give up on this run
if (dirReports[index] == null) {
dirReports = null;
break;
}
} catch (Exception ex) {
FsVolumeSpi fsVolumeSpi = volumes.get(index);
LOG.error("Error compiling report for the volume, StorageId: "
+ fsVolumeSpi.getStorageID(), ex);
// Continue scanning the other volumes
}
}
} catch (IOException e) {
LOG.error("Unexpected IOException by closing FsVolumeReference", e);
}
if (dirReports != null) {
// Compile consolidated report for all the volumes
for (ScanInfoPerBlockPool report : dirReports) {
if(report != null){
list.addAll(report);
}
}
}
return list.toSortedArrays();
}
/**
* Helper method to determine if a file name is consistent with a block.
* meta-data file
*
* @param blockId the block ID
* @param metaFile the file to check
* @return whether the file name is a block meta-data file name
*/
private static boolean isBlockMetaFile(String blockId, String metaFile) {
return metaFile.startsWith(blockId)
&& metaFile.endsWith(Block.METADATA_EXTENSION);
}
/**
* The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the
* volume to discover block information.
*
* When the ReportCompiler discovers block information, it create a new
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
private final FsVolumeSpi volume;
private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
private final StopWatch throttleTimer = new StopWatch();
// Variable for tracking time spent running and waiting for testing
// purposes
private final StopWatch perfTimer = new StopWatch();
/**
* Create a report compiler for the given volume on the given datanode.
*
* @param datanode the target datanode
* @param volume the target volume
*/
public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
this.datanode = datanode;
this.volume = volume;
}
/**
* Run this report compiler thread.
*
* @return the block info report list
* @throws IOException if the block pool isn't found
*/
@Override
public ScanInfoPerBlockPool call() throws IOException {
String[] bpList = volume.getBlockPoolList();
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
perfTimer.start();
throttleTimer.start();
for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<>();
File bpFinalizedDir = volume.getFinalizedDir(bpid);
try {
result.put(bpid,
compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
break;
}
}
return result;
}
/**
* Compile a list of {@link ScanInfo} for the blocks in the directory
* given by {@code dir}.
*
* @param vol the volume that contains the directory to scan
* @param bpFinalizedDir the root directory of the directory to scan
* @param dir the directory to scan
* @param report the list onto which blocks reports are placed
*/
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
throws InterruptedException {
throttle();
final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
List <String> fileNames;
try {
fileNames = fileIoProvider.listDirectory(
volume, dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure.
datanode.checkDiskErrorAsync(volume);
// Ignore this directory and proceed.
return report;
}
Collections.sort(fileNames);
/*
* Assumption: In the sorted list of files block file appears immediately
* before block metadata file. This is true for the current naming
* convention for block file blk_<blockid> and meta file
* blk_<blockid>_<genstamp>.meta
*/
for (int i = 0; i < fileNames.size(); i++) {
// Make sure this thread can make a timely exit. With a low throttle
// rate, completing a run can take a looooong time.
if (Thread.interrupted()) {
throw new InterruptedException();
}
File file = new File(dir, fileNames.get(i));
if (file.isDirectory()) {
compileReport(vol, bpFinalizedDir, file, report);
continue;
}
if (!Block.isBlockFilename(file)) {
if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
long blockId = Block.getBlockId(file.getName());
verifyFileLocation(file, bpFinalizedDir,
blockId);
report.add(new ScanInfo(blockId, null, file, vol));
}
continue;
}
File blockFile = file;
long blockId = Block.filename2id(file.getName());
File metaFile = null;
// Skip all the files that start with block name until
// getting to the metafile for the block
while (i + 1 < fileNames.size()) {
File blkMetaFile = new File(dir, fileNames.get(i + 1));
if (!(blkMetaFile.isFile()
&& blkMetaFile.getName().startsWith(blockFile.getName()))) {
break;
}
i++;
if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
metaFile = blkMetaFile;
break;
}
}
verifyFileLocation(blockFile, bpFinalizedDir, blockId);
report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
}
return report;
}
/**
* Verify whether the actual directory location of block file has the
* expected directory path computed using its block ID.
*/
private void verifyFileLocation(File actualBlockFile,
File bpFinalizedDir, long blockId) {
File expectedBlockDir =
DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
File actualBlockDir = actualBlockFile.getParentFile();
if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
LOG.warn("Block: " + blockId +
" found in invalid directory. Expected directory: " +
expectedBlockDir + ". Actual directory: " + actualBlockDir);
}
}
/**
* Called by the thread before each potential disk scan so that a pause
* can be optionally inserted to limit the number of scans per second.
* The limit is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
private void throttle() throws InterruptedException {
accumulateTimeRunning();
if ((throttleLimitMsPerSec < 1000) &&
(throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
throttleTimer.reset().start();
}
accumulateTimeWaiting();
}
/**
* Helper method to measure time running.
*/
private void accumulateTimeRunning() {
timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
/**
* Helper method to measure time waiting.
*/
private void accumulateTimeWaiting() {
timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
}
private enum BlockDirFilter implements FilenameFilter {
INSTANCE;
@Override
public boolean accept(File dir, String name) {
return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
|| name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
}
}
}