blob: 3f0d9421c54cddfd31b0ab45b3c1b77878b53b76 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
/**
* This is an wordcount application that tests the count of records
* got spilled to disk. It generates simple text input files. Then
* runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
* and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
* and 1 reduce) and verifies counters. Wordcount application reads the
* text input files, breaks each line into words and counts them. The output
* is a locally sorted list of words and the count of how often they occurred.
*
*/
public class TestJobCounters {
private void validateCounters(Counters counter, long spillRecCnt,
long mapInputRecords, long mapOutputRecords) {
// Check if the numer of Spilled Records is same as expected
assertEquals(spillRecCnt,
counter.findCounter(TaskCounter.SPILLED_RECORDS).getCounter());
assertEquals(mapInputRecords,
counter.findCounter(TaskCounter.MAP_INPUT_RECORDS).getCounter());
assertEquals(mapOutputRecords,
counter.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter());
}
private void removeWordsFile(Path inpFile, Configuration conf)
throws IOException {
final FileSystem fs = inpFile.getFileSystem(conf);
if (fs.exists(inpFile) && !fs.delete(inpFile, false)) {
throw new IOException("Failed to delete " + inpFile);
}
}
private static void createWordsFile(Path inpFile, Configuration conf)
throws IOException {
final FileSystem fs = inpFile.getFileSystem(conf);
if (fs.exists(inpFile)) {
return;
}
FSDataOutputStream out = fs.create(inpFile);
try {
// 1024*4 unique words --- repeated 5 times => 5*2K words
int REPLICAS=5, NUMLINES=1024, NUMWORDSPERLINE=4;
final String WORD = "zymurgy"; // 7 bytes + 4 id bytes
final Formatter fmt = new Formatter(new StringBuilder());
for (int i = 0; i < REPLICAS; i++) {
for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
((StringBuilder)fmt.out()).setLength(0);
for (int k = 0; k < NUMWORDSPERLINE; ++k) {
fmt.format("%s%04d ", WORD, j + k);
}
((StringBuilder)fmt.out()).append("\n");
out.writeBytes(fmt.toString());
}
}
} finally {
out.close();
}
}
private static Path IN_DIR = null;
private static Path OUT_DIR = null;
private static Path testdir = null;
@BeforeClass
public static void initPaths() throws IOException {
final Configuration conf = new Configuration();
final Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data", "/tmp"));
testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
IN_DIR = new Path(testdir, "in");
OUT_DIR = new Path(testdir, "out");
FileSystem fs = FileSystem.getLocal(conf);
testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
if (fs.exists(testdir) && !fs.delete(testdir, true)) {
throw new IOException("Could not delete " + testdir);
}
if (!fs.mkdirs(IN_DIR)) {
throw new IOException("Mkdirs failed to create " + IN_DIR);
}
// create 3 input files each with 5*2k words
createWordsFile(new Path(IN_DIR, "input5_2k_1"), conf);
createWordsFile(new Path(IN_DIR, "input5_2k_2"), conf);
createWordsFile(new Path(IN_DIR, "input5_2k_3"), conf);
}
@AfterClass
public static void cleanup() throws IOException {
//clean up the input and output files
final Configuration conf = new Configuration();
final FileSystem fs = testdir.getFileSystem(conf);
if (fs.exists(testdir)) {
fs.delete(testdir, true);
}
}
public static JobConf createConfiguration() throws IOException {
JobConf baseConf = new JobConf(TestJobCounters.class);
baseConf.setOutputKeyClass(Text.class);
baseConf.setOutputValueClass(IntWritable.class);
baseConf.setMapperClass(WordCount.MapClass.class);
baseConf.setCombinerClass(WordCount.Reduce.class);
baseConf.setReducerClass(WordCount.Reduce.class);
baseConf.setNumReduceTasks(1);
baseConf.setInt(JobContext.IO_SORT_MB, 1);
baseConf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.50");
baseConf.setInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
return baseConf;
}
public static Job createJob() throws IOException {
final Configuration conf = new Configuration();
final Job baseJob = Job.getInstance(new Cluster(conf), conf);
baseJob.setOutputKeyClass(Text.class);
baseJob.setOutputValueClass(IntWritable.class);
baseJob.setMapperClass(NewMapTokenizer.class);
baseJob.setCombinerClass(NewSummer.class);
baseJob.setReducerClass(NewSummer.class);
baseJob.setNumReduceTasks(1);
baseJob.getConfiguration().setInt(JobContext.IO_SORT_MB, 1);
baseJob.getConfiguration().set(JobContext.MAP_SORT_SPILL_PERCENT, "0.50");
baseJob.getConfiguration().setInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMinInputSplitSize(
baseJob, Long.MAX_VALUE);
return baseJob;
}
@Test
public void testOldCounterA() throws Exception {
JobConf conf = createConfiguration();
conf.setNumMapTasks(3);
conf.setInt(JobContext.IO_SORT_FACTOR, 2);
removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
FileInputFormat.setInputPaths(conf, IN_DIR);
FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO0"));
RunningJob myJob = JobClient.runJob(conf);
Counters c1 = myJob.getCounters();
// Each record requires 16 bytes of metadata, 16 bytes per serialized rec
// (vint word len + word + IntWritable) = (1 + 11 + 4)
// (2^20 buf * .5 spill pcnt) / 32 bytes/record = 2^14 recs per spill
// Each file contains 5 replicas of 4096 words, so the first spill will
// contain 4 (2^14 rec / 2^12 rec/replica) replicas, the second just one.
// Each map spills twice, emitting 4096 records per spill from the
// combiner per spill. The merge adds an additional 8192 records, as
// there are too few spills to combine (2 < 3)
// Each map spills 2^14 records, so maps spill 49152 records, combined.
// The reduce spill count is composed of the read from one segment and
// the intermediate merge of the other two. The intermediate merge
// adds 8192 records per segment read; again, there are too few spills to
// combine, so all 16834 are written to disk (total 32768 spilled records
// for the intermediate merge). The merge into the reduce includes only
// the unmerged segment, size 8192. Total spilled records in the reduce
// is 32768 from the merge + 8192 unmerged segment = 40960 records
// Total: map + reduce = 49152 + 40960 = 90112
// 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 61440 output records
validateCounters(c1, 90112, 15360, 61440);
}
@Test
public void testOldCounterB() throws Exception {
JobConf conf = createConfiguration();
createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
conf.setNumMapTasks(4);
conf.setInt(JobContext.IO_SORT_FACTOR, 2);
FileInputFormat.setInputPaths(conf, IN_DIR);
FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO1"));
RunningJob myJob = JobClient.runJob(conf);
Counters c1 = myJob.getCounters();
// As above, each map spills 2^14 records, so 4 maps spill 2^16 records
// In the reduce, there are two intermediate merges before the reduce.
// 1st merge: read + write = 8192 * 4
// 2nd merge: read + write = 8192 * 4
// final merge: 0
// Total reduce: 65536
// Total: map + reduce = 2^16 + 2^16 = 131072
// 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 81920 output records
validateCounters(c1, 131072, 20480, 81920);
}
@Test
public void testOldCounterC() throws Exception {
JobConf conf = createConfiguration();
createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
conf.setNumMapTasks(4);
conf.setInt(JobContext.IO_SORT_FACTOR, 3);
FileInputFormat.setInputPaths(conf, IN_DIR);
FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO2"));
RunningJob myJob = JobClient.runJob(conf);
Counters c1 = myJob.getCounters();
// As above, each map spills 2^14 records, so 5 maps spill 81920
// 1st merge: read + write = 6 * 8192
// final merge: unmerged = 2 * 8192
// Total reduce: 45056
// 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 102400 output records
validateCounters(c1, 147456, 25600, 102400);
}
@Test
public void testNewCounterA() throws Exception {
final Job job = createJob();
final Configuration conf = job.getConfiguration();
conf.setInt(JobContext.IO_SORT_FACTOR, 2);
removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
job, IN_DIR);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
job, new Path(OUT_DIR, "outputN0"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
validateCounters(c1, 90112, 15360, 61440);
}
@Test
public void testNewCounterB() throws Exception {
final Job job = createJob();
final Configuration conf = job.getConfiguration();
conf.setInt(JobContext.IO_SORT_FACTOR, 2);
createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
job, IN_DIR);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
job, new Path(OUT_DIR, "outputN1"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
validateCounters(c1, 131072, 20480, 81920);
}
@Test
public void testNewCounterC() throws Exception {
final Job job = createJob();
final Configuration conf = job.getConfiguration();
conf.setInt(JobContext.IO_SORT_FACTOR, 3);
createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
job, IN_DIR);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
job, new Path(OUT_DIR, "outputN2"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
validateCounters(c1, 147456, 25600, 102400);
}
/**
* Increases the JVM's heap usage to the specified target value.
*/
static class MemoryLoader {
private static final int DEFAULT_UNIT_LOAD_SIZE = 10 * 1024 * 1024; // 10mb
// the target value to reach
private long targetValue;
// a list to hold the load objects
private List<String> loadObjects = new ArrayList<String>();
MemoryLoader(long targetValue) {
this.targetValue = targetValue;
}
/**
* Loads the memory to the target value.
*/
void load() {
while (Runtime.getRuntime().totalMemory() < targetValue) {
System.out.println("Loading memory with " + DEFAULT_UNIT_LOAD_SIZE
+ " characters. Current usage : "
+ Runtime.getRuntime().totalMemory());
// load some objects in the memory
loadObjects.add(RandomStringUtils.random(DEFAULT_UNIT_LOAD_SIZE));
// sleep for 100ms
try {
Thread.sleep(100);
} catch (InterruptedException ie) {}
}
}
}
/**
* A mapper that increases the JVM's heap usage to a target value configured
* via {@link MemoryLoaderMapper#TARGET_VALUE} using a {@link MemoryLoader}.
*/
@SuppressWarnings({"deprecation", "unchecked"})
static class MemoryLoaderMapper
extends MapReduceBase
implements Mapper<WritableComparable, Writable,
WritableComparable, Writable> {
static final String TARGET_VALUE = "map.memory-loader.target-value";
private static MemoryLoader loader = null;
public void map(WritableComparable key, Writable val,
OutputCollector<WritableComparable, Writable> output,
Reporter reporter)
throws IOException {
assertNotNull("Mapper not configured!", loader);
// load the memory
loader.load();
// work as identity mapper
output.collect(key, val);
}
public void configure(JobConf conf) {
loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
}
}
/**
* A reducer that increases the JVM's heap usage to a target value configured
* via {@link MemoryLoaderReducer#TARGET_VALUE} using a {@link MemoryLoader}.
*/
@SuppressWarnings({"deprecation", "unchecked"})
static class MemoryLoaderReducer extends MapReduceBase
implements Reducer<WritableComparable, Writable,
WritableComparable, Writable> {
static final String TARGET_VALUE = "reduce.memory-loader.target-value";
private static MemoryLoader loader = null;
public void reduce(WritableComparable key, Iterator<Writable> val,
OutputCollector<WritableComparable, Writable> output,
Reporter reporter)
throws IOException {
assertNotNull("Reducer not configured!", loader);
// load the memory
loader.load();
// work as identity reducer
output.collect(key, key);
}
public void configure(JobConf conf) {
loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
}
}
@SuppressWarnings("deprecation")
private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
int taskId, TaskType type)
throws Exception {
TaskReport[] reports = null;
if (TaskType.MAP.equals(type)) {
reports = client.getMapTaskReports(id);
} else if (TaskType.REDUCE.equals(type)) {
reports = client.getReduceTaskReports(id);
}
assertNotNull("No reports found for task type '" + type.name()
+ "' in job " + id, reports);
// make sure that the total number of reports match the expected
assertEquals("Mismatch in task id", numReports, reports.length);
Counters counters = reports[taskId].getCounters();
return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES);
}
// set up heap options, target value for memory loader and the output
// directory before running the job
@SuppressWarnings("deprecation")
private static RunningJob runHeapUsageTestJob(JobConf conf, Path testRootDir,
String heapOptions, long targetMapValue,
long targetReduceValue, FileSystem fs,
JobClient client, Path inDir)
throws IOException {
// define a job
JobConf jobConf = new JobConf(conf);
// configure the jobs
jobConf.setNumMapTasks(1);
jobConf.setNumReduceTasks(1);
jobConf.setMapperClass(MemoryLoaderMapper.class);
jobConf.setReducerClass(MemoryLoaderReducer.class);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setMaxMapAttempts(1);
jobConf.setMaxReduceAttempts(1);
jobConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, heapOptions);
// set the targets
jobConf.setLong(MemoryLoaderMapper.TARGET_VALUE, targetMapValue);
jobConf.setLong(MemoryLoaderReducer.TARGET_VALUE, targetReduceValue);
// set the input directory for the job
FileInputFormat.setInputPaths(jobConf, inDir);
// define job output folder
Path outDir = new Path(testRootDir, "out");
fs.delete(outDir, true);
FileOutputFormat.setOutputPath(jobConf, outDir);
// run the job
RunningJob job = client.submitJob(jobConf);
job.waitForCompletion();
JobID jobID = job.getID();
assertTrue("Job " + jobID + " failed!", job.isSuccessful());
return job;
}
/**
* Tests {@link TaskCounter}'s {@link TaskCounter.COMMITTED_HEAP_BYTES}.
* The test consists of running a low-memory job which consumes less heap
* memory and then running a high-memory job which consumes more heap memory,
* and then ensuring that COMMITTED_HEAP_BYTES of low-memory job is smaller
* than that of the high-memory job.
* @throws IOException
*/
@Test
@SuppressWarnings("deprecation")
public void testHeapUsageCounter() throws Exception {
JobConf conf = new JobConf();
// create a local filesystem handle
FileSystem fileSystem = FileSystem.getLocal(conf);
// define test root directories
Path rootDir =
new Path(System.getProperty("test.build.data", "/tmp"));
Path testRootDir = new Path(rootDir, "testHeapUsageCounter");
// cleanup the test root directory
fileSystem.delete(testRootDir, true);
// set the current working directory
fileSystem.setWorkingDirectory(testRootDir);
fileSystem.deleteOnExit(testRootDir);
// create a mini cluster using the local file system
MiniMRCluster mrCluster =
new MiniMRCluster(1, fileSystem.getUri().toString(), 1);
try {
conf = mrCluster.createJobConf();
JobClient jobClient = new JobClient(conf);
// define job input
Path inDir = new Path(testRootDir, "in");
// create input data
createWordsFile(inDir, conf);
// configure and run a low memory job which will run without loading the
// jvm's heap
RunningJob lowMemJob =
runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G",
0, 0, fileSystem, jobClient, inDir);
JobID lowMemJobID = lowMemJob.getID();
long lowMemJobMapHeapUsage = getTaskCounterUsage(jobClient, lowMemJobID,
1, 0, TaskType.MAP);
System.out.println("Job1 (low memory job) map task heap usage: "
+ lowMemJobMapHeapUsage);
long lowMemJobReduceHeapUsage =
getTaskCounterUsage(jobClient, lowMemJobID, 1, 0, TaskType.REDUCE);
System.out.println("Job1 (low memory job) reduce task heap usage: "
+ lowMemJobReduceHeapUsage);
// configure and run a high memory job which will load the jvm's heap
RunningJob highMemJob =
runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G",
lowMemJobMapHeapUsage + 256*1024*1024,
lowMemJobReduceHeapUsage + 256*1024*1024,
fileSystem, jobClient, inDir);
JobID highMemJobID = highMemJob.getID();
long highMemJobMapHeapUsage = getTaskCounterUsage(jobClient, highMemJobID,
1, 0, TaskType.MAP);
System.out.println("Job2 (high memory job) map task heap usage: "
+ highMemJobMapHeapUsage);
long highMemJobReduceHeapUsage =
getTaskCounterUsage(jobClient, highMemJobID, 1, 0, TaskType.REDUCE);
System.out.println("Job2 (high memory job) reduce task heap usage: "
+ highMemJobReduceHeapUsage);
assertTrue("Incorrect map heap usage reported by the map task",
lowMemJobMapHeapUsage < highMemJobMapHeapUsage);
assertTrue("Incorrect reduce heap usage reported by the reduce task",
lowMemJobReduceHeapUsage < highMemJobReduceHeapUsage);
} finally {
// shutdown the mr cluster
mrCluster.shutdown();
try {
fileSystem.delete(testRootDir, true);
} catch (IOException ioe) {}
}
}
public static class NewMapTokenizer
extends org.apache.hadoop.mapreduce.Mapper<Object,Text,Text,IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class NewSummer
extends org.apache.hadoop.mapreduce.Reducer<Text,IntWritable,
Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}