blob: 52b3c5ca003086bd04eb7f643b873a365c814b12 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples.terasort;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Generate 1 mapper per a file that checks to make sure the keys
* are sorted within each file. The mapper also generates
* "$file:begin", first key and "$file:end", last key. The reduce verifies that
* all of the start/end items are in order.
* Any output from the reduce is problem report.
* <p>
* To run the program:
* <b>bin/hadoop jar hadoop-examples-*.jar teravalidate out-dir report-dir</b>
* <p>
* If there is any output, something is wrong and the output of the reduce
* will have the problem report.
*/
public class TeraValidate extends Configured implements Tool {
private static final Text error = new Text("error");
static class ValidateMapper extends MapReduceBase
implements Mapper<Text,Text,Text,Text> {
private Text lastKey;
private OutputCollector<Text,Text> output;
private String filename;
/**
* Get the final part of the input name
* @param split the input split
* @return the "part-00000" for the input
*/
private String getFilename(FileSplit split) {
return split.getPath().getName();
}
public void map(Text key, Text value, OutputCollector<Text,Text> output,
Reporter reporter) throws IOException {
if (lastKey == null) {
filename = getFilename((FileSplit) reporter.getInputSplit());
output.collect(new Text(filename + ":begin"), key);
lastKey = new Text();
this.output = output;
} else {
if (key.compareTo(lastKey) < 0) {
output.collect(error, new Text("misorder in " + filename +
" last: '" + lastKey +
"' current: '" + key + "'"));
}
}
lastKey.set(key);
}
public void close() throws IOException {
if (lastKey != null) {
output.collect(new Text(filename + ":end"), lastKey);
}
}
}
/**
* Check the boundaries between the output files by making sure that the
* boundary keys are always increasing.
* Also passes any error reports along intact.
*/
static class ValidateReducer extends MapReduceBase
implements Reducer<Text,Text,Text,Text> {
private boolean firstKey = true;
private Text lastKey = new Text();
private Text lastValue = new Text();
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
if (error.equals(key)) {
while(values.hasNext()) {
output.collect(key, values.next());
}
} else {
Text value = values.next();
if (firstKey) {
firstKey = false;
} else {
if (value.compareTo(lastValue) < 0) {
output.collect(error,
new Text("misordered keys last: " +
lastKey + " '" + lastValue +
"' current: " + key + " '" + value + "'"));
}
}
lastKey.set(key);
lastValue.set(value);
}
}
}
public int run(String[] args) throws Exception {
JobConf job = (JobConf) getConf();
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraValidate");
job.setJarByClass(TeraValidate.class);
job.setMapperClass(ValidateMapper.class);
job.setReducerClass(ValidateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// force a single reducer
job.setNumReduceTasks(1);
// force a single split
job.setLong("mapred.min.split.size", Long.MAX_VALUE);
job.setInputFormat(TeraInputFormat.class);
JobClient.runJob(job);
return 0;
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new JobConf(), new TeraValidate(), args);
System.exit(res);
}
}