blob: f2a2e236dd46d8f06d6838a53eb9b91e9bc5bc25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.File;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.RandomWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Distributed threaded map benchmark.
* <p>
* This benchmark generates random data per map and tests the performance
* of having multiple spills (using multiple threads) over having just one
* spill. Following are the parameters that can be specified
* <li>File size per map.
* <li>Number of spills per map.
* <li>Number of maps per host.
* <p>
* Sort is used for benchmarking the performance.
*/
public class ThreadedMapBenchmark extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(ThreadedMapBenchmark.class);
private static Path BASE_DIR =
new Path(System.getProperty("test.build.data",
File.separator + "benchmarks" + File.separator
+ "ThreadedMapBenchmark"));
private static Path INPUT_DIR = new Path(BASE_DIR, "input");
private static Path OUTPUT_DIR = new Path(BASE_DIR, "output");
private static final float FACTOR = 2.3f; // mapreduce.task.io.sort.mb set to
// (FACTOR * data_size) should
// result in only 1 spill
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
/**
* Generates random input data of given size with keys and values of given
* sizes. By default it generates 128mb input data with 10 byte keys and 10
* byte values.
*/
public static class Map extends MapReduceBase
implements Mapper<WritableComparable, Writable,
BytesWritable, BytesWritable> {
private long numBytesToWrite;
private int minKeySize;
private int keySizeRange;
private int minValueSize;
private int valueSizeRange;
private Random random = new Random();
private BytesWritable randomKey = new BytesWritable();
private BytesWritable randomValue = new BytesWritable();
private void randomizeBytes(byte[] data, int offset, int length) {
for(int i = offset + length - 1; i >= offset; --i) {
data[i] = (byte) random.nextInt(256);
}
}
public void map(WritableComparable key,
Writable value,
OutputCollector<BytesWritable, BytesWritable> output,
Reporter reporter) throws IOException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize
+ (keySizeRange != 0
? random.nextInt(keySizeRange)
: 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize
+ (valueSizeRange != 0
? random.nextInt(valueSizeRange)
: 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
output.collect(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
reporter.incrCounter(Counters.BYTES_WRITTEN, 1);
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
if (++itemCount % 200 == 0) {
reporter.setStatus("wrote record " + itemCount + ". "
+ numBytesToWrite + " bytes left.");
}
}
reporter.setStatus("done with " + itemCount + " records.");
}
@Override
public void configure(JobConf job) {
numBytesToWrite = job.getLong("test.tmb.bytes_per_map",
128 * 1024 * 1024);
minKeySize = job.getInt("test.tmb.min_key", 10);
keySizeRange = job.getInt("test.tmb.max_key", 10) - minKeySize;
minValueSize = job.getInt("test.tmb.min_value", 10);
valueSizeRange = job.getInt("test.tmb.max_value", 10) - minValueSize;
}
}
/**
* Generate input data for the benchmark
*/
public static void generateInputData(int dataSizePerMap,
int numSpillsPerMap,
int numMapsPerHost,
JobConf masterConf)
throws Exception {
JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
job.setJobName("threaded-map-benchmark-random-writer");
job.setJarByClass(ThreadedMapBenchmark.class);
job.setInputFormat(UtilsForTests.RandomInputFormat.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
long totalDataSize = dataSizePerMap * numMapsPerHost
* cluster.getTaskTrackers();
job.set("test.tmb.bytes_per_map",
String.valueOf(dataSizePerMap * 1024 * 1024));
job.setNumReduceTasks(0); // none reduce
job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
FileOutputFormat.setOutputPath(job, INPUT_DIR);
FileSystem fs = FileSystem.get(job);
fs.delete(BASE_DIR, true);
LOG.info("Generating random input for the benchmark");
LOG.info("Total data : " + totalDataSize + " mb");
LOG.info("Data per map: " + dataSizePerMap + " mb");
LOG.info("Number of spills : " + numSpillsPerMap);
LOG.info("Number of maps per host : " + numMapsPerHost);
LOG.info("Number of hosts : " + cluster.getTaskTrackers());
JobClient.runJob(job); // generates the input for the benchmark
}
/**
* This is the main routine for launching the benchmark. It generates random
* input data. The input is non-splittable. Sort is used for benchmarking.
* This benchmark reports the effect of having multiple sort and spill
* cycles over a single sort and spill.
*
* @throws IOException
*/
public int run (String[] args) throws Exception {
LOG.info("Starting the benchmark for threaded spills");
String version = "ThreadedMapBenchmark.0.0.1";
System.out.println(version);
String usage =
"Usage: threadedmapbenchmark " +
"[-dataSizePerMap <data size (in mb) per map, default is 128 mb>] " +
"[-numSpillsPerMap <number of spills per map, default is 2>] " +
"[-numMapsPerHost <number of maps per host, default is 1>]";
int dataSizePerMap = 128; // in mb
int numSpillsPerMap = 2;
int numMapsPerHost = 1;
JobConf masterConf = new JobConf(getConf());
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equals("-dataSizePerMap")) {
dataSizePerMap = Integer.parseInt(args[++i]);
} else if (args[i].equals("-numSpillsPerMap")) {
numSpillsPerMap = Integer.parseInt(args[++i]);
} else if (args[i].equals("-numMapsPerHost")) {
numMapsPerHost = Integer.parseInt(args[++i]);
} else {
System.err.println(usage);
System.exit(-1);
}
}
if (dataSizePerMap < 1 || // verify arguments
numSpillsPerMap < 1 ||
numMapsPerHost < 1)
{
System.err.println(usage);
System.exit(-1);
}
FileSystem fs = null;
try {
// using random-writer to generate the input data
generateInputData(dataSizePerMap, numSpillsPerMap, numMapsPerHost,
masterConf);
// configure job for sorting
JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
job.setJobName("threaded-map-benchmark-unspilled");
job.setJarByClass(ThreadedMapBenchmark.class);
job.setInputFormat(NonSplitableSequenceFileInputFormat.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
FileInputFormat.addInputPath(job, INPUT_DIR);
FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
job.setNumReduceTasks(1);
// set mapreduce.task.io.sort.mb to avoid spill
int ioSortMb = (int)Math.ceil(FACTOR * dataSizePerMap);
job.set(JobContext.IO_SORT_MB, String.valueOf(ioSortMb));
fs = FileSystem.get(job);
LOG.info("Running sort with 1 spill per map");
long startTime = System.currentTimeMillis();
JobClient.runJob(job);
long endTime = System.currentTimeMillis();
LOG.info("Total time taken : " + String.valueOf(endTime - startTime)
+ " millisec");
fs.delete(OUTPUT_DIR, true);
// set mapreduce.task.io.sort.mb to have multiple spills
JobConf spilledJob = new JobConf(job, ThreadedMapBenchmark.class);
ioSortMb = (int)Math.ceil(FACTOR
* Math.ceil((double)dataSizePerMap
/ numSpillsPerMap));
spilledJob.set(JobContext.IO_SORT_MB, String.valueOf(ioSortMb));
spilledJob.setJobName("threaded-map-benchmark-spilled");
spilledJob.setJarByClass(ThreadedMapBenchmark.class);
LOG.info("Running sort with " + numSpillsPerMap + " spills per map");
startTime = System.currentTimeMillis();
JobClient.runJob(spilledJob);
endTime = System.currentTimeMillis();
LOG.info("Total time taken : " + String.valueOf(endTime - startTime)
+ " millisec");
} finally {
if (fs != null) {
fs.delete(BASE_DIR, true);
}
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new ThreadedMapBenchmark(), args);
System.exit(res);
}
}