blob: 02592db5226b1489a3bfd7f348dca9e236307c73 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
/**
* Performs two types of scanning:
* <li> Gets block files from the data directories and reconciles the
* difference between the blocks on the disk and in memory in
* {@link FSDataset}</li>
* <li> Scans the data directories for block files and verifies that
* the files are not corrupt</li>
* This keeps track of blocks and their last verification times.
* Currently it does not modify the metadata for block.
*/
class DataBlockScanner implements Runnable {
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
private static final long ONE_DAY = 24*3600*1000L;
static final DateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
static final String verificationLogFile = "dncp_block_verification.log";
static final int verficationLogLimit = 5; // * numBlocks.
private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
DataNode datanode;
FSDataset dataset;
// sorted set
TreeSet<BlockScanInfo> blockInfoSet;
HashMap<Block, BlockScanInfo> blockMap;
long totalScans = 0;
long totalVerifications = 0; // includes remote verification by clients.
long totalScanErrors = 0;
long totalTransientErrors = 0;
long currentPeriodStart = System.currentTimeMillis();
long bytesLeft = 0; // Bytes to scan in this period
long totalBytesToScan = 0;
private LogFileHandler verificationLog;
Random random = new Random();
BlockTransferThrottler throttler = null;
// Reconciles blocks on disk to blocks in memory
DirectoryScanner dirScanner;
private static enum ScanType {
REMOTE_READ, // Verified when a block read by a client etc
VERIFICATION_SCAN, // scanned as part of periodic verfication
NONE,
}
static class BlockScanInfo implements Comparable<BlockScanInfo> {
Block block;
long lastScanTime = 0;
long lastLogTime = 0;
ScanType lastScanType = ScanType.NONE;
boolean lastScanOk = true;
BlockScanInfo(Block block) {
this.block = block;
}
public int hashCode() {
return block.hashCode();
}
public boolean equals(Object other) {
return other instanceof BlockScanInfo &&
compareTo((BlockScanInfo)other) == 0;
}
long getLastScanTime() {
return ( lastScanType == ScanType.NONE) ? 0 : lastScanTime;
}
public int compareTo(BlockScanInfo other) {
long t1 = lastScanTime;
long t2 = other.lastScanTime;
return ( t1 < t2 ) ? -1 :
(( t1 > t2 ) ? 1 : block.compareTo(other.block));
}
}
DataBlockScanner(DataNode datanode, FSDataset dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
scanPeriod = conf.getInt("dfs.datanode.scan.period.hours", 0);
if ( scanPeriod <= 0 ) {
scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
}
scanPeriod *= 3600 * 1000;
// initialized when the scanner thread is started.
dirScanner = new DirectoryScanner(dataset, conf);
}
private synchronized boolean isInitiliazed() {
return throttler != null;
}
private void updateBytesToScan(long len, long lastScanTime) {
// len could be negative when a block is deleted.
totalBytesToScan += len;
if ( lastScanTime < currentPeriodStart ) {
bytesLeft += len;
}
// Should we change throttler bandwidth every time bytesLeft changes?
// not really required.
}
private synchronized void addBlockInfo(BlockScanInfo info) {
boolean added = blockInfoSet.add(info);
blockMap.put(info.block, info);
if ( added ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
}
}
private synchronized void delBlockInfo(BlockScanInfo info) {
boolean exists = blockInfoSet.remove(info);
blockMap.remove(info.block);
if ( exists ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
}
}
/** Update blockMap by the given LogEntry */
private synchronized void updateBlockInfo(LogEntry e) {
BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
if(info != null && e.verificationTime > 0 &&
info.lastScanTime < e.verificationTime) {
delBlockInfo(info);
info.lastScanTime = e.verificationTime;
info.lastScanType = ScanType.VERIFICATION_SCAN;
addBlockInfo(info);
}
}
private void init() {
// get the list of blocks and arrange them in random order
List<Block> arr = dataset.getFinalizedBlocks();
Collections.shuffle(arr);
blockInfoSet = new TreeSet<BlockScanInfo>();
blockMap = new HashMap<Block, BlockScanInfo>();
long scanTime = -1;
for (Block block : arr) {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
addBlockInfo(info);
}
/* Pick the first directory that has any existing scanner log.
* otherwise, pick the first directory.
*/
File dir = null;
FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
for(FSDataset.FSVolume vol : volumes) {
if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
dir = vol.getDir();
break;
}
}
if (dir == null) {
dir = volumes[0].getDir();
}
try {
// max lines will be updated later during initialization.
verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " +
"Verification times are not stored.");
}
synchronized (this) {
throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
}
}
private synchronized long getNewBlockScanTime() {
/* If there are a lot of blocks, this returns a random time with in
* the scan period. Otherwise something sooner.
*/
long period = Math.min(scanPeriod,
Math.max(blockMap.size(),1) * 600 * 1000L);
return System.currentTimeMillis() - scanPeriod +
random.nextInt((int)period);
}
/** Adds block to list of blocks */
synchronized void addBlock(Block block) {
if (!isInitiliazed()) {
return;
}
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
LOG.warn("Adding an already existing block " + block);
delBlockInfo(info);
}
info = new BlockScanInfo(block);
info.lastScanTime = getNewBlockScanTime();
addBlockInfo(info);
adjustThrottler();
}
/** Deletes the block from internal structures */
synchronized void deleteBlock(Block block) {
if (!isInitiliazed()) {
return;
}
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
delBlockInfo(info);
}
}
/** @return the last scan time */
synchronized long getLastScanTime(Block block) {
if (!isInitiliazed()) {
return 0;
}
BlockScanInfo info = blockMap.get(block);
return info == null? 0: info.lastScanTime;
}
/** Deletes blocks from internal structures */
void deleteBlocks(Block[] blocks) {
for ( Block b : blocks ) {
deleteBlock(b);
}
}
void verifiedByClient(Block block) {
updateScanStatus(block, ScanType.REMOTE_READ, true);
}
private synchronized void updateScanStatus(Block block,
ScanType type,
boolean scanOk) {
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
delBlockInfo(info);
} else {
// It might already be removed. Thats ok, it will be caught next time.
info = new BlockScanInfo(block);
}
long now = System.currentTimeMillis();
info.lastScanType = type;
info.lastScanTime = now;
info.lastScanOk = scanOk;
addBlockInfo(info);
if (type == ScanType.REMOTE_READ) {
totalVerifications++;
}
// Don't update meta data too often in case of REMOTE_READ
// of if the verification failed.
long diff = now - info.lastLogTime;
if (!scanOk || (type == ScanType.REMOTE_READ &&
diff < scanPeriod/3 && diff < ONE_DAY)) {
return;
}
info.lastLogTime = now;
LogFileHandler log = verificationLog;
if (log != null) {
log.appendLine("date=\"" + dateFormat.format(new Date(now)) + "\"\t " +
"time=\"" + now + "\"\t " +
"genstamp=\"" + block.getGenerationStamp() + "\"\t " +
"id=\"" + block.getBlockId() +"\"");
}
}
private void handleScanFailure(Block block) {
LOG.info("Reporting bad block " + block + " to namenode.");
try {
DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
datanode.namenode.reportBadBlocks(blocks);
} catch (IOException e){
/* One common reason is that NameNode could be in safe mode.
* Should we keep on retrying in that case?
*/
LOG.warn("Failed to report bad block " + block + " to namenode : " +
" Exception : " + StringUtils.stringifyException(e));
}
}
static private class LogEntry {
long blockId = -1;
long verificationTime = -1;
long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
/**
* The format consists of single line with multiple entries. each
* entry is in the form : name="value".
* This simple text and easily extendable and easily parseable with a
* regex.
*/
private static Pattern entryPattern =
Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
static LogEntry parseEntry(String line) {
LogEntry entry = new LogEntry();
Matcher matcher = entryPattern.matcher(line);
while (matcher.find()) {
String name = matcher.group(1);
String value = matcher.group(2);
try {
if (name.equals("id")) {
entry.blockId = Long.valueOf(value);
} else if (name.equals("time")) {
entry.verificationTime = Long.valueOf(value);
} else if (name.equals("genstamp")) {
entry.genStamp = Long.valueOf(value);
}
} catch(NumberFormatException nfe) {
LOG.warn("Cannot parse line: " + line, nfe);
return null;
}
}
return entry;
}
}
private synchronized void adjustThrottler() {
long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}
private void verifyBlock(Block block) {
BlockSender blockSender = null;
/* In case of failure, attempt to read second time to reduce
* transient errors. How do we flush block data from kernel
* buffers before the second read?
*/
for (int i=0; i<2; i++) {
boolean second = (i > 0);
try {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false,
false, true, datanode);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
blockSender.sendBlock(out, null, throttler);
LOG.info((second ? "Second " : "") +
"Verification succeeded for " + block);
if ( second ) {
totalTransientErrors++;
}
updateScanStatus(block, ScanType.VERIFICATION_SCAN, true);
return;
} catch (IOException e) {
totalScanErrors++;
updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
if ( dataset.getFile(block) == null ) {
LOG.info("Verification failed for " + block + ". Its ok since " +
"it not in datanode dataset anymore.");
deleteBlock(block);
return;
}
LOG.warn((second ? "Second " : "First ") +
"Verification failed for " + block + ". Exception : " +
StringUtils.stringifyException(e));
if (second) {
datanode.getMetrics().blockVerificationFailures.inc();
handleScanFailure(block);
return;
}
} finally {
IOUtils.closeStream(blockSender);
datanode.getMetrics().blocksVerified.inc();
totalScans++;
totalVerifications++;
}
}
}
private synchronized long getEarliestScanTime() {
if ( blockInfoSet.size() > 0 ) {
return blockInfoSet.first().lastScanTime;
}
return Long.MAX_VALUE;
}
// Picks one block and verifies it
private void verifyFirstBlock() {
Block block = null;
synchronized (this) {
if ( blockInfoSet.size() > 0 ) {
block = blockInfoSet.first().block;
}
}
if ( block != null ) {
verifyBlock(block);
}
}
/** returns false if the process was interrupted
* because the thread is marked to exit.
*/
private boolean assignInitialVerificationTimes() {
int numBlocks = 1;
LogFileHandler log = null;
synchronized (this) {
log = verificationLog;
numBlocks = Math.max(blockMap.size(), 1);
}
//First udpates the last verification times from the log file.
LogFileHandler.Reader logReader = null;
try {
if (log != null) {
logReader = log.new Reader(false);
}
} catch (IOException e) {
LOG.warn("Could not read previous verification times : " +
StringUtils.stringifyException(e));
}
if (log != null) {
log.updateCurNumLines();
}
try {
// update verification times from the verificationLog.
while (logReader != null && logReader.hasNext()) {
if (!datanode.shouldRun || Thread.interrupted()) {
return false;
}
LogEntry entry = LogEntry.parseEntry(logReader.next());
if (entry != null) {
updateBlockInfo(entry);
}
}
} finally {
IOUtils.closeStream(logReader);
}
/* Initially spread the block reads over half of
* MIN_SCAN_PERIOD so that we don't keep scanning the
* blocks too quickly when restarted.
*/
long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks,
10*60*1000 ));
long lastScanTime = System.currentTimeMillis() - scanPeriod;
/* Before this loop, entries in blockInfoSet that are not
* updated above have lastScanTime of <= 0 . Loop until first entry has
* lastModificationTime > 0.
*/
synchronized (this) {
if (blockInfoSet.size() > 0 ) {
BlockScanInfo info;
while ((info = blockInfoSet.first()).lastScanTime < 0) {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
addBlockInfo(info);
}
}
}
return true;
}
private synchronized void startNewPeriod() {
LOG.info("Starting a new period : work left in prev period : " +
String.format("%.2f%%", (bytesLeft * 100.0)/totalBytesToScan));
// reset the byte counts :
bytesLeft = totalBytesToScan;
currentPeriodStart = System.currentTimeMillis();
}
public void run() {
try {
init();
//Read last verification times
if (!assignInitialVerificationTimes()) {
return;
}
adjustThrottler();
while (datanode.shouldRun && !Thread.interrupted()) {
long now = System.currentTimeMillis();
synchronized (this) {
if ( now >= (currentPeriodStart + scanPeriod)) {
startNewPeriod();
}
}
if (dirScanner.newScanPeriod(now)) {
dirScanner.reconcile();
now = System.currentTimeMillis();
}
if ( (now - getEarliestScanTime()) >= scanPeriod ) {
verifyFirstBlock();
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
}
} catch (RuntimeException e) {
LOG.warn("RuntimeException during DataBlockScanner.run() : " +
StringUtils.stringifyException(e));
throw e;
} finally {
shutdown();
LOG.info("Exiting DataBlockScanner thread.");
}
}
synchronized void shutdown() {
LogFileHandler log = verificationLog;
verificationLog = null;
if (log != null) {
log.close();
}
}
synchronized void printBlockReport(StringBuilder buffer,
boolean summaryOnly) {
long oneHour = 3600*1000;
long oneDay = 24*oneHour;
long oneWeek = 7*oneDay;
long fourWeeks = 4*oneWeek;
int inOneHour = 0;
int inOneDay = 0;
int inOneWeek = 0;
int inFourWeeks = 0;
int inScanPeriod = 0;
int neverScanned = 0;
int total = blockInfoSet.size();
long now = System.currentTimeMillis();
Date date = new Date();
for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
BlockScanInfo info = it.next();
long scanTime = info.getLastScanTime();
long diff = now - scanTime;
if (diff <= oneHour) inOneHour++;
if (diff <= oneDay) inOneDay++;
if (diff <= oneWeek) inOneWeek++;
if (diff <= fourWeeks) inFourWeeks++;
if (diff <= scanPeriod) inScanPeriod++;
if (scanTime <= 0) neverScanned++;
if (!summaryOnly) {
date.setTime(scanTime);
String scanType =
(info.lastScanType == ScanType.REMOTE_READ) ? "remote" :
((info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" :
"none");
buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
" scan time : " +
"%-15d %s\n", info.block,
(info.lastScanOk ? "ok" : "failed"),
scanType, scanTime,
(scanTime <= 0) ? "not yet verified" :
dateFormat.format(date)));
}
}
double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
*100.0/scanPeriod;
double pctProgress = (totalBytesToScan == 0) ? 100 :
(totalBytesToScan-bytesLeft)*10000.0/totalBytesToScan/
(100-pctPeriodLeft+1e-10);
buffer.append(String.format("\nTotal Blocks : %6d" +
"\nVerified in last hour : %6d" +
"\nVerified in last day : %6d" +
"\nVerified in last week : %6d" +
"\nVerified in last four weeks : %6d" +
"\nVerified in SCAN_PERIOD : %6d" +
"\nNot yet verified : %6d" +
"\nVerified since restart : %6d" +
"\nScans since restart : %6d" +
"\nScan errors since restart : %6d" +
"\nTransient scan errors : %6d" +
"\nCurrent scan rate limit KBps : %6d" +
"\nProgress this period : %6.0f%%" +
"\nTime left in cur period : %6.2f%%" +
"\n",
total, inOneHour, inOneDay, inOneWeek,
inFourWeeks, inScanPeriod, neverScanned,
totalVerifications, totalScans,
totalScanErrors, totalTransientErrors,
Math.round(throttler.getBandwidth()/1024.0),
pctProgress, pctPeriodLeft));
}
/**
* This class takes care of log file used to store the last verification
* times of the blocks. It rolls the current file when it is too big etc.
* If there is an error while writing, it stops updating with an error
* message.
*/
private static class LogFileHandler {
private static final String curFileSuffix = ".curr";
private static final String prevFileSuffix = ".prev";
// Don't roll files more often than this
private static final long minRollingPeriod = 6 * 3600 * 1000L; // 6 hours
private static final long minWarnPeriod = minRollingPeriod;
private static final int minLineLimit = 1000;
static boolean isFilePresent(File dir, String filePrefix) {
return new File(dir, filePrefix + curFileSuffix).exists() ||
new File(dir, filePrefix + prevFileSuffix).exists();
}
private File curFile;
private File prevFile;
private int maxNumLines = -1; // not very hard limit on number of lines.
private int curNumLines = -1;
long lastWarningTime = 0;
private PrintStream out;
int numReaders = 0;
/**
* Opens the log file for appending.
* Note that rolling will happen only after "updateLineCount()" is
* called. This is so that line count could be updated in a separate
* thread without delaying start up.
*
* @param dir where the logs files are located.
* @param filePrefix prefix of the file.
* @param maxNumLines max lines in a file (its a soft limit).
* @throws IOException
*/
LogFileHandler(File dir, String filePrefix, int maxNumLines)
throws IOException {
curFile = new File(dir, filePrefix + curFileSuffix);
prevFile = new File(dir, filePrefix + prevFileSuffix);
openCurFile();
curNumLines = -1;
setMaxNumLines(maxNumLines);
}
// setting takes affect when next entry is added.
synchronized void setMaxNumLines(int maxNumLines) {
this.maxNumLines = Math.max(maxNumLines, minLineLimit);
}
/**
* Append "\n" + line.
* If the log file need to be rolled, it will done after
* appending the text.
* This does not throw IOException when there is an error while
* appending. Currently does not throw an error even if rolling
* fails (may be it should?).
* return true if append was successful.
*/
synchronized boolean appendLine(String line) {
out.println();
out.print(line);
curNumLines += (curNumLines < 0) ? -1 : 1;
try {
rollIfRequired();
} catch (IOException e) {
warn("Rolling failed for " + curFile + " : " + e.getMessage());
return false;
}
return true;
}
//warns only once in a while
synchronized private void warn(String msg) {
long now = System.currentTimeMillis();
if ((now - lastWarningTime) >= minWarnPeriod) {
lastWarningTime = now;
LOG.warn(msg);
}
}
private synchronized void openCurFile() throws FileNotFoundException {
close();
out = new PrintStream(new FileOutputStream(curFile, true));
}
//This reads the current file and updates the count.
void updateCurNumLines() {
int count = 0;
Reader it = null;
try {
for(it = new Reader(true); it.hasNext(); count++) {
it.next();
}
} catch (IOException e) {
} finally {
synchronized (this) {
curNumLines = count;
}
IOUtils.closeStream(it);
}
}
private void rollIfRequired() throws IOException {
if (curNumLines < maxNumLines || numReaders > 0) {
return;
}
long now = System.currentTimeMillis();
if (now < minRollingPeriod) {
return;
}
if (!prevFile.delete() && prevFile.exists()) {
throw new IOException("Could not delete " + prevFile);
}
close();
if (!curFile.renameTo(prevFile)) {
openCurFile();
throw new IOException("Could not rename " + curFile +
" to " + prevFile);
}
openCurFile();
updateCurNumLines();
}
synchronized void close() {
if (out != null) {
out.close();
out = null;
}
}
/**
* This is used to read the lines in order.
* If the data is not read completely (i.e, untill hasNext() returns
* false), it needs to be explicitly
*/
private class Reader implements Iterator<String>, Closeable {
BufferedReader reader;
File file;
String line;
boolean closed = false;
private Reader(boolean skipPrevFile) throws IOException {
synchronized (LogFileHandler.this) {
numReaders++;
}
reader = null;
file = (skipPrevFile) ? curFile : prevFile;
readNext();
}
private boolean openFile() throws IOException {
for(int i=0; i<2; i++) {
if (reader != null || i > 0) {
// move to next file
file = (file == prevFile) ? curFile : null;
}
if (file == null) {
return false;
}
if (file.exists()) {
break;
}
}
if (reader != null ) {
reader.close();
reader = null;
}
reader = new BufferedReader(new FileReader(file));
return true;
}
// read next line if possible.
private void readNext() throws IOException {
line = null;
try {
if (reader != null && (line = reader.readLine()) != null) {
return;
}
if (line == null) {
// move to the next file.
if (openFile()) {
readNext();
}
}
} finally {
if (!hasNext()) {
close();
}
}
}
public boolean hasNext() {
return line != null;
}
public String next() {
String curLine = line;
try {
readNext();
} catch (IOException e) {
LOG.info("Could not reade next line in LogHandler : " +
StringUtils.stringifyException(e));
}
return curLine;
}
public void remove() {
throw new RuntimeException("remove() is not supported.");
}
public void close() throws IOException {
if (!closed) {
try {
if (reader != null) {
reader.close();
}
} finally {
file = null;
reader = null;
closed = true;
synchronized (LogFileHandler.this) {
numReaders--;
assert(numReaders >= 0);
}
}
}
}
}
}
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;
public void doGet(HttpServletRequest request,
HttpServletResponse response) throws IOException {
response.setContentType("text/plain");
DataBlockScanner blockScanner = (DataBlockScanner)
getServletContext().getAttribute("datanode.blockScanner");
boolean summary = (request.getParameter("listblocks") == null);
StringBuilder buffer = new StringBuilder(8*1024);
if (blockScanner == null) {
buffer.append("Periodic block scanner is not running. " +
"Please check the datanode log if this is unexpected.");
} else if (blockScanner.isInitiliazed()) {
blockScanner.printBlockReport(buffer, summary);
} else {
buffer.append("Periodic block scanner is not yet initialized. " +
"Please check back again after some time.");
}
response.getWriter().write(buffer.toString()); // extra copy!
}
}
}