blob: c5ef1cf6a3bca284d8937e8ec4690b1bbc2dc405 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* This is an wordcount application that tests job counters.
* It generates simple text input files. Then
* runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
* and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
* and 1 reduce) and verifies counters. Wordcount application reads the
* text input files, breaks each line into words and counts them. The output
* is a locally sorted list of words and the count of how often they occurred.
*
*/
public class TestJobCounters {
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
File.separator + "tmp")).toString().replace(' ', '+');
private void validateCounters(Counters counter, long spillRecCnt,
long mapInputRecords, long mapOutputRecords) {
// Check if the numer of Spilled Records is same as expected
assertEquals(spillRecCnt,
counter.findCounter(TaskCounter.SPILLED_RECORDS).getCounter());
assertEquals(mapInputRecords,
counter.findCounter(TaskCounter.MAP_INPUT_RECORDS).getCounter());
assertEquals(mapOutputRecords,
counter.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter());
}
private void createWordsFile(File inpFile) throws Exception {
Writer out = new BufferedWriter(new FileWriter(inpFile));
try {
// 500*4 unique words --- repeated 5 times => 5*2K words
int REPLICAS=5, NUMLINES=500, NUMWORDSPERLINE=4;
for (int i = 0; i < REPLICAS; i++) {
for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
out.write("word" + j + " word" + (j+1) + " word" + (j+2)
+ " word" + (j+3) + '\n');
}
}
} finally {
out.close();
}
}
/**
* The main driver for word count map/reduce program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the
* job tracker.
*/
@Test
public void testOldJobWithMapAndReducers() throws Exception {
JobConf conf = new JobConf(TestJobCounters.class);
conf.setJobName("wordcount-map-reducers");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WordCount.MapClass.class);
conf.setCombinerClass(WordCount.Reduce.class);
conf.setReducerClass(WordCount.Reduce.class);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(1);
conf.setInt(JobContext.IO_SORT_MB, 1);
conf.setInt(JobContext.IO_SORT_FACTOR, 2);
conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
FileSystem fs = FileSystem.get(conf);
Path testDir = new Path(TEST_ROOT_DIR, "countertest");
conf.set("test.build.data", testDir.toString());
try {
if (fs.exists(testDir)) {
fs.delete(testDir, true);
}
if (!fs.mkdirs(testDir)) {
throw new IOException("Mkdirs failed to create " + testDir.toString());
}
String inDir = testDir + File.separator + "genins" + File.separator;
String outDir = testDir + File.separator;
Path wordsIns = new Path(inDir);
if (!fs.mkdirs(wordsIns)) {
throw new IOException("Mkdirs failed to create " + wordsIns.toString());
}
//create 3 input files each with 5*2k words
File inpFile = new File(inDir + "input5_2k_1");
createWordsFile(inpFile);
inpFile = new File(inDir + "input5_2k_2");
createWordsFile(inpFile);
inpFile = new File(inDir + "input5_2k_3");
createWordsFile(inpFile);
FileInputFormat.setInputPaths(conf, inDir);
Path outputPath1 = new Path(outDir, "output5_2k_3");
FileOutputFormat.setOutputPath(conf, outputPath1);
RunningJob myJob = JobClient.runJob(conf);
Counters c1 = myJob.getCounters();
// 3maps & in each map, 4 first level spills --- So total 12.
// spilled records count:
// Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
// 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
// So total 8k+8k+2k=18k
// For 3 Maps, total = 3*18=54k
// Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
// So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
// 3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
// So 0 records spilled to disk in 3rd level)
// So total of 6k+4k=10k
// Total job counter will be 54k+10k = 64k
//3 maps and 2.5k lines --- So total 7.5k map input records
//3 maps and 10k words in each --- So total of 30k map output recs
validateCounters(c1, 64000, 7500, 30000);
//create 4th input file each with 5*2k words and test with 4 maps
inpFile = new File(inDir + "input5_2k_4");
createWordsFile(inpFile);
conf.setNumMapTasks(4);
Path outputPath2 = new Path(outDir, "output5_2k_4");
FileOutputFormat.setOutputPath(conf, outputPath2);
myJob = JobClient.runJob(conf);
c1 = myJob.getCounters();
// 4maps & in each map 4 first level spills --- So total 16.
// spilled records count:
// Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
// 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
// So total 8k+8k+2k=18k
// For 3 Maps, total = 4*18=72k
// Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
// So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
// 3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
// So 0 records spilled to disk in 3rd level)
// So total of 8k+8k=16k
// Total job counter will be 72k+16k = 88k
// 4 maps and 2.5k words in each --- So 10k map input records
// 4 maps and 10k unique words --- So 40k map output records
validateCounters(c1, 88000, 10000, 40000);
// check for a map only job
conf.setNumReduceTasks(0);
Path outputPath3 = new Path(outDir, "output5_2k_5");
FileOutputFormat.setOutputPath(conf, outputPath3);
myJob = JobClient.runJob(conf);
c1 = myJob.getCounters();
// 4 maps and 2.5k words in each --- So 10k map input records
// 4 maps and 10k unique words --- So 40k map output records
validateCounters(c1, 0, 10000, 40000);
} finally {
//clean up the input and output files
if (fs.exists(testDir)) {
fs.delete(testDir, true);
}
}
}
public static class NewMapTokenizer
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class NewIdentityReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
/**
* The main driver for word count map/reduce program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the
* job tracker.
*/
@Test
public void testNewJobWithMapAndReducers() throws Exception {
JobConf conf = new JobConf(TestJobCounters.class);
conf.setInt(JobContext.IO_SORT_MB, 1);
conf.setInt(JobContext.IO_SORT_FACTOR, 2);
conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
FileSystem fs = FileSystem.get(conf);
Path testDir = new Path(TEST_ROOT_DIR, "countertest2");
conf.set("test.build.data", testDir.toString());
try {
if (fs.exists(testDir)) {
fs.delete(testDir, true);
}
if (!fs.mkdirs(testDir)) {
throw new IOException("Mkdirs failed to create " + testDir.toString());
}
String inDir = testDir + File.separator + "genins" + File.separator;
Path wordsIns = new Path(inDir);
if (!fs.mkdirs(wordsIns)) {
throw new IOException("Mkdirs failed to create " + wordsIns.toString());
}
String outDir = testDir + File.separator;
//create 3 input files each with 5*2k words
File inpFile = new File(inDir + "input5_2k_1");
createWordsFile(inpFile);
inpFile = new File(inDir + "input5_2k_2");
createWordsFile(inpFile);
inpFile = new File(inDir + "input5_2k_3");
createWordsFile(inpFile);
FileInputFormat.setInputPaths(conf, inDir);
Path outputPath1 = new Path(outDir, "output5_2k_3");
FileOutputFormat.setOutputPath(conf, outputPath1);
Job job = new Job(conf);
job.setJobName("wordcount-map-reducers");
// the keys are words (strings)
job.setOutputKeyClass(Text.class);
// the values are counts (ints)
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(NewMapTokenizer.class);
job.setCombinerClass(NewIdentityReducer.class);
job.setReducerClass(NewIdentityReducer.class);
job.setNumReduceTasks(1);
job.waitForCompletion(false);
org.apache.hadoop.mapreduce.Counters c1 = job.getCounters();
// 3maps & in each map, 4 first level spills --- So total 12.
// spilled records count:
// Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
// 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
// So total 8k+8k+2k=18k
// For 3 Maps, total = 3*18=54k
// Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
// So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
// 3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
// So 0 records spilled to disk in 3rd level)
// So total of 6k+4k=10k
// Total job counter will be 54k+10k = 64k
//3 maps and 2.5k lines --- So total 7.5k map input records
//3 maps and 10k words in each --- So total of 30k map output recs
validateCounters(Counters.downgrade(c1), 64000, 7500, 30000);
//create 4th input file each with 5*2k words and test with 4 maps
inpFile = new File(inDir + "input5_2k_4");
createWordsFile(inpFile);
JobConf newJobConf = new JobConf(job.getConfiguration());
Path outputPath2 = new Path(outDir, "output5_2k_4");
FileOutputFormat.setOutputPath(newJobConf, outputPath2);
Job newJob = new Job(newJobConf);
newJob.waitForCompletion(false);
c1 = newJob.getCounters();
// 4maps & in each map 4 first level spills --- So total 16.
// spilled records count:
// Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
// 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
// So total 8k+8k+2k=18k
// For 3 Maps, total = 4*18=72k
// Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
// So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
// 3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
// So 0 records spilled to disk in 3rd level)
// So total of 8k+8k=16k
// Total job counter will be 72k+16k = 88k
// 4 maps and 2.5k words in each --- So 10k map input records
// 4 maps and 10k unique words --- So 40k map output records
validateCounters(Counters.downgrade(c1), 88000, 10000, 40000);
JobConf newJobConf2 = new JobConf(newJob.getConfiguration());
Path outputPath3 = new Path(outDir, "output5_2k_5");
FileOutputFormat.setOutputPath(newJobConf2, outputPath3);
Job newJob2 = new Job(newJobConf2);
newJob2.setNumReduceTasks(0);
newJob2.waitForCompletion(false);
c1 = newJob2.getCounters();
// 4 maps and 2.5k words in each --- So 10k map input records
// 4 maps and 10k unique words --- So 40k map output records
validateCounters(Counters.downgrade(c1), 0, 10000, 40000);
} finally {
//clean up the input and output files
if (fs.exists(testDir)) {
fs.delete(testDir, true);
}
}
}
}