| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import static org.junit.Assert.*; |
| |
| /** |
| * Tests the old mapred APIs with {@link Reporter#getProgress()}. |
| */ |
| public class TestReporter { |
| private static final Path rootTempDir = |
| new Path(System.getProperty("test.build.data", "/tmp")); |
| private static final Path testRootTempDir = |
| new Path(rootTempDir, "TestReporter"); |
| |
| private static FileSystem fs = null; |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| fs = FileSystem.getLocal(new Configuration()); |
| fs.delete(testRootTempDir, true); |
| fs.mkdirs(testRootTempDir); |
| } |
| |
| @AfterClass |
| public static void cleanup() throws Exception { |
| fs.delete(testRootTempDir, true); |
| } |
| |
| // an input with 4 lines |
| private static final String INPUT = "Hi\nHi\nHi\nHi\n"; |
| private static final int INPUT_LINES = INPUT.split("\n").length; |
| |
| @SuppressWarnings("deprecation") |
| static class ProgressTesterMapper extends MapReduceBase |
| implements Mapper<LongWritable, Text, Text, Text> { |
| private float progressRange = 0; |
| private int numRecords = 0; |
| private Reporter reporter = null; |
| |
| @Override |
| public void configure(JobConf job) { |
| super.configure(job); |
| // set the progress range accordingly |
| if (job.getNumReduceTasks() == 0) { |
| progressRange = 1f; |
| } else { |
| progressRange = 0.667f; |
| } |
| } |
| |
| @Override |
| public void map(LongWritable key, Text value, |
| OutputCollector<Text, Text> output, Reporter reporter) |
| throws IOException { |
| this.reporter = reporter; |
| |
| // calculate the actual map progress |
| float mapProgress = ((float)++numRecords)/INPUT_LINES; |
| // calculate the attempt progress based on the progress range |
| float attemptProgress = progressRange * mapProgress; |
| assertEquals("Invalid progress in map", |
| attemptProgress, reporter.getProgress(), 0f); |
| output.collect(new Text(value.toString() + numRecords), value); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| assertEquals("Invalid progress in map cleanup", |
| progressRange, reporter.getProgress(), 0f); |
| } |
| } |
| |
| /** |
| * Test {@link Reporter}'s progress for a map-only job. |
| * This will make sure that only the map phase decides the attempt's progress. |
| */ |
| @SuppressWarnings("deprecation") |
| @Test |
| public void testReporterProgressForMapOnlyJob() throws IOException { |
| Path test = new Path(testRootTempDir, "testReporterProgressForMapOnlyJob"); |
| |
| JobConf conf = new JobConf(); |
| conf.setMapperClass(ProgressTesterMapper.class); |
| conf.setMapOutputKeyClass(Text.class); |
| // fail early |
| conf.setMaxMapAttempts(1); |
| conf.setMaxReduceAttempts(0); |
| |
| RunningJob job = |
| UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"), |
| 1, 0, INPUT); |
| job.waitForCompletion(); |
| |
| assertTrue("Job failed", job.isSuccessful()); |
| } |
| |
| /** |
| * A {@link Reducer} implementation that checks the progress on every call |
| * to {@link Reducer#reduce(Object, Iterator, OutputCollector, Reporter)}. |
| */ |
| @SuppressWarnings("deprecation") |
| static class ProgressTestingReducer extends MapReduceBase |
| implements Reducer<Text, Text, Text, Text> { |
| private int recordCount = 0; |
| private Reporter reporter = null; |
| // reduce task has a fixed split of progress amongst copy, shuffle and |
| // reduce phases. |
| private final float REDUCE_PROGRESS_RANGE = 1.0f/3; |
| private final float SHUFFLE_PROGRESS_RANGE = 1 - REDUCE_PROGRESS_RANGE; |
| |
| @Override |
| public void configure(JobConf job) { |
| super.configure(job); |
| } |
| |
| @Override |
| public void reduce(Text key, Iterator<Text> values, |
| OutputCollector<Text, Text> output, Reporter reporter) |
| throws IOException { |
| float reducePhaseProgress = ((float)++recordCount)/INPUT_LINES; |
| float weightedReducePhaseProgress = |
| reducePhaseProgress * REDUCE_PROGRESS_RANGE; |
| assertEquals("Invalid progress in reduce", |
| SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress, |
| reporter.getProgress(), 0.02f); |
| this.reporter = reporter; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| assertEquals("Invalid progress in reduce cleanup", |
| 1.0f, reporter.getProgress(), 0f); |
| } |
| } |
| |
| /** |
| * Test {@link Reporter}'s progress for map-reduce job. |
| */ |
| @SuppressWarnings("deprecation") |
| @Test |
| public void testReporterProgressForMRJob() throws IOException { |
| Path test = new Path(testRootTempDir, "testReporterProgressForMRJob"); |
| |
| JobConf conf = new JobConf(); |
| conf.setMapperClass(ProgressTesterMapper.class); |
| conf.setReducerClass(ProgressTestingReducer.class); |
| conf.setMapOutputKeyClass(Text.class); |
| // fail early |
| conf.setMaxMapAttempts(1); |
| conf.setMaxReduceAttempts(1); |
| |
| RunningJob job = |
| UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"), |
| 1, 1, INPUT); |
| job.waitForCompletion(); |
| |
| assertTrue("Job failed", job.isSuccessful()); |
| } |
| } |