blob: 8a117546ff57c30c84cbaa80d3f8a69ec29b2202 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
/**
* Scans the block files under a block pool and verifies that the
* files are not corrupt.
* This keeps track of blocks and their last verification times.
* Currently it does not modify the metadata for block.
*/
class BlockPoolSliceScanner {
public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class);
private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
private static final String VERIFICATION_PREFIX = "dncp_block_verification.log";
private final String blockPoolId;
private final long scanPeriod;
private final AtomicLong lastScanTime = new AtomicLong();
private final DataNode datanode;
private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
private final SortedSet<BlockScanInfo> blockInfoSet
= new TreeSet<BlockScanInfo>();
private final Map<Block, BlockScanInfo> blockMap
= new HashMap<Block, BlockScanInfo>();
// processedBlocks keeps track of which blocks are scanned
// since the last run.
private volatile HashMap<Long, Integer> processedBlocks;
private long totalScans = 0;
private long totalScanErrors = 0;
private long totalTransientErrors = 0;
private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
private long currentPeriodStart = Time.now();
private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0;
private final LogFileHandler verificationLog;
private final DataTransferThrottler throttler = new DataTransferThrottler(
200, MAX_SCAN_RATE);
private static enum ScanType {
VERIFICATION_SCAN, // scanned as part of periodic verfication
NONE,
}
static class BlockScanInfo implements Comparable<BlockScanInfo> {
Block block;
long lastScanTime = 0;
ScanType lastScanType = ScanType.NONE;
boolean lastScanOk = true;
BlockScanInfo(Block block) {
this.block = block;
}
@Override
public int hashCode() {
return block.hashCode();
}
@Override
public boolean equals(Object other) {
return other instanceof BlockScanInfo &&
compareTo((BlockScanInfo)other) == 0;
}
long getLastScanTime() {
return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
}
@Override
public int compareTo(BlockScanInfo other) {
long t1 = lastScanTime;
long t2 = other.lastScanTime;
return ( t1 < t2 ) ? -1 :
(( t1 > t2 ) ? 1 : block.compareTo(other.block));
}
}
BlockPoolSliceScanner(String bpid, DataNode datanode,
FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
this.blockPoolId = bpid;
long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT);
if (hours <= 0) {
hours = DEFAULT_SCAN_PERIOD_HOURS;
}
this.scanPeriod = hours * 3600 * 1000;
LOG.info("Periodic Block Verification Scanner initialized with interval "
+ hours + " hours for block pool " + bpid);
// get the list of blocks and arrange them in random order
List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
Collections.shuffle(arr);
long scanTime = -1;
for (Block block : arr) {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
addBlockInfo(info);
}
RollingLogs rollingLogs = null;
try {
rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " +
"Verification times are not stored.");
}
verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs);
}
String getBlockPoolId() {
return blockPoolId;
}
private void updateBytesToScan(long len, long lastScanTime) {
// len could be negative when a block is deleted.
totalBytesToScan += len;
if ( lastScanTime < currentPeriodStart ) {
bytesLeft += len;
}
// Should we change throttler bandwidth every time bytesLeft changes?
// not really required.
}
private synchronized void addBlockInfo(BlockScanInfo info) {
boolean added = blockInfoSet.add(info);
blockMap.put(info.block, info);
if (added) {
updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
}
}
private synchronized void delBlockInfo(BlockScanInfo info) {
boolean exists = blockInfoSet.remove(info);
blockMap.remove(info.block);
if (exists) {
updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
}
}
/** Update blockMap by the given LogEntry */
private synchronized void updateBlockInfo(LogEntry e) {
BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
if(info != null && e.verificationTime > 0 &&
info.lastScanTime < e.verificationTime) {
delBlockInfo(info);
info.lastScanTime = e.verificationTime;
info.lastScanType = ScanType.VERIFICATION_SCAN;
addBlockInfo(info);
}
}
private synchronized long getNewBlockScanTime() {
/* If there are a lot of blocks, this returns a random time with in
* the scan period. Otherwise something sooner.
*/
long period = Math.min(scanPeriod,
Math.max(blockMap.size(),1) * 600 * 1000L);
int periodInt = Math.abs((int)period);
return Time.now() - scanPeriod +
DFSUtil.getRandom().nextInt(periodInt);
}
/** Adds block to list of blocks */
synchronized void addBlock(ExtendedBlock block) {
BlockScanInfo info = blockMap.get(block.getLocalBlock());
if ( info != null ) {
LOG.warn("Adding an already existing block " + block);
delBlockInfo(info);
}
info = new BlockScanInfo(block.getLocalBlock());
info.lastScanTime = getNewBlockScanTime();
addBlockInfo(info);
adjustThrottler();
}
/** Deletes the block from internal structures */
synchronized void deleteBlock(Block block) {
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
delBlockInfo(info);
}
}
@VisibleForTesting
long getTotalScans() {
return totalScans;
}
/** @return the last scan time for the block pool. */
long getLastScanTime() {
return lastScanTime.get();
}
/** @return the last scan time the given block. */
synchronized long getLastScanTime(Block block) {
BlockScanInfo info = blockMap.get(block);
return info == null? 0: info.lastScanTime;
}
/** Deletes blocks from internal structures */
void deleteBlocks(Block[] blocks) {
for ( Block b : blocks ) {
deleteBlock(b);
}
}
private synchronized void updateScanStatus(Block block,
ScanType type,
boolean scanOk) {
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
delBlockInfo(info);
} else {
// It might already be removed. Thats ok, it will be caught next time.
info = new BlockScanInfo(block);
}
long now = Time.now();
info.lastScanType = type;
info.lastScanTime = now;
info.lastScanOk = scanOk;
addBlockInfo(info);
// Don't update meta data if the verification failed.
if (!scanOk) {
return;
}
if (verificationLog != null) {
verificationLog.append(now, block.getGenerationStamp(),
block.getBlockId());
}
}
private void handleScanFailure(ExtendedBlock block) {
LOG.info("Reporting bad " + block);
try {
datanode.reportBadBlocks(block);
} catch (IOException ie) {
// it is bad, but not bad enough to shutdown the scanner
LOG.warn("Cannot report bad " + block.getBlockId());
}
}
static private class LogEntry {
long blockId = -1;
long verificationTime = -1;
long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
/**
* The format consists of single line with multiple entries. each
* entry is in the form : name="value".
* This simple text and easily extendable and easily parseable with a
* regex.
*/
private static Pattern entryPattern =
Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
static String toString(long verificationTime, long genStamp, long blockId,
DateFormat dateFormat) {
return "\ndate=\"" + dateFormat.format(new Date(verificationTime))
+ "\"\t time=\"" + verificationTime
+ "\"\t genstamp=\"" + genStamp
+ "\"\t id=\"" + blockId + "\"";
}
static LogEntry parseEntry(String line) {
LogEntry entry = new LogEntry();
Matcher matcher = entryPattern.matcher(line);
while (matcher.find()) {
String name = matcher.group(1);
String value = matcher.group(2);
try {
if (name.equals("id")) {
entry.blockId = Long.valueOf(value);
} else if (name.equals("time")) {
entry.verificationTime = Long.valueOf(value);
} else if (name.equals("genstamp")) {
entry.genStamp = Long.valueOf(value);
}
} catch(NumberFormatException nfe) {
LOG.warn("Cannot parse line: " + line, nfe);
return null;
}
}
return entry;
}
}
private synchronized void adjustThrottler() {
long timeLeft = currentPeriodStart+scanPeriod - Time.now();
long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}
@VisibleForTesting
void verifyBlock(ExtendedBlock block) {
BlockSender blockSender = null;
/* In case of failure, attempt to read second time to reduce
* transient errors. How do we flush block data from kernel
* buffers before the second read?
*/
for (int i=0; i<2; i++) {
boolean second = (i > 0);
try {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, true, true,
datanode, null);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
blockSender.sendBlock(out, null, throttler);
LOG.info((second ? "Second " : "") +
"Verification succeeded for " + block);
if ( second ) {
totalTransientErrors++;
}
updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true);
return;
} catch (IOException e) {
updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
if (!dataset.contains(block)) {
LOG.info(block + " is no longer in the dataset");
deleteBlock(block.getLocalBlock());
return;
}
// If the block exists, the exception may due to a race with write:
// The BlockSender got an old block path in rbw. BlockReceiver removed
// the rbw block from rbw to finalized but BlockSender tried to open the
// file before BlockReceiver updated the VolumeMap. The state of the
// block can be changed again now, so ignore this error here. If there
// is a block really deleted by mistake, DirectoryScan should catch it.
if (e instanceof FileNotFoundException ) {
LOG.info("Verification failed for " + block +
" - may be due to race with write");
deleteBlock(block.getLocalBlock());
return;
}
LOG.warn((second ? "Second " : "First ") + "Verification failed for "
+ block, e);
if (second) {
totalScanErrors++;
datanode.getMetrics().incrBlockVerificationFailures();
handleScanFailure(block);
return;
}
} finally {
IOUtils.closeStream(blockSender);
datanode.getMetrics().incrBlocksVerified();
totalScans++;
}
}
}
private synchronized long getEarliestScanTime() {
if (!blockInfoSet.isEmpty()) {
return blockInfoSet.first().lastScanTime;
}
return Long.MAX_VALUE;
}
private synchronized boolean isFirstBlockProcessed() {
if (!blockInfoSet.isEmpty()) {
long blockId = blockInfoSet.first().block.getBlockId();
if ((processedBlocks.get(blockId) != null)
&& (processedBlocks.get(blockId) == 1)) {
return true;
}
}
return false;
}
// Picks one block and verifies it
private void verifyFirstBlock() {
Block block = null;
synchronized (this) {
if (!blockInfoSet.isEmpty()) {
block = blockInfoSet.first().block;
}
}
if ( block != null ) {
verifyBlock(new ExtendedBlock(blockPoolId, block));
processedBlocks.put(block.getBlockId(), 1);
}
}
// Used for tests only
int getBlocksScannedInLastRun() {
return totalBlocksScannedInLastRun.get();
}
/**
* Reads the current and previous log files (if any) and marks the blocks
* processed if they were processed within last scan period. Copies the log
* records of recently scanned blocks from previous to current file.
* Returns false if the process was interrupted because the thread is marked
* to exit.
*/
private boolean assignInitialVerificationTimes() {
//First updates the last verification times from the log file.
if (verificationLog != null) {
long now = Time.now();
RollingLogs.LineIterator logIterator = null;
try {
logIterator = verificationLog.logs.iterator(false);
// update verification times from the verificationLog.
while (logIterator.hasNext()) {
if (!datanode.shouldRun
|| datanode.blockScanner.blockScannerThread.isInterrupted()) {
return false;
}
LogEntry entry = LogEntry.parseEntry(logIterator.next());
if (entry != null) {
updateBlockInfo(entry);
if (now - entry.verificationTime < scanPeriod) {
BlockScanInfo info = blockMap.get(new Block(entry.blockId, 0,
entry.genStamp));
if (info != null) {
if (processedBlocks.get(entry.blockId) == null) {
updateBytesLeft(-info.block.getNumBytes());
processedBlocks.put(entry.blockId, 1);
}
if (logIterator.isPrevious()) {
// write the log entry to current file
// so that the entry is preserved for later runs.
verificationLog.append(entry.verificationTime, entry.genStamp,
entry.blockId);
}
}
}
}
}
} catch (IOException e) {
LOG.warn("Failed to read previous verification times.", e);
} finally {
IOUtils.closeStream(logIterator);
}
}
/* Before this loop, entries in blockInfoSet that are not
* updated above have lastScanTime of <= 0 . Loop until first entry has
* lastModificationTime > 0.
*/
synchronized (this) {
final int numBlocks = Math.max(blockMap.size(), 1);
// Initially spread the block reads over half of scan period
// so that we don't keep scanning the blocks too quickly when restarted.
long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
long lastScanTime = Time.now() - scanPeriod;
if (!blockInfoSet.isEmpty()) {
BlockScanInfo info;
while ((info = blockInfoSet.first()).lastScanTime < 0) {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
addBlockInfo(info);
}
}
}
return true;
}
private synchronized void updateBytesLeft(long len) {
bytesLeft += len;
}
private synchronized void startNewPeriod() {
LOG.info("Starting a new period : work left in prev period : "
+ String.format("%.2f%%", totalBytesToScan == 0 ? 0
: (bytesLeft * 100.0) / totalBytesToScan));
// reset the byte counts :
bytesLeft = totalBytesToScan;
currentPeriodStart = Time.now();
}
private synchronized boolean workRemainingInCurrentPeriod() {
if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
currentPeriodStart + ", period=" + scanPeriod + ", now=" +
Time.now() + " " + blockPoolId);
}
return false;
} else {
return true;
}
}
void scanBlockPoolSlice() {
if (!workRemainingInCurrentPeriod()) {
return;
}
// Create a new processedBlocks structure
processedBlocks = new HashMap<Long, Integer>();
if (!assignInitialVerificationTimes()) {
return;
}
// Start scanning
try {
scan();
} finally {
totalBlocksScannedInLastRun.set(processedBlocks.size());
lastScanTime.set(Time.now());
}
}
/**
* Shuts down this BlockPoolSliceScanner and releases any internal resources.
*/
void shutdown() {
if (verificationLog != null) {
verificationLog.close();
}
}
private void scan() {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to scan blockpool: " + blockPoolId);
}
try {
adjustThrottler();
while (datanode.shouldRun
&& !datanode.blockScanner.blockScannerThread.isInterrupted()
&& datanode.isBPServiceAlive(blockPoolId)) {
long now = Time.now();
synchronized (this) {
if ( now >= (currentPeriodStart + scanPeriod)) {
startNewPeriod();
}
}
if (((now - getEarliestScanTime()) >= scanPeriod)
|| ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) {
verifyFirstBlock();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("All remaining blocks were processed recently, "
+ "so this run is complete");
}
break;
}
}
} catch (RuntimeException e) {
LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
throw e;
} finally {
rollVerificationLogs();
if (LOG.isDebugEnabled()) {
LOG.debug("Done scanning block pool: " + blockPoolId);
}
}
}
private synchronized void rollVerificationLogs() {
if (verificationLog != null) {
try {
verificationLog.logs.roll();
} catch (IOException ex) {
LOG.warn("Received exception: ", ex);
verificationLog.close();
}
}
}
synchronized void printBlockReport(StringBuilder buffer,
boolean summaryOnly) {
long oneHour = 3600*1000;
long oneDay = 24*oneHour;
long oneWeek = 7*oneDay;
long fourWeeks = 4*oneWeek;
int inOneHour = 0;
int inOneDay = 0;
int inOneWeek = 0;
int inFourWeeks = 0;
int inScanPeriod = 0;
int neverScanned = 0;
DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
int total = blockInfoSet.size();
long now = Time.now();
Date date = new Date();
for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
BlockScanInfo info = it.next();
long scanTime = info.getLastScanTime();
long diff = now - scanTime;
if (diff <= oneHour) inOneHour++;
if (diff <= oneDay) inOneDay++;
if (diff <= oneWeek) inOneWeek++;
if (diff <= fourWeeks) inFourWeeks++;
if (diff <= scanPeriod) inScanPeriod++;
if (scanTime <= 0) neverScanned++;
if (!summaryOnly) {
date.setTime(scanTime);
String scanType =
(info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none";
buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
" scan time : " +
"%-15d %s%n", info.block,
(info.lastScanOk ? "ok" : "failed"),
scanType, scanTime,
(scanTime <= 0) ? "not yet verified" :
dateFormat.format(date)));
}
}
double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
*100.0/scanPeriod;
double pctProgress = (totalBytesToScan == 0) ? 100 :
(totalBytesToScan-bytesLeft)*100.0/totalBytesToScan;
buffer.append(String.format("%nTotal Blocks : %6d" +
"%nVerified in last hour : %6d" +
"%nVerified in last day : %6d" +
"%nVerified in last week : %6d" +
"%nVerified in last four weeks : %6d" +
"%nVerified in SCAN_PERIOD : %6d" +
"%nNot yet verified : %6d" +
"%nVerified since restart : %6d" +
"%nScans since restart : %6d" +
"%nScan errors since restart : %6d" +
"%nTransient scan errors : %6d" +
"%nCurrent scan rate limit KBps : %6d" +
"%nProgress this period : %6.0f%%" +
"%nTime left in cur period : %6.2f%%" +
"%n",
total, inOneHour, inOneDay, inOneWeek,
inFourWeeks, inScanPeriod, neverScanned,
totalScans, totalScans,
totalScanErrors, totalTransientErrors,
Math.round(throttler.getBandwidth()/1024.0),
pctProgress, pctPeriodLeft));
}
/**
* This class takes care of log file used to store the last verification
* times of the blocks.
*/
private static class LogFileHandler {
private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
private final RollingLogs logs;
private LogFileHandler(RollingLogs logs) {
this.logs = logs;
}
void append(long verificationTime, long genStamp, long blockId) {
final String m = LogEntry.toString(verificationTime, genStamp, blockId,
dateFormat);
try {
logs.appender().append(m);
} catch (IOException e) {
LOG.warn("Failed to append to " + logs + ", m=" + m, e);
}
}
void close() {
try {
logs.appender().close();
} catch (IOException e) {
LOG.warn("Failed to close the appender of " + logs, e);
}
}
}
}