blob: 6dcfad418474aa37e9e94d05ae138de94a129324 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@InterfaceAudience.Private
public class BlockScanner {
public static final Logger LOG =
LoggerFactory.getLogger(BlockScanner.class);
/**
* The DataNode that this scanner is associated with.
*/
private final DataNode datanode;
/**
* Maps Storage IDs to VolumeScanner objects.
*/
private final TreeMap<String, VolumeScanner> scanners =
new TreeMap<String, VolumeScanner>();
/**
* The scanner configuration.
*/
private Conf conf;
/**
* Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop
* inside {@link #removeAllVolumeScanners}.
*/
private long joinVolumeScannersTimeOutMs;
@VisibleForTesting
void setConf(Conf conf) {
this.conf = conf;
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
entry.getValue().setConf(conf);
}
}
/**
* The cached scanner configuration.
*/
static class Conf {
// These are a few internal configuration keys used for unit tests.
// They can't be set unless the static boolean allowUnitTestSettings has
// been set to true.
@VisibleForTesting
static final String INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS =
"internal.dfs.datanode.scan.period.ms.key";
@VisibleForTesting
static final String INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER =
"internal.volume.scanner.scan.result.handler";
@VisibleForTesting
static final String INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS =
"internal.dfs.block.scanner.max_staleness.ms";
@VisibleForTesting
static final long INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT =
TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
@VisibleForTesting
static final String INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS =
"dfs.block.scanner.cursor.save.interval.ms";
@VisibleForTesting
static final long
INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT =
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
static boolean allowUnitTestSettings = false;
final long targetBytesPerSec;
final long maxStalenessMs;
final long scanPeriodMs;
final long cursorSaveMs;
final boolean skipRecentAccessed;
final Class<? extends ScanResultHandler> resultHandler;
private static long getUnitTestLong(Configuration conf, String key,
long defVal) {
if (allowUnitTestSettings) {
return conf.getLong(key, defVal);
} else {
return defVal;
}
}
/**
* Determine the configured block scanner interval.
*
* For compatibility with prior releases of HDFS, if the
* configured value is zero then the scan period is
* set to 3 weeks.
*
* If the configured value is less than zero then the scanner
* is disabled.
*
* @param conf Configuration object.
* @return block scan period in milliseconds.
*/
private static long getConfiguredScanPeriodMs(Configuration conf) {
long tempScanPeriodMs = getUnitTestLong(
conf, INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS,
TimeUnit.MILLISECONDS.convert(conf.getLong(
DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT), TimeUnit.HOURS));
if (tempScanPeriodMs == 0) {
tempScanPeriodMs = TimeUnit.MILLISECONDS.convert(
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT, TimeUnit.HOURS);
}
return tempScanPeriodMs;
}
@SuppressWarnings("unchecked")
Conf(Configuration conf) {
this.targetBytesPerSec = Math.max(0L, conf.getLong(
DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT));
this.maxStalenessMs = Math.max(0L, getUnitTestLong(conf,
INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS,
INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT));
this.scanPeriodMs = getConfiguredScanPeriodMs(conf);
this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf,
INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS,
INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT));
this.skipRecentAccessed = conf.getBoolean(
DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED,
DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT);
if (allowUnitTestSettings) {
this.resultHandler = (Class<? extends ScanResultHandler>)
conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
ScanResultHandler.class);
} else {
this.resultHandler = ScanResultHandler.class;
}
}
}
public BlockScanner(DataNode datanode) {
this(datanode, datanode.getConf());
}
public BlockScanner(DataNode datanode, Configuration conf) {
this.datanode = datanode;
setJoinVolumeScannersTimeOutMs(
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT));
this.conf = new Conf(conf);
if (isEnabled()) {
LOG.info("Initialized block scanner with targetBytesPerSec {}",
this.conf.targetBytesPerSec);
} else {
LOG.info("Disabled block scanner.");
}
}
/**
* Returns true if the block scanner is enabled.
*
* If the block scanner is disabled, no volume scanners will be created, and
* no threads will start.
*/
public boolean isEnabled() {
return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0);
}
/**
* Returns true if there is any scanner thread registered.
*/
public synchronized boolean hasAnyRegisteredScanner() {
return !scanners.isEmpty();
}
/**
* Set up a scanner for the given block pool and volume.
*
* @param ref A reference to the volume.
*/
public synchronized void addVolumeScanner(FsVolumeReference ref) {
boolean success = false;
try {
FsVolumeSpi volume = ref.getVolume();
if (!isEnabled()) {
LOG.debug("Not adding volume scanner for {}, because the block " +
"scanner is disabled.", volume);
return;
}
VolumeScanner scanner = scanners.get(volume.getStorageID());
if (scanner != null) {
LOG.error("Already have a scanner for volume {}.",
volume);
return;
}
LOG.debug("Adding scanner for volume {} (StorageID {})",
volume, volume.getStorageID());
scanner = new VolumeScanner(conf, datanode, ref);
scanner.start();
scanners.put(volume.getStorageID(), scanner);
success = true;
} finally {
if (!success) {
// If we didn't create a new VolumeScanner object, we don't
// need this reference to the volume.
IOUtils.cleanupWithLogger(null, ref);
}
}
}
/**
* Stops and removes a volume scanner.
*
* This function will block until the volume scanner has stopped.
*
* @param volume The volume to remove.
*/
public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
if (!isEnabled()) {
LOG.debug("Not removing volume scanner for {}, because the block " +
"scanner is disabled.", volume.getStorageID());
return;
}
VolumeScanner scanner = scanners.get(volume.getStorageID());
if (scanner == null) {
LOG.warn("No scanner found to remove for volumeId {}",
volume.getStorageID());
return;
}
LOG.info("Removing scanner for volume {} (StorageID {})",
volume, volume.getStorageID());
scanner.shutdown();
scanners.remove(volume.getStorageID());
Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
}
/**
* Stops and removes all volume scanners.
*
* This function is called on shutdown. It will return even if some of
* the scanners don't terminate in time. Since the scanners are daemon
* threads and do not alter the block content, it is safe to ignore
* such conditions on shutdown.
*/
public synchronized void removeAllVolumeScanners() {
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
entry.getValue().shutdown();
}
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
Uninterruptibles.joinUninterruptibly(entry.getValue(),
getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS);
}
scanners.clear();
}
/**
* Enable scanning a given block pool id.
*
* @param bpid The block pool id to enable scanning for.
*/
synchronized void enableBlockPoolId(String bpid) {
Preconditions.checkNotNull(bpid);
for (VolumeScanner scanner : scanners.values()) {
scanner.enableBlockPoolId(bpid);
}
}
/**
* Disable scanning a given block pool id.
*
* @param bpid The block pool id to disable scanning for.
*/
synchronized void disableBlockPoolId(String bpid) {
Preconditions.checkNotNull(bpid);
for (VolumeScanner scanner : scanners.values()) {
scanner.disableBlockPoolId(bpid);
}
}
@VisibleForTesting
synchronized VolumeScanner.Statistics getVolumeStats(String volumeId) {
VolumeScanner scanner = scanners.get(volumeId);
if (scanner == null) {
return null;
}
return scanner.getStatistics();
}
synchronized void printStats(StringBuilder p) {
// print out all bpids that we're scanning ?
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
entry.getValue().printStats(p);
}
}
/**
* Mark a block as "suspect."
*
* This means that we should try to rescan it soon. Note that the
* VolumeScanner keeps a list of recently suspicious blocks, which
* it uses to avoid rescanning the same block over and over in a short
* time frame.
*
* @param storageId The ID of the storage where the block replica
* is being stored.
* @param block The block's ID and block pool id.
*/
synchronized void markSuspectBlock(String storageId, ExtendedBlock block) {
if (!isEnabled()) {
LOG.debug("Not scanning suspicious block {} on {}, because the block " +
"scanner is disabled.", block, storageId);
return;
}
VolumeScanner scanner = scanners.get(storageId);
if (scanner == null) {
// This could happen if the volume is in the process of being removed.
// The removal process shuts down the VolumeScanner, but the volume
// object stays around as long as there are references to it (which
// should not be that long.)
LOG.info("Not scanning suspicious block {} on {}, because there is no " +
"volume scanner for that storageId.", block, storageId);
return;
}
scanner.markSuspectBlock(block);
}
public long getJoinVolumeScannersTimeOutMs() {
return joinVolumeScannersTimeOutMs;
}
public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) {
this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs;
}
@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
public void doGet(HttpServletRequest request,
HttpServletResponse response) throws IOException {
response.setContentType("text/plain");
DataNode datanode = (DataNode)
getServletContext().getAttribute("datanode");
BlockScanner blockScanner = datanode.getBlockScanner();
StringBuilder buffer = new StringBuilder(8 * 1024);
if (!blockScanner.isEnabled()) {
LOG.warn("Periodic block scanner is not running");
buffer.append("Periodic block scanner is not running. " +
"Please check the datanode log if this is unexpected.");
} else {
buffer.append("Block Scanner Statistics\n\n");
blockScanner.printStats(buffer);
}
String resp = buffer.toString();
LOG.trace("Returned Servlet info {}", resp);
response.getWriter().write(resp);
}
}
}