blob: 6ab42b49e57835b241546219c863fbff2de2a588 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.mapreduce.MapReduceTestUtil.DataCopyMapper;
import org.apache.hadoop.mapreduce.MapReduceTestUtil.DataCopyReducer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests context api and {@link StatusReporter#getProgress()} via
* {@link TaskAttemptContext#getProgress()} API .
*/
public class TestTaskContext extends HadoopTestCase {
private static final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp"));
private static final Path testRootTempDir =
new Path(rootTempDir, "TestTaskContext");
private static FileSystem fs = null;
@BeforeClass
public static void setup() throws Exception {
fs = FileSystem.getLocal(new Configuration());
fs.delete(testRootTempDir, true);
fs.mkdirs(testRootTempDir);
}
@AfterClass
public static void cleanup() throws Exception {
fs.delete(testRootTempDir, true);
}
public TestTaskContext() throws IOException {
super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 1, 1);
}
static String myStatus = "my status";
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void setup(Context context) throws IOException {
context.setStatus(myStatus);
assertEquals(myStatus, context.getStatus());
}
}
/**
* Tests context.setStatus method.
*
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
@Test
public void testContextStatus()
throws IOException, InterruptedException, ClassNotFoundException {
Path test = new Path(testRootTempDir, "testContextStatus");
// test with 1 map and 0 reducers
// test with custom task status
int numMaps = 1;
Job job = MapReduceTestUtil.createJob(createJobConf(),
new Path(test, "in"), new Path(test, "out"), numMaps, 0);
job.setMapperClass(MyMapper.class);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
TaskReport[] reports = job.getTaskReports(TaskType.MAP);
assertEquals(numMaps, reports.length);
assertEquals(myStatus, reports[0].getState());
// test with 1 map and 1 reducer
// test with default task status
int numReduces = 1;
job = MapReduceTestUtil.createJob(createJobConf(),
new Path(test, "in"), new Path(test, "out"), numMaps, numReduces);
job.setMapperClass(DataCopyMapper.class);
job.setReducerClass(DataCopyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// fail early
job.setMaxMapAttempts(1);
job.setMaxReduceAttempts(0);
// run the job and wait for completion
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
// check map task reports
reports = job.getTaskReports(TaskType.MAP);
assertEquals(numMaps, reports.length);
assertEquals("map > sort", reports[0].getState());
// check reduce task reports
reports = job.getTaskReports(TaskType.REDUCE);
assertEquals(numReduces, reports.length);
assertEquals("reduce > reduce", reports[0].getState());
}
// an input with 4 lines
private static final String INPUT = "Hi\nHi\nHi\nHi\n";
private static final int INPUT_LINES = INPUT.split("\n").length;
@SuppressWarnings("unchecked")
static class ProgressCheckerMapper
extends Mapper<LongWritable, Text, Text, Text> {
private int recordCount = 0;
private float progressRange = 0;
@Override
protected void setup(Context context) throws IOException {
// check if the map task attempt progress is 0
assertEquals("Invalid progress in map setup",
0.0f, context.getProgress(), 0f);
// define the progress boundaries
if (context.getNumReduceTasks() == 0) {
progressRange = 1f;
} else {
progressRange = 0.667f;
}
}
@Override
protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException ,InterruptedException {
// get the map phase progress
float mapPhaseProgress = ((float)++recordCount)/INPUT_LINES;
// get the weighted map phase progress
float weightedMapProgress = progressRange * mapPhaseProgress;
// check the map progress
assertEquals("Invalid progress in map",
weightedMapProgress, context.getProgress(), 0f);
context.write(new Text(value.toString() + recordCount), value);
};
protected void cleanup(Mapper.Context context)
throws IOException, InterruptedException {
// check if the attempt progress is at the progress boundary
assertEquals("Invalid progress in map cleanup",
progressRange, context.getProgress(), 0f);
};
}
/**
* Tests new MapReduce map task's context.getProgress() method.
*
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void testMapContextProgress()
throws IOException, InterruptedException, ClassNotFoundException {
int numMaps = 1;
Path test = new Path(testRootTempDir, "testMapContextProgress");
Job job = MapReduceTestUtil.createJob(createJobConf(),
new Path(test, "in"), new Path(test, "out"), numMaps, 0, INPUT);
job.setMapperClass(ProgressCheckerMapper.class);
job.setMapOutputKeyClass(Text.class);
// fail early
job.setMaxMapAttempts(1);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
}
@SuppressWarnings("unchecked")
static class ProgressCheckerReducer extends Reducer<Text, Text,
Text, Text> {
private int recordCount = 0;
private final float REDUCE_PROGRESS_RANGE = 1.0f/3;
private final float SHUFFLE_PROGRESS_RANGE = 1 - REDUCE_PROGRESS_RANGE;
protected void setup(final Reducer.Context context)
throws IOException, InterruptedException {
// Note that the reduce will read some segments before calling setup()
float reducePhaseProgress = ((float)++recordCount)/INPUT_LINES;
float weightedReducePhaseProgress =
REDUCE_PROGRESS_RANGE * reducePhaseProgress;
// check that the shuffle phase progress is accounted for
assertEquals("Invalid progress in reduce setup",
SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress,
context.getProgress(), 0.01f);
};
public void reduce(Text key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {
float reducePhaseProgress = ((float)++recordCount)/INPUT_LINES;
float weightedReducePhaseProgress =
REDUCE_PROGRESS_RANGE * reducePhaseProgress;
assertEquals("Invalid progress in reduce",
SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress,
context.getProgress(), 0.01f);
}
protected void cleanup(Reducer.Context context)
throws IOException, InterruptedException {
// check if the reduce task has progress of 1 in the end
assertEquals("Invalid progress in reduce cleanup",
1.0f, context.getProgress(), 0f);
};
}
/**
* Tests new MapReduce reduce task's context.getProgress() method.
*
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
@Test
public void testReduceContextProgress()
throws IOException, InterruptedException, ClassNotFoundException {
int numTasks = 1;
Path test = new Path(testRootTempDir, "testReduceContextProgress");
Job job = MapReduceTestUtil.createJob(createJobConf(),
new Path(test, "in"), new Path(test, "out"), numTasks, numTasks,
INPUT);
job.setMapperClass(ProgressCheckerMapper.class);
job.setReducerClass(ProgressCheckerReducer.class);
job.setMapOutputKeyClass(Text.class);
// fail early
job.setMaxMapAttempts(1);
job.setMaxReduceAttempts(1);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
}
}