blob: 232165ef52eaf7a3f0223ef7ab9b6c6568f06c17 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.raid;
import java.io.IOException;
import java.io.PrintStream;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Date;
import java.text.SimpleDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
/**
* distributed block fixer, uses map reduce jobs to fix corrupt files
*
* configuration options
* raid.blockfix.filespertask - number of corrupt files to fix in a single
* map reduce task (i.e., at one mapper node)
*
* raid.blockfix.fairscheduler.pool - the pool to use for block fixer jobs
*
* raid.blockfix.maxpendingfiles - maximum number of files to fix
* simultaneously
*/
public class DistBlockFixer extends BlockFixer {
// volatile should be sufficient since only the block fixer thread
// updates numJobsRunning (other threads may read)
private volatile int numJobsRunning = 0;
private static final String WORK_DIR_PREFIX = "blockfixer";
private static final String IN_FILE_SUFFIX = ".in";
private static final String PART_PREFIX = "part-";
private static final String BLOCKFIX_FILES_PER_TASK =
"raid.blockfix.filespertask";
private static final String BLOCKFIX_MAX_PENDING_FILES =
"raid.blockfix.maxpendingfiles";
private static final String BLOCKFIX_POOL =
"raid.blockfix.fairscheduler.pool";
// mapred.fairscheduler.pool is only used in the local configuration
// passed to a block fixing job
private static final String MAPRED_POOL =
"mapred.fairscheduler.pool";
// default number of files to fix in a task
private static final long DEFAULT_BLOCKFIX_FILES_PER_TASK = 10L;
// default number of files to fix simultaneously
private static final long DEFAULT_BLOCKFIX_MAX_PENDING_FILES = 1000L;
protected static final Log LOG = LogFactory.getLog(DistBlockFixer.class);
// number of files to fix in a task
private long filesPerTask;
// number of files to fix simultaneously
final private long maxPendingFiles;
// number of files being fixed right now
private long pendingFiles;
// pool name to use (may be null, in which case no special pool is used)
private String poolName;
private long lastCheckTime;
private final SimpleDateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private Map<String, CorruptFileInfo> fileIndex =
new HashMap<String, CorruptFileInfo>();
private Map<Job, List<CorruptFileInfo>> jobIndex =
new HashMap<Job, List<CorruptFileInfo>>();
static enum Counter {
FILES_SUCCEEDED, FILES_FAILED, FILES_NOACTION
}
public DistBlockFixer(Configuration conf) {
super(conf);
filesPerTask = DistBlockFixer.filesPerTask(getConf());
maxPendingFiles = DistBlockFixer.maxPendingFiles(getConf());
pendingFiles = 0L;
poolName = conf.get(BLOCKFIX_POOL);
// start off due for the first iteration
lastCheckTime = System.currentTimeMillis() - blockFixInterval;
}
/**
* determines how many files to fix in a single task
*/
protected static long filesPerTask(Configuration conf) {
return conf.getLong(BLOCKFIX_FILES_PER_TASK,
DEFAULT_BLOCKFIX_FILES_PER_TASK);
}
/**
* determines how many files to fix simultaneously
*/
protected static long maxPendingFiles(Configuration conf) {
return conf.getLong(BLOCKFIX_MAX_PENDING_FILES,
DEFAULT_BLOCKFIX_MAX_PENDING_FILES);
}
/**
* runs the block fixer periodically
*/
public void run() {
while (running) {
// check if it is time to run the block fixer
long now = System.currentTimeMillis();
if (now >= lastCheckTime + blockFixInterval) {
lastCheckTime = now;
try {
checkAndFixBlocks(now);
} catch (InterruptedException ignore) {
LOG.info("interrupted");
} catch (Exception e) {
// log exceptions and keep running
LOG.error(StringUtils.stringifyException(e));
} catch (Error e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
}
// try to sleep for the remainder of the interval
long sleepPeriod = (lastCheckTime - System.currentTimeMillis()) +
blockFixInterval;
if ((sleepPeriod > 0L) && running) {
try {
Thread.sleep(sleepPeriod);
} catch (InterruptedException ignore) {
LOG.info("interrupted");
}
}
}
}
/**
* checks for corrupt blocks and fixes them (if any)
*/
private void checkAndFixBlocks(long startTime)
throws IOException, InterruptedException, ClassNotFoundException {
checkJobs();
if (pendingFiles >= maxPendingFiles) {
return;
}
List<Path> corruptFiles = getCorruptFiles();
filterUnfixableSourceFiles(corruptFiles.iterator());
String startTimeStr = dateFormat.format(new Date(startTime));
LOG.info("found " + corruptFiles.size() + " corrupt files");
if (corruptFiles.size() > 0) {
String jobName = "blockfixer." + startTime;
startJob(jobName, corruptFiles);
}
}
/**
* Handle a failed job.
*/
private void failJob(Job job) throws IOException {
// assume no files have been fixed
LOG.error("DistBlockFixer job " + job.getJobID() + "(" + job.getJobName() +
") finished (failed)");
for (CorruptFileInfo fileInfo: jobIndex.get(job)) {
fileInfo.fail();
}
numJobsRunning--;
}
/**
* Handle a successful job.
*/
private void succeedJob(Job job, long filesSucceeded, long filesFailed)
throws IOException {
LOG.info("DistBlockFixer job " + job.getJobID() + "(" + job.getJobName() +
") finished (succeeded)");
if (filesFailed == 0) {
// no files have failed
for (CorruptFileInfo fileInfo: jobIndex.get(job)) {
fileInfo.succeed();
}
} else {
// we have to look at the output to check which files have failed
Set<String> failedFiles = getFailedFiles(job);
for (CorruptFileInfo fileInfo: jobIndex.get(job)) {
if (failedFiles.contains(fileInfo.getFile().toString())) {
fileInfo.fail();
} else {
// call succeed for files that have succeeded or for which no action
// was taken
fileInfo.succeed();
}
}
}
// report succeeded files to metrics
incrFilesFixed(filesSucceeded);
numJobsRunning--;
}
/**
* checks if jobs have completed and updates job and file index
* returns a list of failed files for restarting
*/
private void checkJobs() throws IOException {
Iterator<Job> jobIter = jobIndex.keySet().iterator();
while(jobIter.hasNext()) {
Job job = jobIter.next();
try {
if (job.isComplete()) {
long filesSucceeded =
job.getCounters().findCounter(Counter.FILES_SUCCEEDED).getValue();
long filesFailed =
job.getCounters().findCounter(Counter.FILES_FAILED).getValue();
long filesNoAction =
job.getCounters().findCounter(Counter.FILES_NOACTION).getValue();
int files = jobIndex.get(job).size();
if (job.isSuccessful() &&
(filesSucceeded + filesFailed + filesNoAction ==
((long) files))) {
// job has processed all files
succeedJob(job, filesSucceeded, filesFailed);
} else {
failJob(job);
}
jobIter.remove();
} else {
LOG.info("job " + job.getJobName() + " still running");
}
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
failJob(job);
try {
job.killJob();
} catch (Exception ee) {
LOG.error(StringUtils.stringifyException(ee));
}
jobIter.remove();
}
}
purgeFileIndex();
}
/**
* determines which files have failed for a given job
*/
private Set<String> getFailedFiles(Job job) throws IOException {
Set<String> failedFiles = new HashSet<String>();
Path outDir = SequenceFileOutputFormat.getOutputPath(job);
FileSystem fs = outDir.getFileSystem(getConf());
if (!fs.getFileStatus(outDir).isDir()) {
throw new IOException(outDir.toString() + " is not a directory");
}
FileStatus[] files = fs.listStatus(outDir);
for (FileStatus f: files) {
Path fPath = f.getPath();
if ((!f.isDir()) && (fPath.getName().startsWith(PART_PREFIX))) {
LOG.info("opening " + fPath.toString());
SequenceFile.Reader reader =
new SequenceFile.Reader(fs, fPath, getConf());
Text key = new Text();
Text value = new Text();
while (reader.next(key, value)) {
failedFiles.add(key.toString());
}
reader.close();
}
}
return failedFiles;
}
/**
* purge expired jobs from the file index
*/
private void purgeFileIndex() {
Iterator<String> fileIter = fileIndex.keySet().iterator();
while(fileIter.hasNext()) {
String file = fileIter.next();
if (fileIndex.get(file).isExpired()) {
fileIter.remove();
}
}
}
/**
* creates and submits a job, updates file index and job index
*/
private Job startJob(String jobName, List<Path> corruptFiles)
throws IOException, InterruptedException, ClassNotFoundException {
Path inDir = new Path(WORK_DIR_PREFIX + "/in/" + jobName);
Path outDir = new Path(WORK_DIR_PREFIX + "/out/" + jobName);
List<Path> filesInJob = createInputFile(jobName, inDir, corruptFiles);
Configuration jobConf = new Configuration(getConf());
if (poolName != null) {
jobConf.set(MAPRED_POOL, poolName);
}
Job job = new Job(jobConf, jobName);
job.setJarByClass(getClass());
job.setMapperClass(DistBlockFixerMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(DistBlockFixerInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
DistBlockFixerInputFormat.setInputPaths(job, inDir);
SequenceFileOutputFormat.setOutputPath(job, outDir);
job.submit();
LOG.info("DistBlockFixer job " + job.getJobID() + "(" + job.getJobName() +
") started");
// submit the job before inserting it into the index
// this way, if submit fails, we won't have added anything to the index
insertJob(job, filesInJob);
return job;
}
/**
* inserts new job into file index and job index
*/
private void insertJob(Job job, List<Path> corruptFiles) {
List<CorruptFileInfo> fileInfos = new LinkedList<CorruptFileInfo>();
for (Path file: corruptFiles) {
CorruptFileInfo fileInfo = new CorruptFileInfo(file, job);
fileInfos.add(fileInfo);
fileIndex.put(file.toString(), fileInfo);
}
jobIndex.put(job, fileInfos);
numJobsRunning++;
}
/**
* creates the input file (containing the names of the files to be fixed
*/
private List<Path> createInputFile(String jobName, Path inDir,
List<Path> corruptFiles)
throws IOException {
Path file = new Path(inDir, jobName + IN_FILE_SUFFIX);
FileSystem fs = file.getFileSystem(getConf());
SequenceFile.Writer fileOut = SequenceFile.createWriter(fs, getConf(), file,
LongWritable.class,
Text.class);
long index = 0L;
List<Path> filesAdded = new LinkedList<Path>();
for (Path corruptFile: corruptFiles) {
if (pendingFiles >= maxPendingFiles) {
break;
}
String corruptFileName = corruptFile.toString();
fileOut.append(new LongWritable(index++), new Text(corruptFileName));
filesAdded.add(corruptFile);
pendingFiles++;
if (index % filesPerTask == 0) {
fileOut.sync(); // create sync point to make sure we can split here
}
}
fileOut.close();
return filesAdded;
}
/**
* gets a list of corrupt files from the name node
* and filters out files that are currently being fixed or
* that were recently fixed
*/
private List<Path> getCorruptFiles() throws IOException {
DistributedFileSystem dfs = (DistributedFileSystem)
(new Path("/")).getFileSystem(getConf());
String[] files = RaidDFSUtil.getCorruptFiles(dfs);
List<Path> corruptFiles = new LinkedList<Path>();
for (String f: files) {
Path p = new Path(f);
// filter out files that are being fixed or that were recently fixed
if (!fileIndex.containsKey(p.toString())) {
corruptFiles.add(p);
}
}
RaidUtils.filterTrash(getConf(), corruptFiles);
return corruptFiles;
}
/**
* returns the number of map reduce jobs running
*/
public int jobsRunning() {
return numJobsRunning;
}
/**
* hold information about a corrupt file that is being fixed
*/
class CorruptFileInfo {
private Path file;
private Job job;
private boolean done;
private long time;
public CorruptFileInfo(Path file, Job job) {
this.file = file;
this.job = job;
this.done = false;
this.time = 0;
}
public boolean isDone() {
return done;
}
public boolean isExpired() {
return done && ((System.currentTimeMillis() - time) > historyInterval);
}
public Path getFile() {
return file;
}
/**
* updates file index to record a failed attempt at fixing a file,
* immediately removes the entry from the file index
* (instead of letting it expire)
* so that we can retry right away
*/
public void fail() {
// remove this file from the index
CorruptFileInfo removed = fileIndex.remove(file.toString());
if (removed == null) {
LOG.error("trying to remove file not in file index: " +
file.toString());
} else {
LOG.error("fixing " + file.toString() + " failed");
}
pendingFiles--;
}
/**
* marks a file as fixed successfully
* and sets time stamp for expiry after specified interval
*/
public void succeed() {
// leave the file in the index,
// will be pruged later
job = null;
done = true;
time = System.currentTimeMillis();
LOG.info("fixing " + file.toString() + " succeeded");
pendingFiles--;
}
}
static class DistBlockFixerInputFormat
extends SequenceFileInputFormat<LongWritable, Text> {
protected static final Log LOG =
LogFactory.getLog(DistBlockFixerMapper.class);
/**
* splits the input files into tasks handled by a single node
* we have to read the input files to do this based on a number of
* items in a sequence
*/
@Override
public List <InputSplit> getSplits(JobContext job)
throws IOException {
long filesPerTask = DistBlockFixer.filesPerTask(job.getConfiguration());
Path[] inPaths = getInputPaths(job);
List<InputSplit> splits = new LinkedList<InputSplit>();
long fileCounter = 0;
for (Path inPath: inPaths) {
FileSystem fs = inPath.getFileSystem(job.getConfiguration());
if (!fs.getFileStatus(inPath).isDir()) {
throw new IOException(inPath.toString() + " is not a directory");
}
FileStatus[] inFiles = fs.listStatus(inPath);
for (FileStatus inFileStatus: inFiles) {
Path inFile = inFileStatus.getPath();
if (!inFileStatus.isDir() &&
(inFile.getName().equals(job.getJobName() + IN_FILE_SUFFIX))) {
fileCounter++;
SequenceFile.Reader inFileReader =
new SequenceFile.Reader(fs, inFile, job.getConfiguration());
long startPos = inFileReader.getPosition();
long counter = 0;
// create an input split every filesPerTask items in the sequence
LongWritable key = new LongWritable();
Text value = new Text();
try {
while (inFileReader.next(key, value)) {
if (counter % filesPerTask == filesPerTask - 1L) {
splits.add(new FileSplit(inFile, startPos,
inFileReader.getPosition() -
startPos,
null));
startPos = inFileReader.getPosition();
}
counter++;
}
// create input split for remaining items if necessary
// this includes the case where no splits were created by the loop
if (startPos != inFileReader.getPosition()) {
splits.add(new FileSplit(inFile, startPos,
inFileReader.getPosition() - startPos,
null));
}
} finally {
inFileReader.close();
}
}
}
}
LOG.info("created " + splits.size() + " input splits from " +
fileCounter + " files");
return splits;
}
/**
* indicates that input file can be split
*/
@Override
public boolean isSplitable (JobContext job, Path file) {
return true;
}
}
/**
* mapper for fixing stripes with corrupt blocks
*/
static class DistBlockFixerMapper
extends Mapper<LongWritable, Text, Text, Text> {
protected static final Log LOG =
LogFactory.getLog(DistBlockFixerMapper.class);
/**
* fix a stripe
*/
@Override
public void map(LongWritable key, Text fileText, Context context)
throws IOException, InterruptedException {
BlockFixerHelper helper =
new BlockFixerHelper(context.getConfiguration());
String fileStr = fileText.toString();
LOG.info("fixing " + fileStr);
Path file = new Path(fileStr);
boolean success = false;
try {
boolean fixed = helper.fixFile(file, context);
if (fixed) {
context.getCounter(Counter.FILES_SUCCEEDED).increment(1L);
} else {
context.getCounter(Counter.FILES_NOACTION).increment(1L);
}
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
// report file as failed
context.getCounter(Counter.FILES_FAILED).increment(1L);
String outkey = fileStr;
String outval = "failed";
context.write(new Text(outkey), new Text(outval));
}
context.progress();
}
}
}