blob: 5cf08991869ff68b36cee161dca748c8fc70dfbd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.StringTokenizer;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* A JUnit test to test the Map-Reduce framework's support for the
* "mark-reset" functionality in Reduce Values Iterator
*/
public class TestValueIterReset extends TestCase {
private static final int NUM_MAPS = 1;
private static final int NUM_TESTS = 4;
private static final int NUM_VALUES = 40;
private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"));
private static Configuration conf = new Configuration();
private static FileSystem localFs;
static {
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
}
private static final Log LOG =
LogFactory.getLog(TestValueIterReset.class);
public static class TestMapper
extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
IntWritable outKey = new IntWritable();
IntWritable outValue = new IntWritable();
for (int j = 0; j < NUM_TESTS; j++) {
for (int i = 0; i < NUM_VALUES; i++) {
outKey.set(j);
outValue.set(i);
context.write(outKey, outValue);
}
}
}
}
public static class TestReducer
extends Reducer< IntWritable,IntWritable,IntWritable,IntWritable> {
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int errors = 0;
MarkableIterator<IntWritable> mitr =
new MarkableIterator<IntWritable>(values.iterator());
switch (key.get()) {
case 0:
errors += test0(key, mitr);
break;
case 1:
errors += test1(key, mitr);
break;
case 2:
errors += test2(key, mitr);
break;
case 3:
errors += test3(key, mitr);
break;
default:
break;
}
context.write(key, new IntWritable(errors));
}
}
/**
* Test the most common use case. Mark before start of the iteration and
* reset at the end to go over the entire list
* @param key
* @param values
* @return
* @throws IOException
*/
private static int test0(IntWritable key,
MarkableIterator<IntWritable> values)
throws IOException {
int errors = 0;
IntWritable i;
ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
LOG.info("Executing TEST:0 for Key:"+ key.toString());
values.mark();
LOG.info("TEST:0. Marking");
while (values.hasNext()) {
i = values.next();
expectedValues.add(i);
LOG.info(key + ":" + i);
}
values.reset();
LOG.info("TEST:0. Reset");
int count = 0;
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (i != expectedValues.get(count)) {
LOG.info("TEST:0. Check:1 Expected: " + expectedValues.get(count) +
", Got: " + i);
errors ++;
return errors;
}
count ++;
}
LOG.info("TEST:0 Done");
return errors;
}
/**
* Test the case where we do a mark outside of a reset. Test for both file
* and memory caches
* @param key
* @param values
* @return
* @throws IOException
*/
private static int test1(IntWritable key,
MarkableIterator<IntWritable> values)
throws IOException {
IntWritable i;
int errors = 0;
int count = 0;
ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
ArrayList<IntWritable> expectedValues1 = new ArrayList<IntWritable>();
LOG.info("Executing TEST:1 for Key:" + key);
values.mark();
LOG.info("TEST:1. Marking");
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
expectedValues.add(i);
if (count == 2) {
break;
}
count ++;
}
values.reset();
LOG.info("TEST:1. Reset");
count = 0;
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (count < expectedValues.size()) {
if (i != expectedValues.get(count)) {
errors ++;
LOG.info("TEST:1. Check:1 Expected: " + expectedValues.get(count) +
", Got: " + i);
return errors;
}
}
// We have moved passed the first mark, but still in the memory cache
if (count == 3) {
values.mark();
LOG.info("TEST:1. Marking -- " + key + ": " + i);
}
if (count >= 3) {
expectedValues1.add(i);
}
if (count == 5) {
break;
}
count ++;
}
if (count < expectedValues.size()) {
LOG.info(("TEST:1 Check:2. Iterator returned lesser values"));
errors ++;
return errors;
}
values.reset();
count = 0;
LOG.info("TEST:1. Reset");
expectedValues.clear();
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (count < expectedValues1.size()) {
if (i != expectedValues1.get(count)) {
errors ++;
LOG.info("TEST:1. Check:3 Expected: " + expectedValues1.get(count)
+ ", Got: " + i);
return errors;
}
}
// We have moved passed the previous mark, but now we are in the file
// cache
if (count == 25) {
values.mark();
LOG.info("TEST:1. Marking -- " + key + ":" + i);
}
if (count >= 25) {
expectedValues.add(i);
}
count ++;
}
if (count < expectedValues1.size()) {
LOG.info(("TEST:1 Check:4. Iterator returned fewer values"));
errors ++;
return errors;
}
values.reset();
LOG.info("TEST:1. Reset");
count = 0;
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (i != expectedValues.get(count)) {
errors ++;
LOG.info("TEST:1. Check:5 Expected: " + expectedValues.get(count)
+ ", Got: " + i);
return errors;
}
}
LOG.info("TEST:1 Done");
return errors;
}
/**
* Test the case where we do a mark inside a reset. Test for both file
* and memory
* @param key
* @param values
* @return
* @throws IOException
*/
private static int test2(IntWritable key,
MarkableIterator<IntWritable> values)
throws IOException {
IntWritable i;
int errors = 0;
int count = 0;
ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
ArrayList<IntWritable> expectedValues1 = new ArrayList<IntWritable>();
LOG.info("Executing TEST:2 for Key:" + key);
values.mark();
LOG.info("TEST:2 Marking");
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
expectedValues.add(i);
if (count == 8) {
break;
}
count ++;
}
values.reset();
count = 0;
LOG.info("TEST:2 reset");
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (count < expectedValues.size()) {
if (i != expectedValues.get(count)) {
errors ++;
LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+ ", Got: " + i);
return errors;
}
}
// We have moved passed the first mark, but still reading from the
// memory cache
if (count == 3) {
values.mark();
LOG.info("TEST:2. Marking -- " + key + ":" + i);
}
if (count >= 3) {
expectedValues1.add(i);
}
count ++;
}
values.reset();
LOG.info("TEST:2. Reset");
expectedValues.clear();
count = 0;
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (count < expectedValues1.size()) {
if (i != expectedValues1.get(count)) {
errors ++;
LOG.info("TEST:2. Check:2 Expected: " + expectedValues1.get(count)
+ ", Got: " + i);
return errors;
}
}
// We have moved passed the previous mark, but now we are in the file
// cache
if (count == 20) {
values.mark();
LOG.info("TEST:2. Marking -- " + key + ":" + i);
}
if (count >= 20) {
expectedValues.add(i);
}
count ++;
}
values.reset();
count = 0;
LOG.info("TEST:2. Reset");
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (i != expectedValues.get(count)) {
errors ++;
LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+ ", Got: " + i);
return errors;
}
}
LOG.info("TEST:2 Done");
return errors;
}
/**
* Test "clearMark"
* @param key
* @param values
* @return
* @throws IOException
*/
private static int test3(IntWritable key,
MarkableIterator<IntWritable> values)
throws IOException {
int errors = 0;
IntWritable i;
ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
LOG.info("Executing TEST:3 for Key:" + key);
values.mark();
LOG.info("TEST:3. Marking");
int count = 0;
while (values.hasNext()) {
i = values.next();;
LOG.info(key + ":" + i);
if (count == 5) {
LOG.info("TEST:3. Clearing Mark");
values.clearMark();
}
if (count == 8) {
LOG.info("TEST:3. Marking -- " + key + ":" + i);
values.mark();
}
if (count >= 8) {
expectedValues.add(i);
}
count ++;
}
values.reset();
LOG.info("TEST:3. After reset");
if (!values.hasNext()) {
errors ++;
LOG.info("TEST:3, Check:1. HasNext returned false");
return errors;
}
count = 0;
while (values.hasNext()) {
i = values.next();
LOG.info(key + ":" + i);
if (count < expectedValues.size()) {
if (i != expectedValues.get(count)) {
errors ++;
LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+ ", Got: " + i);
return errors;
}
}
if (count == 10) {
values.clearMark();
LOG.info("TEST:3. After clear mark");
}
count ++;
}
boolean successfulClearMark = false;
try {
LOG.info("TEST:3. Before Reset");
values.reset();
} catch (IOException e) {
successfulClearMark = true;
}
if (!successfulClearMark) {
LOG.info("TEST:3 Check:4 reset was successfule even after clearMark");
errors ++;
return errors;
}
LOG.info("TEST:3 Done.");
return errors;
}
public void createInput() throws Exception {
// Just create one line files. We use this only to
// control the number of map tasks
for (int i = 0; i < NUM_MAPS; i++) {
Path file = new Path(TEST_ROOT_DIR+"/in", "test" + i + ".txt");
localFs.delete(file, false);
OutputStream os = localFs.create(file);
Writer wr = new OutputStreamWriter(os);
wr.write("dummy");
wr.close();
}
}
public void testValueIterReset() {
try {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "TestValueIterReset") ;
job.setJarByClass(TestValueIterReset.class);
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
job.setNumReduceTasks(NUM_TESTS);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.getConfiguration().
setInt(MRJobConfig.REDUCE_MARKRESET_BUFFER_SIZE,128);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path(TEST_ROOT_DIR + "/in"));
Path output = new Path(TEST_ROOT_DIR + "/out");
localFs.delete(output, true);
FileOutputFormat.setOutputPath(job, output);
createInput();
assertTrue(job.waitForCompletion(true));
validateOutput();
} catch (Exception e) {
e.printStackTrace();
assertTrue(false);
}
}
private void validateOutput() throws IOException {
Path[] outputFiles = FileUtil.stat2Paths(
localFs.listStatus(new Path(TEST_ROOT_DIR + "/out"),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = localFs.open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
while (line != null) {
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String key = tokeniz.nextToken();
String value = tokeniz.nextToken();
LOG.info("Output: key: "+ key + " value: "+ value);
int errors = Integer.parseInt(value);
assertTrue(errors == 0);
line = reader.readLine();
}
reader.close();
}
}
}