blob: eef1732ac44261fea03cb74cf8c3da7a04080a83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ListMultimap;
/**
* Periodically scans the data directories for block and block metadata files.
* Reconciles the differences with block information maintained in the dataset.
*/
@InterfaceAudience.Private
public class DirectoryScanner implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(DirectoryScanner.class);
private static final int DEFAULT_MAP_SIZE = 32768;
private final int reconcileBlocksBatchSize;
private final long reconcileBlocksBatchInterval;
private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs;
private final long throttleLimitMsPerSec;
private final AtomicBoolean shouldRun = new AtomicBoolean();
private boolean retainDiffs = false;
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads executing. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeRunningMs = new AtomicLong(0L);
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads blocked by the throttle. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeWaitingMs = new AtomicLong(0L);
/**
* The complete list of block differences indexed by block pool ID.
*/
@VisibleForTesting
final BlockPoolReport diffs = new BlockPoolReport();
/**
* Statistics about the block differences in each blockpool, indexed by block
* pool ID.
*/
@VisibleForTesting
final Map<String, Stats> stats;
/**
* Allow retaining diffs for unit test and analysis. Defaults to false (off).
*
* @param b whether to retain diffs
*/
@VisibleForTesting
public void setRetainDiffs(boolean b) {
retainDiffs = b;
}
/**
* Stats tracked for reporting and testing, per blockpool
*/
@VisibleForTesting
static class Stats {
final String bpid;
long totalBlocks = 0;
long missingMetaFile = 0;
long missingBlockFile = 0;
long missingMemoryBlocks = 0;
long mismatchBlocks = 0;
long duplicateBlocks = 0;
/**
* Create a new Stats object for the given blockpool ID.
*
* @param bpid blockpool ID
*/
public Stats(String bpid) {
this.bpid = bpid;
}
@Override
public String toString() {
return "BlockPool " + bpid + " Total blocks: " + totalBlocks
+ ", missing metadata files: " + missingMetaFile
+ ", missing block files: " + missingBlockFile
+ ", missing blocks in memory: " + missingMemoryBlocks
+ ", mismatched blocks: " + mismatchBlocks;
}
}
/**
* Helper class for compiling block info reports from report compiler threads.
* Contains a volume, a set of block pool IDs, and a collection of ScanInfo
* objects. If a block pool exists but has no ScanInfo objects associated with
* it, there will be no mapping for that particular block pool.
*/
@VisibleForTesting
public static class ScanInfoVolumeReport {
@SuppressWarnings("unused")
private static final long serialVersionUID = 1L;
private final FsVolumeSpi volume;
private final BlockPoolReport blockPoolReport;
/**
* Create a new info list.
*
* @param volume
*/
ScanInfoVolumeReport(final FsVolumeSpi volume) {
this.volume = volume;
this.blockPoolReport = new BlockPoolReport();
}
/**
* Create a new info list initialized to the given expected size.
*
* @param volume
* @param blockPools list of known block pools
*/
ScanInfoVolumeReport(final FsVolumeSpi volume,
final Collection<String> blockPools) {
this.volume = volume;
this.blockPoolReport = new BlockPoolReport(blockPools);
}
public void addAll(final String bpid,
final Collection<ScanInfo> scanInfos) {
this.blockPoolReport.addAll(bpid, scanInfos);
}
public Set<String> getBlockPoolIds() {
return this.blockPoolReport.getBlockPoolIds();
}
public List<ScanInfo> getScanInfo(final String bpid) {
return this.blockPoolReport.getScanInfo(bpid);
}
public FsVolumeSpi getVolume() {
return volume;
}
@Override
public String toString() {
return "ScanInfoVolumeReport [volume=" + volume + ", blockPoolReport="
+ blockPoolReport + "]";
}
}
/**
* Helper class for compiling block info reports per block pool.
*/
@VisibleForTesting
public static class BlockPoolReport {
@SuppressWarnings("unused")
private static final long serialVersionUID = 1L;
private final Set<String> blockPools;
private final ListMultimap<String, ScanInfo> map;
/**
* Create a block pool report.
*/
BlockPoolReport() {
this.blockPools = new HashSet<>(2);
this.map = ArrayListMultimap.create(2, DEFAULT_MAP_SIZE);
}
/**
* Create a new block pool report initialized to the given expected size.
*
* @param blockPools initial list of known block pools
*/
BlockPoolReport(final Collection<String> blockPools) {
this.blockPools = new HashSet<>(blockPools);
this.map = ArrayListMultimap.create(blockPools.size(), DEFAULT_MAP_SIZE);
}
public void addAll(final String bpid,
final Collection<ScanInfo> scanInfos) {
this.blockPools.add(bpid);
this.map.putAll(bpid, scanInfos);
}
public void sortBlocks() {
for (final String bpid : this.map.keySet()) {
final List<ScanInfo> list = this.map.get(bpid);
// Sort array based on blockId
Collections.sort(list);
}
}
public Set<String> getBlockPoolIds() {
return Collections.unmodifiableSet(this.blockPools);
}
public List<ScanInfo> getScanInfo(final String bpid) {
return this.map.get(bpid);
}
public Collection<Map.Entry<String, ScanInfo>> getEntries() {
return Collections.unmodifiableCollection(this.map.entries());
}
public void clear() {
this.map.clear();
this.blockPools.clear();
}
@Override
public String toString() {
return "BlockPoolReport [blockPools=" + blockPools + ", map=" + map + "]";
}
}
/**
* Create a new directory scanner, but don't cycle it running yet.
*
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
this.dataset = dataset;
this.stats = new HashMap<>(DEFAULT_MAP_SIZE);
int interval = (int) conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
scanPeriodMsecs = TimeUnit.SECONDS.toMillis(interval);
int throttle = conf.getInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
if (throttle >= TimeUnit.SECONDS.toMillis(1)) {
LOG.warn(
"{} set to value above 1000 ms/sec. Assuming default value of {}",
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
throttle =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
}
throttleLimitMsPerSec = throttle;
int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
reportCompileThreadPool =
Executors.newFixedThreadPool(threads, new Daemon.DaemonFactory());
masterThread =
new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory());
int reconcileBatchSize =
conf.getInt(DFSConfigKeys.
DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE,
DFSConfigKeys.
DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE_DEFAULT);
if (reconcileBatchSize <= 0) {
LOG.warn("Invalid value configured for " +
"dfs.datanode.reconcile.blocks.batch.size, " +
"should be greater than 0, Using default.");
reconcileBatchSize =
DFSConfigKeys.
DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE_DEFAULT;
}
reconcileBlocksBatchSize = reconcileBatchSize;
long reconcileBatchInterval =
conf.getTimeDuration(DFSConfigKeys.
DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL,
DFSConfigKeys.
DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
if (reconcileBatchInterval <= 0) {
LOG.warn("Invalid value configured for " +
"dfs.datanode.reconcile.blocks.batch.interval, " +
"should be greater than 0, Using default.");
reconcileBatchInterval =
DFSConfigKeys.
DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL_DEFAULT;
}
reconcileBlocksBatchInterval = reconcileBatchInterval;
}
/**
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
@VisibleForTesting
public void start() {
shouldRun.set(true);
long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);
LOG.info(
"Periodic Directory Tree Verification scan starting in {}ms with interval of {}ms and throttle limit of {}ms/s",
firstScanTime, scanPeriodMsecs, throttleLimitMsPerSec);
masterThread.scheduleAtFixedRate(this, firstScanTime, scanPeriodMsecs,
TimeUnit.MILLISECONDS);
}
/**
* Return whether the scanner has been started.
*
* @return whether the scanner has been started
*/
@VisibleForTesting
boolean getRunStatus() {
return shouldRun.get();
}
/**
* Clear the current cache of diffs and statistics.
*/
private void clear() {
synchronized (diffs) {
diffs.clear();
}
stats.clear();
}
/**
* Main program loop for DirectoryScanner. Runs {@link reconcile()} and
* handles any exceptions.
*/
@Override
public void run() {
if (!shouldRun.get()) {
// shutdown has been activated
LOG.warn(
"This cycle terminating immediately because 'shouldRun' has been deactivated");
return;
}
try {
reconcile();
} catch (Exception e) {
// Log and continue - allows Executor to run again next cycle
LOG.error(
"Exception during DirectoryScanner execution - will continue next cycle",
e);
} catch (Error er) {
// Non-recoverable error - re-throw after logging the problem
LOG.error(
"System Error during DirectoryScanner execution - permanently terminating periodic scanner",
er);
throw er;
}
}
/**
* Stops the directory scanner. This method will wait for 1 minute for the
* main thread to exit and an additional 1 minute for the report compilation
* threads to exit. If a thread does not exit in that time period, it is left
* running, and an error is logged.
*/
void shutdown() {
LOG.info("Shutdown has been called");
if (!shouldRun.getAndSet(false)) {
LOG.warn("Shutdown has been called, but periodic scanner not started");
}
if (masterThread != null) {
masterThread.shutdown();
}
if (reportCompileThreadPool != null) {
reportCompileThreadPool.shutdownNow();
}
if (masterThread != null) {
try {
masterThread.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error(
"interrupted while waiting for masterThread to " + "terminate", e);
}
}
if (reportCompileThreadPool != null) {
try {
reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("interrupted while waiting for reportCompileThreadPool to "
+ "terminate", e);
}
}
if (!retainDiffs) {
clear();
}
}
/**
* Reconcile differences between disk and in-memory blocks
*/
@VisibleForTesting
public void reconcile() throws IOException {
LOG.debug("reconcile start DirectoryScanning");
scan();
// HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too
// long
int loopCount = 0;
synchronized (diffs) {
for (final Map.Entry<String, ScanInfo> entry : diffs.getEntries()) {
dataset.checkAndUpdate(entry.getKey(), entry.getValue());
if (loopCount % reconcileBlocksBatchSize == 0) {
try {
Thread.sleep(reconcileBlocksBatchInterval);
} catch (InterruptedException e) {
// do nothing
}
}
loopCount++;
}
}
if (!retainDiffs) {
clear();
}
}
/**
* Scan for the differences between disk and in-memory blocks Scan only the
* "finalized blocks" lists of both disk and memory.
*/
private void scan() {
BlockPoolReport blockPoolReport = new BlockPoolReport();
clear();
Collection<ScanInfoVolumeReport> volumeReports = getVolumeReports();
for (ScanInfoVolumeReport volumeReport : volumeReports) {
for (String blockPoolId : volumeReport.getBlockPoolIds()) {
List<ScanInfo> scanInfos = volumeReport.getScanInfo(blockPoolId);
blockPoolReport.addAll(blockPoolId, scanInfos);
}
}
// Pre-sort the reports outside of the lock
blockPoolReport.sortBlocks();
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord);
Collection<ScanInfo> diffRecord = new ArrayList<>();
statsRecord.totalBlocks = blockpoolReport.size();
final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
Collections.sort(bl); // Sort based on blockId
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < bl.size() && d < blockpoolReport.size()) {
ReplicaInfo memBlock = bl.get(m);
ScanInfo info = blockpoolReport.get(d);
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
continue;
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
if (d < blockpoolReport.size()) {
// There may be multiple on-disk records for the same block, do not
// increment the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport.get(d);
if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
} else {
++m;
}
}
while (m < bl.size()) {
ReplicaInfo current = bl.get(m++);
addDifference(diffRecord, statsRecord, current.getBlockId(),
current.getVolume());
}
while (d < blockpoolReport.size()) {
if (!dataset.isDeletingBlock(bpid,
blockpoolReport.get(d).getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
}
d++;
}
synchronized (diffs) {
diffs.addAll(bpid, diffRecord);
}
LOG.info("Scan Results: {}", statsRecord);
}
}
/**
* Add the ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is found on the disk, but
* the in-memory block is missing or does not match the block on the disk.
*
* @param diffRecord the collection to which to add the info
* @param statsRecord the stats to update
* @param info the differing info
*/
private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
ScanInfo info) {
statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
diffRecord.add(info);
}
/**
* Add a new ScanInfo object to the collection of differences and adjust the
* stats accordingly. This method is called when a block is not found on the
* disk.
*
* @param diffRecord the collection to which to add the info
* @param statsRecord the stats to update
* @param blockId the id of the missing block
* @param vol the volume that contains the missing block
*/
private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
long blockId, FsVolumeSpi vol) {
statsRecord.missingBlockFile++;
statsRecord.missingMetaFile++;
diffRecord.add(new ScanInfo(blockId, null, null, null, vol));
}
/**
* Get the lists of blocks on the disks in the data set.
*/
@VisibleForTesting
public Collection<ScanInfoVolumeReport> getVolumeReports() {
List<ScanInfoVolumeReport> volReports = new ArrayList<>();
List<Future<ScanInfoVolumeReport>> compilersInProgress = new ArrayList<>();
// First get list of data directories
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (final FsVolumeSpi volume : volumes) {
// Disable scanning PROVIDED volumes to keep overhead low
if (volume.getStorageType() != StorageType.PROVIDED) {
ReportCompiler reportCompiler = new ReportCompiler(volume);
Future<ScanInfoVolumeReport> result =
reportCompileThreadPool.submit(reportCompiler);
compilersInProgress.add(result);
}
}
for (Future<ScanInfoVolumeReport> future : compilersInProgress) {
try {
final ScanInfoVolumeReport result = future.get();
if (!CollectionUtils.addIgnoreNull(volReports, result)) {
// This compiler thread were interrupted, give up on this run
volReports.clear();
break;
}
} catch (Exception ex) {
LOG.warn("Error compiling report. Continuing.", ex);
}
}
} catch (IOException e) {
LOG.error("Unexpected IOException by closing FsVolumeReference", e);
}
return volReports;
}
/**
* The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the volume
* to discover block information.
*
* When the ReportCompiler discovers block information, it create a new
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
public class ReportCompiler implements Callable<ScanInfoVolumeReport> {
private final FsVolumeSpi volume;
// Variable for tracking time spent running for throttling purposes
private final StopWatch throttleTimer = new StopWatch();
// Variable for tracking time spent running and waiting for testing
// purposes
private final StopWatch perfTimer = new StopWatch();
/**
* Create a report compiler for the given volume.
*
* @param volume the target volume
*/
public ReportCompiler(FsVolumeSpi volume) {
this.volume = volume;
}
/**
* Run this report compiler thread.
*
* @return the block info report list
* @throws IOException if the block pool is not found
*/
@Override
public ScanInfoVolumeReport call() throws IOException {
String[] bpList = volume.getBlockPoolList();
ScanInfoVolumeReport result =
new ScanInfoVolumeReport(volume, Arrays.asList(bpList));
perfTimer.start();
throttleTimer.start();
for (String bpid : bpList) {
List<ScanInfo> report = new ArrayList<>(DEFAULT_MAP_SIZE);
perfTimer.reset().start();
throttleTimer.reset().start();
try {
// ScanInfos are added directly to 'report' list
volume.compileReport(bpid, report, this);
result.addAll(bpid, report);
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
break;
}
}
LOG.trace("Scanner volume report: {}", result);
return result;
}
/**
* Called by the thread before each potential disk scan so that a pause can
* be optionally inserted to limit the number of scans per second. The limit
* is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
public void throttle() throws InterruptedException {
accumulateTimeRunning();
if (throttleLimitMsPerSec > 0L) {
final long runningTime = throttleTimer.now(TimeUnit.MILLISECONDS);
if (runningTime >= throttleLimitMsPerSec) {
final long sleepTime;
if (runningTime >= 1000L) {
LOG.warn("Unable to throttle within the second. Blocking for 1s.");
sleepTime = 1000L;
} else {
// Sleep for the expected time plus any time processing ran over
final long overTime = runningTime - throttleLimitMsPerSec;
sleepTime = (1000L - throttleLimitMsPerSec) + overTime;
}
Thread.sleep(sleepTime);
throttleTimer.reset().start();
}
accumulateTimeWaiting();
}
}
/**
* Helper method to measure time running.
*/
private void accumulateTimeRunning() {
timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
/**
* Helper method to measure time waiting.
*/
private void accumulateTimeWaiting() {
timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
}
}