blob: d6d0339d51c1fc1a8ddb594eb3f59d75a05f4446 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A sample MR job that helps with testing large sorts in the MapReduce
* framework. Mapper generates the specified number of bytes and pipes them
* to the reducers.
*
* <code>mapreduce.large-sorter.mbs-per-map</code> specifies the amount
* of data (in MBs) to generate per map. By default, this is twice the value
* of <code>mapreduce.task.io.sort.mb</code> or 1 GB if that is not specified
* either.
* <code>mapreduce.large-sorter.map-tasks</code> specifies the number of map
* tasks to run.
* <code>mapreduce.large-sorter.reduce-tasks</code> specifies the number of
* reduce tasks to run.
*/
public class LargeSorter extends Configured implements Tool {
private static final String LS_PREFIX = "mapreduce.large-sorter.";
public static final String MBS_PER_MAP = LS_PREFIX + "mbs-per-map";
public static final String NUM_MAP_TASKS = LS_PREFIX + "map-tasks";
public static final String NUM_REDUCE_TASKS = LS_PREFIX + "reduce-tasks";
private static final String MAX_VALUE = LS_PREFIX + "max-value";
private static final String MIN_VALUE = LS_PREFIX + "min-value";
private static final String MIN_KEY = LS_PREFIX + "min-key";
private static final String MAX_KEY = LS_PREFIX + "max-key";
/**
* User counters
*/
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
/**
* A custom input format that creates virtual inputs of a single string
* for each map.
*/
static class RandomInputFormat extends InputFormat<Text, Text> {
/**
* Generate the requested number of file splits, with the filename
* set to the filename of the output file.
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> result = new ArrayList<InputSplit>();
Path outDir = FileOutputFormat.getOutputPath(job);
int numSplits =
job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
for(int i=0; i < numSplits; ++i) {
result.add(new FileSplit(
new Path(outDir, "dummy-split-" + i), 0, 1, null));
}
return result;
}
/**
* Return a single record (filename, "") where the filename is taken from
* the file split.
*/
static class RandomRecordReader extends RecordReader<Text, Text> {
Path name;
Text key = null;
Text value = new Text();
public RandomRecordReader(Path p) {
name = p;
}
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
}
public boolean nextKeyValue() {
if (name != null) {
key = new Text();
key.set(name.getName());
name = null;
return true;
}
return false;
}
public Text getCurrentKey() {
return key;
}
public Text getCurrentValue() {
return value;
}
public void close() {}
public float getProgress() {
return 0.0f;
}
}
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new RandomRecordReader(((FileSplit) split).getPath());
}
}
static class RandomMapper extends Mapper<WritableComparable, Writable,
BytesWritable, BytesWritable> {
private long numBytesToWrite;
private int minKeySize;
private int keySizeRange;
private int minValueSize;
private int valueSizeRange;
private Random random = new Random();
private BytesWritable randomKey = new BytesWritable();
private BytesWritable randomValue = new BytesWritable();
private void randomizeBytes(byte[] data, int offset, int length) {
for(int i=offset + length - 1; i >= offset; --i) {
data[i] = (byte) random.nextInt(256);
}
}
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
numBytesToWrite = 1024 * 1024 * conf.getLong(MBS_PER_MAP,
2 * conf.getInt(MRJobConfig.IO_SORT_MB, 512));
minKeySize = conf.getInt(MIN_KEY, 10);
keySizeRange =
conf.getInt(MAX_KEY, 1000) - minKeySize;
minValueSize = conf.getInt(MIN_VALUE, 0);
valueSizeRange =
conf.getInt(MAX_VALUE, 20000) - minValueSize;
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
}
static class Discarder extends Reducer<BytesWritable, BytesWritable,
WritableComparable, Writable> {
@Override
public void reduce(BytesWritable key, Iterable<BytesWritable> values,
Context context) throws IOException, InterruptedException {
// Do nothing
}
}
private void verifyNotZero(Configuration conf, String config) {
if (conf.getInt(config, 1) <= 0) {
throw new IllegalArgumentException(config + "should be > 0");
}
}
public int run(String[] args) throws Exception {
Path outDir = new Path(
LargeSorter.class.getName() + System.currentTimeMillis());
Configuration conf = getConf();
verifyNotZero(conf, MBS_PER_MAP);
verifyNotZero(conf, NUM_MAP_TASKS);
conf.setInt(MRJobConfig.NUM_MAPS, conf.getInt(NUM_MAP_TASKS, 2));
int ioSortMb = conf.getInt(MRJobConfig.IO_SORT_MB, 512);
int mapMb = Math.max(2 * ioSortMb, conf.getInt(MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb);
conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
Job job = Job.getInstance(conf);
job.setJarByClass(LargeSorter.class);
job.setJobName("large-sorter");
FileOutputFormat.setOutputPath(job, outDir);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(RandomInputFormat.class);
job.setMapperClass(RandomMapper.class);
job.setReducerClass(Discarder.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS, 1));
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = 1;
try {
ret = job.waitForCompletion(true) ? 0 : 1;
} finally {
FileSystem.get(conf).delete(outDir, true);
}
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return ret;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new LargeSorter(), args);
System.exit(res);
}
}